Monday, 6 October 2014

Java Threads Tutorial

Terminology

Execution Core - The number of cores that your system has. For example, a CPU with four cores can run four operations simultaneously, and thus can run up to four threads simultaneously. If a CPU has only one core and you run four threads, they will do each of their basic CPU operations by turn (which is determined by various parameters).

Threads

A thread is a Java execution environment for running several units of code simultanously. From the Java programmer's point of view, a program starts with just one thread, called the main thread. This thread has the ability to create additional threads.

Threads Strategies

Each thread is associated with an instance of the class java.lang.Thread. There are two basic strategies for using Thread objects to create a concurrent application.

  • To control the creation and management of threads directly, simply by instantiate Thread each time the application needs to initiate an asynchronous task
  • To use a higher level API (JDK V5+) and pass the application's tasks to an executor (java.util.concurrent.Executor)

Controlling Threads Directly

Defining and Starting a Thread

There are two ways to do this:

  • Provide a java.lang.Runnable object. The Runnable interface defines a single method, run, meant to contain the code executed in the thread. The Runnable object is passed to the Thread constructor. For example:
    //Implements Runnable interface
    public class HelloRunnable implements Runnable {
    
        //Runnable entry point
        public void run() {
            System.out.println("Hello from a Runnable!");
        }
    
        //Program entry point (main thread)
        public static void main(String args[]) {
            (new Thread(new HelloRunnable())).start();
        }
    }
  • Subclass java.lang.Thread. The Thread class itself implements java.lang.Runnable, though its run method does nothing. An application can subclass Thread, providing its own implementation of run, as in the HelloThread example:

    //extends Thread interface
    public class HelloThread extends Thread {
    
        //Thread entry point
        public void run() {
            System.out.println("Hello from a thread!");
        }
    
        //Program entry point (main thread)
        public static void main(String args[]) {
            (new HelloThread()).start();
        }
    }

Notice that both examples invoke Thread.start in order to start the new thread.

Which Method To Use?

Using the Runnable interface is more general and flexible, because the class that implements Runnable can inherit another class while working as a Thread.

Pausing Execution with Sleep

Thread.sleep causes the current thread to suspend execution for a specified period. This is an efficient way to:

  • Save in processor when other threads and/or applications need it
  • Pacing and waiting for another thread with time requirements

Using sleep is not guaranteed to be precise because:

  • It is limited by the facilities provided by the underlying OS
  • It can be interrupted by interrupts.

In any case, you cannot assume that invoking sleep will suspend the thread for precisely the time period specified.

The following example uses sleep to print messages at four-second intervals:

//Print messages at four-second intervals
public class SleepMessages {

  //main thread
  public static void main(String args[]) throws InterruptedException {
      int messages[] = {1, 2, 3, 4};

      for (int i = 0; i < messages.length; i++) {
          //Print a message
          System.out.println(messages[i]);
          
          if (i != messages.length-1) {
              //Pause for 4 seconds
              System.out.println("//Pause for 4 seconds");
              Thread.sleep(4000);
          }
      }
  }
}

Interrupts

An interrupt is an indication to a thread to stop what it is doing and do something else. It's up to the programmer to decide how a thread responds to an interrupt.

A thread sends an interrupt by invoking java.lang.Thread#interrupt for the thread to be interrupted.

Note: For the interrupt mechanism to work correctly, the interrupted thread must support its own interruption.

Supporting Interruption

Supporting interruptions is done in two ways:

  • First of all, there are some Thread methods, that throws an InterruptedException when their thread is interrupted. So it can be handled in the catch clause of the method that is throwing the exception.
  • Secondly, we have to define exit points in our thread for code blocks that do not throw InterruptedException. This is done by two methods:
    • Thread.interrupted - A static method that is used inside a thread to ask if it was interrupted.
    • Thread.isInterrupted - An instance method that is used outside of a thread to ask about a thread if it was interrupted.

For example, here is an example that checks a heavy operation if it was interrupted, and also catches an InterruptedException

public class CheckInterrupts implements Runnable {

    public void run() {
        // This method takes about 4 seconds
        heavyOperation();
        while (true) {
            // Checks periodically if it was interrupted
            if (Thread.interrupted()) {
                System.out.println("interrupted");
                return;
            }
        }
    }

