5 min read5 days ago
–
TPL Dataflow is a powerful .NET framework that makes concurrent and asynchronous data processing a breeze. It provides a set of blocks for building pipelines, handling transformations, and managing message flow efficiently. For a deeper dive, see these great introductions: The Magic of .NET Dataflow and An Introduction to TPL Dataflow.
Despite its versatility, TPL Dataflow also has its limitations. For example, it can be hard to add new types of blocks yourself, you cannot apply bounded capacities to composed blocks, and batch sizes or block capacities are fixed at construction and there’s no way to resize them dynamically.
To address t…
5 min read5 days ago
–
TPL Dataflow is a powerful .NET framework that makes concurrent and asynchronous data processing a breeze. It provides a set of blocks for building pipelines, handling transformations, and managing message flow efficiently. For a deeper dive, see these great introductions: The Magic of .NET Dataflow and An Introduction to TPL Dataflow.
Despite its versatility, TPL Dataflow also has its limitations. For example, it can be hard to add new types of blocks yourself, you cannot apply bounded capacities to composed blocks, and batch sizes or block capacities are fixed at construction and there’s no way to resize them dynamically.
To address these gaps, we created the ComposableDataflowBlocks library. It makes it easy to extend TPL Dataflow with new block types while retaining full composability. With it, we’ve been able to build blocks like:
- **ResizableBufferBlock **of which its bounded capacity can be changed at runtime
- **ResizableBatchTransformBlock **that can automatically tune batch size based on throughput
- **PriorityBufferBlock **that delivers the highest-priority items first
- **ParallelBlock **that fans messages out to multiple subblocks and recombines their outputs
…and many others, enabling pipelines that are adaptive, efficient, and easier to maintain.
All of these blocks are included in the library and are available for immediate reuse, and we believe they can be very useful in many scenarios. However, maybe the main value of ComposableDataflowBlocks is not in just the blocks themselves, but that the library makes it so easy to create your own custom blocks. Whether you need a specialized batching strategy, a priority queue, or a completely new processing pattern, the framework provides the tools to build it without having to do the plumbing.
What’s the Problem with Dataflow Blocks and Compositionality, Anyway?
It all started when our team wanted to generalize an if-then-else block that would reroute messages to either a “then” or “else” branch based on a predicate. We hoped to expose it through a simple interface, something like:
IPropagatorBlock<I,O> IfThenElseBlock<I,O>( Predicate<I> predicate, IPropagatorBlock<I,O> thenBlock, IPropagatorBlock<I,O> elseBlock, int boundedCapacity)
In practice, implementing this is tricky. The resulting block needs to implement ITargetBlock<I> so it can accept messages, but we cannot always forward messages immediately to the appropriate sub-branch. When DataflowBlocks are full, they can start postponing messages, so we needed to add an **input ****BufferBlock<I>** to hold incoming messages. Likewise, we also needed an **output ****BufferBlock<O>** to store messages produced by the thenBlock and elseBlock before they can be forwarded.
Now we faced another challenge: we need to set bounded capacities on both the input and output BufferBlock to prevent them from growing indefinitely. Ideally, we wouldn’t want to expose both of these internal limits to the user. Instead, the user should be able to specify just a single size limit for the resulting composed block: the fact that it internally consists of multiple buffers should remain an implementation detail that the user doesn’t need to worry about.
This left us with two unpleasant options:
- Be inflexible: Use fixed bounded capacities and accept they will not be optimal in every use case
- Be unmaintainable: Expose separate parameters for all internal buffers, for example:
IPropagatorBlock<I,O> IfThenElseBlock<I,O>( Predicate<I> predicate, IPropagatorBlock<I,O> then, IPropagatorBlock<I,O> else, int boundedCapacityOfInputBuffer, int boundedCapacityOfOutputBuffer)
But with this approach we would need to add separate parameters for every internal subblock of every abstraction we add. Users end up managing a tangle of limits for internal buffers, which defeats the purpose of creating a simple, reusable abstractions in the first place.
And this is one of the issues that ComposableDataflowBlocks solves. The magic is in the last line of this example:
IPropagatorBlock<I, O> IfThenElseBlock<I, O>( Predicate<I> predicate, IPropagatorBlock<I, O> thenBlock, IPropagatorBlock<I, O> elseBlock, int boundedCapacity){ var inputBuffer = new BufferBlock<I>(new DataflowBlockOptions() { BoundedCapacity = DataflowBlockOptions.Unbounded }); inputBuffer.LinkTo( thenBlock, new DataflowLinkOptions() { PropagateCompletion = true }, predicate ); inputBuffer.LinkTo( elseBlock, new DataflowLinkOptions() { PropagateCompletion = true }, m => !predicate(m) ); var outputBuffer = new BufferBlock<O>(new DataflowBlockOptions() { BoundedCapacity = DataflowBlockOptions.Unbounded }); return DataflowBlock .Encapsulate(inputBuffer, outputBuffer) .WithBoundedCapacity(boundedCapacity); // <-- this is the magic.}
That is, ComposableDataflowBlocks offer aWithBoundedCapacityextension , and using it we can simply compose multiple internal blocks (an input buffer, the then and else branches, and an output buffer) while exposing a single bounded capacity to the user.
The WithBoundedCapacity extension handles in-flight message tracking and enforces the limit across all internal buffers, solving the problems of inflexibility, unmaintainability, and lack of scalability.
Users no longer need to worry about internal buffers or configure each one individually: the composed block behaves like a standard ITargetBlock<I> or ISourceBlock<O> with a well-defined capacity.
More importantly, this lets us finally create our own blocks as primitives, just like the built-in TPL Dataflow blocks, by composing existing blocks.
Once We Fixed Composability, Things Got Fun
One of the handy things we could do once compositionality was solved was make blocks that can change their capacity on the fly. For example, we wrapped a regular BufferBlock in a BoundedPropagatorBlock and suddenly we had a ResizableBufferBlockthat can resize its bounded capacity at runtime. Without touching the internal block or rebuilding anything. In fact, it’s implementation is probably exactly what you’d expect it to be:
public sealed class ResizableBufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>{ private readonly BoundedPropagatorBlock<T, T> _inner; public ResizableBufferBlock(DataflowBlockOptions options) { //make an unbounded buffer var bufferBlock = new BufferBlock<T>(new DataflowBlockOptions { CancellationToken = options.CancellationToken, BoundedCapacity = DataflowBlockOptions.Unbounded }); //and wrap it as a BoundedPropagatorBlock _inner = new BoundedPropagatorBlock<T, T>( bufferBlock, options.BoundedCapacity ); } public int Count => _inner.Count; //and allow the user to dynamically change the bounded capacity public int BoundedCapacity { get => _inner.BoundedCapacity; set => _inner.BoundedCapacity = value; } // and then delegate all ITargetBlock and ISourceBlock members to _inner}
The batch block that learns on the fly
Once we had the basics of composability and resizable blocks, we started dreaming bigger.
One of the coolest blocks we built was the ResizableBatchTransformBlock with auto-scaling. The idea is simple: many workloads have an “ideal” batch size. Manually tuning this is tedious. Even worse, the optimal batch sizes will typically change at runtime, depending e.g. on work load.
But this block measures throughput at runtime and adjusts batch sizes on the fly. And we had fun feeding in thousands of messages and watch it gradually converge to the optimal batch size. It felt like the block was learning on its own, and it turned tedious manual tuning into a completely hands-off process.
Where to Find the Library
If you’d like to try ComposableDataflowBlocks yourself, the library is open source and ready to use. You can find it on Gitab here:
https://gitlab.com/counterpointcollective/composabledataflowblocks
and it is available from nuget.org
https://www.nuget.org/packages/CounterpointCollective.ComposableDataflowBlocks
The repository includes all the blocks we’ve talked about and more: BoundedPropagatorBlock, ResizableBatchTransformBlock, PriorityBufferBlock, ParallelBlock, and more. You can reuse them immediately, or use the library to create your own custom blocks tailored to your pipelines.
We’d love for other developers to try it out, give feedback, or contribute. Whether you need dynamic capacities, auto-scaling batches, or fully composable primitives, this library aims to fill the gaps left by standard TPL Dataflow.