Difference between revisions of "Threads"

From Suhrid.net Wiki
Jump to navigationJump to search
 
(2 intermediate revisions by the same user not shown)
Line 857: Line 857:
 
* Thread can check if its interrupted by calling the static Thread.interrupted(). Calling this method clears the interrupt status. Threads invoking methods which dont throw interrupteexception should check this flag periodically. Based on this, it can choose to terminate or continue (In below example, the thread terminates.)
 
* Thread can check if its interrupted by calling the static Thread.interrupted(). Calling this method clears the interrupt status. Threads invoking methods which dont throw interrupteexception should check this flag periodically. Based on this, it can choose to terminate or continue (In below example, the thread terminates.)
 
* To check another thread's status the non static t.isInterrupted() method can be used - this does not reset the interrrupt flag.
 
* To check another thread's status the non static t.isInterrupted() method can be used - this does not reset the interrrupt flag.
 +
* By convention, any method that exits by throwing an InterruptedException clears interrupt status when it does so. SO in the catch block of an InterruptedException, the interrupt flag will be false.
  
 
<syntaxhighlight lang="java5">
 
<syntaxhighlight lang="java5">
Line 909: Line 910:
  
 
</syntaxhighlight>
 
</syntaxhighlight>
 
  
 
== Thread Exceptions ==
 
== Thread Exceptions ==
Line 918: Line 918:
  
 
[[Category:OCPJP]]
 
[[Category:OCPJP]]
 +
[[Category:RealtimeJava]]

Latest revision as of 01:52, 25 October 2012

Intro, Thread States

  • Think of Thread as the "worker" and Runnable as the job.
  • Define work to be done in a class that implements Runnable.
  • Instantiate the thread using the runnable object. (Thread is in the new state)
  • Then start() it. (Thread moves to the runnable state, eligible to run, perhaps waiting for the scheduler to run it)
class Job implements Runnable {
     public void run() {
         //work to be performed in  a separate thread.
     }
}

Job j = new Job();
Thread t = new Thread(j);
t.start();
  • Note if a Class extends Thread and overrides run() - then its run() method will be invoked when the start() method is called. This will hold true even when the Thread is constructed with a Runnable object.
  • Example:
class MyThread extends Thread {
	
	public MyThread() {
		
	}
	
	public MyThread(Runnable r) {
		super(r);
	}
	
	public void run() {
		System.out.println("MyThread");
	}
}

class MyRunnable implements Runnable {

	@Override
	public void run() {
		System.out.println("Venables");
	}
	
}

public class ThreadRunnable {

	public static void main(String[] args) {
		Runnable r = new MyRunnable();
		Thread t  = new MyThread(r);
		t.start(); //Prints MyThread and NOT MyRunnable
	}

}
  • When thread actually runs it is in the running state.
  • Always a thread object is start()'ed, NOT a runnable.
  • The thread can also go into waiting/blocked/sleeping state. e.g. waiting for an IO Resource such as a packet to arrive. In other words it is NOT runnable.
  • Once run() completes the Thread goes to the dead state. You cannot call start() again on it. Of course, the thread object itself can still be used.
  • Five States of a Thread
    • New
    • Runnable
    • Running
    • Waiting/Blocked/Sleeping
    • Dead

Sleep

  • Be careful of the Thread classes static methods such as sleep() and yield(). They refer to the current executing thread! Do not be misled when they are invoked using a thread object.
  • e.g. t1.sleep() will not cause Thread t1 to sleep(), it causes the current executing thread to sleep().
  • sleep() specifies that the Thread must go to sleep for at least the specified duration.
  • What does this mean, it means that when the sleep duration expires and thread wakes up, the thread immediately does not go to running - it goes to the runnable state.
  • so sleep() gives the minimum duration that the thread will not run. You cannot use it as an accurate timer!

Join

  • t.join() will take the current thread from which this code is called and join it to the end of t.
  • This means the thread from which join() is called will wait until t finishes before it runs again.
  • join() should be invoked after the call to start(), otherwise it has no effect. In the below example, Thread t1 will run to completion before main can start.
public class TJoin extends Thread {

