0

I have a HostedService which inherits from BackgroundService. It loops and dequeues items from a Queue.

These items are placed on the queue via a Controller from a http request.

If I set Postman runner to fire an item every 500 miliseconds, for up to 60 items. Some early items are dequeued after seconds, but later, it can take up to 10 seconds to dequeue.

I have tried: Queue, ConcurrentQueue and BlockingCollection. All with the same results.

Any ideas? Is this a valid use case for any of the Queue types I mentioned?

Here are the implementation details:

Registration:

    services.AddSimpleInjector(_container, options =>
    {
        options.AddAspNetCore().AddControllerActivation();
        options.AddHostedService<QueuedHostedService>();
    });

The Background service:

public class QueuedHostedService : BackgroundService
{
    private readonly IApiLog _apiLog;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        IApiLog apiLog)
    {
        TaskQueue = taskQueue;
        _apiLog = apiLog;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _apiLog.Log(new LogEntry("Queued Hosted Service is starting."));


        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.Dequeue(cancellationToken);

            _apiLog.Log(new LogEntry($"Dequeuing work-item: {nameof(workItem)}"));

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception exception)
            {
                _apiLog.Log(new LogEntry(exception, $"Error occurred executing {nameof(workItem)}."));
            }

        }

        _apiLog.Log(new LogEntry("Queued Hosted Service is stopping."));
    }
}

The queue:

    public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
    private readonly IApiLog _apiLog;
    private readonly Queue<Func<CancellationToken, Task>> _items = new Queue<Func<CancellationToken, Task>>();


    public BackgroundTaskQueue(IApiLog apiLog)
    {
        _apiLog = apiLog;
    }

    public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem, string eventId, string correlationId)
    {
        try
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }

            _items.Enqueue(workItem);
            _apiLog.Log(new LogEntry($"BackgroundWorkItem has been enqueued for EventId={eventId}, CorrelationId={correlationId}"));
        }
        catch (Exception exception)
        {
            _apiLog.Log(new LogEntry(exception, exception.Message));
        }
        finally
        {
            _signal.Release();
        }
    }

    public async Task<Func<CancellationToken, Task>> Dequeue(CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);

        _items.TryDequeue(out var workItem);

        return workItem;
    }
}

And in the controller, the method that ultimately places items on the queue:

public void AddRequestToQueue(MyEvent myEvent, string correlationId, string userName)
    {
        if (string.IsNullOrEmpty(correlationId)) correlationId = Guid.NewGuid().ToString();

        _apiLog.Log(new LogEntry($"Adding Update Event Request to Queue. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));

        BackGroundTaskQueue.QueueBackgroundWorkItem(async token =>
        {
            _apiLog.Log(new LogEntry($"Update Event Request Dequeued. OperationType={OperationType.ToString()}, EventId={myEvent.Id}, CorrelationId={ correlationId }"));

            await AddSomethingToDatabase(myEvent, correlationId, userName);

            var event = _converter.Convert<SomethingElse>(myEvent);

            await SendSomethingToRabbit(event, correlationId, OperationType);
        }, myEvent.Id.ToString(), correlationId);

    }

I am seeing up to 10 seconds between the log lines:

Adding Update Event Request to Queue

and

Update Event Request Dequeued

4
  • 1
    With up to 10s you mean the first await is fast and the latter ones are slow? Your QueuedHostedService.QueueBackgroundWorkItem will deque the next item only when the first has finished executing. It is async and single threaded after all. I would skip the database operations and try some empty await just with the log call that it did arrive. I strongly suspect your AddSomethingToDatabase and SendSomethingToRabbit have something to do with the delay. In any case profiling will tell where the bottleneck is. Commented Nov 5, 2020 at 19:20
  • @Alois Kraus The latter items take longer to Dequeue after they are Enqueued, i.e. the await workItem doesn’t begin to execute until 10 seconds after being queued. but I thought once the semaphore signals it should dequeue and execute instantly. Instead I see the logs of a lot of queuing happening. But no dequeueing till later. I thought the http post thread(s) queue the times, and backgroundService thread loops and dequeues async. Therefore I believed the database & rabbit methods would not hold things up. I will add thread ids to the logs tomorrow, comment out things, and analyse further. Commented Nov 5, 2020 at 21:38
  • @AloisKraus You pointed me in the right direction. Because of await workItem(cancellationToken); in the while loop of QueuedHostedService we were awaiting for the work items AddSomethingToDatabase and SendSomethingToRabbit to be processed before the while loop could loop around and dequeue the next item. i.e. dequeing was held up. Changing the await to _ = Task.Run(() => workItem(cancellationToken), cancellationToken); i.e. fire & forget on another thread, allows the loop to continue and dequeuing to happen, always processing work items on independent threads. Commented Nov 10, 2020 at 11:25
  • For the coordination of work between your Controller and your Background service, I'd recommend looking at System.Threading.Channels.Channel<T>. Seems like a perfect fit, check out a blog post explaining it's usage here devblogs.microsoft.com/dotnet/… Commented Nov 18, 2020 at 4:11

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.