TPL for RESTful example
TPL Dataflow can be useful in a RESTful application for scenarios where you need to handle concurrent requests efficiently, perform asynchronous processing of incoming data, and manage the flow of data through various stages of processing. Let's consider a simple example scenario where TPL Dataflow could be applied:
Imagine you have a RESTful API endpoint that accepts image uploads. Upon receiving an image, you want to perform the following steps asynchronously:
- Resize the image to multiple sizes.
- Apply various image filters to each resized image.
- Save the processed images to different storage locations.
- Respond to the client with the URLs of the processed images.
- In this scenario, TPL Dataflow can help you design a pipeline to efficiently handle these tasks concurrently and asynchronously.
Here's how you could structure it using TPL Dataflow:
using System.Threading.Tasks.Dataflow;
/// <summary>
/// Essentially like ImageProcessingPipeline but with buffer
/// </summary>
public class DemoBuffer
{
private readonly BufferBlock<Stream> bufferBlock;
private readonly TransformBlock<Stream, ProcessedImages> resizeBlock;
private readonly TransformBlock<ProcessedImages, ProcessedImages> filterBlock;
private readonly ActionBlock<ProcessedImages> saveBlock;
public DemoBuffer()
{
bufferBlock = new BufferBlock<Stream>();
resizeBlock = new TransformBlock<Stream, ProcessedImages>(async stream =>
{
// Resize the image and return processed images
Console.WriteLine("TransformBlock resized stream");
return await ResizeImageAsync(stream);
});
filterBlock = new TransformBlock<ProcessedImages, ProcessedImages>(images =>
{
// Apply filters to each resized image
Console.WriteLine($"TransformBlock will resize image");
return ApplyFilters(images);
});
saveBlock = new ActionBlock<ProcessedImages>(images =>
{
// Save processed images to storage
Console.WriteLine($"ActionBlock will save image");
SaveImages(images);
});
// Link the blocks together, "entry" point is ProcessImageAsync
bufferBlock.LinkTo(resizeBlock, new DataflowLinkOptions { PropagateCompletion = true });
resizeBlock.LinkTo(filterBlock, new DataflowLinkOptions { PropagateCompletion = true });
filterBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task ProcessImageAsync(Stream imageStream)
{
await bufferBlock.SendAsync(imageStream);
}
private async Task CompleteAsync()
{
bufferBlock.Complete();
await saveBlock.Completion;
}
private async Task<ProcessedImages> ResizeImageAsync(Stream imageStream)
{
// Simulate resizing the image asynchronously
await Task.Delay(1000);
return new ProcessedImages();
}
private ProcessedImages ApplyFilters(ProcessedImages images)
{
// Simulate applying filters to the images
return images;
}
private void SaveImages(ProcessedImages images)
{
// Simulate saving the processed images to storage
Console.WriteLine("Processed images saved successfully.");
}
private class ProcessedImages
{
// Define properties or fields to hold processed image data
}
// usage for demo...
public async Task Run()
{
var pipeline = new DemoBuffer();
// Simulate receiving image uploads
for (int i = 0; i < 5; i++)
{
// Simulate receiving an image stream
var imageStream = new MemoryStream();
Console.WriteLine($"Sending to pipeline (buffer) the image {i + 1}");
// Process the image asynchronously
await pipeline.ProcessImageAsync(imageStream);
}
// Signal the completion of processing
await pipeline.CompleteAsync();
}
}
In this example:
The DemoBuffer class is where the pipeline is defined. This class contains four blocks that make up the pipeline:
bufferBlock: A BufferBlock<Stream> that accepts image streams and buffers them.
resizeBlock: A TransformBlock<Stream, ProcessedImages> that resizes each image.
filterBlock: A TransformBlock<ProcessedImages, ProcessedImages> that applies filters to each resized image.
saveBlock: An ActionBlock<ProcessedImages> that saves each processed image.
When a new DemoBuffer object is initialized, it links these blocks together in the order they are listed above using the LinkTo function, with the DataflowLinkOptions { PropagateCompletion = true } set. This option ensures that when the preceding block completes, the next block in the pipeline also receives the completion signal.
The ProcessImageAsync method is used to post image data to the bufferBlock for processing. The CompleteAsync method signals to the bufferBlock that no more data will be posted, and it waits for the saveBlock to complete processing all of its input.
The Run method in the DemoBuffer class demonstrates how to use the pipeline. It posts five simulated image uploads to the bufferBlock, then calls CompleteAsync to signal the completion of data posting and waits for all processing to complete.
The ResizeImageAsync, ApplyFilters, and SaveImages methods are stubs for the actual image processing work. ResizeImageAsync is mentioned as a process to resize the image and return the processed image. ApplyFilters is mentioned to apply some filters to the image after resizing. SaveImages is a method to save the processed images. Note that these methods log output to the Console and do not actually do any image processing here.
Files you can download: