BoradCast Block
BroadcastBlock Characteristics
- Sends copy of a message to multiple consumers.
- Requires a cloning function
- Messages passed as reference
- Always returns a new message and never returns the state of the message
- Does not wait for the receiver to be available.
- Sends copies of Message to all Consumers.
Broadcastblock sends a copy of the Message to all connected Consumers and overwrites the current message with the next item in the Queue, meaning that does not hold the Messages.
|
|
|
|
|
|
|
-> |
Copy |
-> |
Consumer |
|
Input Message |
-> |
QueueItems |
QueueItems |
-> |
Meesage |
-> |
Copy |
-> |
Consumer |
|
|
|
|
|
|
|
-> |
Copy |
-> |
Consumer |
Note: BroadcastBlock<T> does not buffer or hold onto old messages. So, if there are no linked targets, or if the linked targets are not ready to accept the message, the message will be dropped.
var broadcaster = new BroadcastBlock<int>(null);
var printAction1 = new ActionBlock<int>(n => Console.WriteLine("Action 1 received {0}", n));
var printAction2 = new ActionBlock<int>(n => Console.WriteLine("Action 2 received {0}", n));
// Link broadcaster to action blocks
using (broadcaster.LinkTo(printAction1))
using (broadcaster.LinkTo(printAction2))
{
// Post to broadcaster
for (int i = 0; i < 10; i++)
{
broadcaster.Post(i);
}
// Wait for all messages to be processed
printAction1.Completion.Wait();
printAction2.Completion.Wait();
}
An example that could be used in RESTful API:
using System.Threading.Tasks.Dataflow;
namespace TplDataFlowDemo.DemoBlocks;
public class DemoBroadCastBlock
{
private readonly BroadcastBlock<string> _broadcast;
private readonly ActionBlock<string> _service1;
private readonly ActionBlock<string> _service2;
public DemoBroadCastBlock()
{
_broadcast = new BroadcastBlock<string>(message => message);
_service1 = new ActionBlock<string>(ProcessService1);
_service2 = new ActionBlock<string>(ProcessService2);
_broadcast.LinkTo(_service1, new DataflowLinkOptions { PropagateCompletion = true });
_broadcast.LinkTo(_service2, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task StartFlow(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
// Simulate getting a new message, normally this could be from event or queue.
var item = Guid.NewGuid().ToString();
await _broadcast.SendAsync(item);
// Simulate delay between each new item.
await Task.Delay(1000);
}
_broadcast.Complete();
await Task.WhenAll(_service1.Completion, _service2.Completion);
}
private async Task ProcessService1(string item)
{
// Simulate processing delay in Service 1.
await Task.Delay(200);
Console.WriteLine($"Service1: {item}");
}
private async Task ProcessService2(string item)
{
// Simulate processing delay in Service 2.
await Task.Delay(300);
Console.WriteLine($"Service2: {item}");
}
}
You can run it in a console application like this:
var broadCast = new DemoBroadCastBlock();
await broadCast.StartFlow(CancellationToken.None);
No files yet, migration hasn't completed yet!