Join Block
Characteristics of JoinBlock:
• Joins two Blocks that do not have necessarily the same type
• Produces a Tuple<T1,T2,T3>
• It requires ALL Blocks to have data to perform the Action
Consider the scenario
We have three buffers, b1 of type BufferBlock<int>(); b2 of type BufferBlock<int>(), and b3 of type BufferBlock<string>(), and want to join the b1 and b2 with buffer b3. As the b1 and b2 are of type int and b3 is of type string we would follow the below steps:
- Create two JoinBlocks, one for b1 and one for b2.
- Link buffers to JoinBlock's.Target1 and .Target2
- Create two ActionBlock of type Tuple<int, string> for each buffer
- Link the created JoinBlocks to ActionBlocks
- As soon as the Buffers receive data, the Join will take place.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public interface IDataflowService<T1, T2>
{
JoinBlock<T1, T2> JoinBlock { get; }
Task ProcessAsync(T1 item1, T2 item2);
}
public class DemoJoinBlock : IDataflowService<string, string>
{
public JoinBlock<string, string> JoinBlock { get; }
public DemoJoinBlock()
{
JoinBlock = new JoinBlock<string, string>();
var actionBlock = new ActionBlock<Tuple<string, string>>(userInfo =>
Console.WriteLine($"User with name {userInfo.Item1} and email {userInfo.Item2} processed."));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
JoinBlock.LinkTo(actionBlock, linkOptions);
}
public async Task ProcessAsync(string userName, string userSurname)
{
JoinBlock.Target1.Post(userName);
JoinBlock.Target2.Post(userSurname);
JoinBlock.Target1.Complete();
JoinBlock.Target2.Complete();
Console.WriteLine($"Finished processing: {userName} - {userSurname}");
await JoinBlock.Completion;
}
/// <summary>
/// This is only for testing. In API register the interface and use the ProcessAsync.
/// </summary>
public async Task Run()
{
var random = new Random();
var userNames = new List<string> { "Alice", "Leo", "Christiano", "Eric" };
var userSurnames = new List<string> { "Cooper", "Messi", "Ronaldo", "Cantona" };
foreach (var userName in userNames)
{
string userSurname = userSurnames[random.Next(userSurnames.Count)];
Console.WriteLine($"Sending for processing: {userName} - {userSurname}");
await ProcessAsync(userName, userSurname);
}
}
// Register as Scoped, Transient or Singleton
// services.AddSingleton<IDataflowService<string, string>, UserService>();
}
The provided C# code represents the use of TPL (Task Parallel Library) Dataflow, particularly the JoinBlock within the DemoJoinBlock class that implements the IDataflowService<string, string> interface. This is a demonstration of how to process a stream of data concurrently and asynchronously using dataflow blocks.
The IDataflowService<T1, T2> interface defines a generic contract for services that implement a form of a dataflow processing pattern.
The DemoJoinBlock class is the implementation of this interface. It includes:
A JoinBlock<string, string>, joins the input data streams from two sources that provide strings (they are named Target1 and Target2). A newly created item is posted to the JoinBlock each time an item is available on both targets.
The ProcessAsync method posts items—userName and userSurname—to both targets and marks them as completed. After their processing is done, it awaits the JoinBlock.Completion which signifies that all data processing is complete.
An ActionBlock<Tuple<string, string>> that, once linked to the JoinBlock with a DataflowLinkOptions (set to propagate completion), processes the tuples from the JoinBlock in the parallelizing action defined in the ActionBlock. After processing, it logs the userName and userSurname to the console.
The Run method is only for testing. It generates a set of user names and randomly matched surnames, which it sends for processing through the ProcessAsync.
A commented-out registration service that hints at how the DemoJoinBlock might be added to a dependency injection container in a real application.
No files yet, migration hasn't completed yet!