Showing posts with label thread. Show all posts
Showing posts with label thread. Show all posts

Friday, March 20, 2015

Producer Consumer problem in Java using wait and notify

A classic problem in concurrent programming is the producer-consumer problem. We have a data buffer, one or more producers of data that save it in the buffer and one or more consumers of data that take it from the buffer.
As the buffer is a shared data structure, we have to control the access to it using a synchronization mechanism such as the synchronized keyword, but we have more limitations. A producer can't save data in the buffer if it's full and the consumer can't take data from the buffer if it's empty




For these types of situations, Java provides the wait(), notify(), and notifyAll() methods implemented in the Object class. A thread can call the wait() method inside a synchronized block of code. If it calls the wait() method outside a synchronized block of code, the JVM throws an IllegalMonitorStateException exception. When the thread calls the wait() method, the JVM puts the thread to sleep and releases the object that controls the synchronized block of code that it's executing and allows the other threads to execute other blocks of synchronized code protected by that object. To wake up the thread, you must call the notify() or notifyAll() method inside a block of code protected by the same object.

In this article, you will learn how to implement the producer-consumer problem using the synchronized keyword and the wait(), notify(), and notifyAll() methods

Storage class
It has two attributes: an int attribute called maxSize and a LinkedList<Date> attribute called list.

Implement the synchronized method set() to store an event in the storage. First, check if the storage is full or not. If it's full, it calls the wait() method until the storage has empty space. At the end of the method, we call the notifyAll() method to wake up all the threads that are sleeping in the wait() method.

Implement the synchronized method get() to get an event for the storage. First, check if the storage has events or not. If it has no events, it calls the wait() method until the storage has some events. At the end of the method, we call the notifyAll() method to wake up all the threads that are sleeping in the wait() method.

Producer and Consumer
Create a class named Producer and Consumer and specify that it implements the Runnable interface. 
Main Class
Sample Output


How it works...
The key to this example is the set() and get() methods of the EventStorage class. First of all, the set() method checks if there is free space in the storage attribute. If it's full, it calls the wait() method to wait for free space. When the other thread calls the notifyAll() method, the thread wakes up and checks the condition again. The notifyAll() method doesn't guarantee that the thread will wake up. This process is repeated until there is free space in the storage and it can generate a new event and store it.

The behavior of the get() method is similar. First, it checks if there are events on the storage. If the EventStorage class is empty, it calls the wait() method to wait for events. Where the other thread calls the notifyAll() method, the thread wakes up and checks the condition again until there are some events in the storage.

If you run this example, you will see how producer and consumer are setting and getting the events, but the storage never has more than 10 events.




If you know anyone who has started learning java, why not help them out! Just share this post with them. Thanks for studying today!...

Wednesday, November 7, 2012

ThreadLocal in Java

One of the most critical aspects of a concurrent application is shared data. This has special importance in those objects that extend the Thread class or implement the Runnable interface. If you create an object of a class that implements the Runnable interface and then start various Thread objects using the same Runnable object, all the threads share the same attributes. This means that, if you change an attribute in a thread, all the threads will be affected by this change.
Sometimes, you will be interested in having an attribute that won't be shared between all the threads that run the same object. The Java Concurrency API provides a clean mechanism called thread-local variables with a very good performance.

We use ThreadLocal when you have some object that is not thread-safe, but you want to avoid synchronizing access to that object. Instead, we give each thread its own instance of the object. There are many Different ways of implementing thread safe operation like :
What is Thread Safe
Thread is a single line of process when there is multi-threading applications, we means there are multiple process that run through the same line of code. In such situation, there is a chance that one thread will modify/accessing the data of another thread. When data cannot be shared like this, then we make the operation thread safe.

Core concept of ThreadLocal is that every thread that access the ThreadLocal variable through get and set methods of ThreadLocal has own, independently initialized copy of the variable. This class provide the thread-local variables.
How does this thread-local variable differ from their counterpart normal variable is that in each thread that access thread-local variable via its methods has its own,independently initialized copy of the variable. ThreadLocal instance are privately static fields in classes that wish to associate state with a thread.It can be accessed from anywhere inside a thread.

ThreadLocal is a thread-scope like scopes in JSP(session scope).You can set any object in ThreadLocal and this object will be global and local to specific thread which is accessing the object.

  • Global means they can be accessed from anywhere inside the thread if a thread call a methods from different classes, then all the methods can see the ThreadLocal variable set by others methods because they are executing in a same thread. 
  • Local means that each thread will have its own thread local variable.One thread can not access/modify other thread local variable.

Getting ready..
In order to understand the concept, we will develop a program that has the problem exposed as discussed above and another program that solves this problem using the thread-local variables mechanism.
Output:
Starting thread 8 : Sat Aug 09 13:39:32 IST 2014
Starting thread 9 : Sat Aug 09 13:39:34 IST 2014
Starting thread 10 : Sat Aug 09 13:39:36 IST 2014
Thread finished 8 : Sat Aug 09 13:39:36 IST 2014
Starting thread 11 : Sat Aug 09 13:39:38 IST 2014
Thread finished 9 : Sat Aug 09 13:39:38 IST 2014
Starting thread 12 : Sat Aug 09 13:39:40 IST 2014
Thread finished 10 : Sat Aug 09 13:39:40 IST 2014
Thread finished 11 : Sat Aug 09 13:39:40 IST 2014
Thread finished 12 : Sat Aug 09 13:39:40 IST 2014

In the above output, you can see the results of this program's execution. Each Thread has a different start time but, when they finish, all have the same value in its startDate attribute.

Now,we are going to use the thread-local variables mechanism to solve this problem.
Output:
Starting thread 8 : Sat Aug 09 13:49:25 IST 2014
Starting thread 9 : Sat Aug 09 13:49:27 IST 2014
Starting thread 10 : Sat Aug 09 13:49:29 IST 2014
Thread finished 8 : Sat Aug 09 13:49:25 IST 2014
Starting thread 11 : Sat Aug 09 13:49:31 IST 2014
Thread finished 9 : Sat Aug 09 13:49:27 IST 2014
Starting thread 12 : Sat Aug 09 13:49:33 IST 2014
Thread finished 10 : Sat Aug 09 13:49:29 IST 2014
Thread finished 11 : Sat Aug 09 13:49:31 IST 2014
Thread finished 12 : Sat Aug 09 13:49:33 IST 2014

Now, the Thread objects have their own value of the startDate attribute.

Thread-local variables store a value of an attribute for each Thread that uses one of these variables. You can read the value using the get() method and change the value using the set() method. The first time you access the value of a thread-local variable, if it has no value for the Thread object that it is calling, the thread-local variable calls the initialValue() method to assign a value for that Thread and returns the initial value.

When to use Thread Local
Today Whatever the application be like banking, telecommunications etc we need some sort of transaction id, user id for every request. Consider an example, you have servlet that call some business methods. You have requirement to generate a unique ID for every transaction that this servlet will process and then you need to pass this transaction ID to some other business methods, for instance, logging purpose.
One solution will be passing this transaction ID in the parameters to all the business methods.  Suppose if you have more than 10+ methods ,you will pass the transaction ID in the parameters to each methods that will redundant and unnecessary

To solve this, use can use ThreadLocal. First you can generate the transaction ID may be in servlet or filter and sets in a thread local. After this whatever business method this servlet call, can access the transaction ID from ThreadLocal. Servlet can be processing more than one request. Since each request is processed in a separate thread, the transaction ID will be unique to each thread(local) and will be accessible from all over the thread execution.

Example: 
TransactionContext.java : hold the transaction ID

MyThreadLocal.java : container to hold TransactionContext  object.

BuisnessClass.java : contain the business method

BuisnessClass1.java : contain the business method

ThreadLocalExample.java : main class that first generate the ID, then set by calling the set() method.


Output
Thread-0 JavaLatte-91996810
Thread-1 JavaLatte-1049236749
Thread-0 JavaLatte-91996810
Thread-1 JavaLatte-1049236749

If you look at the output, once we set the transaction ID for one thread, then it can be accessible from any business method that it will call.

There's more...
The thread-local class also provides the remove() method that deletes the value stored in the thread-local variable for the thread that it's calling.

The Java Concurrency API includes the InheritableThreadLocal class that provides inheritance of values for threads created from a thread. If a thread A has a value in a thread-local variable and it creates another thread B, the thread B will have the same value as the thread A in the thread-local variable. You can override the childValue() method that is called to initialize the value of the child thread in the thread-local variable. It receives the value of the parent thread in the thread-local variable as a parameter.



If you know anyone who has started learning Java, why not help them out! Just share this post with them. Thanks for studying today!...

Introduction to Executor Framework

Executor Framework (java.util.concurrent.executor) is framework for standardizing invocation, scheduling, execution & control of asynchronous tasks according to a set of execution policy .

As we know, thread are light weighted than creating a new process but still creating them utilizes a lot of resources.
Creating a new thread for each task will consume more stack memory as each thread will have its own stack for storing local variable, references etc. and also CPU spend more time in context switching.
Creating a lot of threads with no bounds on the thread creation cause an application to run of heap memory.
So creating a thread pool is a better solution as finite number of thread can pooled and reused. 

Let Start with a basic interface and classes used in the java.util.concurrent packages:

Interface Executor:
This interface contain only one method
                 void execute(Runnable command)
An Object that execute submitted Runnable tasks.This interface provide a way of decoupling task submission from the mechanism of how each task will be run including details of thread use, scheduling etc.
We will see the example how it is decoupling the task.

An Executor is used instead of explicitly creating threads.
For example:
rather than invoking new Thread(new(RunnableTask())).start() for each set of tasks, you might use 
Executor e=anExecutor;
e.execute(new RunnableTask1());
e.execute(new RunnableTask2());

However, an Executor interface does not strictly require that execution be asynchronous.It might be any of the following form:

  1. an executor can run the submitted task immediately in the caller's thread.(DirectExecutor.java)
  2. task is executed in some other thread other than caller's thread.(ThreadPerTaskExecutor.java)
  3. executor can serialize the task to a second executor.(SerialExecutor.java)

Let see one by one with example:

RunnableTask.java : task to be done
In this class I am doing the sum from number 1 to 100

package javalatte;
public class RunnableTask implements Runnable {
    int sum=0;
    @Override
    public void run() {
        for(int i=0;i<100;i++){
            sum=sum+i;
        }
        System.out.println(Thread.currentThread().getName()+" sum="+sum);
    }    
}


DirectExecutor.java : this class implements the Executor interface and providing the execute method body according to point 1.
package javalatte;
import java.util.concurrent.Executor;

public class DirectExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}


main1.java

package javalatte;
public class main1 {
    public static void main(String a[]) {
        DirectExecutor e = new DirectExecutor();
        for (int i = 0; i < 100; i++) {
            e.execute(new RunnableTask());
        }
    }
}


Output  :

main sum=4950
main sum=4950
main sum=4950
main sum=4950
main sum=4950
...
...
===============================================
ThreadPerTaskExecutor.java : this class implements the Executor interface and providing the execute method body according to point 2

package javalatte;
import java.util.concurrent.Executor;
public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}


Main2.java 

package javalatte;
public class main2 {
    public static void main(String a[]) {
        ThreadPerTaskExecutor e = new ThreadPerTaskExecutor();
        for (int i = 0; i < 100; i++) {
            e.execute(new RunnableTask());
        }
    }
}


output : (it may vary with run)

Thread-0 sum=4950
Thread-2 sum=4950
Thread-1 sum=4950
Thread-3 sum=4950
Thread-5 sum=4950
Thread-7 sum=4950

....
...

================================
SerialExecutor.javathis class implements the Executor interface and providing the execute method body according to point 3


package javalatte;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
public class SerialExecutor implements Executor {
    final Queue<Runnable> task = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;
    public SerialExecutor(Executor executor) {
        this.executor = executor;
    }
    @Override
    public synchronized void execute(final Runnable r) {
        task.offer(
                new Runnable() {

                    @Override
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            next();
                        }
                    }
                });

        if (active == null) {
            next();
        }
    }
    synchronized void next() {
        if ((active = task.poll()) != null) {
            executor.execute(active);
        }
    }
}


