I am implementing queues using https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio example.
this is how my code looks:
in startup.cs I am adding my hosted service and background queue
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
then I implement scoped service, hosted service and background queue as following:
namespace Services.Services {
public class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
_serviceProvider = serviceProvider;
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
}
public IBackgroundTaskQueue TaskQueue {
get;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
}
}
}
}
}
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
// scoped service
namespace Services.Services {
public class ImportService: BaseService, IImportService {
private readonly IFileProcessingService _scopedProcessingService;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue {
get;
}
private
const string AZURE_BLOB_CONTAINER = "blobcontainer";
public IServiceProvider Services {
get;
}
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
Services = services;
_configurationSettings = services.GetService < ConfigurationSettings > ();
_scopedProcessingService = services.GetProcessingService();
Queue = queue;
}
// ---- Main file
public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
}
public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
// ....... //
ProcessFile(tempFilePath, fileFormat, file, type, userId);
}
private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
var delimiter = ",";
Queue.QueueBackgroundWorkItem(async token => {
using(var scope = Services.CreateScope()) {
var scopedProcessingService =
scope.ServiceProvider
.GetRequiredService < IFileProcessingService > ();
// do the processing
switch (type) {
case "csv":
await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
break;
}
}
});
}
}
}
I am adding elemeents to queue on request in controller. Now I want to add another queue for pocessing other requests. Is it possible to use another queue using same Hosted service? I have trouble finding examples how to do that. Should I just add another scoped servide and another background queue?
QueuedHostedServiceand just add different tasks withQueue.QueueBackgroundWorkItem(async token .....like inProcessFile()method? Or better practice would be to have e.g.FileProcessingHostedServiceand anotherCalculationsHostedService? something like here learn.microsoft.com/en-us/dotnet/architecture/microservices/…? but then do I needBackgroundTaskQueuefor each? Maybe you know some other example resources that I could look up to? thank you in advance