    private void heavyOperation() {
        try {
            // Sleep for four seconds
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            System.out.println("Sleep interrupted");
            return;
        }
    }

    // Main thread
    public static void main(String args[]) throws InterruptedException {
        Thread t = new Thread(new CheckInterrupts());
        t.start();

        Thread.sleep(1000);

        // Interrupts Thread.sleep(), that is catched in a catch
        // block
        t.interrupt();
        Thread.sleep(1000);

        // Interrupts the thread's while loop, that is catched by
        // checking the Thread.interrupted() flag
        t.interrupt();
    }
}

The Interrupt Status Flag

Invoking Thread.interrupt() sets a flag on the thread, called the interrupt status flag. When checking for an interrupt with the static method Thread.interrupted(), the interrupt status is cleared. The non-static isInterrupted() method, which is used by one thread to query the interrupt status of another, does not change the interrupt status flag.

By convention, any internal java method that exits by throwing an InterruptedException clears the interrupt status flag. However, it's always possible that interrupt status will be set again, by another thread invoking interrupt().

Joins

The java.lang.Thread#join() causes the current thread to wait for the completion of another.

For example, if t is a thread that is currently running, t.join(); causes the current thread to pause execution until t's thread terminates.

You can also specify a waiting period, However, as with sleep(), join is dependent on the OS for timing, so it is not accurate.

Example

public class JoinTest implements Runnable {

    public void run() {
        try {
            // Sleep for four seconds
            System.out.println("Going to sleep");
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            System.out.println("Sleep interrupted");
            return;
        }
        System.out.println("Slept for four seconds");
        System.out.println("Exiting thread t");
    }

    // Main thread
    public static void main(String args[]) throws InterruptedException {
        Thread t = new Thread(new JoinTest());
        t.start();
        
        System.out.println("Waiting for thread t to finish");
        t.join();
        System.out.println("Thread t completed");

        System.out.println("Sleep 1 second");
        Thread.sleep(1000);
        System.out.println("Woke up again");
    }
}

Threads and Concurrency Challenges

Threads are running in concurrency and communicate primarily by sharing access to fields and objects references. This communication is extremely efficient, but makes two kinds of errors possible:

  • Thread Interference and
  • Memory Consistency Errors (aka Happens-Before Relationship)

To avoid these errors you need to synchronize access to shared objects using Synchronization.

Thread Interference

Interference happens when two operations are running simultaneously on different threads, while they are acting on the same data, and this causes them to interleave. This means that the two operations consist of multiple steps, and an operation from one thread runs before an operation from current thread finishes, so the sequences of steps overlap.

We need to keep in mind that every operation is translated to the basic building blocks of CPU commands, so even a simple operation in Java may be translated to several steps

Lets look on the following example:

//Simple Counter
class SimpleCounter {
    
    //Counter instance
    private int c = 0;

    
    //increment
    public void increment() {
        c++;
    }

    //increment
    public void decrement() {
        c--;
    }

    //decrement
    public int value() {
        return c;
    }

}

The single expression c++ can be decomposed into three steps:

  • Retrieve the current value of c.
  • Increment the retrieved value by 1.
  • Store the incremented value back in c.

If a Counter object is referenced from multiple threads, interference between threads may prevent the operation c++ from happening as expected.

The same goes for the expression c--.

If Thread A invokes increment() at about the same time Thread B invokes decrement, and the initial value of c is 0, their interleaved actions might follow this sequence:

  • Thread A: Retrieve c. Retrieved 0.
  • Thread A: Increment retrieved value; result is 1.
  • Thread B: Retrieve c. Retrieved 0.
  • Thread B: Decrement retrieved value; result is -1.
  • Thread A: Store result in c; c is now 1.
  • Thread B: Store result in c; c is now -1.

Because "thread interference bugs" are unpredictable, they can be difficult to detect and fix.

Memory Consistency Errors (aka Happens-Before Relationship)

Memory consistency errors, also known as happens-before relationship, occur when different threads have inconsistent views of what should be the same data. This may happen when a thread holds a value that is outdated because it was already changed by another thread.

The key to avoiding memory consistency errors is to understand which operations or steps happens first, and to make sure that later steps will have an updated data. Let's call it the happens-before relationship. This relationship is simply a guarantee that memory writes by one specific statement are visible to another specific statement.

