I am trying to work out the best way to go about a task which relys on multiple long running tasks taking place.
My use case is that I want to have multiple events running for set periods of time, where I have websocket connections to each event for that period.
My thoughts were that I keep a conurrent list of all active events, when a new event pops into the list, it spawns a new thread to handle the event, when the event pops off the list, this thread will be closed.
Is this a good way to go about it? I am trying to set up a proof of concept, where all I am doing is logging out the event ID to the console for now, it kind of works, but I haven't worked out a way to remove the thread yet etc.
Any advise anyone can give I would be really appreciative.
public class EventProcessingService : IHostedService, IDisposable
{
private readonly ILogger<EventProcessingService> _logger;
private readonly ICacheService _cacheService;
private const int MaxThreads = 10;
private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
public static readonly BlockingCollection<int> eventIds = new BlockingCollection<int>();
ConcurrentBag<int> EventIdsProcessing = new ConcurrentBag<int>();
private Timer _timer = null!;
public EventProcessingService(ILogger<EventProcessingService> logger, ICacheService cacheService)
{
_logger = logger;
_cacheService = cacheService;
for (int i = 0; i < MaxThreads; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(DoWork, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
private void DoWork(object? state)
{
var ids = _cacheService.GetCachedEventIds();
foreach (var id in ids)
{
if (!EventIdsProcessing.Contains(id))
{
EventIdsProcessing.Add(id);
eventIds.Add(id);
}
}
cde.Wait();
}
private async Task Process()
{
foreach (var id in eventIds.GetConsumingEnumerable())
{
cde.Signal();
while (true)
{
Console.WriteLine(id);
await Task.Delay(1000);
}
}
}
public Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}