Asynchronous Transfer of Control
From Suhrid.net Wiki
Contents
Intro
- An ATC is where the point of execution of one SO is changed by the action of another SO.
- Therefore an SO may be executing on 1 method and through no action of its own, find itself executing in another method.
- Controversial:
- Complicates language semantics.
- Makes it difficult to write correct code as it may be interfered with.
- Increases complexity of the RTJVM.
- Slow down code which does not use the feature.
 
Requirements for ATC
- Fundamental requirement is for a thread to respond quickly to a condition detected by another thread.
- Error recovery: To support coordinated error recovery between real-time threads.
- Where several threads are performing a computation, an error detected by one thread needs to be quickly and safely communicated to other threads.
 
- Mode changes: Where changes between modes are expected but cannot be planned.
- A fault may lead to aircraft abandoning take off and entering emergency mode.
- An accident in a manufacturing plant may required mode change to shutdown the plant.
- Threads must be quickly and safely informed that the mode in which they are operating has changed and they now need to perform different sets of actions.
 
- Scheduling using partial/imprecise computations: Many algos whose accuracy depends on how much time has been allocated to their calculation, when times up thread must be interrupted to stop refinement of result.
- User Interrupts - Users may want to cancel current operation because they have detected an error condition and want to start again.
How to solve
- Polling - Polling for notifications is too slow.
- Aborts - Destroy the thread and allow another thread to perform some recovery. However destroying thread can be expensive and may be an extreme response. This can also leave system in inconsistent state (monitor locks may not be released)
- Therefore a control form of ATC is required.
RTSJ Basic Model
- Bring together Java exception handling and an extension of thread interruption semantics.
- When an RTThread is interrupted an asynchronous exception is thrown at the thread rather than the thread having to poll for the interruption, like in normal Java.
- How to program safely in the presence of asynchronous exceptions ?
- Most exception handling mechanism have exception propagation within termination model.
- For e.g. if a method throws an exception, then the program continues to execute in the context of the handler .. E.g. the catch block. It does not return to the context where the original exception was thrown. This makes it difficult to write code that is tolerant of an Asynchronous Exception being thrown at it.
- This means that every method would need a handler for root class of all ASE's.
 