To see this, consider the following example. Suppose a simple int field is defined and initialized:

int counter = 0;

The counter field is shared between two threads, A and B. Suppose thread A increments counter:

counter++;

Then, shortly afterwards, thread B prints out counter:

System.out.println(counter);

If the two statements had been executed in the same thread, it would be safe to assume that the value printed out would be "1". But if the two statements are executed in separate threads, the value printed out might well be "0", because there's no guarantee that thread A's change to counter will be visible to thread B.

There are many ways to prevent memory consistency errors. You can:

  • Invoke a thread (Thread.start) after the data was already updated.
  • Call Thread.join to wait for a thread to finish updating the memory, and then continue to work on the updated data.
  • Use Synchronization.

For a list of actions that prevent memory consistency errors, refer to the Summary page of the java.util.concurrent package (It may be referred in there as a happens-before relationships).

Using Synchronization

Using synchronization may be a good way to be used to prevent both thread interference and memory consistency errors.

However, synchronization can introduce other problem that need to be issued - thread contention - which occurs when two or more threads try to access the same resource simultaneously and cause the Java runtime to execute one or more threads more slowly, or even suspend their execution. There are two main forms of thread contention:

  • Starvation
  • Livelock

The Java programming language provides two basic synchronization idioms:

  • Synchronized Methods
  • Synchronized Statements.

Synchronized Methods

To make a method synchronized, simply add the synchronized keyword to its declaration:

//Synchronized Counter
public class SynchronizedCounter {
    
    //Counter instance
    private int c = 0;

    
    //increment
    public synchronized void increment() {
        c++;
    }

    //increment
    public synchronized void decrement() {
        c--;
    }

    //decrement
    public synchronized int value() {
        return c;
    }

}

Making these methods synchronized has two effects:

  • First, thread interference cannot happen on synchronized methods. it is not possible for two threads of synchronized methods on the same object to interleave. Only one thread can execute a synchronized method for an object. All other threads that invoke synchronized methods for the same object are blocked until the first thread is done.
  • Second, it prevents memory consistency errors.

Note: Constructors cannot be synchronized. Synchronizing constructors does not make sense, because only the thread that creates an object should have access to it while it is being constructed.

Warning: When constructing an object that will be shared between threads, you have to confirm that a reference to the object does not "leak" prematurely. For example, adding an object of class PrematureInstance to a java.util.List on the constructor of PrematureInstance:

//Premature Instance
public class PrematureInstance {
    
    //Adding a premature instance to list
    public PrematureInstance(List list) {
        list.add(this);
    }

}
But then other threads can access the instance PrematureInstance before construction is complete.

final fields can be safely read through non-synchronized methods, once the object is constructed.

Synchronized methods is an effective way to prevent thread interference and memory consistency errors, but can present problems with liveness, as we'll see later.

Intrinsic Locks (aka Monitor Locks, Monitor)

Intrinsic Lock, aka Monitor Lock or simply Monitor or Lock is the mechanism used when synchronizing access of threads.

Every object has an monitor (intrinsic lock) associated with it. A thread that accesses synchronized code has to acquire the object's monitor before accessing it, and then release the monitor when it is done. A thread is said to own the monitor between the time it has acquired it and released it. As long as a thread owns a monitor, no other thread can acquire the same monitor. The other thread will block when it attempts to acquire the monitor.

The monitor release occurs even if the code caused an uncaught exception.

When a thread invokes a synchronized method, monitor for that method's object and releases it when the method returns.

When a static synchronized method is invoked, the thread acquires the monitor for the class object associated with the class. Thus access to static fields is controlled by a monitor that's distinct from the monitor for any instance of the class.

Note: Reentrant Synchronization: A thread cannot acquire a lock owned by another thread, but it can acquire a lock that it already owns.

Synchronized Statements

Another way to create synchronized code is with synchronized statements. When using synchronized statements you must specify the object that provides the monitor (aka intrinsic lock):

//Synchronized Statement
public class SynchronizedStatement {

    private long lastTime; 
    
    //Synchronized statement on "this" (current instance)
    public void synchronizeCurrentInstance() {
        synchronized(this) {
            this.lastTime = System.currentTimeMillis();
        }
        System.out.println(lastTime);
    }

