Thread pool

The approach used until now consists of creating a new thread each time a request arrives and processing this new request in the newly created thread. While this approach seems simple to implement, it has significant drawbacks.
Creating and destroying a thread needs a lot of time and consume a lot system resources. Depending of the number of requests, it is possible to arrive to a program that will spend more time and consume more system resources for creating and destroying threads than processing the actual requests.

Because active threads consume system resources, a JVM creating too many threads at the same time can cause the system to run out of memory. This requires limiting the number of threads created.

A thread pool reuses previously created threads to execute current tasks and provides a solution to the problem of thread cycle overhead and resource destruction. Because the thread already exists when the request arrives, the delay introduced by thread creation is eliminated, making the application more responsive.


Java 1.5   .
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

corePoolSize
- the number of threads to keep in
the pool, even if they are idle

maximumPoolSize
- the maximum number of threads
to allow in the pool

keepAliveTime
- when the number of threads is
greater than the core, this is the maximum time
that excess idle threads will wait for new tasks
before terminating.

unit
- the time unit for the keepAliveTime
argument

workQueue
- the queue to use for holding tasks
before they are executed. This queue will hold
only the Runnable tasks submitted by the execute
 
method

      

 
A ThreadPoolExecutor will automatically adjust the pool size (see 
 getPoolSize()) based on the limits set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()).  By setting corePoolSize and maximumPoolSize the same way, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value, such as Integer.MAX_VALUE, you allow the pool to handle an arbitrary number of concurrent tasks. Typically, the core and maximum pool sizes are set only during build, but they can also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).



The choice of number of threads is very important!

Too many threads - waste of time creating unused resources.

Few threads - tasks are waiting.

For more information on number of threads: ideal thread pool size








Example - Fibonacci numbers with printing the time of execution of each thread and the state of the pool

The Fibonacci numbers are a sequence of numbers in mathematics named after Leonardo of Pisa, known as Fibonacci.
The first number of the pattern is 0, the second number is 1, and each number after that is equal to adding the two numbers right before it together.

F0 F1 F2 F3 F4 F5 F6 F7 F8 F9 F10 F11 F12 F13 F14 F15 F16 F17 F18 F19 F20
0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765


import java.util.concurrent.*;
public class ThreadPoolTest {
    public static void main(String[] args) {
        int nTasks = 20;    // number of tasks to be submitted to pool
        long n = 40;       //Fibonacci number
        int tpSize = 5;  // corePoolSize
        LinkedBlockingQueue<Runnable> q;

        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
            tpSize, tpSize+3, 50000L, TimeUnit.MILLISECONDS,
           ( q=new LinkedBlockingQueue<Runnable>()));

/*     public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue)
*/
        System.out.println("Initial number of threads:"+tpe.getActiveCount());
        Task[] tasks = new Task[nTasks];
        for (int i = 0; i < nTasks; i++) {
            tasks[i] = new Task(n, "Task " + i,tpe);
            tpe.execute(tasks[i]);
            System.out.println("submittint task "+i+
                    " number of active threads "+tpe.getActiveCount()+
                    " number of task in the queue "+q.size());
        }
        tpe.shutdown( );
    }
}
---------------------------------------------------------------------------

import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class Task implements Runnable {
    long n;
    String id;
    ThreadPoolExecutor tpe;
    private long fib(long n) {
        if (n == 0)
            return 0L;
        if (n == 1)
            return 1L;
        return fib(n - 1) + fib(n - 2);
    }

    public Task(long n, String id, ThreadPoolExecutor tpe) {
        this.n = n;
        this.id = id;     
        this.tpe=tpe;
    }

    public void run( ) {
        Date d = new Date( );
        DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
        long startTime = System.currentTimeMillis( );
        d.setTime(startTime);
        System.out.println("Starting task " + id + " at " + df.format(d)+ "; active threads:"
                 +tpe.getActiveCount()+" queue size "+tpe.getQueue().size());
        System.out.println("\tfibonatchi "+ n+":"+ fib(n));
        long endTime = System.currentTimeMillis( );
        d.setTime(endTime);
        System.out.print("\tEnding task " + id + " at " + df.format(d) +" after "
                 + (endTime - startTime) + " milliseconds");
        System.out.println(
                " active threads: "+tpe.getActiveCount()+
                " queue size "+tpe.getQueue().size());
    }
}


Fibonacci numbers with printing the time of execution and the time after creation


package pool;

