Multicore processors are now widespread across server, desktop, and laptop hardware even on smaller devices such as small-phones and tablets. They open new possibilities for concurrent programming because the threads of a process can be executed on several cores in parallel. One important technique for achieving maximal performance in applications is the ability to split intensive tasks into chunks that can be performed in parallel to maximize the use of computational power.
In this post, we'll starts with a brief recall of starting with the low-level mechanisms of dealing with concurrent programming, then rich primitives added by the java.util.concurrent packages before starting with fork/join tasks.
Concurrent Programming with plain old threads
As we already know, we can write threads through the java.lang.Thread class and the java.lang.Runnable interface, then making sure they behave in a correct and consisted way with respect to shared mutable objects and avoiding incorrect read/write operations while not creating deadlocks type of problems.
Here is an example of basic thread manipulation :
All the code in this example does is create a thread that calculate the sum. The main thread waits for created (child) thread to complete by calling join().
Directly manipulating threads this way is fine for simple examples, but with concurrent programming, such code can quickly become error-prone, especially when several threads need to cooperate to perform a larger task. In such cases, their control flow needs to be coordinated.
For example, the completion of a thread’s execution might depend on other threads having completed their execution. This requirement can be addressed through shared state and condition queues, but you still have to use synchronization by using java.lang.Object.notify() and java.lang.Object.wait() on shared-state objects, which is easy to get wrong.
Finally, a common pitfall is to use synchronize and provide mutual exclusion over large pieces of code or even whole methods. While this approach leads to thread-safe code, it usually yields poor performance due to the limited parallelism that is induced by exclusion being in effect too long.
Rich Primitives with the java.util.concurrent Packages
This packages family offers the following concurrent programming primitives, collections, and features:
- Executors, which are an enhancement over plain old threads because they are abstracted from thread pool management. They execute tasks similar to those passed to threads. Several implementations are provided with thread pooling and scheduling strategies. Also, execution results can be fetched both in a synchronous and asynchronous manner. Check it here
- Thread-safe queues allow for passing data between concurrent tasks
- Rich synchronization patterns that go beyond the mutual exclusion provided by low-level synchronized blocks in Java. These patterns comprise common idioms such as semaphores or synchronization barriers.
- Efficient, concurrent data collections (maps, lists, and sets) that often yield superior performance in multithreaded contexts through the use of copy-on-write and fine-grained locks.
- Atomic variables that shield developers from the need to perform synchronized access by themselves.
- A wide range of locks that go beyond the lock/notify capabilities offered by intrinsic locks, for example, support for re-entrance, read/write locking, timeouts, or poll-based locking attempts.
As an example, we try to find the sum from 1 to 1_000_000 using java.util.concurrent features
This example uses an executor that dispatches work over two threads. The ExecutorService.invokeAll() method takes a collection of Callable instances and waits for the completion of all of them before returning. It returns a list of Future objects, which all represent the “future” result of the computation. If we were to work in an asynchronous fashion, we could test each Future object to check whether its corresponding Callable has finished its work and check whether it threw an exception, and we could even cancel it.
By contrast, when using plain old threads, you must encode cancellation logic through a shared mutable Boolean and cripple the code with periodic checks over this Boolean. Because invokeAll() is blocking, we can directly iterate over the Future instances and fetch their computed sums.
Also note that an executor service must be shut down. If it is not shut down, the Java Virtual Machine will not exit when the main method does, because there will still be active threads around.
Fork/Join Tasks Overview
Executors are a big step forward compared to plain old threads because executors ease the management of concurrent tasks. Some types of algorithms exist that require tasks to create subtasks and communicate with each other to complete. Those are the "divide and conquer" algorithms.
An easy example would be a huge array of integers for which you would like to compute the sum. Given that addition is commutative, one may split the array into smaller portions where concurrent threads compute partial sums. The partial sums can then be added to compute the total sum. Because threads can operate independently on different areas of an array for this algorithm, you will see a clear performance boost on multicore architectures compared to a mono-thread algorithm that would iterate over each integer in the array.
Solving the problem above with executors is easy: Divide the array into the number n of available physical processing units, create Callable instances to compute each partial sum, submit them to an executor managing a pool of n threads, and collect the result to compute the final sum.
The problem with the executors for implementing divide and conquer algorithms is not related to creating subtasks, because a Callable is free to submit a new subtask to its executor and wait for its result in a synchronous or asynchronous fashion. The issue is that of parallelism: When a Callable waits for the result of another Callable, it is put in a waiting state, thus wasting an opportunity to handle another Callable queued for execution.
The fork/join framework added to the java.util.concurrent package in Java SE 7 fills that gap. The Java SE 5 and Java SE 6 versions of java.util.concurrent helped in dealing with concurrency, and the additions in Java SE 7 help with parallelism.
Parallel Fork/Join Framework
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.
- The Fork/Join framework in the java.util.concurrent package helps simplify writing parallelized code.
- This framework is very useful for modeling divide-and-conquer problems. This approach is suitable for tasks that can be divided recursively and computed on a smaller scale; the computed results are then combined.
- The framework is an implementation of the ExecutorService interface and provides an easy-to-use concurrent platform in order to exploit multiple processors.
Term useful for this framework
- Forking : Dividing the task into smaller tasks is forking.
- Joining : Merging the results from the smaller tasks is joining
The center of the fork/join framework is the ForkJoinPool class, an extension of AbstractExecutorService. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTasks. Both ForkJoinPool and ForkJoinTasks are classes.
The key is to recursively subdivide the task into smaller chunks that can be processed by separate threads.
The Fork/Join algorithm is designed as follows:
- split tasks
- fork the tasks
- join the tasks
- compose the results
pseudo-code to be written here
doRecursiveTask(input){
if( task is small enough to handled by a thread){
compute the small task;
if there is result to return, then do so
}else{
divide the task i.e, fork() into two parts
call compute on first task, join on second task, combine both results and return
}
}
Additions for Supporting Parallelism
The core addition is a new ForkJoinPool executor that is dedicated to running instances implementing ForkJoinTask. ForkJoinTask objects support the creation of subtasks plus waiting for the subtasks to complete. With those clear semantics, the executor is able to dispatch tasks among its internal threads pool by “stealing” jobs when a task is waiting for another task to complete and there are pending tasks to be run.
ForkJoinTask objects feature two specific methods:
- The fork() method allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one..
- In turn, the join() method allows a ForkJoinTask to wait for the completion of another one.
Cooperation among tasks happens through fork() and join().
There are two types of ForkJoinTask specializations:
- Instances of RecursiveAction represent executions that do not yield a return value.
- In contrast, instances of RecursiveTask yield return values.
Useful Classes of the Fork/Join
- ForkJoinPool
- ForkJoinTask
- RecursiveTask
- RecursiveAction
ForkJoinPool
ForkJoinPool is the most important class in the Fork/Join framework. It is a thread pool for running fork/join tasks - it executes an instance of ForkJoinTask. It executes tasks and manages their life-cycle.
A ForkJoinPool provides the entry point for submissions from non-ForkJoinTask clients, as well as management and monitoring operations.
A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked IO or other unmanaged synchronization.
In addition to execution and life-cycle control methods, this class provides status check methods (for example getStealCount()) that are intended to aid in developing, tuning, and monitoring fork/join applications. Also, method toString() returns indications of pool state in a convenient form for informal monitoring.
non-ForkJoinTask clients means tasks which are not already engaged in fork/join computations in the current pool.
As is the case with other ExecutorServices, there are three main task execution methods summarized in the following table.
Call from non-fork/join clients
- execute(ForkJoinTask) Arrange async execution
- invoke(ForkJoinTask) Await and obtain result
- submit(ForkJoinTask) Arrange exec and obtain Future
Call from within fork/join computations
- ForkJoinTask.fork() Arrange async execution
- ForkJoinTask.invoke() Await and obtain result
- ForkJoinTask.fork() (ForkJoinTasks are Futures) Arrange exec and obtain Future
ForkJoinPool uses threads in daemon mode, there is typically no need to explicitly shutdown such a pool upon program exit. implementation restricts the maximum number of running threads to 32767. Attempts to create pools with greater than the maximum number result in IllegalArgumentException
ForkJoinTask
This is the Abstract base class for tasks that run within a ForkJoinPool. A ForkJoinTask is a thread-like entity that is much lighter weight than a normal thread-like entity representing a task that defines methods such as fork() and join().
When it comes into picture
A "main" ForkJoinTask begins execution when submitted to a ForkJoinPool. Once started, it will usually in turn start other subtasks. As indicated by the name of this class, many programs using ForkJoinTask employ only methods fork() and join(), or derivatives such as invokeAll. However, this class also provides a number of other methods that can come into play in advanced usages, as well as extension mechanics that allow support of new forms of fork/join processing.
RecursiveTask<V> is a task that can run in a ForkJoinPool; the compute() method returns a value of type V. It inherits from ForkJoinTask.
RecursiveAction is a task that can run in a ForkJoinPool; its compute() method performs the actual computation steps in the task. It is similar to RecursiveTask, but does not return a value.
How to use Fork/Join Framework
Here are the steps to use the framework:
- First, check whether the problem is suitable for the Fork/Join framework or not. Remember: the Fork/Join framework is not suitable for all kinds of tasks. This framework is suitable if your problem fits this description:
- The problem can be designed as a recursive task where the task can be subdivided into smaller units and the results can be combined together.
- The subdivided tasks are independent and can be computed separately without the need for communication between the tasks when computation is in process. (Of course, after the computation is over, you will need to join them together.)
- If the problem you want to solve can be modeled recursively, then define a task class that extends either RecursiveTask or RecursiveAction. If a task returns a result, extend from RecursiveTask; otherwise extend from RecursiveAction.
- Override the compute() method in the newly defined task class. The compute() method actually performs the task if the task is small enough to be executed; or split the task into subtasks and invoke them. The subtasks can be invoked either by invokeAll() or fork() method (use fork() when the subtask returns a value). Use the join() method to get the computed results (if you used fork() method earlier).
- Merge the results, if computed from the subtasks.
- Then instantiate ForkJoinPool, create an instance of the task class, and start the execution of the task using the invoke() method on the ForkJoinPool instance.
- That’s it—you are done.
How Fork/Join Worker Thread Pools works
Fork/Join uses per-thread queuing with work-stealing
- Normally best to have one worker thread per CPU, as design is robust you can have more workers than CPU's
- Each new task is queued in current worker threads's dequeue. Addition to this, there is global queue for new tasks from clients.
- Workers run tasks from their own dequeues in stack based LIFO manner.
- If a worker is idle, it steals a task in FIFO order from another thread's dequeue
Example 1 : This class illustrates how we can compute sum of 1..N numbers using fork/join framework. The range of numbers are divided into half until the range can be handled by a thread. Once the range summation completes, the result gets summed up together.
In the compute() method, you decide whether to compute the sum for the range or subdivide the task further using following condition:
(to - from) <= N/NUM_THREADS)
You use this "threshold" value in this computation. In other words, if the range of values is within the threshold that can be handled by a task, then you perform the computation; otherwise you recursively divide the task into two parts. You use a simple for loop to find the sum of the values in that range. In the other case, you divide the range similarly to how you divide the range in a binary search algorithm: for the range from .. to, you find the mid-point and create two sub-ranges from .. mid and mid + 1 .. to. Once you call fork(), you wait for the first task to complete the computation of the sum and spawn another task for the second half of the computation.
In this program, you arbitrarily assumed the number of threads to use was ten threads. This was to simplify the logic of this program. A better approach to decide the threshold value is to divide the data size length by the number of available processors In other words,
threshold value = (data length size) / (number of available processors);
How do you programmatically get the number of available processors? For that you can use the method Runtime.getRuntime().availableProcessors();
Example 2 : This example illustrates how we can search a key within N numbers using fork/join framework. The range of numbers are divided into half until the range can be handled by a thread.
The key difference between above two programs you used RecursiveAction in the latter instead of RecursiveTask. You made several changes to extend the task class from RecursiveAction. The first change is that the compute() method is not returning anything. Another change is that you used the invokeAll() method to submit the subtasks to execute. Another obvious change is that you carried out search in the compute() method instead of summation in earlier case
Example 3 : As a more concrete simple example, the following task increments each element of an array:
Example 4 : In this example, we'll find the max element using fork/join
How ForkJoinPool differs ExecutorService?
As with any ExecutorService, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
Points to Remember
- It is possible to achieve what the Fork/Join framework offers using basic concurrency constructs such as start() and join(). However, the Fork/Join framework abstracts many lower-level details and thus is easier to use. In addition, it is much more efficient to use the Fork/Join framework instead handling the threads at lower levels. Furthermore, using ForkJoinPool efficiently manages the threads and performs much better than conventional threads pools. For all these reasons, you are encouraged to use the Fork/Join framework.
- Each worker thread in the Fork/Join framework has a work queue, which is implemented using a Deque. Each time a new task (or subtask) is created, it is pushed to the head of its own queue. When a task completes a task and executes a join with another task that is not completed yet, it works smart. The thread pops a new task from the head of its queue and starts executing rather than sleeping (in order to wait for another task to complete). In fact, if the queue of a thread is empty, then the thread pops a task from the tail of the queue belonging to another thread. This is nothing but a work-stealing algorithm.
- It looks obvious to call fork() for both the subtasks (if you are splitting in two subtasks) and call join() two times. It is correct—but inefficient. Why? Well, basically you are creating more parallel tasks than are useful. In this case, the original thread will be waiting for the other two tasks to complete, which is inefficient considering task creation cost. That is why you call fork() once and call compute() for the second task.
- The placement of fork() and join() calls are very important. For instance, let’s assume that you place the calls in following order:
first.fork();
resultFirst = first.join();
resultSecond = second.compute();
This usage is a serial execution of two tasks, since the second task starts executing only after the first is complete. Thus, it is less efficient even than its sequential version since this version also includes cost of the task creation. The take-away: watch your placement of fork/join calls.
If you know anyone who has started learning Java, why not help them out! Just share this post with them.
Thanks for studying today!...