Friday, October 4, 2013

CyclicBarrier in java : Concurrency

The Java concurrency API provides a synchronizing utility that allows the synchronization of two or more threads in a determined point. It's the CyclicBarrier class. This class is similar to the CountDownLatch class but presents some differences that make them a more powerful class.
There are many situations in concurrent programming where threads may need to wait at a predefined execution point until all other threads reach that point. CyclicBarrier helps provide such a synchronization point. The barrier is called cyclic because it can be re-used after the waiting threads are released.

Easy definition to remember CyclicBarrier
" If you and your friends are going to a Road Trip, and you need to first meet at some common/fixed point from where you all will start your road journey. "

The CyclicBarrier class is initialized with an integer number, which is the number of threads that will be synchronized in a determined point. When one of those threads arrives to the determined point, it calls the await() method to wait for the other threads. When the thread calls that method, the CyclicBarrier class blocks the thread that is sleeping until the other threads arrive. When the last thread calls the await() method of the CyclicBarrier class, it wakes up all the threads that were waiting and continues with its job.

In general,A barrier is used to make a group of threads meet at a barrier point. A thread from a group arriving at the barrier waits until all threads in that group arrive. Once the last thread from the group arrives at the barrier, all threads in the group are released. You can use a barrier when you have a task that can be divided into subtasks; each subtask can be performed in a separate thread and each thread must meet at a common point to combine their results. Following Figure depict how a barrier synchronizer lets a group of three threads meet at the barrier point and lets them proceed.

One interesting advantage of the CyclicBarrier class is that you can pass an additional Runnable object as an initialization parameter, and the CyclicBarrier class executes this object as a thread when all the threads have arrived to the common point. This characteristic makes this class adequate for the parallelization of tasks using the divide and conquer programming technique.

This is some sort similar to CountDownLatch. Cyclic barrier is re-used where as CountDownLatch not. We'll see in the following example before going further.

CyclicBarrier methods:
  • CyclicBarrier(int parties,Runnable barrierAction) : Creates a CyclicBarrier object with the number of threads waiting on it specified. Throws IllegalArgumentException if numThreads is negative or zero.
  • int await() : Blocks until the specified number of threads have called await() on this barrier. The method returns the arrival index of this thread. This method can throw an InterruptedException if the thread is interrupted while waiting for other threads or a BrokenBarrierException if the barrier was broken for some reason

Example 1

Suppose you organized a racing event and in this event exactly 4 cars are required to start a start. So until all cars will come to the race track, race will not start. We simulate this scenario with the help of cyclic barrier.

Sample Output 
4 lanes are created, race will startas soon as all player arrives

Player Ferrai is ready to race
Player Ford is ready to race
Player Skoda is ready to race
Player BMW is ready to race
All player are ready to race
Player BMW k1200S is ready to race
Player Ducati 1098s is ready to race
Player Aprilla RSV 1000R is ready to race
Player Yamaha YZF R1 is ready to race
All player are ready to race

In the main() method you create a CyclicBarrier object. The constructor takes two arguments: the number of threads to wait for, and the thread to invoke when all the threads reach the barrier.
In this case, you have four players to wait for, so you create four threads, with each thread representing a player. The second argument for the CyclicBarrier constructor is the Race object since this thread represents the game, which will start once all four players are ready.
Inside the run() method for each Player thread, you call the await() method on the CyclicBarrier object. Once the number of awaiting threads for the CyclicBarrier object reaches four, the run() method in Race is called

Example 2
Suppose you have split a large job into a n * m tasks, distributed over n threads. m corresponds to a matrix row and you have a total to compute for each row. In that case, threads must be synchronized after each task ending so that the total for the row is compute. In that case, a CyclicBarrier initialized with the number of threads n is used to wait for the end of each row computation (m times in fact).

Sample Output
sum of 1 row: 22
sum of 2 row: 46
sum of 0 row: 10
sum of 3 row: 46
Sum of rows of matrix: 124

The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).

There's more...
The CyclicBarrier class has another version of the await() method:

  • await(long time, TimeUnit unit): The thread will be sleeping until it's interrupted; the internal counter of CyclicBarrier arrives to 0 or specified time passes. 

This class also provides the getNumberWaiting() method that returns the number of threads that are blocked in the await() method, and the getParties() method that returns the number of tasks that are going to be synchronized with CyclicBarrier.

Resetting a CyclicBarrier object
The CyclicBarrier class has some points in common with the CountDownLatch class, but they also have some differences. One of the most important differences is that a CyclicBarrier object can be reset to its initial state, assigning to its internal counter the value with which it was initialized.

This reset operation can be done using the reset() method of the CyclicBarrier class. When this occurs, all the threads that were waiting in the await() method receive a brokenBarrierException exception. This exception was processed in the example presented in this recipe by printing the stack trace, but in a more complex application, it could perform some other operation, such as restarting their execution or recovering their operation at the point it was interrupted.

Broken CyclicBarrier objects
A CyclicBarrier object can be in a special state denoted by broken. When there are various threads waiting in the await() method and one of them is interrupted, this thread receives an InterruptedException exception, but the other threads that were waiting receive a BrokenBarrierException exception and CyclicBarrier is placed in the broken state.
The CyclicBarrier class provides the isBroken() method, then returns true if the object is in the broken state; otherwise it returns false.

Related Post
Semaphore vs CountDownLatch in Java
How thread exchange data with Exchanger class
Daemon Thread in Java

If you know anyone who has started learning Java, why not help them out! Just share this post with them. Thanks for studying today!...