import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class Task1 implements Runnable {
    long n;
    String id;
    ThreadPoolExecutor tpe;
    Date dt;
    long crTime;
    private long fib(long n) {
        if (n == 0)
            return 0L;
        if (n == 1)
            return 1L;
        return fib(n - 1) + fib(n - 2);
    }

    public Task1(long n, String id, ThreadPoolExecutor tpe) {
        this.n = n;
        this.id = id;     
        this.tpe=tpe;
        dt = new Date( );
        DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
        crTime = System.currentTimeMillis( );
        dt.setTime(crTime);
        System.out.println("Creating  task " + id + " at " + df.format(dt)+ "; active threads:"
                 +tpe.getActiveCount());
    }

    public void run( ) {
        Date d = new Date( );
        DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
        long startTime = System.currentTimeMillis( );
        d.setTime(startTime);
        System.out.println("Starting task " + id + " at " + df.format(d)+ "; active threads:"
                 +tpe.getActiveCount());
        System.out.println("\tfibonatchi "+ n+":"+ fib(n));
        long endTime = System.currentTimeMillis( );
        d.setTime(endTime);
        System.out.println("\tEnding task " + id + " at " + df.format(d) +" after "
                 + (endTime - startTime) + " milliseconds");
        System.out.println("\tafter creating " + id + " at " + df.format(d) +" after "
                + (endTime - crTime) + " milliseconds");
    }
}






 One way bridge with Thread Pool

Resources (Bridge) - without modification

public class Bridge {
    private int nVh;
    private boolean open;        // for the bridge
    private int consVehicle;     //consecutive cars in one direction
    private int limit;                 // max consecutive cars
    Bridge(int limit){
        nVh = consVehicle =0;        //nVh>0 - vehicles crossing  from right to left are on the bridge
                             //nVh<0 - vehicles crossing  from left to right are on the bridge
        open =  true;
        this.limit = limit;
    }
    synchronized public int brN(){
        return nVh;  
    }
    synchronized public void takeB(boolean lr ){
        while((nVh>0)&& (lr==true)||
              (nVh<0) && (lr==false) || !open){
            System.out.println("\t"+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println(Thread.currentThread().getName()+" on the bridge");
        if (lr) nVh--;
        else nVh++;
       if( ++ consVehicle >= limit){
            open = false;
            System.out.println ("The bridge is closed");
       }
    }
    synchronized public void leaveB(boolean lr ){
        if (nVh>0) nVh--;
        else nVh++;
        System.out.println("\t\t"+Thread.currentThread().getName()+" leave the bridge");
        if (nVh  == 0){
            open = true;
            consVehicle =0;
            System.out.println ("The bridge is open");
       }
       notifyAll();
    }
}

Vehicle  -  instead of the Thread, it's a Runnable task. The name of the thread is set not in the constructor of the vehicle, but  in the beginning of the run method.

public class Vehicle implements Runnable{
    boolean lr;
    Bridge b;
    String name;
    static int num;
    Vehicle(boolean lr, Bridge b){
        this.lr=lr;
        this.b = b;
        name = "V "+ ++num + (lr?" left->":" <-right");
    }
    public void run(){
        Thread.currentThread().setName(name);
        try {           //arriving to bridge
            Thread.sleep(20);
        } catch (InterruptedException e){}
        b.takeB(lr);
        try {         // crossing the bridge
            Thread.sleep(300);
        } catch (InterruptedException e){}
        b.leaveB(lr);
    }
}


The application Circ creates the bridge and a thread pool with a fixed number of threads. It also generates vehicles and transfers them to the thread pool.

import java.util.concurrent.*;
public class Circ {
    public static void main(String arg[]){
        int tpSize = 5;  // corePoolSize

        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
            tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
           ( new LinkedBlockingQueue<Runnable>( )));
        
        Bridge b = new Bridge(5);
        for(int i = 0; i < 20; i++){
            try {          
                Thread.sleep(20);
            } catch (InterruptedException e){}
            tpe.execute(new Vehicle(Math.random()>0.5?true:false, b));
        }
        tpe.shutdown();
    }
}


To demonstrate the pool's operation in detail :

import java.util.concurrent.*;
public class Circ {
    public static void main(String arg[]){
        int tpSize = 5;  // corePoolSize

        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
            tpSize, tpSize+2, 50000L, TimeUnit.MILLISECONDS,
           ( new LinkedBlockingQueue<Runnable>( 13)));
        
        Bridge b = new Bridge(5);
        for(int i = 0; i < 20; i++){
            try {          
                Thread.sleep(20);
            } catch (InterruptedException e){}
            tpe.execute(new Vehicle(Math.random()>0.5?true:false, b,tpe));
            System.out.println("\t\t\t\tnew vehicle arrived, queue size:"+tpe.getQueue().size());
        }
        tpe.shutdown();
    }
}

//==========================================================

import java.util.concurrent.ThreadPoolExecutor;

public class Vehicle implements Runnable{
    boolean lr;
    Bridge b;
    String name;
    static int num;
    ThreadPoolExecutor tpe;
    Vehicle(boolean lr, Bridge b, ThreadPoolExecutor tpe){
        this.lr=lr;
        this.b = b;
        name = "V "+ ++num + (lr?" left->":" <-right");
        this.tpe=tpe;
    }
    public void run(){
        Thread.currentThread().setName(name);
        System.out.println("\t\t\t\tnew thread working: "+name);
        System.out.println("\t\t\t\tactive threads: "+tpe.getActiveCount()+
                "\tqueue size:" + tpe.getQueue().size());
        try {           //arriving to bridge
            Thread.sleep(20);
        } catch (InterruptedException e){}
        b.takeB(lr);
        try {         // crossing the bridge
            Thread.sleep(1500);
        } catch (InterruptedException e){}
        b.leaveB(lr);
    }
}





Swimming pool with Thread Pool


Resources - without change

Cabin

public class Cabin {
    private int free;
    Cabin(int free){
        this.free = free;
    }
    synchronized void takeCabin(){
        while(free==0){
            System.out.println("there is no free cabin, "+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        free--;
        System.out.println("the cabin is taken by "+Thread.currentThread().getName()+", there is "+free+" free cabines");
    }
    synchronized void releaseCabin(){
        free++;
        System.out.println("the cabin is released by "+Thread.currentThread().getName()+", there is "+free+" free cabines");
        notifyAll();
    }
}

Basket

public class Basket {
    private  int free;
    Basket(int free){
        this.free = free;
    }
    synchronized void takeBasket(){
        while(free==0){
            System.out.println("there is no free basket, "+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        free--;
        System.out.println("the basket is taken by "+Thread.currentThread().getName()+", there is "+free+" free baskets");
    }
    synchronized void releaseBasket(){
        free++;
        System.out.println("the basket is released by "+Thread.currentThread().getName()+", there is "+free+" free baskets");
        notifyAll();
    }
}

Client - instead of the Thread, it's a Runnable task

public class Client implements Runnable{
    String  name;
    static int n=0;
    Cabin c;
    Basket b;
    Client(Cabin c, Basket b){
        name = "Client "+ ++n;
        this.c=c;
        this.b = b;
        try {
            System.out.println(" creating new client:"+name);
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
    }
    public void run(){
        try {
            System.out.println(this+" going to the swim pool");
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take basket");
        b.takeBasket();
        try {
            System.out.println(this+" going to the Cabin");
            Thread.sleep((int)(Math.random()*50));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take Cabin");
        c.takeCabin();
        try {
            System.out.println(this+" changing");
            Thread.sleep((int)(Math.random()*600));
        } catch (InterruptedException e){}
        System.out.println(this+" release cabin");
        c.releaseCabin();
        try {       
            System.out.println(this+" swimimg");
            Thread.sleep((int)(Math.random()*2000));
        } catch (InterruptedException e){}
        System.out.println(this+" try to take Cabin");
        c.takeCabin();
        try {       
            System.out.println(this+" changing");
            Thread.sleep((int)(Math.random()*600));
        } catch (InterruptedException e){}
        System.out.println(this+" release cabin");
        c.releaseCabin();
        System.out.println(this+" release basket");
        b.releaseBasket();
        System.out.println(this+" going home");
    }
    public String toString(){
        return name;
    }
}

Thread Pool

import java.util.concurrent.*;
public class SwimPool {
    public static void main(String[] args) {
        int coreThr=7;
        LinkedBlockingQueue<Runnable> q;
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
                coreThr, coreThr, 5000L, TimeUnit.MILLISECONDS,
                (q=new LinkedBlockingQueue<Runnable>( )));

        Cabin c=new Cabin(2);
        Basket b = new Basket(3);
        for(int i = 0; i<15;i++){
            tpe.execute(new Client(c,b));
            System.out.println("next client,there is "+q.size()+" elements in queue");
            try{
                Thread.sleep(50);
             } catch (InterruptedException e){}
        }
        tpe.shutdown();
    }
}





Train with Thread Pool

Model Startup – completely rewritten
20 travelers randomly generated, every 100 ms
Two ThreadPoolExecutor with tpsize active threads
  tpsize-3 passengers maximum on the train.


import java.util.concurrent.*;
public class Circ {
    public static void main(String arg[]){
        int tpsize = 8;  // corePoolSize
        LinkedBlockingQueue<Runnable> ql,qr;
        Pass pass;
        ThreadPoolExecutor left, right;
        left = new ThreadPoolExecutor(tpsize,tpsize,20,TimeUnit.MILLISECONDS,
                ( ql=new LinkedBlockingQueue<Runnable>( )));
        right = new ThreadPoolExecutor(tpsize,tpsize,20,TimeUnit.MILLISECONDS,
                ( qr=new LinkedBlockingQueue<Runnable>( )));
        Monitor mt = new Monitor(tpsize-3);
        Train train = new Train(mt);
        train.start();
        for(int i=0;i<20;i++){
            pass = new Pass(mt,Math.random()>0.5?true:false);
            if (pass.aToB) {
                left.execute(pass);
            }
            else {
                right.execute(pass);
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e){}
        }
        left.shutdown();
        right.shutdown();
    }
}

The train - without modifications
public class Train extends Thread{
    Monitor mt;
    public Train(Monitor mt){
        this.mt=mt;   
    }
    public void run(){
        for(int i=0;i<3;i++){
            try{
                sleep(1000);
            }
            catch(InterruptedException e){}
            mt.leaveA();
            try{
                sleep(2000);
            }
            catch(InterruptedException e){}  
            mt.arriveB();
            try{
                sleep(1000);
            }
            catch(InterruptedException e){}
            mt.leaveB();
            try{
                sleep(2000);
            }
            catch(InterruptedException e){}  
            mt.arriveA();
        }
    }
}

The passengers
public class Pass implements Runnable{
    boolean aToB;
    static int num=1;
    Monitor mt;
    String name;
    Pass(Monitor mt,boolean aToB){
        name = "pass"+num++ +(aToB?"(A to B)":"(B to A)");
       // super.setName(name);
        this.mt=mt;
        this.aToB = aToB;      
    }
    public void run(){
        Thread.currentThread().setName(name);
        if(aToB){
            mt.taketA();
            mt.leavetB();
        }
        else{
            mt.taketB();
            mt.leavetA();
        }
    }
}

Monitor class - without modification (train waits for all passengers to get off).
public class Monitor {
    private boolean onA, onB;
    private int passAB,passBA, passMax;
    public Monitor(int passMax ){
        onA=true;
        System.out.println("Train on A");
        onB = false;
        passAB=passBA=0;
        this.passMax=passMax;
    }
    public synchronized void leaveA(){
        while(passBA>0) {     //train wait for lazy passengers to descend
            System.out.println("Train wait "+passBA+" to leave it");
             try{     wait();   }
             catch(InterruptedException e){
                 System.err.println(e);
             }
        }
        onA=false;
        System.out.println("\t\t\t\tTrain travelling A ->B");
    }
    public synchronized void arriveB(){
        System.out.println("\t\t\t\tTrain arrive B");
        onB=true;
        notifyAll();
    }
    public synchronized void leaveB(){
        while(passAB>0) {   //train wait for lazy passengers to descend
            System.out.println("Train wait "+passAB+" to leave it");
             try{     wait();   }
             catch(InterruptedException e){
                 System.err.println(e);
             }
        }
        onB = false;
        System.out.println("\t\t\t\tTrain travelling B ->A");
    }
    public synchronized void arriveA(){
        System.out.println("\t\t\t\tTrain arrive A");
        onA=true;
        notifyAll();
    }
    public synchronized void taketA(){
        while(!onA||((passAB+passBA)>=passMax)){
            System.out.println("\t\t"+Thread.currentThread().getName()+" waiting train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" get the train;");
        passAB++;
        if(passAB+passBA >= passMax) System.out.print(" train is full");
        System.out.println("\t"+passAB+" passengers in the train traveling A-> B");
    }
    public synchronized void taketB(){
        while(!onB||((passAB+passBA)>=passMax)){
            System.out.println("\t\t"+Thread.currentThread().getName()+" waiting train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" get the train;");
        passBA++;
        if(passAB+passBA >= passMax) System.out.print(" train is full");
        System.out.println("\t"+passBA+" passengers in the train traveling B-> A");
    }
    public synchronized void leavetA(){
        while(!onA){
            //System.out.println("\t"+Thread.currentThread().getName()+" traveling in the train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }      
        System.out.print(Thread.currentThread().getName()+" leaving the train and going home; ");
        if(passBA>0)passBA--;
        System.out.println("\t"+passBA+" passengers B-> A still in the train");
        notifyAll();
    }
    public synchronized void leavetB(){
        while(!onB){
            //System.out.println(Thread.currentThread().getName()+" traveling in the train");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.print(Thread.currentThread().getName()+" leaving the train and going home; ");
        if(passAB>0)passAB--;
        System.out.println("\t"+passAB+" passengers  A-> B still in the train");
        notifyAll();
    }
}