Introduction to Reactive Extensions

Recently I got an opportunity to work on a FX  trading (client facing) application. I specifically mention client facing because in this money world trading application has to be very responsive and fast. When I started working on this application, along with the other challenges the major one was to learn and understand the Reactive extensions (Rx). Fx trading application is a Reactive application and it has to be looking at its business domain and the software solution it provides.

But what do you mean by Reactive Application?
If we try to briefly understand the Reactive Applications, they are

  • Event driven – React to events
  • Scalable – React to loads
  • Resilient – React to failure
  • Responsive – React to users

All the above features imply how the application and environment works together. The reactive application passively waits for the environment to send the data and then reacts to it. This behavior can be achieved by Microsoft reactive extensions.

This article is intended to give an Introduction to Reactive Extensions(Rx).
To study the Rx to its depth you can find good material at below links

Definition

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequence and LINQ-style query operators.

Essentially Rx is built upon the foundations of the Observer pattern and the interfaces IObserver<T> and IObservable<T> forms the fundamental building blocks for it.

IObservable<T>

//Defines a provider for push-based notification.
public interface IObservable<out T>
{
//Notifies the provider the that an observer is to receive notifications.
IDisposable Subscribe(IObservable<T> observer);
}

It is a simple interface with just a Subscribe method. We can think anything that implements IObservable<T> as a streaming sequence of T objects (sequence of data in motion). Example IObservable<Price> refers srteam of Prices.

IObserver<T>

//Provides a mechanism for receiving push-based notifications.
public interface IObserver<in T>
{
//Provides the observer with new data.
void OnNext(T value);

//Notifies the observer that provider has experienced an error condition.
void OnError(Exception error);

//Notifies the observer that provider has finished sending push-based notification.
void OnCompleted();
}