- RTSJ solves this problem by requiring that all methods that are prepared to allow the delivery of an ASE place the ASE in their throws list.
- These methods are called Asynchronously Interruptible (AI) methods.
- If a method does not do this, not an AI method then the ASE is not delivered, but held pending until the SO is in a method which has the appropriate throw clause.
- Hence code written without being concerned with ATC can be used safely in an environment where ATCs are being used.
- To ensure ATC's are handled safely, RTSJ requires that
- synchronized methods and synchronized blocks are ATC deferred. Such methods alongwith non-AI methods are called ATC-Deferred sections. [TODO: Can we have a synchronized AI method ? what's the behaviour ?]
- ATC's can only be handled in ACT-Deferred sections. Why? To avoid the ATC handler being interrupted by another ATC being thrown ?
 
- So:
- AI Methods allow delivery of an ATC.
- Non-AI methods and ACT-Deferred regions allow handling of an ATC.
 
- Use of ATC requires
- AI Methods declare AsynchronouslyInterruptedException (AIE) in their throws list.
- Throwing an AIE at a SO. How ?
- Caling interrupt() in the RealtimeThread class throws the generic AIE.
 
 
- Example:
import javax.realtime.*;
public class ATC1 {
	public static void main(String[] args) throws InterruptedException {
		ATC1 a = new ATC1();
		a.go();
	}
	
	private void go() throws InterruptedException {
		
		final RealtimeThread rtt = new RealtimeThread() {
			public void run() {
				try {
					longRunningComputation();
				} catch (AsynchronouslyInterruptedException aie) {
					System.out.println("Caught AIE");
				}
			}
		};
		
		rtt.start();
		
		System.out.println("Waiting for 3 secs for RTT to finish..");
		Thread.sleep(3000);
		RealtimeThread interrupter = new RealtimeThread() {
			public void run() {
				System.out.println("Interrupting RTT");
				rtt.interrupt();
			}
		};
		interrupter.start();
		
	}
	
	private void longRunningComputation() throws AsynchronouslyInterruptedException {
		long l = 0;
		for(int i=0; i < Integer.MAX_VALUE; i++) {
			l = l + i;
		}
		System.out.println(l);
	}
}
AIE Semantics
- If t is executing in an ATC-Deferred region (means AIE cant be delivered, only handled) the AIE is marked as pending.
- A pending AIE is delivered as soon as t returns to an AI method.
- If t is executing in an AI method and is blocked inside a call to sleep() or join(), t is rescheduled and AIE is thrown.
- If t is executing in an ATC deferred region and is blocked inside a call to wait(), sleep(), join() t is rescheduled and is thrown as a synchronous exception and also marked as pending.
- Example for last case:
public class ATC1 {
	public static void main(String[] args) throws InterruptedException {
		ATC1 a = new ATC1();
		a.go();
	}
	
	private void go() throws InterruptedException {
		
		final RealtimeThread rtt = new RealtimeThread() {
			public void run() {
				try {
					launchSleep();
				} catch (AsynchronouslyInterruptedException aie) {
					System.out.println("Now the AIE is caught");
				}
			}
		};
		
		rtt.start();
		
		System.out.println("Waiting for 3 secs for RTT to finish..");
		Thread.sleep(3000);
		RealtimeThread interrupter = new RealtimeThread() {
			public void run() {
				System.out.println("Interrupting RTT");
				rtt.interrupt();
			}
		};
		interrupter.start();
		
	}
	
	private void launchSleep() throws AsynchronouslyInterruptedException {
		longRunningSleep();
		//AIE will be delivered here.
	}
	
	private void longRunningSleep() throws AsynchronouslyInterruptedException {
		try {
			Thread.sleep(10000);
		} catch (InterruptedException ie) {
			System.out.println("Interrupted from sleep"); //Wont print
		}
	}
}
- One example where the method is marked as AI, but has a synchronized block which makes it an ATC-Deferred section.
public class ATC4 {
	public static void main(String[] args) throws InterruptedException {
		ATC4 a = new ATC4();
		a.go();
	}
	
	private void go() throws InterruptedException {
		
		final RealtimeThread rtt = new RealtimeThread() {
			public void run() {
				try {
					launchComp();
				} catch (AsynchronouslyInterruptedException aie) {
					System.out.println("Caught AIE");
				}
			}
		};
		
		rtt.start();
		
		System.out.println("Waiting for 3 secs for RTT to finish..");
		Thread.sleep(3000);
		RealtimeThread interrupter = new RealtimeThread() {
			public void run() {
				System.out.println("Interrupting RTT");
				rtt.interrupt(); //Attempt to deliver the AIE to the thread.
			}
		};
		interrupter.start();
		
	}
	
	private void launchComp() throws AsynchronouslyInterruptedException {
		longRunningComputation();
		//AIE delivered here 
	}
	
	//AI method. Synchronized block acts like an ATC Deferred section - so AIE cant be delivered.
	private void longRunningComputation() throws AsynchronouslyInterruptedException {
			synchronized (this) {
				long l = 0;
				for (int i = 0; i < Integer.MAX_VALUE / 2; i++) {
					l = l + i;
				}
				System.out.println(l);
			}
	}
}
Difference from Java Exception Handling
- Only explicitly naming AIE (and not any subclass) in the throws clause indicates method is AI.
- Therefore catch clauses also must name only the AIE.
- Handlers for AIE's do not automatically stop the propagation of AIE's. It is necessary to call the clear method.
- The catch clauses in ATC Deferred regions that name InterruptedException or Exception will handle an AIE, they will not stop the propagation of an AIE.
- The catch clauses for AIE, IE and Exception in AI regions will not catch an AIE.
- Finally clauses in AI methods are not executed when an ATC is thrown.
- When a synchronous exception is propagating in an AI method and there is a pending AIE, the synchronous exception is lost when AIE is pending/thrown.
Catching an AIE
- When a handler catches an AIE, it must determine if it is the same AIE as the one it expects. ( Because many different AIE's can be thrown, but catch block is for the generic AIE)
- If it is, AIE should be handled.
- Otherwise AIE must be propagated down the stack to the calling method.
- The clear method is used for this purpose.
Clear Semantics
- If the AIE is the current AIE, the AIE is no longer pending; true is returned.
- If the AIE is not the current AIE, the AIE is still pending; false is returned.
Interruptible
- A class which wants to provide an interruptible method can do so by implementing the Interruptible interface.
- The interruptible interface has two methods:
- public void run(AIE) throws AIE. This is the method that is interruptible.
- public void interruptAction(AIE). This is the method that is called by the RTJVM when the run() method is interrupted.
 
- Any interruptible object can be passed as a parameter to the doInterruptible() method in the AIE class.
- The doInterruptible() method then calls the run() method of the object, passing the associated AIE as a parameter.
- If a SO is executing a method this way, it can then be interrupted by calling the fire() method of the AIE. (As opposed to calling RT.interrupt() which only fires the generic AIE).
- Note that only one doInterruptible method for a particular AsynchronouslyInterruptedException can be active at one time. If a call is outstanding, the doInterruptible() method returns immediately with a false value.
- AIE can be further controlled using the enable(), disable() methods.
- Firing the AIE has no effect, if there is no active doInterruptible() for that AIE.
Nested ATC
- Given that AsynchronouslyInterruptedException can be deferred, it is possible for multiple ATC requests to be deferred.
- Consider below example:
- method3() uses aie3 to execute an Interruptible method (method2)
- method2() uses aie2 to execute an ATC-deferred method (method1).
- Now assume aie2 is fired while method1() is in progress.
- After a short while aie3 is fired. Now aie-3 becomes the current aie because it is at a higher level of nesting. aie2's fire event is discarded.
- When method1() completes, aie2's interruptaction is called, since aie3 is not aie2, aie3 is held pending till it reaches interruptaction of aie3.
public class NestedATC {
	
	private AsynchronouslyInterruptedException aie1 = new AsynchronouslyInterruptedException();
	private AsynchronouslyInterruptedException aie2 = new AsynchronouslyInterruptedException();
	private AsynchronouslyInterruptedException aie3 = new AsynchronouslyInterruptedException();
	public static void main(String[] args) throws InterruptedException {
		NestedATC n = new NestedATC();
		n.go();
	}
	
	//AIE-Deferred method
	private void method1() {
		System.out.println("In method 1.. performing computation");
		long l = 0;
		for (int i = 0; i < Integer.MAX_VALUE / 2; i++) {
			l = l + i;
		}
		System.out.println(l);
	}
	
	private void method2() {
			aie2.doInterruptible(new Interruptible() {
				
				public void run(AsynchronouslyInterruptedException exception)
						throws AsynchronouslyInterruptedException {
					method1();
				}
				
				public void interruptAction(AsynchronouslyInterruptedException exception) {
					System.out.println("AIE2 Interrupt Action ");
				}
			});
	}
	
	private void method3() {
					aie3.doInterruptible(new Interruptible() {
						
						public void run(AsynchronouslyInterruptedException exception)
								throws AsynchronouslyInterruptedException {
								method2();
						}
						
						public void interruptAction(AsynchronouslyInterruptedException exception) {
							System.out.println("AIE3 Interrupt Action");
						}
					});
		
	}
	
	private void go() throws InterruptedException {
		RealtimeThread rtt = new RealtimeThread() {
			public void run() {
				method3();
			}
		};
		
		rtt.start();
		
		Thread.sleep(1000);
		System.out.println("Firing aie2 .. ");
		System.out.println("Returned :  " + aie2.fire());
		Thread.sleep(500);
		System.out.println("Firing aie3 .. ");
		System.out.println("Returned :  " + aie3.fire());
	}
	
}
Timed Class
- There is a subclass of ASE called Timed. A time value is passed to it when being created.
- Now when doInterruptible() of Timed class is called, the timer is started.
- Timer can be reset for the next call of doInterruptible() by using resetTime() method.
- Consider the following example:
- Used the Timed ASE to interrupt an optional computation part after some time.
import javax.realtime.*;
class ImpreciseResult {
	
	private long value;
	private boolean preciseResult;
	
	public long getValue() {
		return value;
	}
	
	public void setValue(long value) {
		this.value = value;
	}
	
	public boolean isPreciseResult() {
		return preciseResult;
	}
	
	public void setPreciseResult(boolean preciseResult) {
		this.preciseResult = preciseResult;
	}
	
}
public class ImpreciseComputation {
	
	private HighResolutionTime completionTime;
	private ImpreciseResult result;
	
	public ImpreciseComputation(HighResolutionTime T)
	{
		completionTime = T; 
		result = new ImpreciseResult();
	}
	
	private long compulsoryPart() {
		System.out.println("Compulsory Part started");
		long l = 0;
		for (int i = 0; i < Integer.MAX_VALUE / 3; i++) {
			l = l + i;
		}
		System.out.println("Compulsory Part done");
		return l;
	}
	
	public ImpreciseResult compute() {
		result.setPreciseResult(false);
		final long compValue = compulsoryPart();
		result.setValue(compValue);
		
		Interruptible optionalTask = new Interruptible() {
			
			public void run(AsynchronouslyInterruptedException exception)
					throws AsynchronouslyInterruptedException {
				System.out.println("Optional Part started");
				long val = 0;
				for (int i = 0; i < Integer.MAX_VALUE / 3; i++) {
					val = val + i;
				}
				System.out.println("Optional Part done");
				result.setValue(compValue);
				result.setPreciseResult(true);
			}
			
			public void interruptAction(AsynchronouslyInterruptedException exception) {
				System.out.println("Interrupted while doing optional part");
				result.setPreciseResult(false);
			}
		};
		
		Timed t = new Timed(completionTime);
		
		System.out.println("Calling optional task doI() returned : " + t.doInterruptible(optionalTask));
		
		return result;
	}
	public static void main(String[] args) {
		HighResolutionTime deadline = new RelativeTime(60000,0);
		final ImpreciseComputation ic = new ImpreciseComputation(deadline);
		RealtimeThread rtt = new RealtimeThread() {
			public void run() {
				ImpreciseResult result = ic.compute();
				System.out.println("Computed result ");
				System.out.println("Value : " + result.getValue());
				System.out.println("Precision : " + result.isPreciseResult());
			}
		};
		
		rtt.start();
		
	}
}
