As we know, thread are light weighted than creating a new process but still creating them utilizes a lot of resources.
Creating a new thread for each task will consume more stack memory as each thread will have its own stack for storing local variable, references etc. and also CPU spend more time in context switching.
Creating a lot of threads with no bounds on the thread creation cause an application to run of heap memory.
So creating a thread pool is a better solution as finite number of thread can pooled and reused.
Let Start with a basic interface and classes used in the java.util.concurrent packages:
Interface Executor:
This interface contain only one method
void execute(Runnable command)
An Object that execute submitted Runnable tasks.This interface provide a way of decoupling task submission from the mechanism of how each task will be run including details of thread use, scheduling etc.
We will see the example how it is decoupling the task.
An Executor is used instead of explicitly creating threads.
For example:
rather than invoking new Thread(new(RunnableTask())).start() for each set of tasks, you might use
Executor e=anExecutor;
e.execute(new RunnableTask1());
e.execute(new RunnableTask2());
However, an Executor interface does not strictly require that execution be asynchronous.It might be any of the following form:
- an executor can run the submitted task immediately in the caller's thread.(DirectExecutor.java)
- task is executed in some other thread other than caller's thread.(ThreadPerTaskExecutor.java)
- executor can serialize the task to a second executor.(SerialExecutor.java)
Let see one by one with example:
RunnableTask.java : task to be done
In this class I am doing the sum from number 1 to 100
package javalatte;
public class RunnableTask implements Runnable {
int sum=0;
@Override
public void run() {
for(int i=0;i<100;i++){
sum=sum+i;
}
System.out.println(Thread.currentThread().getName()+" sum="+sum);
}
}
DirectExecutor.java : this class implements the Executor interface and providing the execute method body according to point 1.
package javalatte;
import java.util.concurrent.Executor;
public class DirectExecutor implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
main1.java :
package javalatte;
public class main1 {
public static void main(String a[]) {
DirectExecutor e = new DirectExecutor();
for (int i = 0; i < 100; i++) {
e.execute(new RunnableTask());
}
}
}
Output :
main sum=4950
main sum=4950
main sum=4950
main sum=4950
main sum=4950
...
...
===============================================
ThreadPerTaskExecutor.java : this class implements the Executor interface and providing the execute method body according to point 2
package javalatte;
import java.util.concurrent.Executor;
public class ThreadPerTaskExecutor implements Executor {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
}
Main2.java
package javalatte;
public class main2 {
public static void main(String a[]) {
ThreadPerTaskExecutor e = new ThreadPerTaskExecutor();
for (int i = 0; i < 100; i++) {
e.execute(new RunnableTask());
}
}
}
output : (it may vary with run)
Thread-0 sum=4950
Thread-2 sum=4950
Thread-1 sum=4950
Thread-3 sum=4950
Thread-5 sum=4950
Thread-7 sum=4950
....
...
================================
SerialExecutor.java : this class implements the Executor interface and providing the execute method body according to point 3
package javalatte;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
public class SerialExecutor implements Executor {
final Queue<Runnable> task = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
public SerialExecutor(Executor executor) {
this.executor = executor;
}
@Override
public synchronized void execute(final Runnable r) {
task.offer(
new Runnable() {
@Override
public void run() {
try {
r.run();
} finally {
next();
}
}
});
if (active == null) {
next();
}
}
synchronized void next() {
if ((active = task.poll()) != null) {
executor.execute(active);
}
}
}
main3.java : in this class we are passing the task to second executor we used here ThreadPerTaskExecutor as the second executor
package javalatte;
public class main3 {
public static void main(String a[]){
SerialExecutor e=new SerialExecutor(new ThreadPerTaskExecutor());
for(int i=0;i<1000;i++){
e.execute(new RunnableTask());
}
}
}
Thread-0 sum=4950
Thread-1 sum=4950
Thread-2 sum=4950
Thread-3 sum=4950
Thread-4 sum=4950
Thread-5 sum=4950
...
..
From the above 3 example, hope you guys got the idea how it decouple the task.
============================
Interface ExecutorService :
public interface ExecutorService extends Executor
As it is extending the Executor interface , so it become an Executor that provides methods to manage termination and methods that produce a Future for tracking progress of one or more thread in asynchronous tasks.
Class Executors :
This class provide the factory methdos for Executor,ExecutorService,ScheduledExecutorService,ThreadFactory and callable classes defind in the concurrent package.
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html
Now I am sharing the example of Fixed threadpool used the above classes that i have explained :
In this example , i have used fixed number of thread i.e. 3. Only 3 thread will be created to complete the task.package javalatte;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
private static final int NO_OF_THREAD = 3;
public static void main(String javalatte[]) {
ExecutorService executor = Executors.newFixedThreadPool(NO_OF_THREAD);
for(int i=0;i<1000;i++){
Runnable worker=new RunnableTask();
executor.execute(worker);
}
/**
* Method of ExecutorService interface
*/
executor.shutdown();
/**
* Waiting for thread to terminated
*/
while(!executor.isTerminated()){
}
System.out.println("Finished");
}
}
Output :
pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
....
......
Note : Runnable interface void run() method has no way of returning any result back to the main method (RunnableTask.java)
Executor Interface has introduced the Callable interface that return a a value from call() method. This means asynchronous task will be able to return value once it done the processing.
Before going for the example of Callable , first we see that Future interface
Future Interface:
Future represent the result of asynchronous computation.
Methods are provides to check :
- is computation is complete
- wait for it completion
- to retrieve the result of computation
Result can be retrieved using get() method when computation is complete,blocking if necessary until it is ready.( we'll see in the example)
CallableTask.java
package javalatte;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<String> {
@Override
public String call() {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum = sum + i;
}
return Thread.currentThread().getName() + " Sum=" + sum;
}
}
FixedThreadPoolCallable.java
package javalatte;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
public class FixedThreadPoolCallable {
private static final int NO_OF_THREAD = 3;
public static void main(String javalatte[]){
ExecutorService executor=Executors.newFixedThreadPool(NO_OF_THREAD);
List<Future<String>> list=new ArrayList<Future<String>> ();
for(int i=0;i<100;i++){
Callable<String> worker=new CallableTask();
Future<String> submit=executor.submit(worker);
list.add(submit);
}
for(Future<String> future :list){
try{
System.out.println("Thread:"+future.get());
}catch(Exception e){
e.printStackTrace();
}
}
executor.shutdown();
}
}
output :
Thread:pool-1-thread-1 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-3 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
...
...
The Executor implementations provides different execution policies to be set while executing the tasks ,example threadpool support the following polices:
- newFixedThreadPool
Creates thread as task are submitted , up to maximum pool size and then attempt to keep the pool size constant.
- newCachedThreadPool
Add threads when demand increase, no bounds on the size of the pool.
- newSingleThreadExecutor
Single worker thread to process tasks. Guarantees order of execution based on the queue policy
- newScheduledThreadPool
Fixed size ,support delay and periodic task
Executor framework :is based on producer consume design pattern ,where thread that submit the tasks are the producer and the thread that execute the task are consumer.
In the above example, main thread is the producer as it loops through and submit the task to the worker thread.
RunnableRask, CallableTask is the consumer that execute the task.
Executor framework :is based on producer consume design pattern ,where thread that submit the tasks are the producer and the thread that execute the task are consumer.
In the above example, main thread is the producer as it loops through and submit the task to the worker thread.
RunnableRask, CallableTask is the consumer that execute the task.
your chosen topic is very helpful to all types software developers.
ReplyDeleteweb designing company in coimbatore
seo companies coimbatore
digital marketing company in coimbatore
best web development company in coimbatore
seo services in coimbatore
Brochure designing in coimbatore