3

I am implementing queues using https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio example.

this is how my code looks:

in startup.cs I am adding my hosted service and background queue

services.AddHostedService<QueuedHostedService>(); services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

then I implement scoped service, hosted service and background queue as following:

namespace Services.Services {
  public class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private readonly IServiceProvider _serviceProvider;
    public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      _serviceProvider = serviceProvider;
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
    }
    public IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {

        }
      }
    }
  }
}


public interface IBackgroundTaskQueue {
  void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
  Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);
    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }
      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);
      return workItem;
    }
  }
}


// scoped service
namespace Services.Services {
  public class ImportService: BaseService, IImportService {
    private readonly IFileProcessingService _scopedProcessingService;

    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue {
      get;
    }
    private
    const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public IServiceProvider Services {
      get;
    }

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
      Services = services;
      _configurationSettings = services.GetService < ConfigurationSettings > ();
      _scopedProcessingService = services.GetProcessingService();
      Queue = queue;
    }

    // ---- Main file
    public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
      await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
      var fileFormat = GetFileFormat(file);
      var tempFilePath = await GetTemporaryPath(file);
      var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
      // ....... //

      ProcessFile(tempFilePath, fileFormat, file, type, userId);
    }

    private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
      var delimiter = ",";
      Queue.QueueBackgroundWorkItem(async token => {
        using(var scope = Services.CreateScope()) {
          var scopedProcessingService =
            scope.ServiceProvider
            .GetRequiredService < IFileProcessingService > ();

          // do the processing
          switch (type) {
            case "csv":
              await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
              break;
          }
        }
      });
    }
  }
}

I am adding elemeents to queue on request in controller. Now I want to add another queue for pocessing other requests. Is it possible to use another queue using same Hosted service? I have trouble finding examples how to do that. Should I just add another scoped servide and another background queue?

6
  • Yes, adding another pair of services is one of ways how you can achieve this. Commented May 18, 2020 at 20:40
  • Is this the best way? What are other ways? Commented May 19, 2020 at 4:16
  • 1
    It depends on situation. Another is handling introducing concept of message types and handling different messages in one queue. Commented May 19, 2020 at 6:54
  • So I can have one QueuedHostedService and just add different tasks with Queue.QueueBackgroundWorkItem(async token ..... like in ProcessFile() method? Or better practice would be to have e.g. FileProcessingHostedService and another CalculationsHostedService? something like here learn.microsoft.com/en-us/dotnet/architecture/microservices/…? but then do I need BackgroundTaskQueue for each? Maybe you know some other example resources that I could look up to? thank you in advance Commented May 19, 2020 at 7:02
  • 1
    Yes, that's the idea. You can try to reuse BackgroundTaskQueue adding generics. If you want, I can elaborate on that later. Commented May 19, 2020 at 7:17

1 Answer 1

2
  1. The first option is the most straightforward - you just create bunch of classes and interfaces QueuedHostedServiceA, QueuedHostedServiceB, IBackgroundTaskQueueA.. (you can use inheritance to reduce code duplication)

  2. Also you can introduce concept of "handler" and make all this stuff generic:

interface IHandler<T> { Task Handle(T msg, CancelationToken ...)}
interface IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>

class QueuedHostedService<T>
{
   public QueuedHostedService(..., IBackgroundMessageQueue<T> queue, IHandler<T> h) {... }
   protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        T message = await queue.DequeueAsync(cancellationToken);

        try {
            using(var scp = serviceProvider.CreateScope())
            {
                var handler = ServiceProvider.GetRequiredService<IHandler<T>>;
              await handler.Handle(message, cancellationToken);
            }
        } catch (Exception ex) {

        }
      }
    }
}

And for each message type you create your own handler:

class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId){}

FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile}

Then you register everything:

services.AddScoped<IHandler<ProcessFile>, FileProcessor>()
services.AddSingleton<IBackgroundTaskQueue<ProcessFile>, BackgroundTaskQueue<ProcessFile>>();

services.AddHostedService<QueuedHostedService<ProcessFile>>();

and in your ImportService you resolve typed queue:

public ImportService(IBackgroundMessageQueue<ProcessFile> queue) 

and enqueue message in it when needed.

Sign up to request clarification or add additional context in comments.

1 Comment

thank you so much for your answer! just one more question. What is class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Type type, int userId){}? it should only contain the constructor with the parameters needed for my processing? Sorry for asking so much I'm really trying to understand that. Also I can I call different services/repositories inside my FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile} since everything was inside scope in tis method before?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.