3

I have a scenario with dozens of producer and one single consumer. Timing is critical: for performance reason I want to avoid any locking of producers and I want the consumer to wait as little as possible when no messages are ready.

I've started using a ConcurrentLinkedQueue, but I don't like to call sleep on the consumer when queue.poll() == null because I could waste precious milliseconds, and I don't want to use yield because I end up wasting cpu.

So I came to implement a sort of ConcurrentBlockingQueue so that the consumer can run something like:

T item = queue.poll();
if(item == null) {
    wait();
    item = queue.poll();
}
return item;

And producer something like:

queue.offer(item);
notify();

Unfortunately wait/notify only works on synchronized block, which in turn would drastically reduce producer performance. Is there any other implementation of wait/notify mechanism that does not require synchronization?

I am aware of the risks related to not having wait and notify synchronized, and I managed to resolve them by having an external thread running the following:

while(true) {
    notify();
    sleep(100);
}
4
  • What's wrong with an actual BlockingQueue? Commented Feb 15, 2018 at 9:42
  • Probably relevant stackoverflow.com/questions/976940/… Commented Feb 15, 2018 at 9:44
  • Well, you could implement your own subclass of ConcurrentBlockingQueue and register observers on it - or use something like RxJava. Commented Feb 15, 2018 at 9:45
  • I think the title is a bit misleading though. There is no wait/notify without synchronized, and that's not really what you (the OP) want. You want low latency. Is this a trading platform? Commented Feb 15, 2018 at 9:47

2 Answers 2

3

I've started using a ConcurrentLinkedQueue, but I don't like to call sleep on the consumer when queue.poll() == null

You should check the BlockingQueue interface, which has a take method that blocks until an item becomes available.

It has several implementations as detailed in the javadoc, but ConcurrentLinkedQueue is not one of them:

All Known Implementing Classes:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

Sign up to request clarification or add additional context in comments.

12 Comments

Are you aware of some of this implementation which does not lock the producer on insertion?
@Jack You can use the offer method which doesn't block and returns true/false to indicate if the item was inserted. If you want the call to always return true, just pick an unbounded queue, such as a LinkedBlockingQueue (with no capacity limit).
You mean that it doesn't block the caller, but actually I said "lock", and ArrayBlockingQueue.offer() does lock. I haven't checked other implementation, that's why I was asking.
@Jack not sure what you mean by lock: offer returns immediately. Check the javadoc of BlockingQueue which has a nice table explaining how the different methods work. The only situation where offer returns false is if the queue is full - if you don't want that to happen, use an unbounded queue.
As far as I know offer() uses a lock, meaning it can block the producer to wait the free way to insert value into the queue. Doesn't matter if this behavior is transparent for the caller as you are properly pointing out. But as I stated "I want to avoid any locking of producers", that's why all BlockingQueue interface doens't work for me.
|
0

I came out with the following implementation:

private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final Semaphore semaphore = new Semaphore(0);
private int size;

public void offer(T item) {
    size += 1;
    queue.offer(item);
    semaphore.release();
}

public T poll(long timeout, TimeUnit unit) {
    semaphore.drainPermits();
    T item = queue.poll();
    if (item == null) {
        try {
            semaphore.tryAcquire(timeout, unit);
        } catch (InterruptedException ex) {
        }
        item = queue.poll();
    }
    if (item == null) {
        size = 0;
    } else {
        size = Math.max(0, size - 1);
    }
    return item;
}

/** An inaccurate representation O(1)-access of queue size. */
public int size() {
    return size;
}

With the following properties:

  • producers never go to SLEEP state (which I think can go with BlockingQueue implementations that use Lock in offer(), or with synchronized blocks using wait/notify)
  • consumer only goes to SLEEP state when queue is empty but it is soon woken up whenever a producer offer an item (no fixed-time sleep, no yield)
  • consumer can be sometime woken up even with empty queue, but it's ok here to waste some cpu cycle

Is there any equivalent implementation in jdk that I'm not aware of? Open for criticism.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.