Recently I got an opportunity to work on a FX trading (client facing) application. I specifically mention client facing because in this money world trading application has to be very responsive and fast. When I started working on this application, along with the other challenges the major one was to learn and understand the Reactive extensions (Rx). Fx trading application is a Reactive application and it has to be looking at its business domain and the software solution it provides.
But what do you mean by Reactive Application?
If we try to briefly understand the Reactive Applications, they are
- Event driven – React to events
- Scalable – React to loads
- Resilient – React to failure
- Responsive – React to users
All the above features imply how the application and environment works together. The reactive application passively waits for the environment to send the data and then reacts to it. This behavior can be achieved by Microsoft reactive extensions.
This article is intended to give an Introduction to Reactive Extensions(Rx).
To study the Rx to its depth you can find good material at below links
Definition
The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequence and LINQ-style query operators.
Essentially Rx is built upon the foundations of the Observer pattern and the interfaces IObserver<T> and IObservable<T> forms the fundamental building blocks for it.
IObservable<T>
//Defines a provider for push-based notification. public interface IObservable<out T> { //Notifies the provider the that an observer is to receive notifications. IDisposable Subscribe(IObservable<T> observer); }
It is a simple interface with just a Subscribe method. We can think anything that implements IObservable<T> as a streaming sequence of T objects (sequence of data in motion). Example IObservable<Price> refers srteam of Prices.
IObserver<T>
//Provides a mechanism for receiving push-based notifications. public interface IObserver<in T> { //Provides the observer with new data. void OnNext(T value); //Notifies the observer that provider has experienced an error condition. void OnError(Exception error); //Notifies the observer that provider has finished sending push-based notification. void OnCompleted(); }
An implementation of the IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError (Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by OnError(Exception) or by OnCompleted(). However this protocol does not demand that OnNext(T), OnError(Exception) or OnCompleted() ever be called.This enables to concept of empty and infinite sequences.
ISubject<T>
If we create our own implementation of IObservable<T> we may find that while we want to publicly expose the IObervable characteristics we still need to be able to publish items to the subscribers, throw errors and notify when the sequence is complete. That sounds more like the methods in IObserver<T>. So ISubject<TSource, TResult> implements both the IObservable<T> and IObserver<T> interfaces. The implementation of ISubject<T> reduces the learning curve for the developers new to Rx.
//Represents an object that is both an obsrevable sequence as well as an observer public interface ISubject<in TSource, out TResult> : IOBserver<TSource>, IObservable<TResult> { }
Subscribing
The Subscribe extension method of observable allows us to
- Pass an action to be performed when OnNext is invoked for each incoming data.
- Throws an Exception in case of OnError notification.
- Pass an action OnCompeted when the source completes
There are different overloads of the Subscribe extension method but below is for the complete implementation.
IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
Unsubscribing
The return type of subscription is of type IDisposable. This disposable can be consider as the subscription itself, or perhaps a token representing the subscription. Disposing it will dispose the subscription and effectively unsubscribe.
The instance of IDisposable that is returned from subscription does not have a finalizer and will not be disposed when it goes out of scope. If you call a Subscribe method and ignore the return value you will lose your only handle to unsubscribe. The subscription will still exist and you will lose the access to this resource which could result in leaking memory and running unwanted processes.
Rx Example
var tickingTimespan = TimeSpan.FromSeconds(5); //source is the observable stream which gets the price for every 5 secs. IObserverable<Price> source = Observable.Interval().Select(_=> GetPrice()); //Observable sourceis subscribed to react to incoming price. //For every new price DisplayPrice method is called. //On error if any can be handled with action 'ActionOnError'. //On completion of the source stream ActionOnCompleted will be called. IDisposable priceSubscription = source.Subscribe(DisplayPrice() ,ActionOnError(), ActionOnCompleted()); //Dispose is used to unsubscribe the priceSubscription. priceSubscription.Dispose();
Flavors of Reactive Extensions
We also have Rx in different languages as below
- Rx.NET: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.
- RxJS: The Reactive Extensions for JavaScript (RxJS) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in JavaScript which can target both the browser and Node.js.
- Rx++: The Reactive Extensions for Native (RxC) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
- RxJava: A library for composing asynchronous and event-based programs using observable sequences for the Java VM.