Thread pool
3.14.1  Introduction. 

L'approche qu'on a utilisée jusqu'au maintenant consiste à créer un nouveau thread à chaque fois qu'une demande arrive et à traiter cette nouvelle demande dans le thread nouvellement créé. Si cette approche semble simple à mettre en œuvre, elle présente des inconvénients importants. Un programme qui crée un nouveau thread pour chaque demande passerait plus de temps et consommerait plus de ressources système pour créer et détruire des threads que pour traiter les demandes réelles.

Étant donné que les threads actifs consomment des ressources système, une JVM créant trop de threads en même temps peut entraîner une insuffisance de mémoire du système. Cela nécessite la limitation de nombre de threads créés.

Qu'est-ce que ThreadPool en Java?

Un pool de threads réutilise les threads précédemment créés pour exécuter les tâches en cours et offre une solution au problème de la surcharge du cycle des threads et de la destruction des ressources. Comme le thread existe déjà lorsque la demande arrive, le délai introduit par la création de thread est éliminé, ce qui rend l'application plus réactive.


Java 1.5   .
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

corePoolSize
- the number of threads to keep in
the pool, even if they are idle

maximumPoolSize
- the maximum number of threads
to allow in the pool

keepAliveTime
- when the number of threads is
greater than the core, this is the maximum time
that excess idle threads will wait for new tasks
before terminating.

unit
- the time unit for the keepAliveTime
argument

workQueue
- the queue to use for holding tasks
before they are executed. This queue will hold
only the Runnable tasks submitted by the execute
 
method

      

   

