Синхронизация между Threads

При стартирането на две или повече нишки те започват да работят асинхронно и независимо една от друга. Когато споделят обаче един и същи ресурс е необходимо да не се позволи на една нишка да повреди или смени данните, които в момента се обработват от друга нишка - синхронизация.

Производител- консуматор. Разпределяне на ресурсите без синхронизация

Във една релация производител - консуматор,  една нишка производител вика метод за писане които записва последователност от числа ( 1, 2, 3, ...)  в поделена зона от паметта. Нишката консуматор чете тези данни:

public class IntShare {
    private int intShared = -1;
   public void setIntShared( int val ){
      System.out.println( Thread.currentThread().getName() +
                 " set intShared: " + val );
      intShared = val;
   }
    public int getIntShared(){
       System.out.println( Thread.currentThread().getName() +
                 " get intShared " + intShared );
       return intShared;
   }
}
public class SetShared extends Thread {
	   private IntShare pGuarde;
	   public SetShared( IntShare h ){
	       super( "Set Shared" );
	       pGuarde = h;
	   }
	   public void run(){
	      for ( int counter = 1; counter <= 10; counter++ ) {
	          // sleep fot random time
	          try {Thread.sleep( (int) ( Math.random() * 3000 ) );}
	          catch( InterruptedException e ) {
	              System.err.println( e.toString() );
	          }
	          pGuarde.setIntShared( counter );
	      }
	      System.out.println( getName() + " finished"  );
	   }
}
public class GetShared extends Thread {
	   private IntShare cGuarde;
	   public GetShared( IntShare h ){
	      super( "GetShared" );
	      cGuarde = h;
	   }
	   public void run(){
	      int val, sum = 0;
	      do {
	         // sleep fot random time
	         try {Thread.sleep( (int) ( Math.random() * 1000 ) );}
	         catch( InterruptedException e ) {
	              System.err.println( e.toString() );
	         }
	         val = cGuarde.getIntShared();
	         sum += val;
	       } while ( val != 10 );
	       System.out.println(
	           getName() + " Total: " + sum);
	   }
}
public class SharedMem {
	   public static void main( String args[] ){
	      IntShare h =new IntShare();
	      SetShared p = new SetShared( h );
	      GetShared c = new GetShared( h );
	      p.start();
	      c.start();
	   }
}
GetShared get intShared -1
GetShared get intShared -1
Set Shared set intShared: 1
GetShared get intShared 1
GetShared get intShared 1
GetShared get intShared 1
GetShared get intShared 1
Set Shared set intShared: 2
GetShared get intShared 2
Set Shared set intShared: 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
Set Shared set intShared: 4
GetShared get intShared 4
GetShared get intShared 4
Set Shared set intShared: 5
GetShared get intShared 5
Set Shared set intShared: 6
GetShared get intShared 6
GetShared get intShared 6
GetShared get intShared 6
GetShared get intShared 6
Set Shared set intShared: 7
GetShared get intShared 7
GetShared get intShared 7
Set Shared set intShared: 8
GetShared get intShared 8
GetShared get intShared 8
GetShared get intShared 8
GetShared get intShared 8
Set Shared set intShared: 9
GetShared get intShared 9
GetShared get intShared 9
GetShared get intShared 9
Set Shared set intShared: 10
Set Shared finished
GetShared get intShared 10
GetShared Total: 139

 

Синхронизация

Кода на програмата, който може да предизвика повреждане или промяна на данните ползвани от друга нишка се нарича критична секция.

 Java Използва така наречените "монитори" за синхронизация между нишките. Мониторът представлява обект, който става собственост на нишката навлязла в критична секция и който й позволява да изпълнява критичната секция. Нарича се заключване на обекта, метода, кода.

За заключване на критичната секция се използва ключовата дума  synchronized  - за метод:

public synchronized int method1{
      ...
}

инструкция (или блок) 

synchronized : System.out.println("...");

или обект

synchronized(object1){
    object1.method2();

}

Обектът се заключва когато някоя нишка влиза в метода и програмния код. Всичко останали нишки, които искат да използват същия метод или код трябва да чакат завършването на метода или кода от заключилата я нишка. След освобождаването нишката с най-висок приоритет се опитва да вземе ресурса.

За всеки обект в който има синхронизиран метод (или самия обект е синхронизиран)се генерира един монитор.

Трябва да се отбележи, че за всеки обект съществува само по един монитор. Ако има няколко критични метода, то всички те се заключват при изпълнение на един от тях.

Ако се синхронизира статичен метод то заключването се извършва за всички обекти от класа.

Ако една нишка е заключила обект и поради някаква причина не може да продължи, тя трябва да извика  wait() , да премине в състояние "блокирана" и да освободи заключването на обекта. За да бъде ре-активирана, тя трябва да получи от друга нишка съобщение  notyfy().

Ако множество нишки чакат за даден монитор Java runtime system избира една от тях без задължение или гаранция коя нишка ще бъде избрана.

 

notifyAll();

Класът Object притежава още един метод -- notifyAll() -- който събужда всички нишки, чакащи на същия монитор. В този случай нишките се състезават за монитора. След като една го получи другите отново минават в блокирано състояние.
 
Съществуват три метода
wait()
Чака безкрайно да бъде събуден с notyfy() или notifyAll().
wait(long timeout)
Чака събуждане докато изтекат timeout милисекунди.
wait(long timeout, int nanos)
Чака събуждане докато изтекат timeout милисекунди +  nanos наносекунди.
 
wait(long timeout) и wait(long timeout, int nanos) могат да бъдат използвани вместо sleep(). Разликата е, че sleep не може да бъде прекъснат преди изтичането на времето за разлика от wait.

 

Производител - консуматор със синхронизация:

public class IntShare {
    private int intShared = -1;
    private boolean writable= true;
   public synchronized void setIntShared( int val ){
      while(!writable) {
        try{     wait();   }
        catch(InterruptedException e){
            System.err.println(e);
        }
      }
      System.out.println( Thread.currentThread().getName() +
                 " set intShared: " + val );
      intShared = val;
      writable=false;
      notify();
   }
   public synchronized int getIntShared(){
       while(writable) {
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
       }
       System.out.println( Thread.currentThread().getName() +
                 " get intShared " + intShared );
       writable=true;
       notify();
       return intShared;
   }
}
Set Shared set intShared: 1
GetShared get intShared 1
Set Shared set intShared: 2
GetShared get intShared 2
Set Shared set intShared: 3
GetShared get intShared 3
Set Shared set intShared: 4
GetShared get intShared 4
Set Shared set intShared: 5
GetShared get intShared 5
Set Shared set intShared: 6
GetShared get intShared 6
Set Shared set intShared: 7
GetShared get intShared 7
Set Shared set intShared: 8
GetShared get intShared 8
Set Shared set intShared: 9
GetShared get intShared 9
Set Shared set intShared: 10
Set Shared finished
GetShared get intShared 10
GetShared Total: 55

 

 

Избягване на само-блокиране

Java runtime system позволява нишката да получи отново монитор, който вече притежава (Мониторите са re-entrant). По този начин се избягва възможността за само-блокиране на дадена нишка върху монитор, който тя притежава.

 

public class Reentrant {
    public static void main(String a[]){
        Reentrant r = new Reentrant();
        r.a();
    }
    public synchronized void a() {
	b();
	System.out.println("here I am, in a()");
    }
    public synchronized void b() {
	System.out.println("here I am, in b()");
    }
}

 

here I am, in b()
here I am, in a()

 

Явно заключване(Locks и Condition Variables)

Java 1.5 въвежда и един друг подход за синхронизация на критичните секции - явното им заключване. Явното заключване е по-гъвкаво от използването на synchronized защото не се ограничава до метод или блок.

За да се създаде явен ключ трябва да се създаде обект от клас наследяващ Lock  интерфейса - обикновено ReentrantLock. За да заключи, нишката трябва да извика lock() метода, а за да отключи - unlock()  метода. За да не остане кодът заключен при възникване на изключение, методите lock() и unlock() трябва да бъдат обхванати от try/finally клауза.

За да се изчака заключен код трябва да се създаде променлива за условие(condition variable -обект наследяващ интерфейса Condition ). Променливите за условие предоставят методи за изчакване (await ) и за сигнализиране (signalAll ) на чакащите нишки за освобождаване на критичната секция. Подобно на Object.wait методите, Condition.await има няколко варианта, които са цитирани в следващата таблица:

 

Метод Описание
await() Чака да бъде изпълнено условие
awaitNanos(long timeout) Чака да бъде изпълнено условие, до изтичане на времето декларирано в наносекунди.
await(long timeout, TimeUnit unit) Чака да бъде изпълнено условие до изтичане на времето декларирано в TimeUnit  единици (TimeUnit .SECONDS, TimeUnit.MILLISECONDS, TimeUnit.NANOSECONDS, TimeUnit.MICROSECONDS)
await(Date timeout) Чака да бъде изпълнено условие до зададената дата.
awaitInterruptibly() Чака да бъде изпълнено условие. Не може да се прекъсва.

 

Производител - консуматор със явно заключване:

import java.util.concurrent.locks.*;

public class IntShare {
    private int intShared = -1;
    private boolean writable= true;
    private Lock aLock = new ReentrantLock();
    private Condition condVar = aLock.newCondition();
   public void setIntShared( int val ){
   	aLock.lock();
	try{
      		while(!writable) {
      			try{     condVar.await();   }
      			catch(InterruptedException e){
      				System.err.println(e);
      			}
      		}
        		System.out.println( Thread.currentThread().getName() +" set intShared: " + val );
        		intShared = val;
        		condVar.signalAll();
      	}
      	finally {
        		writable=false;
        		aLock.unlock();
      	}    
   }
   public int getIntShared(){
   	aLock.lock();
   	try{
       		while(writable) {
            			try{     condVar.await();   }
            				catch(InterruptedException e){
                				System.err.println(e);
            				}
       		}
       		System.out.println( Thread.currentThread().getName() +  " get intShared " + intShared );
       		writable=true;
       		condVar.signalAll();
   	}
   	finally{
       		aLock.unlock();
       		return intShared;
   	}
   	
   }
}