Wednesday, March 26, 2014

Executor, ExecutorService, ThreadPool, Callable vs Runnable, Thread Factory, ThreadLocalRandom and Future in Java

Executor|ExecutorService|ThreadPool|Thread Factory|ThreadLocalRandom
You can directly create and manage threads in the application by creating Thread objects. However, if you want to abstract away the low-level details of multi-threaded programming, you can make use of the Executor interface. In this post, we'll how to use Executor, ExecutorService, Callable, Future interface and ThreadPool with examples.

Executor Interface
Executor is an interface that declares only one method.

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.This may not look like a big interface by itself, but its derived classes (or interfaces), such as ExecutorService, ThreadPoolExecutor, and ForkJoinPool, support useful functionality.

For example, rather than invoking new Thread(new(RunnableTask())).start() for each of a set of tasks, you might use:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executor Example

In MyExecutor, tasks are executed in some thread other than the caller's thread. The executor below spawns a new thread for each task. You can make another implementation where an executor can run the submitted task immediately in the caller's thread.

Many Executor implementations impose some sort of limitation on how and when tasks are scheduled. The executor below serializes the submission of tasks to a second executor, illustrating a composite executor.
In this example, we'll ArrayDeque for storing Runnable task

Thread Pool

Source : Wikipedia
A thread pool is a group of threads initially created that waits for jobs and executes them. The idea is to have the threads always existing, so that we won't have to pay overhead time for creating them every time

Callable interface
A task that returns a result and may throw an exception. Implementors define a single method with no arguments called call.
Because you cannot pass a Callable into a Thread to execute, you instead use the ExecutorService to execute the Callable object. The service accepts Callable objects to run by way of the submit() method

Difference between the Runnable and Callable interface
The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

To execute a task using the Callable object, you first create a thread pool. A thread pool is a collection of threads that can execute tasks. You create a thread pool using the Executors utility class. This class provides methods to get instances of thread pools, thread factories, etc.

ExecutorService interface
An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks. The ExecutorService interface implements the Executor interface and provides services such as termination of threads and production of Future objects. Some tasks may take considerable execution time to complete. So, when you submit a task to the executor service, you get a Future object.

Interface Future
Future represents objects that contain a value that is returned by a thread in the future (i.e., it returns the value once the thread terminates in the "future"). You can use the isDone() method in the Future class to check if the task is complete and then use the get() method to fetch the task result. If you call the get() method directly while the task is not complete, the method blocks until it completes and returns the value once available.
In the next example, we create a Factorial program that implements Callable so that it can be passed to ExecutorService and get executed as a task

In this program, you have a Factorial class that implements Callable. Since the task is to compute the factorial of a number N, the task needs to return a result. 
Inside the Factorial class, you define the call() method that actually performs the task. In the CallableExample class, you first create an instance of the Factorial class. You then need to execute this task. For the sake of simplicity, you get a singled-threaded executor by calling the newSingleThreadExecutor() method in the Executors class. Note that you could use other methods such as newFixedThreadPool(nThreads) to create a thread pool with multiple threads depending on the level of parallelism you need.

Once you get an ExecutorService, you submit the task for execution. ExecutorService abstracts details such as when the task is executed, how the task is assigned to the threads, etc. You get a reference to Future<Long> when you call the submit(task) method. From this future reference, you call the get() method to fetch the result after completing the task.
If the task is still executing when you call future.get(), this get() method will block until the task execution completes.

Advantage of ExecutorService 
Executor may be a simple interface, but it forms the basis for a flexible and powerful framework for asynchronous task execution that supports a wide variety of task execution policies.
  • Executor service manage thread in asynchronous way.
  • Use callable to get the return result after thread completion.
  • Shutdown provide capability for completion of all thread assigned work.
  • It provides mechanisms for safely starting, closing down, submitting, executing, and blocking on the successful or abrupt termination of tasks.
  • It provides a standard means of decoupling task submission from task execution, describing tasks as Runnable.
  • The Executor implementations also provide lifecycle support and hooks for adding statistics gathering, application management, and monitoring.
  • Rather than spending your time implementing the underlying infrastructure for parallelism, the concurrent framework allows you to instead focus on structuring tasks, dependencies, potential parallelism.
  • It is easy to managing/scheduling several threads.
  • This is especially useful if your program needs to run several threads at once. For example you want to execute two threads at a time.

newFixedThreadPool constructor
At any point, at most n threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Example : How Executor service is useful when you need to run run several threads at once.

ExecutorService is a full Producer-Consumer implementation
ExecutorService is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnables (FutureTask to be precise) holding tasks you submit(). All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit() a task, exactly one thread will pick it up and run it. Of course submit() is not waiting for thread in the pool to finish processing. On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.

ThreadFactory is an interface that is meant for creating threads instead of explicitly creating threads by calling new Thread().An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.

For example, assume that you often create high-priority threads. You can create a MaxPriorityThreadFactory to set the default priority of threads created by that  factory to maximum priority 

With the use of ThreadFactory, you can reduce boilerplate code to set thread priority, name, thread-pool, etc.

ThreadLocalRandom Class
In concurrent programming, you’ll find that there is often a need to generate random numbers.
Using Math.random() is not efficient for concurrent programming. For this reason, the java.util.concurrent package introduces the ThreadLocalRandom class, which is suitable for use in concurrent programs. You can use ThreadLocalRandom.current() and then call methods such as nextInt() and nextFloat() to generate the random numbers.

Final example : In this program We create a class that sums the values from 1..N where N is a large number. We divide the task to sum the numbers to 10 threads. Once computation is complete, we add the results of all the threads

If you know anyone who has started learning Java, why not help them out! Just share this post with them. 
Thanks for studying today!...

Friday, March 21, 2014

Stream and Lambda examples in Java 8

Stream|Parallel Stream|Lambda examples Java 8
In this post, we'll see how Stream concept works in Java 8, its characteristics, how pipelines operations works in Stream with examples. This post help you to understand and grasp the basic knowledge of writing Stream, its usefulness and working.

What is a Stream in Java
"Stream is a wrappers around collections that support many convenient and high-performance operations expressed compactly but clearly with lambdas."
Streams are not collections: they do not manage their own data. Instead, they are wrappers around existing data structures. When you make or transform a Stream, it does not copy the underlying data. Instead, it just builds a pipeline of operations. How many times that pipeline will be invoked depends on what you later do with the stream.

Characteristics of Streams

  • No storage : A stream is not a data structure that stores elements; instead, it conveys elements from a source such as a data structure, an array, a generator function, or an I/O channel, through a pipeline of computational operations.
  • Designed for lambdas : All Stream operations take lambdas as arguments.
  • Functional in nature : operation on a stream produces a result, but does not modify its source. For example, filtering a Stream obtained from a collection produces a new Stream without the filtered elements, rather than removing elements from the source collection.
  • Possibly unbounded : While collections have a finite size, streams need not. Short-circuiting operations such as limit(n) or findFirst() can allow computations on infinite streams to complete in finite time.
  • Consumable : The elements of a stream are only visited once during the life of a stream. Like an Iterator, a new stream must be generated to revisit the same elements of the source.
  • Do not support indexed access : You can ask for the first element, but not the second or third or last element. But, see next bullet.
  • Laziness-seeking : Many Stream operations are postponed until it is known how much data is eventually needed.
  • Parallelizable : If you designate a Stream as parallel, then operations on it will automatically be done concurrently, without having to write explicit multi-threading code.

How Stream operations and pipelines works

Stream operations are divided into 2 operations:

  1. Intermediate operation
  2. Terminal operations.

There are combined to form stream pipelines.

A stream pipeline consists 3 parts

  1. A source (such as a Collection, an array, a generator function, or an I/O channel).
  2. Followed by zero or more intermediate operations such as Stream.filter or
  3. A terminal operation such as Stream.forEach or Stream.reduce.

Intermediate operations
Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate.
Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
Intermediate operations are further divided into operations:

  1. Stateless operations
  2. Stateful operations
Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements.

Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements. Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream.

As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.

Intermediate methods

  • map (and related mapToInt, flatMap, etc.)
  • filter
  • distinct
  • sorted
  • peek
  • limit
  • substream
  • parallel
  • sequential
  • unordered

Terminal operations
Terminal operations, such as Stream.forEach or IntStream.sum, may traverse the stream to produce a result or a side-effect. After the terminal operation is performed, the stream pipeline is considered consumed, and can no longer be used; if you need to traverse the same data source again, you must return to the data source to get a new stream.
In almost all cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning. Only the terminal operations iterator() and spliterator() are not.

Processing streams lazily allows for significant efficiencies. It allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. 

Further, some operations are deemed short-circuiting operations.

  • An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result.
  • A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time.
Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

Terminal methods

  • forEach
  • forEachOrdered
  • toArray
  • reduce
  • collect
  • min
  • max
  • count
  • anyMatch
  • allMatch
  • noneMatch
  • findFirst
  • findAny
  • iterator
Short-circuit methods
  • anyMatch
  • allMatch
  • noneMatch
  • findFirst
  • findAny
  • limit
  • substream

How Stream can be obtained
Streams can be obtained in a number of ways:
  1. From a Collection via the stream() and parallelStream() methods.
  2. From an array via[]).
  3. From static factory methods on the stream classes, such as Stream.of(Object[]), IntStream.range(int, int) or Stream.iterate(Object, UnaryOperator).
  4. The lines of a file can be obtained from BufferedReader.lines().
  5. Streams of file paths can be obtained from methods in Files.
  6. Streams of random numbers can be obtained from Random.ints().
  7. Numerous other stream-bearing methods in the JDK, including, Pattern.splitAsStream(java.lang.CharSequence), and

Here we see the basic one here to understand Steam first -  From a Collection.

Collection is interface containing methods such as stream() and parallelStream() implementing by all the collections such as ArrayList, LinkedList etc.
Streams is a interface having useful methods. Before moving to out first example, let understand few method of Stream interface

Here Interface Predicate<T> is a functinal interface with abstract method.
boolean test(T t)
So in place of Predicate we always use lambda expression that return always take an input and return boolean result. If you don't have an idea about lambda expression, check this post.

Here Interface Consumer<T> is a functinal interface with abstract method.
void accept(T t)
So in place of Consumer we always use lambda expression that return always take an input and return nothing. If you don't have an idea about lambda expression, check this post.

Examples to explain the above concept.

User defined Stream examples:

Map( ) in Stream
Stream operations include map(), which applies a function across each element present within a Stream to produce a result out of each element. So, for example, we can obtain the age of each Student in the collection by applying a simple function to retrieve the age out of each Student.

ToIntFunctionToDoubleFunction and ToLongFunction are functional interface that accept one argument and return result.

Making Streams from Primitives

  • Stream.of(val1, val2, ...)
  • Stream.of(someArray)

You always have be cautious while making Streams from Primitives.

How it works
Suppose in a list of integers such as 1, 2, 3, 4, and 5, the seed 0 is added to 1 and the result (1) is stored as the accumulated value, which then serves as the left-hand value in addition to serving as the next number in the stream (1+2). The result (3) is stored as the accumulated value and used in the next addition (3+3). The result (6) is stored and used in the next addition (6+4), and the result is used in the final addition (10+5), yielding the final result 15.

All streams operations can execute either in serial or in parallel. The stream implementations in the JDK create serial streams unless parallelism is explicitly requested.

For example, Collection has methods and Collection.parallelStream(), which produce sequential and parallel streams respectively; other stream-bearing methods such as IntStream.range(int, int) produce sequential streams but these streams can be efficiently parallelized by invoking their BaseStream.parallel() method.

The only difference between the serial and parallel versions of this example is the creation of the initial stream, using "parallelStream()" instead of "stream()".
When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked. Whether a stream will execute in serial or parallel can be determined with the isParallel() method, and the orientation of a stream can be modified with the BaseStream.sequential() and BaseStream.parallel() operations. When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the mode of the stream on which it is invoked.

Most stream operations accept parameters that describe user-specified behavior, which are often lambda expressions. To preserve correct behavior, these behavioral parameters must be non-interfering, and in most cases must be stateless. Such parameters are always instances of a functional interface such as Function, and are often lambda expressions or method references.
Here non-interfering means: For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline.
Related Post
Basic of functional interface and Lambda in Java 8
Lambda and effectively final with examples in Java 8
How to use :: method reference in Java 8
Metaspace in Java 8

If you know anyone who has started learning Java, why not help them out! Just share this post with them. 
Thanks for studying today!...

Wednesday, March 19, 2014

Metaspace in Java 8

Metaspace | Removal of Permanent Generation Java 8
In this post, we'll see one of the JVM update i.e, removal of Permanent Generation. Here we'll see why there were need of removal of Permanent Generation and it alternative Metaspace. This is the continuation of previous post on memory management & Garbage collection.

This is how Heap Structure look like in Java 6

Permanent Generation
The pool containing all the reflective data of the virtual machine itself, such as class and method objects. With Java VMs that use class data sharing, this generation is divided into read-only and read-write areas.
The Permanent generation contains metadata required by the JVM to describe the classes and methods used in the application. The permanent generation is populated by the JVM at runtime based on classes in use by the application. In addition, Java SE library classes and methods may be stored here.
Classes may get collected (unloaded) if the JVM finds they are no longer needed and space may be needed for other classes. The permanent generation is included in a full garbage collection
  • Region of Java Heap for JVM Class Metadata.
  • Hotspot’s internal representation of Java Classes.
  • Class hierarchy information, fields, names
  • Method compilation information and bytecodes
  • Variables
  • Constant pool and symbolic resolution

PermGen Size

  • Limited to MaxPermSize – default ~64M - 85M
  • Contiguous with Java Heap : Identifying young references from old gen and permgen would  be more expensive and complicated with a non-contiguous heap – card table(A kind of remembered set that records where oops have changed in a generation).
  • Once exhausted throws OutOfMemoryError "PermGen space".
    • Application could clear references to cause class unloading.
    • Restart with larger MaxPermSize.
  • Size needed depends on number of classes, size of methods, size of constant pools.

Why was PermGen Eliminated?
  • Fixed size at startup – difficult to tune.
    • -XX:MaxPermSize=?
  • Internal Hotspot types were Java objects : Could move with full GC, opaque, not strongly typed and hard to debug, needed meta-metadata.
  • Simplify full collections : Special iterators for metadata for each collector
  • Want to deallocate class data concurrently and not during GC pause
  • Enable future improvements that were limited by PermGen.

Where did JVM Metadata go now?

The Permanent Generation (PermGen) space has completely been removed and is kind of replaced by a new space called Metaspace.
The consequences of the PermGen removal is that obviously the PermSize and MaxPermSize JVM arguments are ignored and you will never get a java.lang.OutOfMemoryError: PermGen error.
The JDK 8 HotSpot JVM is now using native memory for the representation of class metadata and is called Metaspace.

  • Take advantage of Java Language Specification property : Classes and associated metadata lifetimes match class loader’s.
  • Per loader storage area – Metaspace
  • Linear allocation only
  • No individual reclamation (except for RedefineClasses and class  loading failure)
  • No GC scan or compaction
  • No relocation for metaspace objects
  • Reclamation en-masse when class loader found dead by GC

In Metaspace memory allocation model

  • Most allocations for the class metadata are now allocated out of native memory.
  • The classes that were used to describe class metadata have been removed.
  • Multiple mapped virtual memory spaces allocated for metadata.
  • Allocate per-class loader chunk lists
    • Chunk sizes depend on type of class loader.
    • Smaller chunks for sun/reflect/Delegating ClassLoader.
  • Return chunks to free chunk lists.
  • Virtual memory spaces returned when emptied.
  • Strategies to minimize fragmentation.

We see how virtual memory space is allocated for metadata  and how it loaded per -class loader with this picture

You can see how virtual memory space(vs1,vs2,vs3) allocated and how per-class loader chunk is allocated. CL -  class loader

Understanding of  _mark and _klass pointer
To understand the next diagram, you need to have an idea of these pointers.
In the JVM, every object has a pointer to its class, but only to its concrete class and not to its interface or abstract class.
For 32 bit JVM:
_mark : 4 byte constant
_klass : 4 byte pointer to class

The second field ( _klass ) in the object layout in memory (for a 32-bit JVM, the offset is 4, for a 64-bit JVM offset is 8 from the address of an object in memory) points to the class definition of object in memory.
For 64 bit JVM:
_mark : 8 byte constant
_klass : 8 byte pointer to class

For 64 bit JVM with compressed-oops:
_mark : 8 byte constant
_klass : 4 byte pointer to class

HotSpot Glossary of Terms

Java Object Memory Layout

Compressed Class Pointer Space
This is case where we compressed the class pointer space and this is only for 64 bit platforms.
For 64 bit platforms, to compress JVM _klass pointers in objects, introduce a compressed class pointer space

Java Object Memory Layout with Compressed Pointers

Summary of Compressed Pointers
  • Default for 64 bit platforms.
  • Compressed object pointers -XX:+UseCompressedOops
    • "oops" are "ordinary" object pointers.
    • Object pointers are compressed to 32 bits in objects in Java Heap.
    • Using a heap base (or zero if Java Heap is in lower 26G memory).
  • Compressed Class Pointers -XX:+UseCompressedClassPointers.
  • Objects have a pointer to VM Metadata class (2nd word) compressed to 32 bits.
  • Using a base to the compressed class pointer space.

Difference between Metaspace vs. Compressed Class Pointer Space
  • Compressed Class Pointer Space contains only class metadata.
    • InstanceKlass, ArrayKlass
      • Only when UseCompressedClassPointers true.
      • These include Java virtual tables for performance reasons.
      • We are still shrinking this metadata type.
  • Metaspace contains all other class metadata that can be large.
    • Methods, Bytecodes, ConstantPool ...

Metaspace Tuning
The maximum metaspace size can be set using the -XX:MaxMetaspaceSize flag, and the default is unlimited, which means that only your system memory is the limit. The -XX:MetaspaceSize tuning flag defines the initial size of metaspace If you don’t specify this flag, the Metaspace will dynamically re-size depending of the application demand at runtime.

Tuning Flags - MaxMetaspaceSize

  • -XX:MaxMetaspaceSize={unlimited}
  • Metaspace is limited by the amount of memory on your machine.
  • Limit the memory used by class metadata before excess swapping and native allocation failure occur.
    • Use if suspected class loader memory leaks.
    • Use on 32 bit if address space could be exhausted.
  • Initial MetaspaceSize 21 mb – GC initial high water mark for doing a full GC to collect classes.
    • GC's are done to detect dead classloaders and unload classes.
  • Set to a higher limit if doing too many GC’s at startup.
  • Possibly use same value set by PermSize to delay initial GC.
  • High water mark increases with subsequent collections for a reasonable amount of head room before next Metaspace GC.
    • See MinMetaspaceFreeRatio and MaxMetaspaceFreeRatio
    • Interpreted similarly to analogous GC FreeRatio parameters

Tuning Flags - CompressedClassSpaceSize
  • Only valid if -XX:+UseCompressedClassPointers (default on 64 bit).
  • -XX:CompressedClassSpaceSize=1G.
  • Since this space is fixed at startup time currently, start out with large reservation.
  • Not committed until used.
  • Future work is to make this space growable.
    • Doesn’t need to be contiguous, only reachable from the base address.
    • Would rather shift more class metadata to Metaspace instead.
    • In future might set ergonomically based on PredictedLoadedClassCount (experimental flag now).
      •  Sets size of other internal JVM data structures, like dictionary of loaded classes.

Tools for Metaspace
  • jmap -permstat option renamed jmap -clstats
    • Prints class loader statistics of Java heap. For each class loader, its name, liveness, address, parent class loader, and the number and size of classes it has loaded are printed. In addition, the number and size of interned Strings are printed.
  • jstat -gc option shows Metaspace instead of PermGen.
  • jcmd <pid> GC.class_stats.
    • Gives detailed histogram of class metadata sizes.
    • Start java with -XX:+UnlockDiagnosticVMOptions

Improved GC Performance
If you understand well about the Metaspace concept, it is easily to see improvement in Garbage Collection

  • During full collection, metadata to metadata pointers are not scanned.
    • A lot of complex code (particularly for CMS) for metadata scanning was removed.
  • Metaspace contains few pointers into the Java heap.
    • Pointer to java/lang/Class instance in class metadata
    • Pointer to component java/lang/Class in array class metadata
  • No compaction costs for metadata.
  • Reduces root scanning (no scanning of VM dictionary of loaded classes and other internal hashtables).
  • Improvements in full collection times.
  • Working on class unloading in G1 after concurrent marking cycle

  • Hotspot metadata is now allocated in Metaspace.
    • Chunks in mmap spaces based on liveness of class loader.
  • Compressed class pointer space is still fixed size but large.
  • Tuning flags available but not required.
  • Change enables other optimizations and features in the future
    • Application class data sharing.
    • Young collection optimizations, G1 class unloading.
    • Metadata size reductions and internal JVM footprint projects

For detail on Java Language enhancement, check this post.

If you know anyone who has started learning Java, why not help them out! Just share this post with them. 
Thanks for studying today!...