Producer/Consumer using Semaphore; getting deadlock

Go To StackoverFlow.com

0

According to http://en.wikipedia.org/wiki/Producer-consumer_problem I want to simulate P/C problem using semaphore. I am getting deadlock and I don't know what is problem.

public static void main(String[] args) {
        CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
    private static final int MAX_SIZE = 10;

    private Semaphore mutex = new Semaphore(1);
    private Semaphore fillCount = new Semaphore(0);
    private Semaphore emptyCount = new Semaphore(MAX_SIZE);

    @Override
    public boolean offer(Object e) {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        boolean result = super.offer(e);
        System.out.println("offer " + size());
        try {
            fillCount.release();
            emptyCount.acquire();
            mutex.release();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return result;
    }

    @Override
    public Object poll() {
        try {
            mutex.acquire();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Object result = super.poll();
        System.out.println("poll  " + size());
        try {
            emptyCount.release();
            fillCount.acquire();
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

class Producer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Producer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(2));
                blockingQueue.offer(new Object());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private CustomBlockingQueue blockingQueue;
    private Random random = new Random();

    public Consumer(CustomBlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(4));
                blockingQueue.poll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Using semaphores

Semaphores solve the problem of lost wakeup calls. In the solution below we use two semaphores, fillCount and emptyCount, to solve the problem. fillCount is the number of items to be read in the buffer, and emptyCount is the number of available spaces in the buffer where items could be written. fillCount is incremented and emptyCount decremented when a new item has been put into the buffer. If the producer tries to decrement emptyCount while its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount is incremented and the producer wakes up. The consumer works analogously.

2012-04-04 22:14
by ASD
save yourself headaches: use thread safe queues - jldupont 2012-04-04 22:16
Agreed - this is all implemented for you in the libraries - no need to do it the hard way - DNA 2012-04-04 22:17
I want for myself to solve this problem using intentionally semaphor - ASD 2012-04-04 22:17
BTW @ASD, I'd recommend doing your .release() calls in a finally {} block - Gray 2012-04-04 23:43


2

your locking is in the wrong order:

needs to be for offer:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

similar change needed for poll:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

in your implementation you are waiting for semaphores while holding the mutex which causes problems because the other thread can be waiting for the mutex in order to release a semaphore.

2012-04-04 22:18
by benmmurphy
+1 You have to try hard to get this order wrong - semaphore P-C queues have been taught in classrooms since the 60's and there are dozens of examples - Martin James 2012-04-04 23:21


2

You might consider using instead a BlockingQueue which takes case of the mutex locking and waiting for you.

On the aside, I've got an old page which demonstrates the producer/consumer race conditions (as opposed to the spurious interrupt). But my implementation does not use semaphores so I'm not sure it will help you:

http://256stuff.com/gray/docs/misc/producer_consumer_race_conditions/

2012-04-04 22:16
by Gray
I know about BlockingQueue but I want for myself to solve this problem using intentionally semaphore - ASD 2012-04-04 22:17
Figured @ASD. Thought I'd just point that out. I've also put a link to my producer/consumer page FYI - Gray 2012-04-04 22:19
Thanks for the reply,but I already solved this problem by this way(but a little differently than you) - ASD 2012-04-04 22:25
Ads