Replacing BlockingCollection<T> with TPL DataFlow BufferBlock<T>
For years now, I've been using BlockingCollection<T>
as a way to implement a simple work queue for background services. Here's a dummy example:
public class MyBackgroundService
{
private readonly BlockingCollection<int> _queue = new BlockingCollection<int>();
public void AddItem(int item)
{
_queue.Add(item);
}
// Call this when the app starts
public void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
ProcessItem(item);
}
}, TaskCreationOptions.LongRunning);
}
// Call this when the app stops
public void Stop()
{
_queue.CompleteAdding();
}
private void ProcessItem(int item)
{
// Do some processing
Thread.Sleep(item);
}
}
The idea is to force the creation of a dedicated background thread for processing queue items using Task.Factory.StartNew(..., TaskCreationOptions.LongRunning)
. We need a dedicated thread as opposed to a thread from the thread pool, because GetConsumingEnumerable()
blocks until an item is available. Blocking thread pool threads is generally something to be avoided.
Items added to the queue with AddItem(int item)
will eventually be yielded by the enumerable and processed in ProcessItem(int item)
.
This works well as long as the code is synchronous. However, when introducing async code with async/await, we get a weird mix of blocking code and async code.
public class MyBackgroundService
{
...
public void Start()
{
// Yuck!
Task.Factory.StartNew(async () =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
await ProcessItem(item);
}
}, TaskCreationOptions.LongRunning);
}
...
private Task ProcessItem(int item)
{
// Do some async processing
return Task.Delay(item);
}
}
This looks harmless, but we were actually experiencing problems with this particular piece of code. The foreach
loop would just get stuck and stop processing items under specific conditions.
Anyway, after some research we determined that this could be reimplemented using the TPL DataFlow BufferBlock<T>
, which is pretty much a BlockingCollection<T>
with an async API.
Here is the code:
public class MyBackgroundService
{
// BufferBlock<T> replaces the BlockingCollection<T>
private readonly BufferBlock<int> _queue = new BufferBlock<int>();
public void AddItem(int item)
{
_queue.Post(item);
}
public void Start()
{
Task.Run(async () =>
{
while (await _queue.OutputAvailableAsync())
{
var item = await _queue.ReceiveAsync();
await ProcessItem(item);
}
});
}
public Task Stop()
{
_queue.Complete();
return _queue.Completion;
}
private Task ProcessItem(int item)
{
// Do some async processing
return Task.Delay(item);
}
}
It works exactly as before, except the code never blocks to wait for queue items. It fixed our issues, and it piqued my interest in the TPL DataFlow library. This article goes into more details about this queue implementation using BufferBlock<T>
.
TPL DataFlow WAS the best library I was not using, and I will certainly consider it the next time I do concurrent programming.