An implementation of the IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError (Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by OnError(Exception) or by OnCompleted(). However this protocol does not demand that OnNext(T), OnError(Exception) or OnCompleted() ever be called.This enables to concept of empty and infinite sequences.

ISubject<T>

If we create our own implementation of IObservable<T> we may find that while we want to publicly expose the IObervable characteristics we still need to be able to publish items to the subscribers, throw errors and notify when the sequence is complete. That sounds more like the methods in IObserver<T>. So ISubject<TSource, TResult> implements both the IObservable<T> and IObserver<T> interfaces. The implementation of ISubject<T> reduces the learning curve for the developers new to Rx.


//Represents an object that is both an obsrevable sequence as well as an observer
public interface ISubject<in TSource, out TResult> : IOBserver<TSource>, IObservable<TResult>
{

}

Subscribing

The Subscribe extension method of observable allows us to

  1. Pass an action to be performed when OnNext is invoked for each incoming data.
  2. Throws an Exception in case of OnError notification.
  3. Pass an action OnCompeted when the source completes

There are different overloads of the Subscribe extension method but below is for the complete implementation.

IDisposable Subscribe<TSource>(this IObservable<TSource> source,
Action<TSource> onNext,
Action<Exception> onError,
Action onCompleted);

Unsubscribing

The return type of subscription is of type IDisposable. This disposable can be consider as the subscription itself, or perhaps a token representing the subscription. Disposing it will dispose the subscription and effectively unsubscribe.

The instance of IDisposable that is returned from subscription does not have a finalizer and will not be disposed when it goes out of scope. If you call a Subscribe method and ignore the return value you will lose your only handle to unsubscribe. The subscription will still exist and you will lose the access to this resource which could result in leaking memory  and running unwanted processes.

Rx Example

var tickingTimespan = TimeSpan.FromSeconds(5);

//source is the observable stream which gets the price for every 5 secs.
IObserverable<Price> source = Observable.Interval().Select(_=> GetPrice());

//Observable sourceis subscribed to react to incoming price.
//For every new price DisplayPrice method is called.
//On error if any can be handled with action 'ActionOnError'.
//On completion of the source stream ActionOnCompleted will be called.
IDisposable priceSubscription = source.Subscribe(DisplayPrice() ,ActionOnError(), ActionOnCompleted());

//Dispose is used to unsubscribe the priceSubscription.
priceSubscription.Dispose();

Flavors of Reactive Extensions

We also have Rx in different languages as below

  • Rx.NET: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.
  • RxJS: The Reactive Extensions for JavaScript (RxJS) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in JavaScript which can target both the browser and Node.js.
  • Rx++: The Reactive Extensions for Native (RxC) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
  • RxJava: A library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Get File CheckSum

The following C# code returns the checksum of the file given the path in strFilePath string.

using System.Security.Cryptography;
public string GetCheckSum(string strFilePath)
{
byte[] checksum = new byte[2000];
MD5 oMD5 = MD5.Create();
string strchecksum = “”;

try
{
using (FileStream stream = File.OpenRead(strFilePath))
{
checksum = oMD5.ComputeHash(stream);
}
strchecksum = BitConverter.ToString(checksum).Replace(“-“, String.Empty);
}
catch (Exception ex)
{
//Handle Exception
}

return strchecksum;
}

Comparing files using MD5

Following C# function takes the two files path as arguments and return true if these two file are
same or false in other case.

public bool CampareFiles(string strFilePath1, string strFilePath2)
{
bool bReturn = false;
System.Security.Cryptography.MD5 oMD5 = MD5.Create();
byte[] checksum = null;
string strchecksum1 = ” “;
string strchecksum2 = ” “;

try
{
//Getting Checksum for file 1 in string format
using (FileStream stream = File.OpenRead(strFilePath1))
{
checksum = oMD5.ComputeHash(stream);
//Converting in string format
strchecksum1 = BitConverter.ToString(checksum).Replace(“-“, String.Empty);
}

using (FileStream stream = File.OpenRead(strFilePath2))
{
checksum = oMD5.ComputeHash(stream);
strchecksum2 = BitConverter.ToString(checksum).Replace(“-“, String.Empty);
}

//Comparing Checksum
if (strchecksum1 == strchecksum2)
{
bReturn = false;
}
return bReturn;
}
catch (Exception ex)
{
//Catch Error here if any
}
return bReturn;
}

Using zlib.net

In one of our project,
we required to compress a file using zlib.net and store it in database in byte array format,
and on retrieving it, uncompressed byte array and create a file on disc.

Hope this post helps others …. !

download Zlib.net click here

Add reference of above dll.

using zlib;

public void CopyStream(Stream input, Stream output)
{
byte[] buffer = new byte[2000];
int len;
while ((len = input.Read(buffer, 0, 2000)) &gt; 0)
{
output.Write(buffer, 0, len);
}
output.Flush();
}

//compress function gets the file path to be compresss and fills in byte array arrCompressed
public void compress( ref byte[] arrCompressed, string strFileSrcPath)
{
MemoryStream oOutStream = new MemoryStream();
ZOutputStream ostream = new ZOutputStream(oOutStream, zlib.zlibConst.Z_DEFAULT_COMPRESSION);
FileStream inStream = new FileStream(strFileSrcPath, FileMode.Open);
try
{
CopyStream(inStream, ostream);
ostream.finish();
arrCompressed = oOutStream.ToArray();
}
finally
{
ostream.Close();
oOutStream.Close();
inStream.Close();
}
}

//uncompress function gets back file from compressed btye Array
public void uncompress( byte[] arrCompressed, string strFileDesName)
{
MemoryStream oInStream = new MemoryStream(arrCompressed);
ZInputStream oZInstream = new ZInputStream(oInStream);
MemoryStream oOutStream = new MemoryStream();

byte[] buffer = new byte[2000];
int len;
while ((len = oZInstream.read(buffer, 0, 2000)) > 0)
{
oOutStream.Write(buffer, 0, len);
}
oOutStream.Flush();
byte[] arrUncompressed = oOutStream.ToArray();
oZInstream.Close();
oOutStream.Close();
writeByteArrayToFile(arrUncompressed, strFileDesName);
}

//Writes Byte Array to Disc
public void writeByteArrayToFile(byte[] buff, string fileName)
{
try
{
FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.ReadWrite);
BinaryWriter bw = new BinaryWriter(fs);
bw.Write(buff);
bw.Close();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}