0

I have a database of messages that I want to send. Each has a unique message identifier and is placed in the database by another process. The only communication between the two processes is via the database table.

Every x minutes (let's just say 15) I want to check this database and then schedule them to send in a performant way that gives control over concurrently sending multiple messages at once.

We may have 100,000+ messages to send at any one time, and to send all messages may take longer than the scheduled frequency (so individual jobs will overlap).

The target platform is C# on .NET 8.

I'm looking at utilising a Windows service that runs a standard IHost pattern.

My current thinking is to utilise both Quartz.NET and MassTransit but I'm not sure that this is the correct application of both of these frameworks and open to alternatives.

Getting a Job to run on a regular cadence works nicely with Quartz.NET. The challenge I'm facing is setting up MassTransit for what I'm trying to accomplish.

My initial thinking was to have the Quartz.NET job queue on a simple InMemory bus (so there aren't any external services needed) each message by its unique message identifier and then a basic consumer picks that up and completes the actual work to send.

What I now want to avoid is having multiple messages/events/commands being queued for the same message identifier. I can add a simple check that the message is in the expected state and bail out if it's already been recorded as sent but I want to avoid growing the queue longer than it needs to be.

Essentially I want a way to check that a given message identifier isn't already queued.

I tried directly setting the message id e.g.

await _bus.Publish(new DeliverEvent { }, message => message.MessageId = messageId);

And looked at partitions but I don't think they quite fit the requirements?

I think I could achieve this by using RabbitMQ and the deduplication plugin? But I am trying to keep it all within a single process.

Am I approaching this wrong? How could I achieve this either with MassTransit or an alternative library/framework?

2
  • what database are you using? Commented Aug 17, 2024 at 6:23
  • @GuruStron MS SQL Commented Aug 20, 2024 at 21:54

2 Answers 2

0

You could use:

  • MassTransit SQL Transport
  • The transactional outbox on consumers to ensure exactly once message processing
  • Continue using Quartz.NET to schedule the recurring jobs that send off these messages

That combination seems like it would suit your requirements.

Sign up to request clarification or add additional context in comments.

Comments

0

What I managed to get working is using the following filter which puts the message ids into a hashset.

public class DeduplicationMiddleware<T> : IFilter<ConsumeContext<T>> where T : class, IMessageEvent
{
    private readonly DeduplicationStore<T> _deduplicationStore;

    public DeduplicationMiddleware(DeduplicationStore<T> deduplicationStore)
    {
        _deduplicationStore = deduplicationStore;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope($"deduplication-{typeof(T).Name}");
    }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        if (_deduplicationStore.Contains(context.Message.MessageId))
        {
            //duplicate message detected
            return;
        }

        _deduplicationStore.Add(context.Message.MessageId);

        try
        {
            await next.Send(context);
        }
        finally
        {
            _deduplicationStore.Remove(context.Message.MessageId);
        }
    }
}

public class DeduplicationStore<T> where T : IMessageEvent
{
    private ImmutableHashSet<Guid> _messageIds = ImmutableHashSet<Guid>.Empty;

    public bool Contains(Guid id)
    {
        return _messageIds.Contains(id);
    }

    public void Add(Guid id)
    {
        ImmutableInterlocked.Update(ref _messageIds, set => set.Add(id));
    }

    public void Remove(Guid id)
    {
        ImmutableInterlocked.Update(ref _messageIds, set => set.Remove(id));
    }
}

Which are then just loaded in during setup

config.UseConsumeFilter<DeduplicationMiddleware<DeliverEvent>>(context);

It seems to be working nicely, and I stress-tested it up to 1 million events and it seemed to hold up nicely.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.