I have a situation where different threads populate a queue (producers) and one consumer retrieve element from this queue. My problem is that when one of these elements are retrieved from the queue some is missed (missing signal?). The producers code is:
class Producer implements Runnable {
private Consumer consumer;
Producer(Consumer consumer) { this.consumer = consumer; }
@Override
public void run() {
consumer.send("message");
}
}
and they are created and run with:
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
executor.execute(new Producer(consumer));
}
Consumer code is:
class Consumer implements Runnable {
private Queue<String> queue = new ConcurrentLinkedQueue<String>();
void send(String message) {
synchronized (queue) {
queue.add(message);
System.out.println("SIZE: " + queue.size());
queue.notify();
}
}
@Override
public void run() {
int counter = 0;
synchronized (queue) {
while(true) {
try {
System.out.println("SLEEP");
queue.wait(10);
} catch (InterruptedException e) {
Thread.interrupted();
}
System.out.println(counter);
if (!queue.isEmpty()) {
queue.poll();
counter++;
}
}
}
}
}
When the code is run I get sometimes 20 elements added and 20 retrieved, but in other cases the elements retrieved are less than 20. Any idea how to fix that?
I'd suggest you use a BlockingQueue instead of a Queue. A LinkedBlockingDeque might be a good candidate for you.
Your code would look like this:
void send(String message) {
synchronized (queue) {
queue.put(message);
System.out.println("SIZE: " + queue.size());
}
}
and then you'd need to just
queue.take()
on your consumer thread
The idea is that .take() will block until an item is available in the queue and then return exactly one (which is where I think your implementation suffers: missing notification while polling). .put() is responsible for doing all the notifications for you. No wait/notifies needed.
BlockingQueue
you should a) get rid of your wait/notify calls and b) use .put()
and .take()
instead of .add()
and .poll()
- charisis 2012-04-04 07:57
The issue in your code is probably because you are using notify
instead of notifyAll
. The former will only wake up a single thread, if there is one waiting on the lock. This allows a race condition where no thread is waiting and the signal is lost. A notifyAll will force correctness at a minor performance cost by requiring all threads to wake up to check whether they can obtain the lock.
This is best explained in Effective Java 1st ed (see p.150). The 2nd edition removed this tip since programmers are expected to use java.util.concurrent which provides stronger correctness guarantees.
It looks like bad idea to use ConcurrentLinkedQueue and synchronization both at the same time. It defies the purpose of concurrent data structures in the first place.
There is no problem with ConcurrentLinkedQueue data structure and replacing it with BlockingQueue will solve the problem but this is not the root cause.
Problem is with queue.wait(10). This is timed wait method. It will acquire lock again once 10ms elapses.
Notification (queue.notify() ) will get lost because there is no consumer thread waiting on it if 10ms has elapsed.
Producer will not be able to add to the queue since they can't acquire lock because lock is claimed again by the consumer.
Moving to BlockingQueue solved your problem because you removed your wait(10) code and wait and notify was taken care by BlockingQueue data structure.
wait
,notify
) and high-level ones (ConcurrentLinkedQueue
,ExecutorService
). Use one or the other - artbristol 2012-04-04 07:32