Thursday Night

Paul Betts’s personal website / blog / what-have-you

ReactiveXaml Series: On combining notifications

I was excited today when I got my first Email post to the Google Group for RxXaml, and even better, it was a great question; what the poster was asking about really strikes to the core of why Rx and ReactiveXaml are compelling in my mind. In my experimentation, I’ve come across a number of useful patterns that I should’ve mentioned earlier – I showed you how to get the notifications, but not how to use them!

When I say ‘notification’, you have to read into this term very broadly and kind of stretch your brain a bit: as a reminder, here are some examples of what are notifications in Rx and RxXaml:

  • A simple .NET event (i.e. via Observable.FromEvent)
  • Whenever a property changes (via ReactiveObject)
  • When an ICommand is invoked (ReactiveCommand)
  • Any time any sort of asynchronous operation completes (via Observable.FromAsyncCommand, ReactiveAsyncCommand, or QueuedAsyncMRUCache)
  • In response to an explicit notification (via a Subject)

Combining notifications in meaningful ways

One of the most powerful parts of the Reactive Extensions is its ability to combine single events compositionally – when I describe what Rx is to people, I often use the description, “Rx gives you the ability to take simple events and combine them together into something more specific and useful – I don’t really care when the ‘MouseUp’ and ‘KeyDown’ events happen, I want to know when the ‘User dropped a file on the top left corner’ happens – tell me about that.”

To this effect, there are several tricks that we can do. The first one is, that you must remember that ReactiveObject fires its IObservable when any property changes – this means, that it’s very easy to watch an entire object. Often, this is useful enough – when it isn’t, Wherehelps you out:

ReactiveObject Toaster;

// Any change will print something
Toaster.Subscribe(x => Console.WriteLine("{0} changed!", x.PropertyName);

// This is an observable that only notifies when the Foo property changes
var FooChanged = Toaster.Where(x => x.PropertyName == "Foo");

Merge, CombineLatest, and Zip – the ‘And’ and ‘Or’ of Rx

So, to combine several IObservables, we have a few useful methods that stand out. The first is Observable.Merge: as its name implies, Merge takes several IObservables of the same type, and returns an IObservable that fires when any one of its inputs fires. Thinking in a boolean sense, Merge is kind of like Or. Having to be of the same type isn’t as onerous of a requirement:

IObservable[float] O1;
IObservable[int] O2;
IObservable[string] O3;

// Tell me when *any* of these 3 send a notification
var result = Observable.Merge(
    O1.Select(_ => true), O2.Select(_ => true), O3.Select(_ => true)
);

One of the difficulties of Merge that can sometimes bite you, is that it is stateless – when you get a notification about O1, you don’t have any knowledge about what items came in on O2 or O3. For two IObservables, we have a handy method called Observable.CombineLatest. This method will “remember” the last item that came in on both sides – when O1 changes, it will give you the new O1 and the latest value of O2. Furthermore, we can take the result and expose it as a change-notifying property via ObservableAsPropertyHelper.

// Subjects are just IObservables that we can trigger by-hand
// They’re the mutable variables of Rx
Subject[int] s1;
Subject[int] s2;

// Combine s1 with s2 and write its output to Console
s1.CombineLatest(s2, (a,b) => a * b).Subscribe(Console.WriteLine);

s1.OnNext(5)// Nothing happens, no value for s2

s2.OnNext(10)// 10 came in, combine the 10 with whatever s1 was (5)
>>> 50

s2.OnNext(20); // 20 came in, still use s1′s latest value
>>> 100

s1.OnNext(2); // s1 is 1, take s2′s latest value (20)
>>> 40

Finally, we have Observable.Zip. Like the other two, this function also combines observables, but this function like its IEnumerable counterpart, is only concerned about pairs of items. This means, it’s more like “And” than the other two (remember that it’s extremely unlikely that notifications will come in at the exact same time so an “Observable.And” wouldn’t make much sense). Zip will not yield elements until it has both of its “slots” filled for the next item.

Subject[int] s1;
Subject[int] s2;

s1.Zip(s2, (a,b) => a * b).Subscribe(Console.WriteLine);

s1.OnNext(2); // Nothing, no pair yet
s1.OnNext(5); // Still no pair
s2.OnNext(10); // We’ve got a pair (2,10), let’s send it down
>>> 20

s1.OnNext(10); // s2′s empty, no pair
s2.OnNext(1); // 5 * 1
>>> 5
s2.OnNext(10); // 10*10
>>> 100
s2.OnNext(100); // s1′s empty, no output

Combining Notifications for Visual State Manager

Here’s another clever trick that I really like – often, we need to change the visual state on a variety of different notifications of different types and are unrelated. Here’s how to do it:

IObservable[int] SomethingToWatch, SomethingElse;
IObservable[float] AThirdThing;
var state = Observable.Merge(
    SomethingToWatch.Select(_ => "State1"),
    SomethingElse.Select(_ => "State2"),
    AThirdThing.Select(_ => "State3")
);

state.Subscribe(x =>
    VisualStateManager.GoToState(this, x, true));

Observable.Merge can also be used along with Scan to keep a reference count, check out this example from ReactiveAsyncCommand where we use two observables and Select them to 1 and -1, then keep a running count via Scan.

Written by Paul Betts

August 14th, 2010 at 7:56 pm

Posted in Uncategorized