    //Unsynchronized get
    public long getLastTime() {
        System.out.println("returning lastTime: [" + this.lastTime + "]");
        return lastTime;
    }
}

The synchronized statement saves the current time. While in the synchronized statement, other threads cannot access any code on that instance. When the synchronized statement is finished, lastTime is printed in an unsynchronized code, so other threads can get the lastTime by invoking getLastTime().

Synchronizing Instance Fields

There are some cases when you want to synchronize only specific fields of an instance without locking the entire instance. The following idiom will do the work:

//Synchronized Fields
public class SynchronizedFields {
    
    private long counter1 = 0;
    private long counter2 = 0;
    //A monitor for counter 1
    private Object counter1Lock = new Object();
    //A monitor for counter 2
    private Object counter2Lock = new Object();

    //Increment counter 1
    public void incrementCounter1() {
        synchronized(counter1Lock) {
            counter1++;
        }
    }

    //Decrement counter 1
    public void decrementCounter1() {
        synchronized(counter1Lock) {
            counter1++;
        }
    }

    //Increment counter 2
    public void incrementCounter2() {
        synchronized(counter2Lock) {
            counter2++;
        }
    }

    //Decrement counter 2
    public void decrementCounter2() {
        synchronized(counter2Lock) {
            counter2++;
        }
    }
}

Use this idiom with extreme care. You must be absolutely sure that access to counter1 and counter2 are never used together, so that they never interleave.

Atomic Access

An atomic action is one that happens all at once. It either happens completely, or it doesn't happen at all.

Even simple expressions, such as an increment, c++, does not describe an atomic action, but is actually decomposed into other actions. However, there are actions you can specify that are atomic:

However, there are actions you can specify that are atomic:

  • Reads and writes for reference variables and for most primitive variables (all types except long and double).
  • Reads and writes for all variables declared volatile (including long and double variables).

Atomic actions cannot be interleaved, so they can be used without fear of thread interference.

Still, atomic actions does not prevent memory consistency errors. so you still need to synchronize them. Using volatile variables reduces the risk of memory consistency errors, because changes to a volatile variable are always visible to other threads.

Using atomic variable is more efficient than synchronized code, but requires more care by the programmer to avoid memory consistency errors.

Some of the classes in the java.util.concurrent package provide atomic methods that do not rely on synchronization.

Liveness

Liveness is the ability of a concurrent application, or its threads, to execute correctly and respond in a reasonable manner.

Common liveness problems are:

  • Deadlock
  • Starvation
  • Livelock.

Deadlock

Deadlock is a situation where two or more threads are blocked forever, waiting for each other. Here's an example.

Two adventurers are walking towards a gate, one from north and one from west. Two gatekeepers need to confirm their pass through the gate. The northern person needs to get a passport from the northern gate keeper and than from the southern gate keeper. The southern person needs to get a passport from the southern gate keeper and than from the northern gate keeper. Unfortunately, this two rules conflict with each other when two adventurers might come from north and south at the same time:

//Deadlock example 
public class Deadlock {
    
    
    //Person class
    static class Person {
    }

    //Northern gate keeper
    static class GateKeeperNorth {
        public static void pass(Person p) {
            System.out.println(
                    "[GateKeeperNorth]"
                    + " allows to pass for ["+p.toString()+"]"
                    );
        }
    }
    
    
    //Southern gate keeper
    static class GateKeeperSouth {
        public static void pass(Person p) {
            System.out.println(
                    "[GateKeeperSouth]"
                    + " allows to pass for ["+p.toString()+"]"
                    );
        }
    }

    
    //Each PassHandlerThread holds an instance of the person that wants to pass
    static abstract class PassHandlerThread implements Runnable{
        public Person person;
    }

    //Approves a person to pass
    static class PassHandlerThreadNorth extends PassHandlerThread {
        

        //Sets the person that should pass
        public PassHandlerThreadNorth(Person person) {
            this.person = person;
        }

        //Thread entry point
        @Override
        public void run() {
            System.out.println(
                    "[PassHandlerThreadNorth]"
                    + " is waiting to lock [GateKeeperNorth]"
                    );
            synchronized (GateKeeperNorth.class) {
                GateKeeperNorth.pass(this.person);
                System.out.println(
                        "[GateKeeperNorth]"
                        + " was locked by [PassHandlerThreadNorth]"
                        );
                
                // Give a chance for the other thread to lock the other gate keeper
                try { Thread.sleep(1000); }
                catch (InterruptedException e) {}
                
                System.out.println(
                        "[PassHandlerThreadNorth]"
                        + " is waiting to lock [GateKeeperSouth]"
                        );
                synchronized (GateKeeperSouth.class) {
                    GateKeeperSouth.pass(this.person);
                    System.out.println(
                            "[GateKeeperSouth]"
                            + " was locked by [PassHandlerThreadNorth]"
                            );
                }
            }
        }
    }
    

    //Approves a person to pass
    static class PassHandlerThreadSouth extends PassHandlerThread {

        //Sets the person that should pass
        public PassHandlerThreadSouth(Person person) {
            this.person = person;
        }
        
        //Thread entry point
        @Override
        public void run() {
            System.out.println("[PassHandlerThreadSouth] is waiting to lock [GateKeeperSouth]");
            synchronized (GateKeeperSouth.class) {
                GateKeeperSouth.pass(this.person);
                System.out.println("[GateKeeperSouth] was locked by [PassHandlerThreadSouth]");

                // Give a chance for the other thread to lock the other gate keeper
                try { Thread.sleep(1000); }
                catch (InterruptedException e) {}

                System.out.println("[PassHandlerThreadSouth] is waiting to lock [GateKeeperNorth]");
                synchronized (GateKeeperNorth.class) {
                    GateKeeperNorth.pass(this.person);
                    System.out.println("[GateKeeperNorth] was locked by [PassHandlerThreadSouth]");
                }
            }
        }
    }
    

    //Main thread
    public static void main(String[] arg) {
        new Thread(new PassHandlerThreadNorth(new Person())).start();
        new Thread(new PassHandlerThreadSouth(new Person())).start();
    }
}

Neither of the threads will ever end, because each thread is waiting for the other to release a monitor on a gate keeper.

Starvation and Livelock

Starvation and livelock are much less common problems in thread synchronization.

Starvation

Starvation happens when a thread is unable to gain access to a resource and is unable to make progress. This happens by "greedy" threads that holds a monitor (lock) for a resource for a long time. For example, If synchronized method that takes a long time is invoked frequently by different threads, the first thread will have access to the method but the later threads will be blocked until the call of the first thread will release the method's object monitor.

For example, suppose that the two adventurers are reaching a gate at the same time, and the gate keeper is not working there anymore and not only that, he took the key with him. So the process of giving a passport to a person takes a long time, as long as forever. This means that there are two threads that are in starvation: the first one is in the process of approval, and the second is waiting to be handled.

//Starvation example 
public class Starvation {
    
    
    //Person class
    static class Person {
    }

    
    //Gate keeper
    static class GateKeeper {
        public static synchronized void pass(Person p) {
            System.out.println(
                    "[GateKeeper]"
                    + " allows to pass for ["+p.toString()+"]"
                    );
        }
    }

    
    //Approves a person to pass
    static class PassHandlerThread implements Runnable {
        public Person person;
        
        //Sets the person that should pass
        public PassHandlerThread(Person person) {
            this.person = person;
        }

        //Thread entry point
        @Override
        public void run() {
            System.out.println(
                    "[PassHandlerThread]"
                    + " is waiting to lock [GateKeeper] for person [" + this.person + "]"
                    );
            synchronized (GateKeeper.class) {
                try {
                    GateKeeper.class.wait();
                    GateKeeper.pass(this.person);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        }
    }
    

    //Main thread
    public static void main(String[] arg) {
        new Thread(new PassHandlerThread(new Person())).start();
        new Thread(new PassHandlerThread(new Person())).start();
    }
}

Livelock

In livelock, threads are not blocked but instead they are stuck in an endless situation when they react to each other. For example, if an adventurer reaches a gate and two gatekeepers start to argue who will give a pass to the adventurer first, and none of them will let go, the two gatekeepes are stuck in a livelock.

//Livelock example 
public class Livelock {
    
    
    //Person class
    static class Person {
    }

    //Northern gate keeper
    static class GateKeeperNorth {
        public static void pass(Person p) {
            System.out.println("[GateKeeperNorth] allows to pass for ["+p.toString()+"]");
        }
        
        //An argue on who handles the person first
        public static String whoIsFirst() {
            return "Me first";
        }
    }
    
    
    //Southern gate keeper
    static class GateKeeperSouth {
        public static void pass(Person p) {
            System.out.println("[GateKeeperSouth] allows to pass for ["+p.toString()+"]");
        }
        
        //An argue on who handles the person first
        public static String whoIsFirst() {
            return "Me first";
        }
    }

    
    //Approves a person to pass
    static class PassHandlerThread implements Runnable {
        public Person person;
        

        //Sets the person that should pass
        public PassHandlerThread(Person person) {
            this.person = person;
        }

        //Thread entry point
        @Override
        public void run() {
            System.out.println("[PassHandlerThread] is waiting to lock [GateKeeperNorth]");
            
            synchronized(GateKeeperNorth.class) {
                synchronized(GateKeeperNorth.class) {
                    String northResponse = GateKeeperNorth.whoIsFirst();
                    String southResponse = GateKeeperSouth.whoIsFirst();
                    while (northResponse.equals("Me first") &&
                            southResponse.equals("Me first")) {
                      //wait until one gives up
                        System.out.println("North response: " + northResponse);
                        System.out.println("South response: " + southResponse);
                    }
                }
            }
        }
    }
    

    //Main thread
    public static void main(String[] arg) {
        new Thread(new PassHandlerThread(new Person())).start();
    }
}

Guarded Blocks

Guarded block is used to check for a condition before the block can proceed. A guarded block, if not written correctly, can consume unnecessary resources.

For example, the previous example in the Livelock section not only demonstrates a livelock, but also an incorrect coding of a guarded block. The thread will run and always check for one of the gate keepers to give up.

while (northResponse.equals("Me first") &&
        southResponse.equals("Me first")) {
    //wait until one gives up
    System.out.println("North response: " + northResponse);
    System.out.println("South response: " + southResponse);
}

A more efficient guard invokes Object.wait to suspend the current thread, until another thread will interrupt it. Still, a check should always be made so that the interrupt of the thread is caused from the right reason:

while (northResponse.equals("Me first") &&
        southResponse.equals("Me first")) {
    //wait until one gives up
    System.out.println("North response: " + northResponse);
    System.out.println("South response: " + southResponse);
    try {
        GateKeeperNorth.class.wait();
    } catch (InterruptedException e) {
        System.out.println("[" + GateKeeperNorth.class + "] was interrupted");
    }
}

Note: Always invoke wait inside a loop. A thread may be interrupted from an incorrect or unexpected reason and you have to check the guarding condition again before the thread continues to run.

When wait is invoked, the thread releases the monitor of the synchronized object or class, and suspends the execution of the running thread. Later on, another thread will interrupt the suspended thread, or aquire the suspended monitor and invoke Object.notifyAll, informing all threads waiting on that lock that something important has happened.

After the thread that interrupted or notified the waiting thread will release the lock, the waiting thread will reacquire the monitor and resumes by returning from the wait method.

Immutable Objects

An object is considered immutable if its state cannot be changed after it is constructed.

Using immutable objects is very useful in concurrent applications. Since they cannot change state, they cannot be corrupted by thread interference or memory consistency errors.

As for efficiency, many programmers worry about the cost of creating new objects. But in fact, object creation in many times is more efficient then the overhead of the code needed to protect mutable objects from corruption, in multi-threaded application.

When writing an immutable object you need to keep to the following rules:

  • Don't provide any "setter" methods
  • Make all fields final and private.
  • Prevent overiding your class. For example, declare the class as final; make the constructor private; construct instances in factory methods.
  • Prevent from references to mutable objects to be changed:
    • Don't provide methods that modify the mutable objects.
    • Don't share references to the mutable objects: Never store references to external, mutable objects passed to the constructor; if necessary, create copies, and store references to the copies. Similarly, create copies of your internal mutable objects when necessary to avoid returning the originals in your methods.

For example, take a look at the following ImmutablePerson class:

//Immutable Person
public final class ImmutablePerson {

    //final fields
    private final String name;
    private final int age;
    
