This processes need to be synchronized so the producers will not try to put data into the buffer when the buffer is full and consumer processes will not try to pop data out of the buffer when the buffer is empty.
In Java the synchronization can be achieved using the synchronized keyword. When a thread enters a synchronized method, it will activate a lock on that method so no other thread may enter it. The lock will be deactivated when the thread reaches the end of that method.
First we shall create the BufferResource class who will act similar to a stack:
public class BufferResource 
{
 private int[] buffer;
 private int lastIndex;
 private boolean isFull;
 
 public BufferResource(int size)
 {
  isFull = false;
  lastIndex = 0;
  buffer = new int[size];
 }
 
 public synchronized void pushData(int data)
 {
  while(isFull==true)
  {
   try
   {
    wait();
   }catch(InterruptedException ex){}
  }
  buffer[lastIndex] = data;
  lastIndex++;
  if(lastIndex>=buffer.length)
  {
   isFull=true;
   notifyAll();
  }
 }
 
 public synchronized int popData()
 {
  while(isFull==false)
  {
   try
   {
    wait();
   }catch(InterruptedException ex){}
  }
  lastIndex--;
  if(lastIndex==0)
  {
   isFull = false;
   notifyAll();
  }
  return buffer[lastIndex];
 }
}
The resource class contains an array to hold the data called buffer, an index called lastIndex to help pushing and popping and a boolean flag that will be set/unset when the lastIndex will be equal to the size of the array, respectively when lastIndex will be equal to zero.The method void pushData(int data) will be called by the Producer class and will be used to fill the array with data. If the array is full, the Producer thread who entered the method will be blocked. When the last element of the array will be filled with data, the consumer threads will be waked up.
The method int popData() will be called by the Consumer class and will be used to remove data from the array. If the array is not full, the Consumer thread who entered the method will be blocked. When the array will be empty (after all data been popped), the producer threads will be waked up.
The Producer class will be implemented as:
public class Producer extends Thread
{
 private BufferResource buffer;
 private int iterations;
 
 public Producer(String name, int iterations, BufferResource buffer)
 {
  super(name);
  this.buffer = buffer;
  this.iterations = iterations;
 }
 
 public void run()
 {
  for(int i=0;i<iterations;i++)
  {
   buffer.pushData(i);
   System.out.println(this.getName() + " pushed " + i);
   try 
   {
    Thread.sleep((long)(Math.random()*1000));
   } catch (InterruptedException e) {}
  }
 }
}
The Producer class extends Thread and by doing so it needs to implement the interface Runnable. A Producer object needs to hold a reference to the common resource and may also hold a variable which specifies how many iterations it will do.The Producer void run() method contains all the logic for a producer thread. The producer will push data into the buffer according to the specified number of iterations. After each push it will take a break for a random amount of time (maximum 1 second).
The Consumer class will be implemented as:
public class Consumer extends Thread
{
 private BufferResource buffer;
 private int iterations;
 
 public Consumer(String name, int iterations, BufferResource buffer)
 {
  super(name);
  this.buffer = buffer;
  this.iterations = iterations;
 }
 
 public void run()
 {
  for(int i=0;i<iterations;i++)
  {
   int val = buffer.popData();
   System.out.println(this.getName() + " poped " + val);
   try 
   {
    Thread.sleep((long)(Math.random()*1000));
   } catch (InterruptedException e) {}
  }
 }
}
The Consumer class is very similar to the Producer class. The main exception is that the Consumer will take data out of the buffer using BufferResource's method int popData().The client for this application can be implemented as:
public class ProducerConsumerClient 
{
 public static void main(String args[])
 {
  BufferResource buffer = new BufferResource(5);
  Producer p1 = new Producer("Producer1",5,buffer);
  Producer p2 = new Producer("Producer2",5,buffer);
  Consumer c1 = new Consumer("Consumer1",5,buffer);
  Consumer c2 = new Consumer("Consumer2",5,buffer);
  p1.start();
  p2.start();
  c1.start();
  c2.start();
 }
}
It is very important to note, that in this implementation, in order to have all data consumed, the sum of iterations from the producers must be equal to the sum of iterations from the consumers.If you try running the client, your output will be like:
Producer1 pushed 0
Producer2 pushed 0
Producer2 pushed 1
Producer2 pushed 2
Producer1 pushed 1
Consumer2 poped 1
Consumer1 poped 2
Consumer1 poped 1
Consumer2 poped 0
Consumer2 poped 0
Producer1 pushed 2
Producer2 pushed 3
Producer1 pushed 3
Producer2 pushed 4
Producer1 pushed 4
Consumer2 poped 4
Consumer1 poped 4
Consumer1 poped 3
Consumer2 poped 3
Consumer1 poped 2
Download source here.
Further reading:
Implementation of the Producer-Consumer using a BlockingQueue
 


Nice post. By the way Producer consumer can be better addressed by Counting Semaphore or blocking queue. here is a solution of Producer consumer pattern with BlockingQueue
ReplyDeleteThank you for your comment! I will add a link to the post with the Producer-Consumer variant using a BlockingQueue.
Deletehow to Solve Producer Consumer Problem
ReplyDeletehttp://www.youtube.com/watch?v=dUwboVZ59KM
The producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue