Wednesday, November 7, 2012

Introduction to Executor Framework

Executor Framework (java.util.concurrent.executor) is framework for standardizing invocation, scheduling, execution & control of asynchronous tasks according to a set of execution policy .

As we know, thread are light weighted than creating a new process but still creating them utilizes a lot of resources.
Creating a new thread for each task will consume more stack memory as each thread will have its own stack for storing local variable, references etc. and also CPU spend more time in context switching.
Creating a lot of threads with no bounds on the thread creation cause an application to run of heap memory.
So creating a thread pool is a better solution as finite number of thread can pooled and reused. 

Let Start with a basic interface and classes used in the java.util.concurrent packages:

Interface Executor:
This interface contain only one method
                 void execute(Runnable command)
An Object that execute submitted Runnable tasks.This interface provide a way of decoupling task submission from the mechanism of how each task will be run including details of thread use, scheduling etc.
We will see the example how it is decoupling the task.

An Executor is used instead of explicitly creating threads.
For example:
rather than invoking new Thread(new(RunnableTask())).start() for each set of tasks, you might use 
Executor e=anExecutor;
e.execute(new RunnableTask1());
e.execute(new RunnableTask2());

However, an Executor interface does not strictly require that execution be asynchronous.It might be any of the following form:

  1. an executor can run the submitted task immediately in the caller's thread.(DirectExecutor.java)
  2. task is executed in some other thread other than caller's thread.(ThreadPerTaskExecutor.java)
  3. executor can serialize the task to a second executor.(SerialExecutor.java)

Let see one by one with example:

RunnableTask.java : task to be done
In this class I am doing the sum from number 1 to 100

package javalatte;
public class RunnableTask implements Runnable {
    int sum=0;
    @Override
    public void run() {
        for(int i=0;i<100;i++){
            sum=sum+i;
        }
        System.out.println(Thread.currentThread().getName()+" sum="+sum);
    }    
}


DirectExecutor.java : this class implements the Executor interface and providing the execute method body according to point 1.
package javalatte;
import java.util.concurrent.Executor;

public class DirectExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}


main1.java

package javalatte;
public class main1 {
    public static void main(String a[]) {
        DirectExecutor e = new DirectExecutor();
        for (int i = 0; i < 100; i++) {
            e.execute(new RunnableTask());
        }
    }
}


Output  :

main sum=4950
main sum=4950
main sum=4950
main sum=4950
main sum=4950
...
...
===============================================
ThreadPerTaskExecutor.java : this class implements the Executor interface and providing the execute method body according to point 2

package javalatte;
import java.util.concurrent.Executor;
public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}


Main2.java 

package javalatte;
public class main2 {
    public static void main(String a[]) {
        ThreadPerTaskExecutor e = new ThreadPerTaskExecutor();
        for (int i = 0; i < 100; i++) {
            e.execute(new RunnableTask());
        }
    }
}


output : (it may vary with run)

Thread-0 sum=4950
Thread-2 sum=4950
Thread-1 sum=4950
Thread-3 sum=4950
Thread-5 sum=4950
Thread-7 sum=4950

....
...

================================
SerialExecutor.javathis class implements the Executor interface and providing the execute method body according to point 3


package javalatte;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
public class SerialExecutor implements Executor {
    final Queue<Runnable> task = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;
    public SerialExecutor(Executor executor) {
        this.executor = executor;
    }
    @Override
    public synchronized void execute(final Runnable r) {
        task.offer(
                new Runnable() {

                    @Override
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            next();
                        }
                    }
                });

        if (active == null) {
            next();
        }
    }
    synchronized void next() {
        if ((active = task.poll()) != null) {
            executor.execute(active);
        }
    }
}


main3.java : in this class we are passing the task to second executor we used here ThreadPerTaskExecutor as the second executor


package javalatte;
public class main3 {
    public static void main(String a[]){
        SerialExecutor e=new SerialExecutor(new ThreadPerTaskExecutor());
        for(int i=0;i<1000;i++){
            e.execute(new RunnableTask());
        }
    }
}


output : it will always execute the sequence wise 0,1,2,3,4....

Thread-0 sum=4950
Thread-1 sum=4950
Thread-2 sum=4950
Thread-3 sum=4950
Thread-4 sum=4950
Thread-5 sum=4950
...
..

From the above 3 example, hope you guys got the idea how it decouple the task.

============================

Interface ExecutorService :
public interface ExecutorService extends Executor 

As it is extending the Executor interface , so it become an Executor that provides methods to manage termination and methods that produce a Future for tracking progress of one or more thread in asynchronous tasks.


Class Executors :
This class provide the factory methdos for Executor,ExecutorService,ScheduledExecutorService,ThreadFactory and callable classes defind in the concurrent package.
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html


Now I am sharing the example of Fixed threadpool used the above classes that i have explained :
In this example , i have used fixed number of thread i.e. 3. Only 3 thread will be created to complete the task.


package javalatte;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    private static final int NO_OF_THREAD = 3;
   public static void main(String javalatte[]) {
        ExecutorService executor = Executors.newFixedThreadPool(NO_OF_THREAD);
            for(int i=0;i<1000;i++){
                Runnable worker=new RunnableTask();
                executor.execute(worker);
            }
            /**
             * Method of ExecutorService interface
             */
            executor.shutdown();
            /**
             * Waiting for thread to terminated
             */
            while(!executor.isTerminated()){
                
            }
            System.out.println("Finished");
    }
}


Output :

pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
....
......

Note : Runnable interface void run() method has no way of returning any result back to the main method (RunnableTask.java)

Executor Interface has introduced the Callable interface that return a a value from call() method. This means asynchronous task will be able to return value once it done the processing.
Before going for the example of Callable , first we see that Future interface

Future Interface:
Future represent the result of asynchronous computation.
Methods are provides to check :

  • is computation is complete
  • wait for it completion 
  • to retrieve the result of computation 

 Result can be retrieved using get() method when computation is complete,blocking if necessary until it is ready.( we'll see in the example)

CallableTask.java

package javalatte;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<String> {
    @Override
    public String call() {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum = sum + i;
        }
        return Thread.currentThread().getName() + " Sum=" + sum;
    }
}

FixedThreadPoolCallable.java

package javalatte;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
public class FixedThreadPoolCallable {
        private static final int NO_OF_THREAD = 3;
    public static void main(String javalatte[]){
        ExecutorService executor=Executors.newFixedThreadPool(NO_OF_THREAD);
        List<Future<String>> list=new ArrayList<Future<String>> ();
        
        for(int i=0;i<100;i++){
            Callable<String> worker=new CallableTask();
            Future<String> submit=executor.submit(worker);
            list.add(submit);            
        }
        for(Future<String> future :list){
            try{
            System.out.println("Thread:"+future.get());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        executor.shutdown();
    }
}

output :

Thread:pool-1-thread-1 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-3 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
...
...

The Executor implementations provides different execution policies to be set while executing the tasks ,example threadpool support the following polices:

  • newFixedThreadPool
Creates thread as task are submitted , up to maximum pool size and then attempt to keep the pool size constant.
  • newCachedThreadPool
Add threads when demand increase, no bounds on the size of the pool.
  • newSingleThreadExecutor
Single worker thread to process tasks. Guarantees order of execution based on the queue policy
  • newScheduledThreadPool
Fixed size ,support delay and periodic task

Executor framework :is based on producer consume design pattern ,where thread that submit the tasks are the producer and the thread that execute the task are consumer.

In the above example, main thread is the producer as it loops  through and submit the task to the worker thread.
RunnableRask, CallableTask is the consumer that execute the task.




1 comment: