Transform Block
Characteristics:
• Accepts and sends data
• Has an output queue
• Map function like Func(T, U)
Transform Block Characteristics:
• Accepts and sends data
• Has an output queue
• Map function like Func<T, U>
• Default value for processing is 1 (no parallel execution, but can be overridden)
Receive Function Block
• Accepts an item
• Will make the Receive call
• Return it to the program and move on
Parallelism without Logic changes - Enable parallelism at the constructor
Transform process number items is set to 1 by default. We can specify it by providing the options at the constructor.
var transformBlock = new TransformBlock<int, string>(n =>
{
TaskDelary(TimeSpan.FromSeconds(1));
return n.ToString();
},
new ExecutionDataflowBlockOptions(){ MaxDegreeOfParallelism = 2}
);
This is an example of how TPL Dataflow allows parallelism implementation without having to change the Execution and/or Business Logic.
Item1 Item2 -> f(x) -> Item2 Item1
Input Queue -> f(x) -> Output Queue
class Program
{
static async Task Main(string[] args)
{
// Create a TransformBlock that doubles the input integer
var doublerBlock = new TransformBlock<int, int>(input => input * 2);
// Post some data to the block
for (int i = 1; i <= 10; i++)
{
await doublerBlock.SendAsync(i);
}
// Complete the block, indicating that no more data will be posted
doublerBlock.Complete();
// Read the output data from the block
while (await doublerBlock.OutputAvailableAsync())
{
var output = await doublerBlock.ReceiveAsync();
Console.WriteLine(output);
}
}
}
In this example:
- We create a TransformBlock<int, int> where the input type is int and the output type is also int. The block will double the input integer and produce it as output.
- We post integers from 1 to 10 into the block asynchronously using SendAsync.
- After all data has been posted, we complete the block using Complete(). This indicates that no more data will be sent to the block.
- Finally, we read the output from the block using OutputAvailableAsync() to check if there's output available and ReceiveAsync() to receive the output.
No files yet, migration hasn't completed yet!