Thread Synchronization

Resource sharing without synchronization

In a producer - consumer relationship, a producer thread calls a producer method which deposits a sequence of numbers (let's use 1, 2, 3, ...) into a shared memory area (buffer). The consumer thread reads this data:

The buffer must be the same for the Producer and Consumer threads -  a single object (that has set and get methods) as a parameter for both Producer() and Consumer() constructors.

In our example, the buffer will be represented by an "IntToShare" class, which can store a single integer (intShared). This class provides two methods: to read (getIntegerShared()) and to modify (setIntegerShared()) the buffer's contents.

Both the producer  and consumer classes are derived directly from the Thread class. The producer generates 10 integers and stores them in the buffer. The consumer reads them in order and finishes when it reads the last number : 10.

Potential risks:

  1.Both the producer and the consumer may attempt to update the buffer contents simultaneously. This can lead to data loss or inconsistencies.
  2.The producer might be slower than the consumer, potentially reading the same value multiple times.
  3.The consumer might be slower than the producer, potentially losing generated values.

class IntToShare {
    private int intShared = -1;
    public void setIntShared( int val ){  //used by Producer
        System.out.println( Thread.currentThread().getName() +
                " set intShared " + val );
        intShared = val;
    }
    public int getIntShared(){        // used by Consumer
        System.out.println( Thread.currentThread().getName() +
                " get intShared " + intShared );
        return intShared;
    }
}

class Producer extends Thread {
    private IntToShare pGarde;
    public Producer( IntToShare h ){
        super( "Producer" );
        pGarde = h;
    }
    public void run(){
        for ( int counter = 1; counter <= 10; counter++ ) {
            // sleep for random time.
            try {Thread.sleep( (int) ( Math.random() * 30 ) );}
            catch( InterruptedException e ) {
                System.err.println( e.toString() );
            }
            pGarde.setIntShared( counter );
        }
        System.out.println( getName() + " is finish"  );
    }
}

class Consumer extends Thread {
    private IntToShare cGarde;
    public Consumer( IntToShare h ){
        super( "Consumer" );
        cGarde = h;
    }
    public void run(){
        int val, sum = 0;
        do {
            // sleep for a random time.
            try {Thread.sleep( (int) ( Math.random() * 50 ) );}
            catch( InterruptedException e ) {
                System.err.println( e.toString() );
            }
            val = cGarde.getIntShared();
            sum += val;
        } while ( val != 10 );
        System.out.println(
                getName() + " read values whose total sum is:" + sum);
    }
}

public class SharedCell {
    public static void main( String args[] ){
        IntToShare h =new IntToShare();
        Producer p = new Producer( h );
        Consumer c = new Consumer( h );
        p.start();
        c.start();
    }
}
Consumer get intShared -1
Producer set intShared 1
Consumer get intShared 1
Producer set intShared 2
Consumer get intShared 2
Producer set intShared 3
Producer set intShared 4
Producer set intShared 5
Consumer get intShared 4
Consumer get intShared 5
Producer set intShared 6
Producer set intShared 7
Consumer get intShared 7
Producer set intShared 8
Producer set intShared 9
Producer set intShared 10
Producer is finish
Consumer get intShared 10
Consumer read values whose total sum is:29

Synchronization

Java uses so-called "monitors" to ensure synchronization. The monitor represents an object that allows a single thread at a time to execute a method or block of code. This is accomplished by locking the object when invoking the method or block of code.

The keyword synchronized is used to mark that a method.

public synchronized int method1{
      ...
}

or a block of code

synchronized(objet)
    while(...) {
           ...
    }

must lock the object when a thread enters the method or the block. It is also said that the lock has been obtained. If there are multiple synchronized methods, only one of these methods is active at a time on an object, all other threads that attempt to invoke synchronized methods on the object must wait. When the synchronized method finishes executing, the lock on the object is released and the monitor lets the highest priority thread attempt to invoke execution of a synchronized method.

If a thread running in a synchronized method determines that it cannot continue, it must call the method wait () to go into the "blocked" state and release the object's lock. To exit this state it will have to receive notification (via methods notify/notifyAll) from another thread which exits a synchronized method on the same object.

We use the keyword synchronized to mark that a method

notify() / notifyAll();

Producer - consumer with synchronization:

class IntToShare {
    private int intShared = -1;
    private boolean writable= true;
    public synchronized void setIntShared( int val ){
        while(!writable) {
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " set intShared " + val );
        intShared = val;
        writable = false;
        notify();
    }
    public synchronized int getIntShared(){
        while(writable) {
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " get intShared " + intShared );
        writable = true;
        notify();
        return intShared;
    }
}
Producer set intShared 1
Consumer get intShared 1
Producer set intShared 2
Consumer get intShared 2
Producer set intShared 3
Consumer get intShared 3
Producer set intShared 4
Consumer get intShared 4
Producer set intShared 5
Consumer get intShared 5
Producer set intShared 6
Consumer get intShared 6
Producer set intShared 7
Consumer get intShared 7
Producer set intShared 8
Consumer get intShared 8
Producer set intShared 9
Consumer get intShared 9
Producer set intShared 10
Producer is finish
Consumer get intShared 10
Consumer read values whose total sum is:55


 

 

 Several producers and consumers

A consumer must finish when all producers have finished and there is no product value (no active producers). Exception to finish. “Timed waiting” so as not to miss the termination of the last producer.

class IntToShare {
    private int intPartage = -1;
    private int nomP =0;
    private boolean writable= true;
    public synchronized void incP(){        nomP++;    }
    public synchronized void decP(){        nomP--;    }
    public synchronized void setIntPartage( int val ){
        while(!writable) {
            System.out.println("\t"+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " produce intPartage  " + val );
        intPartage = val;
        writable = false;
        notifyAll();
    }
    public synchronized int getIntPartage() throws NoP{
        while(writable) {
            if(nomP==0)  throw new NoP()  ;
            System.out.println("\t"+Thread.currentThread().getName()+" waiting");
            try{     wait(1000);  }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " get intPartage " + intPartage );
        writable = true;
        notifyAll();
        return intPartage;
    }
}
//-----------------------------------------------------------------------------
class Producer extends Thread {
    private IntToShare pGarde;
    public Producer( IntToShare h ){
        super( "Producer " +(int)(Math.random()*1000));
        pGarde = h;
    }
    public void run(){
        pGarde.incP();
        System.out.println( getName() + " starting"  );
        for ( int counter = 1; counter <= 4; counter++ ) {
            // sleep for random time.
            try {Thread.sleep( (int) ( Math.random() * 3000 ) );}
            catch( InterruptedException e ) { System.err.println( e.toString() );   }
            pGarde.setIntPartage( counter );
        }
        pGarde.decP();
        System.out.println( getName() + " finished"  );  
    }
}
//-----------------------------------------------------------------------------
class Consumer extends Thread {
    private IntToShare cGarde;
    public Consumer( IntToShare h ){
        super( "Consumer "+(int)(Math.random()*1000) );
        cGarde = h;
    }
    public void run(){
        System.out.println( getName() + " starting"  );
        int val;
        do {
            try {Thread.sleep( (int) ( Math.random() * 1000 ) );}
            catch( InterruptedException e ) {
                System.err.println( e.toString() );
            }
            try{
                val = cGarde.getIntPartage();
            }catch (NoP exc){
                break;
            }
            System.out.println("\t\t"+this.getName()+" has read the value "+val);
        } while ( true );
        System.out.println( getName() + " finished");
    }
}
//----------------------------------------------------------------------------
class NoP extends Exception{
   
    private static final long serialVersionUID = 1L;

    NoP(){
        System.out.println("Exception NoP thrown");
    }

}
//---------------------------------------------------------------------------
public class SharedCellM {
    public static void main( String args[] ){
        IntToShare h =new IntToShare();
        for(int i=0;i<2;i++){
            (new Producer( h )).start();
        }
        System.out.println("All Producer started by: "+ Thread.currentThread().getName());
        for(int i=0;i<3;i++){
            (new Consumer(h)).start();
        }
        System.out.println("Thread "+Thread.currentThread().getName()+" finished");
    }
}
Producer 860 starting
Producer 711 starting
Producer 74 starting
All Producer started by: main
Thread main finished
Consumer 771 starting
Consumer 964 starting
    Consumer 771 waiting
    Consumer 964 waiting
    Consumer 771 waiting
    Consumer 964 waiting
Producer 711 produce intShared  1
Consumer 771 get intShared 1
        Consumer 771 has read the value 1
    Consumer 964 waiting
Producer 860 produce intShared  1
Consumer 964 get intShared 1
        Consumer 964 has read the value 1
Producer 74 produce intShared  1
Consumer 771 get intShared 1
        Consumer 771 has read the value 1
    Consumer 964 waiting
    Consumer 771 waiting
    Consumer 964 waiting
Producer 860 produce intShared  2
Consumer 771 get intShared 2
        Consumer 771 has read the value 2
    Consumer 964 waiting
Producer 711 produce intShared  2
Consumer 964 get intShared 2
        Consumer 964 has read the value 2
    Consumer 771 waiting
Producer 711 produce intShared  3
Consumer 771 get intShared 3
        Consumer 771 has read the value 3
    Consumer 964 waiting
    Consumer 771 waiting
    Consumer 964 waiting
Producer 74 produce intShared  2
Consumer 771 get intShared 2
        Consumer 771 has read the value 2
    Consumer 964 waiting
    Consumer 771 waiting
Producer 711 produce intShared  4
Producer 711 finished
Consumer 964 get intShared 4
        Consumer 964 has read the value 4
    Consumer 771 waiting
Producer 74 produce intShared  3
Consumer 771 get intShared 3
        Consumer 771 has read the value 3
    Consumer 964 waiting
Producer 74 produce intShared  4
Producer 74 finished
Consumer 964 get intShared 4
        Consumer 964 has read the value 4
    Consumer 771 waiting
Producer 860 produce intShared  3
Consumer 771 get intShared 3
        Consumer 771 has read the value 3
    Consumer 964 waiting
    Consumer 771 waiting
    Consumer 964 waiting
    Consumer 771 waiting
Producer 860 produce intShared  4
Producer 860 finished
Consumer 964 get intShared 4
        Consumer 964 has read the value 4
Exception NoP thrown
Consumer 771 finished
Exception NoP thrown
Consumer 964 finished

Exercise - In the previous example, try to make the buffer/queue include three elements instead of one.