	public static void main(String[] args) throws Exception {
		TJoin main = new TJoin();
		Thread t1 = new Thread(main, "T1");
		t1.start();
		t1.join();
		main.run();
	}
	
	public void run() {
		for(int x=0; x < 100; x++, System.out.println(Thread.currentThread().getName() + " : " + x));
	}

}

Name and Id

  • Every thread has a name and an id which is assigned by default.
  • Id is a positive long integer which remains unique during the lifetime of the thread. When a thread terminates this ID may be reused.
  • Names are also assigned by default they look like - "Thread-0", "Thread-1" etc. 0, 1 do not represent Thread IDs
  • Names can be explicitly assigned as an argument to the Thread constructor or using setName().
  • See example for generated thread names and id's :
for(int i=0; i<5; i++) {
			Thread t = new Thread();
			System.out.println(t.getName() + " , " + t.getId());
}
		
/* Output:
		
Thread-0 , 8
Thread-1 , 9
Thread-2 , 10
Thread-3 , 11
Thread-4 , 12

*/

Thread Lifetime

  • The JVM will continue to run till the last user thread is running.
  • Even if some user threads have an uncaught exception and they die, if other user threads are running they will continue to do so.
  • The JVM does not care whether daemon (background) threads are running or not, if all user threads finish - the JVM will exit.
  • A thread can be set as a daemon thread using the setDaemon(flag) method, this must be done before the thread is started.
  • In below example, main starts a thread to print integers, after a while main throws a FileNotFoundException and dies, but since t3 is a user thread,it will run to completion and the JVM will not exit.
  • If the setDaemon(true) call was uncommented, then as soon as main throws the exception, the JVM will exit and not wait for t3 to complete - since it is a daemon thread.
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;

public class T3 implements Runnable {

	public static void main(String[] args) throws FileNotFoundException {
		
		Thread t3 = new Thread(new T3());
		System.out.println("Starting T3 ... ");
		//t3.setDaemon(true);
		t3.start();
		System.out.println("Main sleeping");
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		BufferedReader br = new BufferedReader(new FileReader("nosuchfile")); 
		
	}