    //The final fields are initialized on the constructor
    public ImmutablePerson(String name, int age) {
        this.name = name;
        this.age = age;
    }
    
    //name is a reference to an immutable object
    //(java.lang.String), and the reference is final,
    //so we can return it
    public String getName() {
        return name;
    }
    
    //age is a primitive value and not a reference;
    //so we can return it
    public int getAge() {
        return age;
    }
}

High Level Concurrency Objects

As of Java 5.0+, several higher-level building blocks were added to the Java platform. These building blocks are needed for more advanced tasks, such as massively concurrent applications.

Most of these new building blocks are implemented in the java.util.concurrent packages. There are also new concurrent data structures in the Java Collections Framework.

Here is a list of some of the new features:

  • Lock objects is a new set of APIs that provides several locking idioms to simplify synchronized and reentrant locks.
  • Executors is a high-level API for launching and managing threads and thread pool management, that are suitable for large-scale applications.
  • Concurrent collections structures for managing large collections of data that is accessed is concurrency.
  • Atomic variables is a new set of APIs that minimizes synchronization and avoid memory consistency errors.
  • ThreadLocalRandom (in JDK 7) provides safe generation of pseudorandom numbers in muti-threaded applications.

Lock Objects

Sophisticated locking idioms are provided in the java.util.concurrent.locks package. We will focus in here the most basic interface, java.util.concurrent.locks.Lock.

Lock objects work very much like the implicit locks (synchronized methods) used by synchronized code, but they also support a wait/notify mechanism, through their associated java.util.concurrent.locks.Condition objects.

The biggest advantage of Lock is their ability to back out of an attempt to acquire a lock:

  • The tryLock method backs out if the lock is not available immediately or before a timeout expires (if specified).
  • The lockInterruptibly method backs out if another thread sends an interrupt before the lock is acquired.

For example, examine the following idiom that solves the deadlock problem we saw earlier. The approve passport thread Alphonse and Gaston have trained themselves to notice when a friend is about to bow. We model this improvement by requiring that our Friend objects must acquire locks for both participants before proceeding with the bow. Here is the source code for the improved model, Safelock. To demonstrate the versatility of this idiom, we assume that Alphonse and Gaston are so infatuated with their newfound ability to bow safely that they can't stop bowing to each other:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//LockObjects example 
public class LockObjects {

    // Person class
    static class Person {
        
        private String name;
        
        public Person(String name) {
            this.name = name;
        }
        
        private String getName() {
            return this.name;

        }
    }

    // Northern guard
    static class GuardNorth {
        
        private final static Lock lock = new ReentrantLock();
        
        public static void approvePassport(Person p) {
            System.out.println("[GuardNorth]" + " approves passport of ["
                    + p.getName() + "]");
        }
    }

    // Southern guard
    static class GuardSouth {
        
        private final static Lock lock = new ReentrantLock();
        
        public static void approvePassport(Person p) {
            System.out.println("[GuardSouth]" + " approves passport of ["
                    + p.getName() + "]");
        }
    }


    // Approves a person to pass
    static class PassportApprovalThread implements Runnable {

        public Person person;

        // Sets the person that should pass
        public PassportApprovalThread(Person person) {
            this.person = person;
        }

        // Thread entry point
        @Override
        public void run() {
            System.out.println("Thread [" + Thread.currentThread().getName() + "]"
                    + " is waiting to lock [GuardNorth]");
            
            
            boolean lockedNorth = false;
            while ( ! lockedNorth) {
                lockedNorth = GuardNorth.lock.tryLock();
                if ( ! lockedNorth) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }

            
            System.out.println("Thread [" + Thread.currentThread().getName() + "] locked [GuardNorth]");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }


            System.out.println("Thread [" + Thread.currentThread().getName() + "]"
                    + " is waiting to lock [GuardSouth]");
            
            boolean lockedSouth = false;
            while ( ! lockedSouth) {
                lockedSouth = GuardSouth.lock.tryLock();
                if ( ! lockedSouth) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
                
            System.out.println("Thread [" + Thread.currentThread().getName() + "] locked [GuardSouth]");

            GuardNorth.approvePassport(this.person);
            GuardSouth.approvePassport(this.person);

            System.out.println("Thread [" + Thread.currentThread().getName() + "] "
                    + "allowed passport of [" + this.person.getName() + "]");

            GuardNorth.lock.unlock();
            GuardSouth.lock.unlock();
        }
    }

