Trying to understand how threads synchronize and communicate with each other with monitors, guarded blocks and what not. God, this stuff is complex to understand. Unlike many other things with software where understanding 90% of something means I can do that thing perhaps 80% effectively, programming is binary - understand the concept completely and you can write the program; understand "more or less" and you are stuck forever!
Why all this philosophy? Because I tried writing the code for the Producer and Consumer example given in the java tutorial
here and it was a struggle to get it working because I didn't understand that the threads have to synchronize on a common object to get their work done. Unlike in the example where the synchronization happens on the Drop object (that is a better way of coding because it allows for multiple producers and consumers), I tried to synchronize Producer and Consumer with each other. At last got it to work at least:-)
All this stuff sounds so very similar to the locking concepts in databases and why not? Both deal with concurrent applications. Hopefully, understanding one will help understanding the other also.
Here is the inefficient code anyway...
// The Drop object contains the packet that must be passed from Producer to Consumer
import java.util.Random;
class Drop {
private boolean empty=true;
private String msg=null;
public synchronized boolean isEmpty() {
return empty;
}
public synchronized void put(String info) throws Exception {
if (!isEmpty()) {
System.out.println("ERROR! TRYING TO OVERWRITE!!!");
throw new Exception("OVERWRITE");
}
msg = info;
empty = false;
}
public synchronized String get() throws Exception {
if (isEmpty()) {
System.out.println("ERROR! EMPTY BOX!!!");
throw new Exception("EMPTY BOX");
}
empty = true;
return msg;
}
}
class Producer extends Thread {
private Drop box;
private int numMsg;
Consumer myConsumer;
Producer(Drop pkg, int n) {
box = pkg;
numMsg = n;
}
public void run() {
try {
sendMessages();
} catch(Exception e) {
System.out.println("ERROR in producer");
e.printStackTrace();
}
System.out.println("Producer exiting");
}
void start(Consumer c) {
myConsumer = c;
start();
}
private void sendMessages() throws Exception {
String msg;
for(int i=0; i<=numMsg; i++) {
// Sleep randomly
try {
Random rand = new Random();
Thread.sleep(rand.nextInt(1000));
}
catch (InterruptedException e) {
// Sleep interrupted, never mind, continue
}
if (i==numMsg) {
msg = new String("DONE");
} else {
msg = new String("Message "+(i+1));
}
writeMessage(msg);
System.out.println("PROD: Wrote:"+msg);
}
}
synchronized void writeMessage(String msg) throws Exception {
if(box.isEmpty()) {
System.out.println("PROD: Box is empty, try putting a message.");
box.put(msg);
System.out.println("PROD: Put message, notifying all.");
synchronized (myConsumer) {
myConsumer.notifyAll();
System.out.println("PROD: Notified consumer.");
}
return;
}
while(!box.isEmpty()) {
try {
System.out.println("PROD: Box is full, going to wait.");
wait();
} catch(InterruptedException e) {
}
if(box.isEmpty()) {
box.put(msg);
synchronized (myConsumer) {
myConsumer.notifyAll();
System.out.println("PROD: Put message, notifying all.");
}
return;
}
}
}
}
class Consumer extends Thread {
private Drop box;
Producer myProducer;
Consumer(Drop pkg) {
box = pkg;
}
public void run() {
try {
getMessages();
} catch(Exception e) {
System.out.println("ERROR in consumer");
e.printStackTrace();
}
System.out.println("Consumer exiting");
}
void start(Producer p) {
myProducer = p;
start();
}
private void getMessages() throws Exception {
String msg="EMPTY";
while(!msg.equals("DONE")) {
// Sleep randomly
try {
Random rand = new Random();
Thread.sleep(rand.nextInt(1000));
}
catch (InterruptedException e) {
// Sleep interrupted, never mind, continue
}
System.out.println("CONS: Trying to read msg...");
msg = new String(readMessage());
System.out.println("CONS: Read:"+msg);
}
}
synchronized String readMessage() throws Exception {
String msg=null;
if(!box.isEmpty()) {
msg = new String(box.get());
synchronized (myProducer) {
myProducer.notifyAll();
System.out.println("CONS: Notified producer.");
}
}
else {
while(box.isEmpty()) {
try {
System.out.println("CONS: Empty box, will wait till woken up...");
wait();
} catch(InterruptedException e) {
}
if(!box.isEmpty()) {
msg = new String(box.get());
synchronized (myProducer) {
myProducer.notifyAll();
System.out.println("CONS: Notified producer.");
}
break;
}
}
}
return msg;
}
}
public class ProducerConsumer {
public static void main(String[] args) {
Drop box = new Drop();
System.out.println("MAIN: Starting a loop of "+args[0]);
Producer p = new Producer(box, Integer.parseInt(args[0]));
Consumer c = new Consumer(box);
c.start(p);
p.start(c);
System.out.println("MAIN: Ended.");
}
}