Buffer Block
The BufferBlock<T> class in the Task Parallel Library's Dataflow is essentially a queue. It allows you to asynchronously take and receive data from the block. It doesn't perform any action on the data; it simply collects it until something pulls the data out.
The BufferBlock is most commonly used in conjunction with other blocks as it exhibits both ISourceBlock and ITargetBlock interfaces. It buffers its input and produces a bounded or unbounded buffer based on its configuration.
BufferBlock characteristics
- Buffers messages to the first consumer that could accept it
- Does not alter the messages
- Useful at the start and end of sub-workflows
- Acts as message throttle Block in producer-consumer scenario
Buffers messages but can not change these messages. This might sound a little bit strange and raise the question of why we are using it. The answer relies more on the architecture point of view and some aspects of the Block implementation that are not in the spotlight from the beginning. The purpose of any implementation is not just to make things work, but to make them work efficiently and in a manner that would help developers understand, work, and follow the implementation as well as make it easier to expand or alter it later on. It is considered a good practice to use Buffer Blocks at the start and end of sub-workflows as it takes advantage of the throttling addition of Buffer that the Block can offer.
Producer-Consumer Scenario
Buffer offers items in a production line style. Visits first Consumer, if is available hands over the Message. If a Consumer is not available will pass it over to the next available Consumer. If none is available, will wait for them and will start all over again.
|
|
|
|
|
|
|
|
|
-> |
|
Accepted |
Consumer A |
Accepted so will not offer it to the next available |
|
Buffer |
-> |
Message 1 |
|
Consumer B |
|
|
|
-> |
|
|
Consumer C |
|
|
|
|
|
|
|
|
|
|
-> |
|
Rejected |
Consumer A |
Busy with Message 1... |
|
buffer |
-> |
Message 2 |
Accepted |
Consumer B |
Accepted so will not offer it to the next available |
|
|
-> |
|
|
Consumer C |
|
Accepting and Processing
Accepting and processing a message are two independent processes for the Buffer Block. When we have multiple Consumers, we need to specify the size of the input queue of the consumer. Multiple consumers without specified input queue size will cause the first Consumer to grab all the messages first. Ideally, we would prefer each consumer to accept one message by turn, until having their queues filled.
This is done by specifying the size of their queue with the property: new ExcecutionDataflowBlockOptions(){ BoundedCapacity = 1}); to every consumer and Buffer. Then we need to adjust our Post implementation for the post method to use the SendAsync() with Continue like bufferBlock.SendAsync(i).ContinueWith (a => if(a.Result {//do somthing}else {//Do something else}) );
An example that could be used for processing images.
using System.Threading.Tasks.Dataflow;
/// <summary>
/// Essentially like ImageProcessingPipeline but with buffer
/// </summary>
public class DemoBuffer
{
private readonly BufferBlock<Stream> bufferBlock;
private readonly TransformBlock<Stream, ProcessedImages> resizeBlock;
private readonly TransformBlock<ProcessedImages, ProcessedImages> filterBlock;
private readonly ActionBlock<ProcessedImages> saveBlock;
public DemoBuffer()
{
bufferBlock = new BufferBlock<Stream>();
resizeBlock = new TransformBlock<Stream, ProcessedImages>(async stream =>
{
// Resize the image and return processed images
Console.WriteLine("TransformBlock resized stream");
return await ResizeImageAsync(stream);
});
filterBlock = new TransformBlock<ProcessedImages, ProcessedImages>(images =>
{
// Apply filters to each resized image
Console.WriteLine($"TransformBlock will resize image");
return ApplyFilters(images);
});
saveBlock = new ActionBlock<ProcessedImages>(images =>
{
// Save processed images to storage
Console.WriteLine($"ActionBlock will save image");
SaveImages(images);
});
// Link the blocks together, "entry" point is ProcessImageAsync
bufferBlock.LinkTo(resizeBlock, new DataflowLinkOptions { PropagateCompletion = true });
resizeBlock.LinkTo(filterBlock, new DataflowLinkOptions { PropagateCompletion = true });
filterBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task ProcessImageAsync(Stream imageStream)
{
await bufferBlock.SendAsync(imageStream);
}
private async Task CompleteAsync()
{
bufferBlock.Complete();
await saveBlock.Completion;
}
private async Task<ProcessedImages> ResizeImageAsync(Stream imageStream)
{
// Simulate resizing the image asynchronously
await Task.Delay(1000);
return new ProcessedImages();
}
private ProcessedImages ApplyFilters(ProcessedImages images)
{
// Simulate applying filters to the images
return images;
}
private void SaveImages(ProcessedImages images)
{
// Simulate saving the processed images to storage
Console.WriteLine("Processed images saved successfully.");
}
private class ProcessedImages
{
// Define properties or fields to hold processed image data
}
// usage for demo...
public async Task Run()
{
var pipeline = new DemoBuffer();
// Simulate receiving image uploads
for (int i = 0; i < 5; i++)
{
// Simulate receiving an image stream
var imageStream = new MemoryStream();
Console.WriteLine($"Sending to pipeline (buffer) the image {i + 1}");
// Process the image asynchronously
await pipeline.ProcessImageAsync(imageStream);
}
// Signal the completion of processing
await pipeline.CompleteAsync();
}
}
var procImageWithBuffer = new DemoBuffer();
await procImageWithBuffer.Run();
The DemoBuffer class is where the pipeline is defined. This class contains four blocks that make up the pipeline:
bufferBlock: A BufferBlock<Stream> that accepts image streams and buffers them.
resizeBlock: A TransformBlock<Stream, ProcessedImages> that resizes each image.
filterBlock: A TransformBlock<ProcessedImages, ProcessedImages> that applies filters to each resized image.
saveBlock: An ActionBlock<ProcessedImages> that saves each processed image.
When a new DemoBuffer object is initialized, it links these blocks together in the order they are listed above using the LinkTo function, with the DataflowLinkOptions { PropagateCompletion = true } set. This option ensures that when the preceding block completes, the next block in the pipeline also receives the completion signal.
The ProcessImageAsync method is used to post image data to the bufferBlock for processing. The CompleteAsync method signals to the bufferBlock that no more data will be posted, and it waits for the saveBlock to complete processing all of its input.
The Run method in the DemoBuffer class demonstrates how to use the pipeline. It posts five simulated image uploads to the bufferBlock, then calls CompleteAsync to signal the completion of data posting and waits for all processing to complete.
The ResizeImageAsync, ApplyFilters, and SaveImages methods are stubs for the actual image processing work. ResizeImageAsync is mentioned as a process to resize the image and return the processed image. ApplyFilters is mentioned to apply some filters to the image after resizing. SaveImages is a method to save the processed images. Note that these methods log output to the Console and do not actually do any image processing here.
No files yet, migration hasn't completed yet!