	@Override
	public void run() {
		for(int i=0; i < 25; i++) {
			try {
				Thread.sleep(1000);
				System.out.print(i + " ");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

Synchronization

  • Control access to shared object by multiple threads.
  • Only methods and code blocks can be synchronized.
  • Synchronized blocks:
    • all objects can be used for synchronization - even arrays. But not primitives.
    • the expression for a synchronized block can have anything that returns an object - including a method call.
    • It has to have curly braces even if there is only 1 line in the synchronized block.
    • Trying to use null as the object lock will result in an NPE.
  • Synchronization happens through object locks.
  • Suppose a synchronized method is called, then the lock of the object that invoked the method is used for synchronization.
  • When a thread enters a synchronized method, since an object has only one lock - even other synchronized methods in the class cannot be entered for that object by other threads.
  • The following program invokes three threads to each print a single different letter 100 times.
  • If the code is not synchronized, different threads can print the same letter - because they would start printing before the letter has got a chance to change.
  • Note that printLetter() is not synchronized. Why ? Because printLetter is invoked using a LetterPrintJob object and each thread uses a separate LetterPrintJob object.
  • So you have to synchronize on the lock of the shared object which in this case is the StringBuffer.
class LetterPrintJob implements Runnable {

	private StringBuffer strBuf;

	LetterPrintJob(StringBuffer strBuf) {
		this.strBuf = strBuf;
	}

	@Override
	public void run() {
		printLetter();
	}

	void printLetter() {
		synchronized(strBuf) {
			char letter = strBuf.charAt(0);
			for(int i=0; i < 100; i++) {
				System.out.println(Thread.currentThread().getName() + ":" + letter);
			}
			strBuf.setCharAt(0, ++letter);
		}
	}
}

public class SynchroTest {

	public static void main(String[] args) {
		StringBuffer strBuf = new StringBuffer("A");
		new Thread(new LetterPrintJob(strBuf),"T1").start();
		new Thread(new LetterPrintJob(strBuf),"T2").start();
		new Thread(new LetterPrintJob(strBuf),"T3").start();
	}

}


  • Since each thread has its own stack, two threads executing the same method concurrently will use different copies of local variables.
  • Shared data needs to be synchronized. For non-static members - usually accessed by non-static methods, we need to prevent threads that use the same object instance to invoke the method. Usually by marking

such methods as synchronized.

  • Threads using different instances is not a problem - since the non-static data is not shared.
  • Similarly for static fields that need to be synchronized, mark the static accessor methods as synchronized.
  • However problems arise when non-static methods access static fields and static methods access non-static members through instances !
  • Careful coding is necessary to make the class thread-safe.
  • See example below on how threads can cause even a Thread-safe class like a Vector to fail.
  • Individual methods like add() and remove() are synchronized - which prevents multiple threads using these methods concurrently on a single vector instance.
  • However, this will not prevent situations where a different thread can manipulate the vector in between two invocations of the synchronized methods.
class VectorJob implements Runnable {

	Vector<String> v;

	VectorJob(Vector<String> v) {
		this.v = v;
	}

	public void run() {
			try {
				if(v.size() > 0) {
					Thread.sleep(10);
					v.remove(0);
				}
			} catch (Exception e) {
				System.out.println("Oops ");
				e.printStackTrace();
			}
        }

public class SynchroTest1 {

	public static void main(String[] args) {

		Vector<String> v = new Vector<String>();
		v.add("Troll");
		new Thread(new VectorJob(v), "T1").start();
		new Thread(new VectorJob(v), "T2").start();
		
	}
}
  • What happens here is T1 checks Vector size is non-zero before removing the element and then goes to sleep
  • T2 gets a chance to run, checks size is non-zero and goes to sleep
  • T1 wakes up - removes the first element
  • T2 wakes up - proceeds to remove the first element ( T2 was sure that the size > 0 before sleeping) and an exception occurs.
  • Solution is to externally synchronize the code block on the vector element.

Deadlock

  • Example-1 using synchronized blocks. Straightforward to see how threads can get deadlocked.
public class Deadlock {
	
	String resourceA = "resourceA";
	String resourceB = "resourceB";

	public static void main(String[] args) {
		final Deadlock dl = new Deadlock();
		
		new Thread("Reader") {
			public void run() {
				dl.read();
			}
		}.start();
		
		new Thread("Writer") {
			public void run() {
				dl.write("AA", "BB");
			}
		}.start();
	}
	
	String read() {
		synchronized(resourceA) {
			synchronized(resourceB) {
				return resourceA + resourceB;
			}
		}
	}
	
	void write(String a, String b) {
		synchronized(resourceB) {
			synchronized(resourceA) {
				resourceA = a;
				resourceB = b;
			}
		}
	}

}
  • Example 2 - Same example using synchronized methods(). Notice, it is more difficult to spot the deadlock condition here.
  • How can a deadlock occur here ? A possible scenario:
  • Thread-A gets the lock for the thing1 object by calling thing1.foo(thing2)
  • Before Thread-A can call the lock for thing2 object by calling thing2.bar(), Thread-B has acquired the lock for thing2 by calling thing2.foo(thing1)
  • Since Thread-A has to call thing2.bar(), it waits for Thing2's lock being held by Thread-B.
  • Since Thread-B has to call thing1.bar(), it waits for Thing1's lock being held by Thread-A.
  • Deadlock !
class Thing {
	
	private String str;
	
	public Thing(String str) {
		this.str = str;
	}

	synchronized void foo(Thing t) {
		t.bar();
	}

	synchronized void bar() {
	}
	
	public String toString() {
		return str;
	}
	
}

public class Deadlock1 {

	public static void main(String[] args) {
		
		final Thing thing1 = new Thing("Thing1");
		final Thing thing2 = new Thing("Thing2");
		
		new Thread("Thread-A") {
			public void run() {
				thing1.foo(thing2);
			}
		}.start();
		
		new Thread("Thread-B") {
			public void run() {
				thing2.foo(thing1);
			}
		}.start();
	}
}

Thread Communication

  • Threads communicate with each other through their locking status on the objects.
  • The Object class has three methods wait(), notify() and notifyAll() which are used by Threads to communicate to each other.
  • wait(), notify() must be called from a synchronized context - the thread must own the object's lock before calling it.
  • wait() means the thread says - I am going to wait for some condition to be satisfied, so I will release this object's lock till another thread notify()'s me of the condition being fulfilled.
  • notify() means the thread says - I am done running the job after acquiring the object's lock, I will now release the lock, so that threads wait()'ing for this object can run.
  • NOTE: wait() will cause the thread to give up it's lock immediately, but not notify() - notify wakes up waiting thread(s), but the lock is released only when the current thread gives it up - e.g. reaching the end of the synchronized block/method.
  • Here are couple of Producer-Consumer examples. The producer thread writes some messages, the consumer thread will pick up the message and print it out.
  • The producer consumer threads communicate with each other on when to produce/consume messages.
  • The producer thread will wait() for the message buffer to become empty before filling it with messages. Once it fills the buffer, it notify()'s the consumer.
  • The consumer thread will wait() for the message buffer to become full before extracting messages. Once all the messages are extracted, it notify()'s the producer.
  • The first example uses synchronized blocks. Since the lock status of the message buffer is used to communicate between threads, the blocks are synchronized on the message buffer.
import java.util.*;

class Producer implements Runnable {

	private Queue<String> messages;
	private String[] msgStrs = {"A", "B", "C", "D", "E"};

	public Producer(Queue<String> messages) {
		this.messages = messages;
	}

	public void run() {
		while (true) {
			String logPrefix = Thread.currentThread().getName() + " : ";
			try {
				Thread.sleep(2000);
				System.out.println(logPrefix + "Waiting for messages lock");
				synchronized (messages) {
					System.out.println(logPrefix + "Got messages lock");
					while (!messages.isEmpty()) { //Consumer has not finished consuming the messages
						System.out
						.println(logPrefix
								+ "Messages still pending, let's wait for some... ");
						try {
							messages.wait(); //wait till consumer finishes;
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println(logPrefix
								+ "Notified that messages are consumed");
					}
					System.out.println(logPrefix
							+ "Messages empty.. let's fill in some messages... ");
					//Wake up, consumer has finished. Populate the list
					List<String> msgList = Arrays.asList(msgStrs);
					Collections.shuffle(msgList);
					for (String s : msgList) {
						messages.offer(s);
					}
					System.out
					.println(logPrefix
							+ " Done filling in messages, notifying the consumer... ");
					//Now notify the consumer thread that the producer is done.
					messages.notify();
				}
			} catch (InterruptedException e) {
				System.out.println(logPrefix + "Interrupted ! Exiting..");
				break;
			}
		}
	}

}


class Consumer implements Runnable {

	private Queue<String> messages;

	public Consumer(Queue<String> messages) {
		this.messages = messages;
	}

	public void run() {
		while (true) {
			String logPrefix = Thread.currentThread().getName() + " : ";
			try {
				Thread.sleep(2000);
				System.out.println(logPrefix + "Waiting for messages lock");
				synchronized (messages) {
					System.out.println(logPrefix + "Got messages lock");
					while (messages.isEmpty()) { //Producer has not produced any messages
						System.out.println(logPrefix
								+ "No messages, let's wait for some... ");
						try {
							messages.wait(); //Wait on the messages object
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					System.out.println(logPrefix
							+ "Notified, let's print some messages... ");
					//Woken up, print out the messages and empty the queue
					String msg = "";
					while ((msg = messages.poll()) != null) {
						System.out.println("Msg : " + msg);
					}
					//Done, notify the producer to add new messages.
					System.out.println(logPrefix
							+ " Done printing, notifying the producer... ");
					messages.notify();
				}
			} catch (InterruptedException e) {
				System.out.println(logPrefix + "Interrupted ! Exiting..");
				break;
			}
		}
	}
}


public class ProducerConsumer {

	public static void main(String[] args) {

		Queue<String> messages = new LinkedList<String>();
		final Thread producer = new Thread(new Producer(messages), "Producer");
		final Thread consumer = new Thread(new Consumer(messages), "Consumer");

		producer.start();
		consumer.start();
		
		class ConsoleReader implements Runnable {
			public void run() {
				Scanner scanner = new Scanner(System.in);
				String userMsg = scanner.next();
				if(userMsg.equalsIgnoreCase("Q")) {
					producer.interrupt();
					consumer.interrupt();
				}
			}
		}
		
		Thread consoleReader = new Thread(new ConsoleReader(), "ConsoleReader");
		consoleReader.start();
	}
	
}
  • The second example uses synchronized methods.
  • wait() and notify() are called as-is since they are called from within the synchronized methods of the Message class.
  • This means the the lock of the message instance that is being used to call the synchronized methods are used by the Producer and Consumer threads to communicate with each other.
import java.util.*;

class Message {

	private Queue<String> msgQ = new LinkedList<String>();
	private String[] msgStrs = {"A", "B", "C", "D", "E"};
	private List<String> list = new ArrayList<String>();

	synchronized void put() {
		while(true) {
			try {
				while (!msgQ.isEmpty()) {
					//wait for msgQ to be consumed;
					wait();
				}
				list = Arrays.asList(msgStrs);
				Collections.shuffle(list);
				msgQ.addAll(list);
				//msgQ full, notify.
				notify();
			} catch (InterruptedException e) {
				break;
			}
		}
	}

	synchronized void take() {
		while (true) {
			try {
				while (msgQ.isEmpty()) {
					//wait for msgQ to be full;
					wait();
				}
				String msg;
				while ((msg = msgQ.poll()) != null) {
					System.out.println("msg : " + msg);
				}
				//msgQ empty, notify
				notify();
			} catch (InterruptedException e) {
				break;
			}
		}
	}

}

public class PCMethod {

	public static void main(String[] args) {

		final Message m = new Message();

		final Thread consumer = new Thread("Consumer") {
			public void run() {
				m.take();
			}
		};
		
		consumer.start();

		final Thread producer = new Thread("Producer") {
			public void run() {
				m.put();
			}
		};
		
		producer.start();
		
		new Thread("Quitter") {
			public void run() {
				Scanner s = new Scanner(System.in);
				if(s.next().equalsIgnoreCase("Q")) {
					consumer.interrupt();
					producer.interrupt();
				}
			}
		}.start();

	}
}
  • Example using notifyAll() - which wakes up all the threads waiting for an object's lock.
class Adder {
	
	private int limit;
	private int total;
	private boolean isComplete = false;
	
	Adder(int limit) {
		this.limit = limit;
	}
	
	boolean isComplete() {
		return isComplete;
	}
	
	int getTotal() {
		return total;
	}
	
	synchronized void add() {
		for(int i=0; i < limit; i++) {
			total += i;
		}
		isComplete = true;
		notifyAll();
	}
	
}

class Subscriber implements Runnable {
	
	private Adder adder;
	
	Subscriber(Adder a) {
		adder = a;
	}
	
	public void run() {
		String logPrefix = Thread.currentThread().getName() + " : ";
		synchronized(adder) {
			while(!adder.isComplete()) {
				try {
					adder.wait();
				} catch (InterruptedException e) {
				}
			}
			System.out.println(logPrefix + "Total : " + adder.getTotal());
		}
	}
}


public class NotifyAll {

	public static void main(String[] args) {
		Adder a = new Adder(200);
		Thread sub1 = new Thread(new Subscriber(a), "S1");
		Thread sub2 = new Thread(new Subscriber(a), "S2");
		Thread sub3 = new Thread(new Subscriber(a), "S3");
		sub1.start();
		sub2.start();
		sub3.start();
		a.add();
	}
}

Using wait() in a loop

  • Apart from the fact that wait() must be called only when the current thread owns the current object's lock, you should always check for the condition in which to call wait().
  • Why ? See example below:
class Reader implements Runnable {

	private StringBuffer strBuf;

	public Reader(StringBuffer strBuf) {
		super();
		this.strBuf = strBuf;
	}

	public void run() {
		synchronized(strBuf) {
			try {
				strBuf.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("Numbers from 1 to 1000 : \n" + strBuf);
		}
	}
}

class Writer implements Runnable {

	private StringBuffer strBuf;

	public Writer(StringBuffer strBuf) {
		super();
		this.strBuf = strBuf;
	}

	public void run() {
		synchronized(strBuf) {
			for(int i= 1; i <= 1000; i++) {
				strBuf.append(i + ",");
			}
				strBuf.notify();
		}
	}
}

public class WaitNotify {

	public static void main(String[] args) {
		StringBuffer strBuf = new StringBuffer();
		Thread reader = new Thread(new Reader(strBuf), "Reader");
		Thread writer = new Thread(new Writer(strBuf), "Writer");
		reader.start();
		writer.start();
	}

}
  • Here the reader() thread waits on the strBuf object for it to be populated first.
  • However what can happen is that writer thread can start first and complete its run method thus calling notify() before any other thread is waiting.
  • When reader then gets a chance to run it will issue a call to wait() and wait indefinitely since no-one else will be calling notify.
  • What does this imply ? After calling wait(), Thread can get a chance to be runnable, only when someone else calls notify() ! It wont just happen if the object lock is released by another thread.
  • The solution is to check for the condition which must be satisfied to reader to start waiting. In this case, only if the strBuf is empty, should reader thread wait for it to be populated by writer.
  • A while check is preferred over an if condition. This is because the thread can be notified() outside of our code, e.g. by the JVM.
  • So as soon as the thread wakes up the condition should be checked again. If the condition is true then notify() has been called legitimately, otherwise it is a spurious wake-up and thread should go back into wait(). This is accomplished through use of a while() loop.
Wrong Right
synchronized(this) {
    if(!condition)
        wait();
    }
}
synchronized(this) {
    while(!condition)
        wait();
    }
}

Thread Interrupts

  • An interrupt is an indication to a thread that it should stop what it is doing and do something else.
  • It's up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate.
  • Calling methods such as sleep() will throw an interrupted exception - the thread can ignore or terminate on catching the exception.


package tred;

class Job implements Runnable {
	
	public void run() {
		for(int i=0; i < 100; i++) {
			try {
				Thread.sleep(1000);
			} catch(InterruptedException e) {
				System.out.println("Interrupted");
				System.out.println("Flag : " + Thread.currentThread().isInterrupted());
				return;
			}
			System.out.print(i + " ");
		}
	}
	
}

public class Inter {

	public static void main(String[] args) {
		
		Thread t1 = new Thread(new Job());
		t1.start();
		
		try {
			Thread.sleep(5000);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		System.out.println("Interrupting T1 : "); 
		t1.interrupt();
		
	}
}
  • Interrupt status is maintained as an internal flag in the thread class.
  • Thread can check if its interrupted by calling the static Thread.interrupted(). Calling this method clears the interrupt status. Threads invoking methods which dont throw interrupteexception should check this flag periodically. Based on this, it can choose to terminate or continue (In below example, the thread terminates.)
  • To check another thread's status the non static t.isInterrupted() method can be used - this does not reset the interrrupt flag.
  • By convention, any method that exits by throwing an InterruptedException clears interrupt status when it does so. SO in the catch block of an InterruptedException, the interrupt flag will be false.
class Job implements Runnable {
	
	PrintWriter pw;
	
	Job()  {
		try {
			pw = new PrintWriter("/tmp/U571");
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}
	}
	
	public void run() {
		for(int i=0; i < 10000000; i++ ) {
			boolean b = Thread.interrupted();
			if(b) {
				System.out.println("Interrupted ! ");
				return;
			}
			pw.println("Mayday");
		}
		
		pw.close();
	}
	
}

public class Inter {

	public static void main(String[] args) {
		
		Thread t1 = new Thread(new Job());
		t1.start();
		
		try {
			Thread.sleep(100);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		System.out.println("Interrupting T1 : "); 
		t1.interrupt();
		System.out.println("T1 Interrupted Status " + t1.isInterrupted());
		
	}
}

Thread Exceptions

  • sleep(), join() and wait() throw a checked InterruptedException.
  • Whenever a thread goes into a waiting state - then it can be interrupted and therefore must handle the InterruptedException.
  • wait() and notify() also throw an unchecked IllegalMonitorStateException when the thread invoking them does not own the object lock.