Introduction
The Concurrency and Coordination Runtime (CCR) is an asynchronous message-passing library for the .NET platform that provides a set of small but powerful primitives that enable a different approach to structuring applications. The effective use of the CCR will result in an application that will be more responsive, will scale better and will be more robust. The intriguing aspect about it is that it can deliver these benefits at the same time as reducing (and sometimes removing) the need for you to explicitly deal with threads (and failures on these threads), locks, mutexes and other low-level synchronization primitives.
If your application is currently single-threaded, it offers the possibility of increasing responsiveness and better utilizing the available cores, whilst retaining the conceptual simplicity of your original code-base. Alternatively, if your application is already multi-threaded, the CCR can match (and sometimes improve) your system throughput, whilst simplifying your codebase.
In particular the CCR provides:
- A simple, high-performance message-passing implementation. Extremely lightweight, type-safe channels connect objects in an actor-oriented view of the world.
- A set of primitive scheduling constraints. Scheduling is really what the CCR is about. You spawn tasks and post messages to other components in your process and declare, via objects called Arbiters the constraints under which you can deal with the results, as well as incoming requests. The CCR will ensure that these constraints are honored before executing your code.
- A better model to reason about failure. The CCR provides causalities, effectively a means of propagating a context through a series of related asynchronous sub-tasks, such that if any task fails (i.e. throws an exception), then that failure can be dealt with in isolation, in a single-place, without regard to the thread that originally raised it.
- Better utilization of available (and future) processing power. The CCR will schedule work over either the existing thread-pool or, if you prefer, its own custom implementation, which in some cases offers better performance. Nevertheless, your code-exposure to this mechanism is normally kept to a minimum.
- Easier integration of asynchronous I/O operations. The key to improving scalability and performance of individual processes is often down to improving the efficiency of I/O-bound operations. As these operations tend to be orders of magnitude slower than compute-bound tasks, any blocking I/O has the effect of suspending useful resources (in this case, threads), preventing them from being used for any other pending work. By dispatching I/O operations asynchronously, these resources are free to process other tasks until the operations completes. However, the sequencing of asynchronous operations often separates the initiation of each operation from its completion, resulting in source code that is difficult to follow. The CCR uses a novel implementation of C# iterators to bring these operations under control.
By 'asynchronous message-passing', we mean that components communicate by sending data to each other, and what's more, the data and any subsequent reply have no guaranteed temporal relationship. A sent message will probably be processed at some point in the future and any reply will probably be received at some time after that.
Whilst in reality, the purely intra-process use of the CCR provides somewhat stronger guarantees than this, this approach is essential to most models of inter-process computation, where the possibility of failure is everywhere. This makes the CCR primitives useful not only for I/O at a low-level, but also as a building-block for constructing scalable, distributed systems.
The Fundamental Types of the CCR
There are a small number of basic types that make up the CCR:
- Tasks. Arbitrary pieces of (your) code to execute.
- TaskQueues. Or DispatcherQueues to be exact. Just a list of tasks awaiting execution. Threads from either the vanilla CLR thread pool or a custom CCR thread-pool (known as a Dispatcher) will remove tasks from these queues and execute them.
- Ports. These are the in-process message queues that connect components. Essentially nothing more than linked-lists, producers place messages in ports. The CCR provides generic overloads for type-safety.
- Arbiters. These are the primitives of the CCR and they provide the glue between a port and task. They define the contraints under which the task should be created when a message arrives on a port and which task queue will receive it. There are various arbiters that ship with the CCR and many of them can be composed to form higher-level constructs.
With these basic concepts in mind, let's look at some simple CCR code. First, let's define a simple C# console application that we will use to host all the examples. Note that in this application, we use a custom CCR thread-pool (the Dispatcher) and tie our task queue to it. This means that tasks from this queue will execute over threads belonging to this custom pool.
static void Main(string[] args) { using (var dr = new Dispatcher()) { using (var taskQueue = new DispatcherQueue("samples", dr)) { // Examples will go here...
// Need a blocking call to prevent the application
// exiting.
Console.ReadLine(); } } }
Although the samples here only run on a single task queue, the use of multiple task queues is encouraged. The CCR will round-robin amongst the registered task queues when retrieving tasks, preventing a single task queue from starving others.
First, let's just enqueue a task directly. This is the simplest way to execute a task under the CCR and we don't even need a port. The Arbiter class contains a number of these convenience methods and FromHandler() just creates a task from a delegate - in this case, an anonymous one. This is then placed on our task queue to be executed by our dispatcher.
// Enqueue a task directly taskQueue.Enqueue(Arbiter.FromHandler(() => Console.WriteLine("Hello, world.")));
It's not that common to enqueue tasks directly; normally, there's a port involved somewhere. In the next snippet, we'll define a port, an arbiter, and then we'll post to the port. In this case we have a typed port of String, and a handler whose signature takes a string.
// Post a message to a port to schedule a task. var port = new Port<string>(); Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine)); port.Post("Hello (again), world");
There's a number of things going on here and it pays to understand the process. The port.Receive() call creates the simplest possible arbiter, a 'receiver', that is satisfied as soon as a single message arrives on the port. On arrival, a task is created that essentially calls our handler with the message. The Arbiter.Activate() call ties the created task to the specified task queue.
The most critical thing to understand about CCR arbiters is that they do not block threads. Once activated the receiver is consuming nothing more than a few bytes of memory, and the thread is free to process other pending tasks.
An important CCR concept is that arbiters can be defined at any time. So, as illustrated below, we can take the above example and switch round the last two lines - the effect is the same.
// Post a message to a port to schedule a task. var port = new Port<string>(); port.Post("Hello (again), world"); Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
Now, let's alter this sample slightly - let's put two messages into the port before we activate the receiver and see what happens…
// Post a message to a port to schedule a task. var port = new Port<string>(); port.Post("Hello (again), world"); port.Post("Hello (thrice), world"); Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
Now, if you run this, you'll see that only the first message is printed out. This is because the port.Receive() call is an extension method that simplifies, but is equivalent to, the following syntax:
Arbiter.Activate( taskQueue, Arbiter.Receive(false, port, Console.WriteLine));
The critical parameter is the first (Boolean) parameter passed to Arbiter.Receive(). This indicates that the receiver is transient; after processing a single message it will be discarded. If we want to process all messages that arrive on the port, we can flip the value of this parameter.
// Post a message to a port to schedule a task. var port = new Port<string>(); port.Post("Hello (again), world"); port.Post("Hello (thrice), world"); Arbiter.Activate( taskQueue, Arbiter.Receive(true, port, Console.WriteLine));
Now, the above code can occasionally produce a surprising result - the lines may be printed out of order! What's going on here?
In the CCR, as soon as an arbiter (in this case, a Receive) is satisfied, it generates a task to process the relevant message. Unless, the arbiter is nested as part of a larger composition, these tasks will be placed on a task queue where they will be scheduled for execution. In our example above, the persistent receiver immediately results in two tasks, one for each item. If there are sufficient threads available, these tasks may run concurrently and hence may run out of order.
The CCR thread-pool implementation differs from the CLR thread-pool in a number of ways. Most importantly, it has a fixed number of threads, determined at construction. As it expects no blocking operations to be executed on its threads, this doesn’t typically matter, but if you must make blocking calls, schedule them to a task queue over the CLR thread-pool which can grow and shrink dynamically. Such a task queue can be created by calling the default DispatcherQueue constructor.
There are a number of ways to constrain this behavior so that ordering is preserved. Perhaps the simplest is to sit in a loop with a transient receiver and process one at a time. And fortunately for us, the CCR contains a very powerful mechanism, called Iterative Tasks that allows us to express this in a fairly natural way, by exploiting C# iterator functionality. Let's look at an example.
First, let's replace the existing Arbiter.Activate() call with this one:
Arbiter.Activate( taskQueue, new Arbiter<Arbiter<string>>(port, ProcessMessages));
This schedules a new iterative task called ProcessMessages, defined below:
static IEnumerator<ITask> ProcessMessages(Port<string> port) { while (true) yield return port.Receive(Console.WriteLine); }
This method loops indefinitely, waiting (but not blocking) for the receive to be satisfied. As soon as it is, the handler is invoked and the loop continues. If we wanted to loop until the empty string arrives on the port, we could have written (note the use of lambda syntax to handle the message within an anonymous delegate):
static IEnumerator<ITask> ProcessMessages(Port<string> port) { bool fDone = false; while (!fDone) { yield return port.Receive(message => { if (String.IsNullOrEmpty(message)) fDone = true; else Console.WriteLine(message); }); } Console.WriteLine("Finished"); }
Iterators are an extremely powerful tool within the CCR toolset - they make it possible to write a sequence of asynchronous, non-blocking operations in a manner close to their synchronous counterparts. Iterative tasks may also be nested - for example, a higher level task could yield to ProcessMessages() as follows:
static IEnumerator<ITask> TopLevelTask(Port<static> port) { // Yield to the nested task yield return new IterativeTask<Port<string>>(port, ProcessMessages); Console.WriteLine("Finished nested task."); }
So far, we've looked only at simple receivers - arbiters that schedule tasks when a single item is posted to a single port. It's time to introduce some higher-level arbitrations - these provide the glue that nest the receivers into more powerful constructs.
The most common of these is Choice. It simply chooses one and only one of n receivers to activate. For example, the following iterative task waits for either a string or a signal before proceeding:
static IEnumerator<ITask> ProcessChoice(PortSet<string, EmptyValue> portSet) { bool fDone = false; while (!fDone) { yield return portSet.Choice( message => Console.WriteLine(message), signal => fDone = true); } Console.WriteLine("Finished"); }
Choice is most often used to determine the success or failure of asynchronous operation, but can be used to select one of an arbitrary number of receivers.
A PortSet is a wrapper around one or more independent ports, that makes it convenient to pass them round as a single entity. A common example from the CCR is the SuccessFailurePort, itself derived from PortSet
The other common arbiter is Join. This is satisfied when both nested receivers are satisfied. The following example illustrates the principle:
var port1 = new Port<int>(); var port2 = new Port<int>(); port1.Post(1); port2.Post(3); Arbiter.Activate( taskQueue, Arbiter.JoinedReceive(false, port1, port2, (x, y) => { Console.WriteLine(x + y); }));
Join is very useful for throttling access to a limited resource. The first port contains incoming requests for a resource, the second contains available resources. By using a Join, we can constrain requests to only run when there are resources available to service them.
The other common higher-level arbitration is the Interleave. It's conceptually similar to a reader-writer lock, only with asynchronous non-blocking semantics. Reader tasks can execute concurrently with other readers, but writer tasks (which take priority over readers) may only execute when no other tasks are running. The following is the declaration of an interleave protecting some notional 'cache'.
var updatePort = new Port<UpdateCache>(); var queryPort = new Port<QueryCache>(); var stopPort = new Port<Shutdown>(); var interleave = new Interleave( new TeardownReceiverGroup( Arbiter.Receive(false, stopPort, ClearDownCache)), new ExclusiveReceiverGroup( Arbiter.Receive(true, updatePort, OnUpdateCache)), new ConcurrentReceiverGroup( Arbiter.Receive(true, queryPort, OnQueryCache))); Arbiter.Activate(taskQueue, interleave);
Here, persistent receivers are placed within their appropriate receiver groups that indicate their allowed interleavings. Any receivers placed within the ConcurrentReceiverGroup() can have their associated tasks run concurrently with each other. Conversely, receivers placed with the ExclusiveReceiverGroup have their tasks run in isolation to all others. In addition, by placing receivers within this group, we can constrain the order of their task execution to the order of posting. Any receiver in the TeardownReceiverGroup will shutdown the interleave and be the last handler that it will execute - for this reason such receivers cannot be persistent.
The Interleave will round-robin amongst its receivers where possible in order to ensure fair scheduling. Additionally, ordering of processing, even within the ExclusiveReceiverGroup is only on a per-port basis. Two messages on independent ports will not necessarily be processed in the same order as they were posted relative to each other.
As mentioned earlier, CCR iterative tasks allow us to write logical sequences of asynchronous non-blocking operations in a style that's closer to how we'd write them as simple synchronous blocking operations. Frequently, these asynchronous operations will be I/O bound, perhaps from a web-request, a database operation or even basic file I/O.. By enabling these operations to come more easily under our control, we have the opportunity to use asynchronous I/O more effectively and significantly increase the throughput and scalability of our applications.
An important pattern for bridging between the APM world of BeginXXX/EndXXX and the CCR is based on the observation that the AsyncCallback delegate, which has the form:
public delegate void AsyncCallback(IAsyncResult ar);
is satisfied by the Post operation of Port<IAsyncResult>. Using this pattern, we can write an asynchronous file copy as the following IterativeTask.
static IEnumerator<ITask> Copy(FileStream source, FileStream target) { var buffer = new byte byte[128 * 1024]; var port = new Port<IAsyncResult>(); var bytesRead = 0; do { source.BeginRead(buffer, 0, buffer.Length, port.Post, null); yield return Arbiter.Receive( false, port, iar => bytesRead = source.EndRead(iar)); target.BeginWrite(buffer, 0, bytesRead, port.Post, null); yield return Arbiter.Receive( false, port, iar => target.EndWrite(iar)); } while (bytesRead > 0); }
Essentially the asynchronous operations are instructed to post the IAsyncResult to our port upon completion. Once underway, we yield until the receive on the port is satisfied, at which point we process the completion and proceed to the next step. This is a fully asynchronous implementation (we could potentially spawn thousands of these operations across just a few threads), but notice how the intent of the code remains clear - we read a block, then write a block and then repeat until done.
In order to keep the file copy example as simple as possible, exception-handling has been omitted but more robust code would have to deal with failure either of the read or the write. Normal try/catch won't cut it here, because this form of exception-handling has thread-affinity and yet CCR tasks could be running on one of any of the available threads - indeed, in the case of iterative tasks, each 'step' of the task may run on a different thread to the previous one.
There are essentially two approaches to exception-handling within the CCR. The first is to explicitly deal with errors by encoding operations to catch and propagate them through ports. However, this can cause considerable code-bloat for both the caller and the callee. In the previous example, the file copy would require substantial changes to explicitly deal with failure. The following sample shows just the changes required to the read side.
static IEnumerator<ITask> Copy( FileStream source, FileStream target, SuccessFailurePort resultPort) { var buffer = new byte [128 * 1024]; var port = new Port<IAsyncResult>(); var bytesRead = 0; do { // Deal with a failure on the BeginRead try { source.BeginRead(buffer, 0, buffer.Length, port.Post, null); } catch (Exception e) { resultPort.Post(e); yield break; } // Deal with a failure on the EndRead yield return Arbiter .Receive( false, port, iar => { try { bytesRead = source.EndRead(iar); } catch (Exception e) { resultPort.Post(e); } }); // Stop processing if there is a failure. if (bytesRead == 0) yield break; // And so on for the write... } while (bytesRead > 0); resultPort.Post(new SuccessResult ()); }
It should be fairly obvious that this approach is unwieldy and error-prone - not desirable characteristics in any code, let alone error-handling routines.
Fortunately, the CCR has cleaner and much more powerful support for exception-handling through a mechanism called Causalities. Not only do these allow our code to remain free from the explicit handling outlined above, but once declared, they flow down the execution paths of our tasks, regardless of thread, allowing us to declare a single point of failure-handling for an arbitrarily complex graph of asynchronous operations.
To declare and use a causality, the typical pattern is to instantiate it, attach a handler to its exception port and inform the CCR about it e.g.:
var exceptionPort = new Port<Exception>(); Arbiter.Activate(taskQueue, exceptionPort.Receive(e => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("Caught " + e.ToString()); Console.ResetColor(); })); Dispatcher.AddCausality(new Causality("Example", exceptionPort));
Having done this, we can then just spawn our tasks or post our messages. The causality will flow with these and any exception not explicitly caught and handled by any task in the subsequent execution graph will be routed, via the causality, and into our exception port and onto our handler.
Conclusion
In conclusion, the CCR offers some substantial opportunities to express your application in terms of its fundamental data-dependencies, describing schedules of tasks that the runtime then distributes seamlessly over the available cores. This simultaneously frees you from explicitly dealing with threads and locks whilst enabling your application to take full advantage of the increasingly powerful multi-core machines that software will run on in the future.
Note: All opinions expressed here are those of the author and not of his employer.
All of the samples in this article may be run against the freely available 'Microsoft Robotics Developer Studio 2008 Express Edition'. Please check the license-agreement for further details.