I created priority queue which stores messages sent from asp net core controller and then processes them in BackgroundService
The thread which loops through items is blocked when collection is empty.
It might look like reinventing the wheel as there is already BlockingCollectionOfT but I am clueless how to extend that thing to my scenario.
Are there any code smells, concurrency hazards?
Perhaps there might be even better way to do it.
I would like to hear your opinion
Thanks
Implementation
internal class BPQueue : IBPQueue, IDisposable
{
private readonly BaseMessage[] _items;
private readonly object _syncRoot = new();
private readonly ManualResetEventSlim _manualResetEventSlim;
private int _head;
public BPQueue ()
{
_items = new BaseMessage[100];
_head = -1;
_manualResetEventSlim = new ();
}
public bool TryEnqueue(BaseMessage message)
{
lock (_syncRoot)
{
if(_head + 1 == _items.Length)
{
return false;
}
_head++;
_items[_head] = message;
}
if (!_manualResetEventSlim.IsSet)
{
_manualResetEventSlim.Set();
}
return true;
}
public IEnumerable<BaseMessage> GetEnumerable(CancellationToken token = default)
{
while(true)
{
token.ThrowIfCancellationRequested();
if(_head == -1)
{
_manualResetEventSlim.Reset();
_manualResetEventSlim.Wait(token);
}
BaseMessage item;
lock (_syncRoot)
{
item = _items[_head];
_items[_head] = null;
_head--;
}
yield return item;
}
}
public void Dispose()
{
_manualResetEventSlim.Dispose();
}
}
Message base class
public abstract class BaseMessage : IComparable<BaseMessage>
{
public Priority Priority { get; }
public BaseMessage(Priority priority)
{
Priority = priority;
}
public int CompareTo(BaseMessage? other)
{
if (other == null)
{
return -1;
}
if (Priority < other.Priority)
{
return 1;
}
if (Priority > other.Priority)
{
return -1;
}
return 0;
}
How is it used
Example controller
public class QueueController : ControllerBase
{
private readonly IBPQueue _queue;
public QueueController(IBPQueue queue)
{
_queue = queue;
}
[HttpPost]
public IActionResult AddMessage([FromBody] SampleMessage sampleMessage)
{
var enqueued = _queue.TryEnqueue(sampleMessage);
if (!enqueued)
{
return BadRequest();
}
return Ok();
}
}
Background service
internal class MessageProcessorHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IBPQueue _queue;
public TaskQueueHostedService(
ILogger<MessageProcessorHostedService> logger,
IServiceScopeFactory serviceScopeFactory,
IBPQueue queue)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_queue = queue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Run(async () =>
{
foreach (var message in _blockingPriorityQueue.GetEnumerable(stoppingToken))
{
try
{
stoppingToken.ThrowIfCancellationRequested();
using var serviceScope = _serviceScopeFactory.CreateScope();
var serviceProvider = serviceScope.ServiceProvider;
var messageHandler = serviceProvider.GetMessageHandler(message);
await messageHandler.HandleAsync(message);
stoppingToken.ThrowIfCancellationRequested();
}
catch (OperationCanceledException)
{
_logger.LogInformation("Queue has been stopped");
break;
}
catch (Exception ex)
{
_logger.LogError("An error occurred while processing queue", ex);
}
}
}, stoppingToken);
}
}
Priority
public enum Priority : byte
{
High = 0,
Medium = 1,
Low = 2
}
Priority? \$\endgroup\$IAsyncEnumerablebut I couldn't find good mechanism inSystem.Threadingwhich would block and return task. \$\endgroup\$