java concurrency: multi-producer one-consumer

Go To StackoverFlow.com

5

I have a situation where different threads populate a queue (producers) and one consumer retrieve element from this queue. My problem is that when one of these elements are retrieved from the queue some is missed (missing signal?). The producers code is:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

and they are created and run with:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

Consumer code is:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

When the code is run I get sometimes 20 elements added and 20 retrieved, but in other cases the elements retrieved are less than 20. Any idea how to fix that?

2012-04-04 07:25
by Randomize
You are using an odd mixture of low-level synchronization constructs (wait, notify) and high-level ones (ConcurrentLinkedQueue, ExecutorService). Use one or the other - artbristol 2012-04-04 07:32
I did it but in both cases I have same proble - Randomize 2012-04-04 07:40
I can't see the code that actually runs Consumer - dhblah 2012-04-04 07:50
just a normal new Thread(consumer).start( - Randomize 2012-04-04 08:13


10

I'd suggest you use a BlockingQueue instead of a Queue. A LinkedBlockingDeque might be a good candidate for you.

Your code would look like this:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

and then you'd need to just

queue.take()

on your consumer thread

The idea is that .take() will block until an item is available in the queue and then return exactly one (which is where I think your implementation suffers: missing notification while polling). .put() is responsible for doing all the notifications for you. No wait/notifies needed.

2012-04-04 07:37
by charisis
Tried LinkedBlockingDeque but I still got same proble - Randomize 2012-04-04 07:43
@Randomize can you post an example of the problematic code utilizing a BlockingQueue? The Consumer code should be enough - charisis 2012-04-04 07:45
I'm reusing exactly the same code above I have just replaced ConcurrentLinkedQueue with LinkedBlockingDeque - Randomize 2012-04-04 07:54
As I described above, with a BlockingQueue you should a) get rid of your wait/notify calls and b) use .put() and .take() instead of .add() and .poll() - charisis 2012-04-04 07:57
Using BlockingQueue and removing synchronized/wait/notify/add/poll worked fine thanks : - Randomize 2012-04-04 08:12
Why are you synchronizing around the queue.put - Roland 2017-05-22 07:58


2

The issue in your code is probably because you are using notify instead of notifyAll. The former will only wake up a single thread, if there is one waiting on the lock. This allows a race condition where no thread is waiting and the signal is lost. A notifyAll will force correctness at a minor performance cost by requiring all threads to wake up to check whether they can obtain the lock.

This is best explained in Effective Java 1st ed (see p.150). The 2nd edition removed this tip since programmers are expected to use java.util.concurrent which provides stronger correctness guarantees.

2012-04-04 08:27
by Ben Manes
I used notifyAll but it didn't wor - Randomize 2012-04-04 10:25
There is single consumer so notify / notifyAll doesn't make differenc - Amrish Pandey 2014-09-03 17:02


2

It looks like bad idea to use ConcurrentLinkedQueue and synchronization both at the same time. It defies the purpose of concurrent data structures in the first place.

There is no problem with ConcurrentLinkedQueue data structure and replacing it with BlockingQueue will solve the problem but this is not the root cause.

Problem is with queue.wait(10). This is timed wait method. It will acquire lock again once 10ms elapses.

  1. Notification (queue.notify() ) will get lost because there is no consumer thread waiting on it if 10ms has elapsed.

  2. Producer will not be able to add to the queue since they can't acquire lock because lock is claimed again by the consumer.

Moving to BlockingQueue solved your problem because you removed your wait(10) code and wait and notify was taken care by BlockingQueue data structure.

2014-09-03 17:16
by Amrish Pandey
Ads