TPL interfaces
Having to pass options inside the LinkTo, Comple, Completion, and other functions is error-prone and might cause some bugs as the code becomes more complicated. The best practice is to create some extension methods to extend the functionality.
Interfaces
Four main interfaces can be used to extend ending and linking functions.
|
ISourceBloc<> |
ITargetBlock<> |
IPropagatorBlock<,> |
IDataflow |
|
Producers |
Consumers |
Transformer |
Lifecycle |
|
Used by the Producers |
Used by the Consumers |
Combines ISource and ITargetBlock And does not provide new functionality |
Responsible for Block Completion |
Options Filters and some messages from the past...
When adding options that alter the behavior of the Block like Append (ordering), MaxMessages, Filter(accepting in condition only), etc... Be cautious. It is easy to overlook, confuse yourself, and get some unwanted behavior. Whenever possible create an extension class to implement these options and take a minute to add some comments, notes, and warnings into your code, it is really easy to oversee little details in clustered code.
Filtering
Always pay attention or bring the attention to developers when adding options. Consider a workflow in which the two Consumers have different options. TPL is message guarantee architecture. That means Consumers do not discard messages by default as TPL operates as a push rather than pull message architecture. Now consider that these Consumers are in order and the first Consumer has a MaxMessage option set and the other Consumer has a condition, let's say string length. If we oversee these options and we send all the messages to the first Consumer and cause it to fill the MaxMessages amount and then send some messages that do not satisfy the string length to the second Consumer, we will cause messages to not be processed or lost.
public static class BlockExtensions
{
/// <summary>
/// LinkTo and Add Propagation to source Block
/// </summary>
/// <typeparam name="T">SourceBlock</typeparam>
/// <param name="producer">SourceBlock</param>
/// <param name="consumer">TargetBlock</param>
/// <returns></returns>
public static IDisposable LinkToPropagation<T>(this ISourceBlock<T> producer, ITargetBlock<T> consumer)
{
return producer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
}
Filter and fall back ActionBlock
One way to ensure that we are not accidentally missing something is to add a fallback ActionBlock that writes to our Log. This way if something is gone missing due to filtering for instance, then we could get it by checking the logs and check the traces from messages.
sourceBlock.LinkTo(new ActionBlock<T>(a=> Console.WriteLine($"Message {a} not processed.")));
//// When having many blocks
//Task.WhenAll(new[] {producerA.Completion, producerB.Completion, producer3.Completion})
.ContinueWith(consumer.Complete());
//await consumer.Completion;
No files yet, migration hasn't completed yet!