main3.java : in this class we are passing the task to second executor we used here ThreadPerTaskExecutor as the second executor


package javalatte;
public class main3 {
    public static void main(String a[]){
        SerialExecutor e=new SerialExecutor(new ThreadPerTaskExecutor());
        for(int i=0;i<1000;i++){
            e.execute(new RunnableTask());
        }
    }
}


output : it will always execute the sequence wise 0,1,2,3,4....

Thread-0 sum=4950
Thread-1 sum=4950
Thread-2 sum=4950
Thread-3 sum=4950
Thread-4 sum=4950
Thread-5 sum=4950
...
..

From the above 3 example, hope you guys got the idea how it decouple the task.

============================

Interface ExecutorService :
public interface ExecutorService extends Executor 

As it is extending the Executor interface , so it become an Executor that provides methods to manage termination and methods that produce a Future for tracking progress of one or more thread in asynchronous tasks.


Class Executors :
This class provide the factory methdos for Executor,ExecutorService,ScheduledExecutorService,ThreadFactory and callable classes defind in the concurrent package.
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html


Now I am sharing the example of Fixed threadpool used the above classes that i have explained :
In this example , i have used fixed number of thread i.e. 3. Only 3 thread will be created to complete the task.


package javalatte;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    private static final int NO_OF_THREAD = 3;
   public static void main(String javalatte[]) {
        ExecutorService executor = Executors.newFixedThreadPool(NO_OF_THREAD);
            for(int i=0;i<1000;i++){
                Runnable worker=new RunnableTask();
                executor.execute(worker);
            }
            /**
             * Method of ExecutorService interface
             */
            executor.shutdown();
            /**
             * Waiting for thread to terminated
             */
            while(!executor.isTerminated()){
                
            }
            System.out.println("Finished");
    }
}


Output :

pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-1 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
pool-1-thread-2 sum=4950
....
......

Note : Runnable interface void run() method has no way of returning any result back to the main method (RunnableTask.java)

Executor Interface has introduced the Callable interface that return a a value from call() method. This means asynchronous task will be able to return value once it done the processing.
Before going for the example of Callable , first we see that Future interface

Future Interface:
Future represent the result of asynchronous computation.
Methods are provides to check :

  • is computation is complete
  • wait for it completion 
  • to retrieve the result of computation 

 Result can be retrieved using get() method when computation is complete,blocking if necessary until it is ready.( we'll see in the example)

CallableTask.java

package javalatte;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<String> {
    @Override
    public String call() {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum = sum + i;
        }
        return Thread.currentThread().getName() + " Sum=" + sum;
    }
}

FixedThreadPoolCallable.java

package javalatte;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
public class FixedThreadPoolCallable {
        private static final int NO_OF_THREAD = 3;
    public static void main(String javalatte[]){
        ExecutorService executor=Executors.newFixedThreadPool(NO_OF_THREAD);
        List<Future<String>> list=new ArrayList<Future<String>> ();
        
        for(int i=0;i<100;i++){
            Callable<String> worker=new CallableTask();
            Future<String> submit=executor.submit(worker);
            list.add(submit);            
        }
        for(Future<String> future :list){
            try{
            System.out.println("Thread:"+future.get());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        executor.shutdown();
    }
}

output :

Thread:pool-1-thread-1 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-3 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
Thread:pool-1-thread-2 Sum=4950
...
...

The Executor implementations provides different execution policies to be set while executing the tasks ,example threadpool support the following polices:

  • newFixedThreadPool
Creates thread as task are submitted , up to maximum pool size and then attempt to keep the pool size constant.
  • newCachedThreadPool
Add threads when demand increase, no bounds on the size of the pool.
  • newSingleThreadExecutor
Single worker thread to process tasks. Guarantees order of execution based on the queue policy
  • newScheduledThreadPool
Fixed size ,support delay and periodic task

Executor framework :is based on producer consume design pattern ,where thread that submit the tasks are the producer and the thread that execute the task are consumer.

In the above example, main thread is the producer as it loops  through and submit the task to the worker thread.
RunnableRask, CallableTask is the consumer that execute the task.