Un ThreadPoolExecutor ajustera automatiquement la taille du pool (voir  getPoolSize()en fonction des limites définies par corePoolSize (voir getCorePoolSize()et maximumPoolSize (voir  getMaximumPoolSize()). 

En définissant corePoolSize et maximumPoolSize de la même manière, vous créez un pool de threads de taille fixe. En définissant maximumPoolSize sur une valeur essentiellement non limitée, telle que Integer.MAX_VALUE, vous autorisez le pool à gérer un nombre arbitraire de tâches simultanées. Généralement, les tailles de pool principale et maximale sont définies uniquement lors de la construction, mais elles peuvent également être modifiées de manière dynamique à l'aide  de   setCorePoolSize(int)  and  setMaximumPoolSize(int).


nombre de threads - important.

trop de threads  - perte de temps pour création des ressources non utilisés.

peu de threads - les tâches attendent.

pour plus d'information sur nombre des threads: ideal thread pool size

3.14.2  Exemple. 

import java.util.concurrent.*;
public class ThreadPoolTest {
    public static void main(String[] args) {
        int nTasks = 20;    // number of tasks to be submitted to pool
        long n = 30;       //Fibonacci number
        int tpSize = 5;  // corePoolSize
        LinkedBlockingQueue<Runnable> q;

        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
            tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
           ( q=new LinkedBlockingQueue<Runnable>( )));

/*     public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue)
*/
        System.out.println("Initial number of threads:"+tpe.getActiveCount());
        Task[] tasks = new Task[nTasks];
        for (int i = 0; i < nTasks; i++) {
            tasks[i] = new Task(n, "Task " + i,tpe);
            tpe.execute(tasks[i]);
            System.out.println("submittint task "+i+
                    " number of active threads "+tpe.getActiveCount()+
                    " number of task in the queue "+q.size());
        }
        tpe.shutdown( );
    }
}
---------------------------------------------------------------------------
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class Task implements Runnable {
    long n;
    String id; 
    ThreadPoolExecutor tpe;
    private long fib(long n) {
        if (n == 0)
            return 0L;
        if (n == 1)
            return 1L;
        return fib(n - 1) + fib(n - 2);
    }

    public Task(long n, String id, ThreadPoolExecutor tpe) {
        this.n = n;
        this.id = id;     
        this.tpe=tpe;
    }

    public void run( ) {
        Date d = new Date( );
        DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
        long startTime = System.currentTimeMillis( );
        d.setTime(startTime);
        System.out.println("Starting task " + id + " at " + df.format(d)+ "; active threads:"
                 +tpe.getActiveCount());
        System.out.println("\tfibonatchi "+ n+":"+ fib(n));
        long endTime = System.currentTimeMillis( );
        d.setTime(endTime);
        System.out.println("\tEnding task " + id + " at " + df.format(d) +" after "
                 + (endTime - startTime) + " milliseconds");
    }
}


3.14.3 Réalisation "piscine" avec Thread Pool


Resources - sans changement 

3.14.3.1 Cabine

public class Cabine {
    private int free;
    Cabine(int free){
        this.free = free;
    }
    synchronized void takeCabine(){
        while(free==0){
            System.out.println("there is no free cabine, "+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        free--;
        System.out.println("the cabine is taken by "+Thread.currentThread().getName()+", there is "+free+" free cabines");
    }
    synchronized void releaseCabine(){
        free++;
        System.out.println("the cabine is released by "+Thread.currentThread().getName()+", there is "+free+" free cabines");
        notifyAll();
    }
}

3.14.3.2 Panier

public class Basket {
    private  int free;
    Basket(int free){
        this.free = free;
    }
    synchronized void takeBasket(){
        while(free==0){
            System.out.println("there is no free basket, "+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        free--;
        System.out.println("the basket is taken by "+Thread.currentThread().getName()+", there is "+free+" free baskets");
    }
    synchronized void releaseBasket(){
        free++;
        System.out.println("the basket is released by "+Thread.currentThread().getName()+", there is "+free+" free baskets");
        notifyAll();
    }
}

3.14.3.3 Client - à la place de la  Thread, c'est une task  Runnable

public class Client implements Runnable{
    String  name;
    static int n=0;
    Cabine c;
    Basket b;
    Client(Cabine c, Basket b){
        name = "Client "+ ++n;
        this.c=c;
        this.b = b;
        try {
            System.out.println(" creating new client:"+name);
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
    }
    public void run(){
        try {
            System.out.println(this+" going to the swim pool");
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take basket");
        b.takeBasket();
        try {
            System.out.println(this+" going to the cabine");
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take cabine");
        c.takeCabine();
        try {
            System.out.println(this+" changing");
            Thread.sleep((int)(Math.random()*600));
        } catch (InterruptedException e){}
        System.out.println(this+" release cabin");
        c.releaseCabine();
        try {        
            System.out.println(this+" swimimg");
            Thread.sleep((int)(Math.random()*2000));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take cabine");
        c.takeCabine();
        try {        
            System.out.println(this+" changing");
            Thread.sleep((int)(Math.random()*600));
        } catch (InterruptedException e){}
        System.out.println(this+" release cabin");
        c.releaseCabine();
        System.out.println(this+" release basket");
        b.releaseBasket();
        System.out.println(this+" going home");
    }
    public String toString(){
        return name;
    }
}

3.14.3.4 Thread Pool

import java.util.concurrent.*;
public class SwimPool {
    public static void main(String[] args) {
        int coreThr=7;
        LinkedBlockingQueue<Runnable> q;
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
                coreThr, coreThr, 5000L, TimeUnit.MILLISECONDS,
                (q=new LinkedBlockingQueue<Runnable>( )));

        Cabine c=new Cabine(2);
        Basket b = new Basket(3);
        for(int i = 0; i<15;i++){
            tpe.execute(new Client(c,b));
            System.out.println("next client,there is "+q.size()+" elements in queue");
            try{
                Thread.sleep(50);
             } catch (InterruptedException e){}
        }
        tpe.shutdown();
    }
}





3.14.4 Réalisation "Train" avec Thread Pool


3.14.4.1 Mise en route du modèle - complètement réécrit.


20 voyageurs  généré par hasard, toutes les 100 ms
Deux
ThreadPoolExecutor avec tpsize actives threads pour les passagers
 tpsize-3  voyageurs maximum dans le train.

import java.util.concurrent.*;
public class Circ {
    public static void main(String arg[]){
        int tpsize = 8;  // corePoolSize
        Pass pass;
        ThreadPoolExecutor left, right;
        left = new ThreadPoolExecutor(tpsize,tpsize,20,TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>( ));
        right = new ThreadPoolExecutor(tpsize,tpsize,20,TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>( ));
        Monitor mt = new Monitor(tpsize-3);
        Train train = new Train(mt);
        train.start();
        for(int i=0;i<20;i++){
            pass = new Pass(mt,Math.random()>0.5?true:false);
            if (pass.aToB) {
                left.execute(pass);
            }
            else {
                right.execute(pass);
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e){}
        }
        left.shutdown();
        right.shutdown();
    }
}


3.14.4.2 Le train - sans modifications


public class Train extends Thread{
    Monitor mt;
    public Train(Monitor mt){
        this.mt=mt;   
    }
    public void run(){
        for(int i=0;i<3;i++){
            try{
                sleep(1000);
            }
            catch(InterruptedException e){}
            mt.leaveA();
            try{
                sleep(2000);
            }
            catch(InterruptedException e){}  
            mt.arriveB();
            try{
                sleep(1000);
            }
            catch(InterruptedException e){}
            mt.leaveB();
            try{
                sleep(2000);
            }
            catch(InterruptedException e){}  
            mt.arriveA();
        }
    }
}

3.14.4.3 Passager - à la place d'une  Thread, c'est une task  Runnable

public class Pass implements Runnable{
    boolean aToB;
    static int num=1;
    Monitor mt;
    String name;
    Pass(Monitor mt,boolean aToB){
        name = "pass"+num++ +(aToB?"(A to B)":"(B to A)");
       // super.setName(name);
        this.mt=mt;
        this.aToB = aToB;      
    }
    public void run(){
        Thread.currentThread().setName(name);
        if(aToB){
            mt.taketA();
            mt.leavetB();
        }
        else{
            mt.taketB();
            mt.leavetA();
        }
    }
}

3.14.4.4 La classe moniteur 
Modification: le train attend que tous les passagers descendent.


public class Monitor {
    private boolean onA, onB;
    private int passAB,passBA, passMax;
    public Monitor(int passMax ){
        onA=true;
        System.out.println("Train on A");
        onB = false;
        passAB=passBA=0;
        this.passMax=passMax;
    }
    public synchronized void leaveA(){
        while(passBA>0) {     //train wait for lazy passengers to descend
            System.out.println("Train wait "+passBA+" to leave it");
             try{     wait();   }
             catch(InterruptedException e){
                 System.err.println(e);
             }
        }
        onA=false;
        System.out.println("\t\t\t\tTrain travelling A ->B");
    }
    public synchronized void arriveB(){
        System.out.println("\t\t\t\tTrain arrive B");
        onB=true;
        notifyAll();
    }
    public synchronized void leaveB(){
        while(passAB>0) {   //train wait for lazy passengers to descend
            System.out.println("Train wait "+passAB+" to leave it");
             try{     wait();   }
             catch(InterruptedException e){
                 System.err.println(e);
             }
        }
        onB = false;
        System.out.println("\t\t\t\tTrain travelling B ->A");
    }
    public synchronized void arriveA(){
        System.out.println("\t\t\t\tTrain arrive A");
        onA=true;
        notifyAll();
    }
    public synchronized void taketA(){
        while(!onA||((passAB+passBA)>=passMax)){
            System.out.println("\t\t"+Thread.currentThread().getName()+" waiting train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" get the train;");
        passAB++;
        if(passAB+passBA >= passMax) System.out.print(" train is full");
        System.out.println("\t"+passAB+" passengers in the train traveling A-> B");
    }
    public synchronized void taketB(){
        while(!onB||((passAB+passBA)>=passMax)){
            System.out.println("\t\t"+Thread.currentThread().getName()+" waiting train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" get the train;");
        passBA++;
        if(passAB+passBA >= passMax) System.out.print(" train is full");
        System.out.println("\t"+passBA+" passengers in the train traveling B-> A");
    }
    public synchronized void leavetA(){
        while(!onA){
            //System.out.println("\t"+Thread.currentThread().getName()+" traveling in the train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }      
        System.out.print(Thread.currentThread().getName()+" leaving the train and going home; ");
        if(passBA>0)passBA--;
        System.out.println("\t"+passBA+" passengers B-> A still in the train");
        notifyAll();
    }
    public synchronized void leavetB(){
        while(!onB){
            //System.out.println(Thread.currentThread().getName()+" traveling in the train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" leaving the train and going home; ");
        if(passAB>0)passAB--;
        System.out.println("\t"+passAB+" passengers  A-> B still in the train");
        notifyAll();
    }
}


3.14.4.5 Modifier le programme de train afin que les passagers ne peuvent pas prendre le train avant que tous les passagers arrivés par le train à la gare sont descendus.