    // Main thread
    public static void main(String[] arg) {
        new Thread(
                new PassportApprovalThread(new Person("John")),
                "t1"
                ).start();
        new Thread(
                new PassportApprovalThread(new Person("Jane")),
                "t2"
                ).start();
    }
}

Executors

In large-scale applications, it is better to separate the management of thread's lifecycle and the rest of the application. A new Java API, called executors, is out to achieve this seperation. Three concepts are introduced in this API:

  • Task - is a concrete implementation of a thread, that usually implements java.lang.Runnable
  • Worker - is the code that runs a task, or a single thread
  • Executors - is the life-cycle management of workers, or in other words, life-cycle management of workers and tasks (threads)

And three types of algorithms:

  • Thread Pools - are the most common kind of executor implementation
  • Scheduled Executors - Running threads in a timely manner
  • Fork/Join - is a framework (new in JDK 7) for taking advantage of multiple processors.

Executor Interfaces

The java.util.concurrent package defines three executor interfaces:

 

  • Executor - a simple interface that supports launching new tasks.
  • ExecutorService - a sub-interface of Executor, which adds features that help manage the life-cycle, both of the individual tasks and of the executor itself.
  • ScheduledExecutorService - a sub-interface of ExecutorService that supports future and/or periodic execution of tasks.

Typically, referencing variables of executors is done by their interface and not by their concrete implementation.

The Executor Interface

The Executor interface is the main interface for implementations of thread executors.

It provides a single method:

void execute(Runnable command)

This method invokes common thread-creation idiom (what is actually done depends on the implementation).

For example, if runner is a Runnable object, and executor is an Executor object you can replace

(new Thread(runner)).start();

with

executor.execute(runner);

Bram, while the first idiom creates a new thread and launches it immediately, the later executor#execute() method may do the same thing, or use an existing worker thread to run runnable, or to place runnable in a queue to wait for a worker thread to become available. (We'll describe worker threads in the section on Thread Pools.)

The executor implementations in java.util.concurrent are designed to make full use of the more advanced ExecutorService and ScheduledExecutorService interfaces, although they also work with the base Executor interface.

The ExecutorService Interface

The ExecutorService extends the Executor interface and provides methods for thread management and lifecycle management:

ExecutorService provides:

  • A task to return a value by using Callable objects
  • Submitting large collections of Callable objects, and thus running large collections of tasks
  • Methods for managing the shutdown of an executor. To support immediate shutdown, tasks should handle interrupts correctly.

Here is a short description of the API:

  • Block the flow of the application until the execution of tasks ended:

    • All tasks have completed execution after a shutdown request
    • Timeout occurs
    • The current thread is interrupted
    • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
  • Execute a thread and return the result of its execution

    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                             throws InterruptedException
    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                    long timeout,
                                    TimeUnit unit)
                             throws InterruptedException
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks)
               throws InterruptedException,
                      ExecutionException
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                      long timeout,
                      TimeUnit unit)
               throws InterruptedException,
                      ExecutionException,
                      TimeoutException
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks)
               throws InterruptedException,
                      ExecutionException
    • <T> Future<T> submit(Callable<T> task)
    • Future<?> submit(Runnable task)
    • <T> Future<T> submit(Runnable task,
                           T result)
  • Manage the life-cycle of the executor:

    • boolean isShutdown()
    • boolean isTerminated()
    • void shutdown()
    • List<Runnable> shutdownNow()

Note: While the submit method is similar to the execute method, it is more versatile. Like the execute method it accepts Runnable objects, but also accepts java.util.concurrent.Callable objects, which allow a task to return a value. Low-level concurrency idioms does not support a thread to return a value. The submit method returns a java.util.concurrent.Future object, which is used to retrieve the Callable return value and to manage the status of both Callable and Runnable tasks.

The ScheduledExecutorService Interface

The ScheduledExecutorService interface extends the ExecutorService with:

  • schedule method - which executes a Runnable or Callable task after a specified delay
  • scheduleAtFixedRate and scheduleWithFixedDelay methods - which executes specified tasks repeatedly, at defined intervals

No comments:

Post a Comment