0

I'm using a ConcurrentQueue<Result> to store results of a certain activity that occurs in my system (ASP.NET Core 6). It works well.

But now I have a new requirement: I need a Result.Id property - a long value, similar to a sequential database primary key. When I enqueue an item, it must get the next value in the sequence, i.e. queue.Count + 1.

When I enqueue items, I don't use locks because ConcurrentQueue does that for me. But now that need Id, I must:

  • lock access to Enqueue()
  • determine the queue's size
  • increment by one
  • set the item's Id
  • enqueue the item

or:

  • lock access to Enqueue()
  • enqueue the item
  • determine the queue's new size
  • update the item's Id (but it will be in an inconsistent state until then)

Can I do this without locking the Enqueue() method? Or maybe there's a better way? (Though I prefer to continue using ConcurrentQueue, if possible, as it already works well for what I have.)

16
  • What will you do to the queue next? You can set the Id when the item is dequeued. Commented Jun 29, 2022 at 4:01
  • 1
    I suggest to reconsider your implemention, if you read ConcurrentQueue.Count you may feel suprise in its abnormally complexity, so normal queue + lock may be even better. Commented Jun 29, 2022 at 5:14
  • 1
    @TheodorZoulias It is a problem, and there are other race conditions and side effects too. I was convinced by the comments above and the answer below to redesign. Probably using a simpler collection and do locking myself. Commented Jun 29, 2022 at 9:20
  • 1
    If you get an IEnumerator<T> and start enumerating it while the Queue<T> is mutated in parallel by other threads, the behavior is undefined. You might get random exceptions, corrupted data, whatever. So protecting with lock the GetEnumerator method alone is not enough. Commented Jun 29, 2022 at 10:10
  • 1
    @TheodorZoulias Your comment resulted in lots of research and yak shaving (thanks / nothanks :-)) and I realised it was an even bigger problem than the one here. So I moved it to a different question. Thanks for your insights. Commented Jun 29, 2022 at 11:36

2 Answers 2

1

Instead of assigning the Id when adding the items, you can assign it after you have finished adding items to the queue. You can use a Select with index, e.g.:

public class Item {
    public int Id { get; set; }
    public int Value { get; set; }
}

public static void Main()
{
    var q = new ConcurrentQueue<Item>();
    Parallel.For(0, 10, i => q.Enqueue(new Item() { Value = i + 100 }));
    var withIds = q.Select((x, i) => {
        x.Id = i;
        return x;
    });
    
    Console.WriteLine(string.Join(", ", withIds.Select(x => string.Format("{0} -> {1}", x.Id, x.Value))));
}

This leads to the following output:

0 -> 100, 1 -> 102, 2 -> 103, 3 -> 105, 4 -> 101, 5 -> 106, 6 -> 107, 7 -> 104, 8 -> 108, 9 -> 109

Depending on the size of the queue and the number of times you iterate it, you can also add a ToArray after the Select in order to avoid assigning the Ids multiple times.

Important to note that this only works if you can clearly separate the phase of adding items to the queue and processing the queue.

Sign up to request clarification or add additional context in comments.

3 Comments

Thank you Markus that is a very clever approach! I am still trying to understand the tricky bits. Am I right in assuming 1) this enumerates queue on every enqueue, 2) between the enqueue and Select, last item is in inconsistent state (that is what you meant by "clearly separate the phase...")? Otherwise despite these issues it's stil a nice way to avoid a lock, in my case, as these side effects are not too bad for my use case.
@lonix no, the queue is enumerated only when the Select is executed ("Linq deferred execution"). If you only want to enumerate it once, add a .ToArray() after the Select but keep in mind that this means more memory consumption. By "separate the phases" I mean that if you enqueue items, enumerate then and enqueue or dequeue again, you might get different ids because the id is only assigned in the Select. In the sample, the phases are separated - first the items are enqueued in the Parallel.For. When this is finished, the ids are assigned. After that, no items are de- or enqueued.
Ah I see, I need to enqueue items one by one and ensure the ids are correct at any given point in time, so the enumeration must be performed each time. I think there are drawbacks or race conditions no matter which solution I choose, maybe a ConcurrentQueue is the wrong data structure for my use case. But your solution does answer the question, thank you!
1

You could consider storing the IDs in Lazy<long> instances, and make these instances a part of the state of your Result type. Or you could store them side by side in the ConcurrentQueue<T> in value tuples, like this:

long idSeed = 0;
Func<long> idFactory = () => Interlocked.Increment(ref idSeed);

ConcurrentQueue<(Result, Lazy<long>)> queue = new();

//...

queue.Enqueue((new Result(), new Lazy<long>(idFactory)));

//...

if (queue.TryDequeue(out var entry))
{
    var (result, lazy) = entry;
    long id = lazy.Value;
}

The Lazy<T> class guarantees that the valueFactory will be invoked only once for each Lazy<T> instance. So even if multiple threads are racing to read the ID of a specific Result, all threads will get the same ID.

2 Comments

Thanks that is another good and very clever approach!
@lonix you could also take a look at the source code of the Task class, and see how Microsoft has implemented the Id property.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.