TPL Tricks of Trade


Options Filters and some messages from the past...

When adding options that alter the behavior of the Block like Append (ordering), MaxMessages, Filter(accepting in condition only), etc... Be cautious. It is easy to overlook, confuse yourself, and get some unwanted behavior. Whenever possible create an extension class to implement these options and take a minute to add some comments, notes, and warnings into your code, it is really easy to oversee little details in clustered code.

 

Filtering

Always pay attention or bring the attention to developers when adding options. Consider a workflow in which the two Consumers have different options. TPL is message guarantee architecture. That means Consumers do not discard messages by default as TPL operates as a push rather than pull message architecture. Now consider that these Consumers are in order first Consumer has a MaxMessage option set and the other Consumer has a condition, let's say string length. If we oversee these options and send all the messages to the first Consumer and cause it to fill the MaxMessages amount and then send some messages that do not satisfy the string length to the second Consumer, we will cause messages to not be processed or lost.

 

 

    public static class BlockExtensions

    {

        /// <summary>

        /// LinkTo and Add Propagation to source Block

        /// </summary>

        /// <typeparam name="T">SourceBlock</typeparam>

        /// <param name="producer">SourceBlock</param>

        /// <param name="consumer">TargetBlock</param>

        /// <returns></returns>

        public static IDisposable LinkToPropagation<T>(this ISourceBlock<T> producer, ITargetBlock<T> consumer)

        {

            return producer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });

        }

    }

 

 

Filter and fall back ActionBlock

One way to ensure that we are not accidentally missing something is to add a fallback ActionBlock that writes to our Log. This way if something is gone missing due to filtering for instance, then we could get it by checking the logs and check the traces from messages.

 

sourceBlock.LinkTo(new ActionBlock<T>(a=> Console.WriteLine($"Message {a} not processed.")));

 

 

Handling multiple Completions (highlight: make it visible)

//// When having many blocks

//Task.WhenAll(new[] {producerA.Completion, producerB.Completion, producer3.Completion})

.ContinueWith(consumer.Complete());

//await consumer.Completion;

 

 

target.Completion.ContinueWith(a =>

            {

                if (a.IsFaulted)

                {

                    Console.WriteLine($"Item faulted, exception message: {a.Exception}");

                   

                }

                else

                {

                    source.Complete();

                }

            });

 

 

 


No files yet, migration hasn't completed yet!