Thursday Night

Paul Betts’s personal website / blog / what-have-you

Making Async I/O work for you, Reactive style

Earlier today, I read a fantastic article about the TPL by Scott Hanselman. In it, he describes how to take a fairly straightforward function to detect if a given Url responds, and write it in an asynchronous fashion. As soon as I read it, I knew that I had to write the Reactive Extensions for .NET version!

How do the TPL and Rx.NET relate?

Both of these technologies are intended to help make writing asynchronous and concurrent programs easier and more straightforward, so it’s really easy to be confused as to which one to use. You can often think of Task and IObservable for async calls as the same thing – an object that represents a future result that hasn’t completed yet – we saw this in a previous blog post. In an asynchronous function, we send out the request, but we don’t have the data – we have to return something that will allow us to eventually get the result.

When it comes down to it, Task is really a specialization of IObservable – Task is specifically designed to run on the TPL threadpool, whereas IObservable abstracts away where the code will execute unless you specify it explicitly.

Seeing the problem again

Let’s take a look at the synchronous version of the code again – we want to take this and rewrite it so that it doesn’t block:

Writing our initial stab at VerifyUrlAsync

Just like Scott’s Task-based async function, we’ll also define a function that returns a future result. However, instead of using Task as our return type, we’ll define a function that returns IObservable:

Now, let’s see the implementation:

How can we use this?

This method will not block, it will instantly return you an IObservable representing the future result. So, there are a couple of ways you can use the Observable to “unpack” the result:

Now, let’s see how we can do arrays:

The truly revolutionary thing about Rx.NET is how the same primitive you used in LINQ now take on awesome new meanings when applied to the domain of the future. So, the first thing we’ll do is take our array and convert it to an IObservable via AsObservable. This means that the resulting IObservable will produce n items, one for each element in the array, then OnComplete.

The natural thing we would do to get the result is someObservable.Select(x => ValidateUrlAsync(x)). However, there’s a problem – our type is now IObservable<IObservable<T>>; we now have a “future list of futures” (thinking of IObservable as a “future list” is a good analogy, whereas the web call is just a “future list” with only one item). We need a way to flatten down our future list back to IObservable<T> – so what’s the way to flatten a list in LINQ? SelectMany, of course! SelectMany is the secret behind writing async Rx code. Let’s see how to do it:

The code above is still asynchronous – at no time will we block, and it will instantly return. The SelectMany’s default IScheduler will run items on the TaskPool (actually in this case, we never used any synchronous method so we will never block, even on a Threadpool thread. To get the result, similar to the above method, we’d have to call First() on it.

If we were to dump the IObservables at every point of the computation, it’d look something like this:

[ "http://foo", "http://bar" ] ===>

[ {"http://foo", false}, {"http://bar", false} ] ===>

[ Dictionary ]

Cool! Where can I learn more?

  • The Rx Hands-on-lab is an awesome, thorough, and technically correct introduction to Rx.NET
  • The Rx.NET forums are full of really smart, helpful people – I’ve learned a ton by reading through the forum posts
  • The Rx.NET videos on Channel 9 are a great resource – the developers behind the library itself explain the concepts in an easy-to-understand way
  • My blog series on ReactiveXaml and Rx.NET is also a good way to understand many practical uses of Rx, especially if you’re writing desktop / Silverlight / WP7 apps.

Written by Paul Betts

November 16th, 2010 at 12:17 am