Posts in  commodities

6.20.2013

Part 3 - Integrating with ddfplus - Reactive benefits with commodity option barrier events

Barchart Market Data Solutions provides real-time market data feeds (ddfplus quote service) for equities, indices, futures and foreign exchange markets. I've worked with their platform in the past which makes this a great opportunity to share how these techniques work with a real system.

Creating the observable quote stream

In Part 1 I described how to wrap an event based interface to create an observable quote stream. The ddfplus API comes with a simple Client interface exposing several events, one of which we can use to be notified of new quotes (NewQuote). Here's how we create the quote stream:

    private static IObservable<Quote> CreateQuoteStream(Client client)
    {
        return Observable
            .FromEventPattern<Client.NewQuoteEventHandler, Client.NewQuoteEventArgs>(h => client.NewQuote += h, h => client.NewQuote -= h)
            .Select(e => e.EventArgs.Quote);
    }

Thanks to using a real system and running an integration test, I discovered that FromEventPattern is the overload to use, the reason: use FromEventPattern if the events follow the standard .net (sender, args) convention. Lastly, I added a Select to project just the quote into my new quote stream.

Testing

If you would like to add some confidence in this transformation, consider a smoke test. The client will always return the "last" quote per symbol upon connecting. This will give you the ability to expect some data in a smoke test.

An alternative or complementary strategy would be to setup notifications if you don't receive quotes when you expect them. This is a great way to proactively address risk in a real time system. I'll address techniques to add this monitoring in subsequent posts.

Beyond this there's not much to test as the Rx framework is doing the heavy lifting and is well tested.

ddfplus quote source

Obviously hooking up to the event isn't enough, we need to tell the client to start listening for data and what to listen for. Fortunately this is very simple:

public class ddfplusQuoteSource
{
    private readonly Client _Client;
    public readonly IObservable<Quote> QuoteStream;

    public ddfplusQuoteSource()
    {
        _Client = new Client();
        QuoteStream = CreateQuoteStream(_Client);
        SetupConnection();
    }

    private static IObservable<Quote> CreateQuoteStream(Client client)

    private void SetupConnection()
    {
        Connection.Username = Config.UserName;
        Connection.Password = Config.Password;
    }

    public void Start(IEnumerable<string> symbols)
    {
        _Client.Symbols = String.Join(",", symbols);
    }

    public void Stop()
    {
        _Client.Symbols = string.Empty;
        Connection.Close();
    }
}

Steps

  • Create a Client instance.
  • Create the observable using our CreateQuoteStream above.
  • Set user name and password.
  • To start, provide a list of comma delimited symbols.
  • To stop, clear the symbol list.

Testing this out

Now let's put this to use to print quotes to the console:

var source = new ddfplusQuoteSource();
source.QuoteStream.Subscribe(PrintQuote);
var symbols = new[] {"ZC^F"};
source.Start(symbols);

Steps

  • Create our observable adapter ddfplusQuoteSource
  • Subscribe and print quotes using PrintQuote
  • Start listening for CME Globex Corn futures, ZC indicates CME Globex Corn, ^F indicates all futures contracts

Here's what we're printing:

private static void PrintQuote(Quote quote)
{
    var currentSession = quote.Sessions["combined"];
    Console.WriteLine(new {quote.Symbol, currentSession.Day, currentSession.Timestamp, currentSession.High, currentSession.Low});
}

Quotes have two sessions: current (called "combined") and previous. For new lows and highs we are only interested in the current session.

Next

Creating a simple observable adapter is pretty straightforward with the ddfplus API. Next, we'll look at adding some separation and testing between the ddfplus API and our application, like we did in Part 2.

6.7.2013

Part 2 - Testing: Reactive benefits with commodity option barrier events

In my last post I introduced some benefits of using reactive programming techniques. This post will start us down the journey of testing so that subsequent topics can easily be expressed and verified.

Extracting the Anti Corruption Layer

As previously mentioned, when working with APIs it's helpful to introduce an anti-corruption layer. This layer allows you to build strong separation between an external system and your own. Think of this layer as a condom, practice safe development! This layer is a great place to transform and filter interactions to constrain how you use the external system and to inject a layer that can be very helpful for testing.

FuturesQuoteClient publishes FuturesQuotes via an event. First, we'll build a small anti-corruption layer with transformed events. Then we can do the same with observables. The anti-corruption layer will transform a FuturesQuote to QuoteWithContract and exclude quotes with invalid contracts.

Anti-corruption event layer

Stubbing the real client

The FuturesQuoteClient for reference:

public class FuturesQuoteClient
{
    public delegate void QuoteHandler(object sender, FuturesQuote quote);

    public event QuoteHandler Quotes;

    ...
}

Most APIs that use events won't have the ability to stub them for testing. Also, in c#, only the defining class of an event can raise it. Therefore, to separate the real client we'll need a wrapper around it. This is like a wrapper before our anti-corruption wrapper :)

/// <summary>
///     An interface to abstract the quote source.
/// </summary>
public interface IFuturesQuoteClient
{
    event FuturesQuoteClient.QuoteHandler Quotes;
}

This allows us to abstract an interface which we can stub during testing. Here's an implementation of the wrapper for the real client:

/// <summary>
///     Implementation of real wrapper around quote source.
/// </summary>
public class FuturesQuoteClientWrapper : IFuturesQuoteClient
{
    public FuturesQuoteClientWrapper(FuturesQuoteClient client)
    {
        client.Quotes += (sender, quote) =>
            {
                var handler = Quotes;
                if (handler != null) handler(this, quote);
            };
    }

    public event FuturesQuoteClient.QuoteHandler Quotes;
}

The wrapper takes a FuturesQuoteClient and forwards quotes to the IFuturesQuoteClient.Quotes event interface. I'm not going to test this wrapper, I only included it to show the extra dimension of complexity required. This wrapper would require an integration test with the real quote service.

Testing the anti-corruption event layer

Now that we can isolate the anti-corruption layer from the real client, we can begin to build the functionality through tests.

First, we want to transform FuturesQuote to QuoteWithContract:

[Test]
public void OnFuturesQuote_AValidContractOnAFuturesQuote_TriggersQuoteWithContract()
{
    var validFuturesQuote = new FuturesQuote
        {
            Symbol = "CZ2013"
        };
    var futuresQuoteClient = MockRepository.GenerateStub<IFuturesQuoteClient>();
    var quotesWithContractClient = new AntiCorruptionLayerEventClient(futuresQuoteClient);
    var quotes = new List<NotifyOnBarrierEventsReactive.QuoteWithContract>();
    quotesWithContractClient.Quotes += (sender, quote) => quotes.Add(quote);

    futuresQuoteClient.Raise(c => c.Quotes += null, null, validFuturesQuote);

    quotes.Should().HaveCount(1);
    var quoteWithContract = quotes.Single();
    quoteWithContract.Quote.ShouldBeEquivalentTo(validFuturesQuote);
}

  • Setup
    • We start with a valid FuturesQuote.
    • I'm using Rhino.Mocks to generate a stub of IFuturesQuoteClient instead of creating this class manually.
    • AntiCorruptionLayerEventClient is the class that will implement the anti-corruption layer, it doesn't exist yet. It requires a client of type IFuturesQuoteClient.
    • To detect what events are raised, we subscribe to the transformed event AntiCorruptionLayerEventClient.Quotes and put the quotes into a list.
  • Act
    • Next, we raise the event on our IFuturesQuoteClient stub with the validFuturesQuote event argument.
  • Assert
    • Finally, we want to make sure only one QuoteWithContract arrives for the original validFuturesQuote.

Implementing AntiCorruptionLayerEventClient

First we need our transformed event:

public class AntiCorruptionLayerEventClient
{
    public delegate void QuoteWithContractHandler(object sender, NotifyOnBarrierEventsReactive.QuoteWithContract args);

    public event QuoteWithContractHandler Quotes;

    protected virtual void OnQuotes(NotifyOnBarrierEventsReactive.QuoteWithContract quote)
    {
        var handler = Quotes;
        if (handler != null) handler(this, quote);
    }

    ...
}

That's a lot of typing, events aren't fun!

To get the test to pass we have to transform and forward the quotes:

public class AntiCorruptionLayerEventClient
{
    ...

    public AntiCorruptionLayerEventClient(IFuturesQuoteClient client)
    {
        client.Quotes += TransformToQuoteWithContract;
        // neglects handling unsubscribing from the event and cascading that up the chain :)
    }

    private void TransformToQuoteWithContract(object sender, FuturesQuote quote)
    {
        OnQuotes(new NotifyOnBarrierEventsReactive.QuoteWithContract(quote));
    }
}

The constructor takes IFuturesQuoteClient, subscribes to FuturesQuote events and transforms them to QuoteWithContract. Finally, it raises the transformed event.

Pain points

  • Awkward to assert what events are raised.
  • Extra layer of indirection in IFuturesQuoteClient abstraction.
  • Excessive code to setup new events in the anti-corruption layer.
  • These code samples would also need unsubscribe and disposal taken into consideration.
  • Obsession with how (imperative) versus what (declarative).

Side Note: Unit testing transforms

The above example demonstrates testing the composition of the anti-corruption layer. Testing the actual transform is better left to a unit test where the infrastructure of the anti-corruption layer doesn't get in the way of more detailed testing. Here's one possible test:

[Test]
public void MapFromFuturesQuote()
{
    var quote = new FuturesQuote
        {
            Symbol = "CZ2013"
        };

    var commodityContract = NotifyOnBarrierEventsReactive.QuoteWithContract.ContractFromQuote(quote);

    commodityContract.ContractMonth.Should().Be(12);
    commodityContract.ContractYear.Should().Be(2013);
    commodityContract.ProductCode.Should().Be("C");
}

Notice how simple this test is! Regardless if we use events or observables these tests will look the same.

Anti-corruption observable alternative

Stubbing the real client

Now let's look at the same scenario with an anti-corruption layer built on observables. To isolate the client we still have to apply a wrapper, fortunately the reactive extensions provides this for us (as shown in the last post):

private static IObservable<FuturesQuote> FuturesQuotesSource(FuturesQuoteClient client)
{
    return Observable
        .FromEvent<FuturesQuoteClient.QuoteHandler, FuturesQuote>(h => client.Quotes += h, h => client.Quotes -= h);
}

Again, I'm not testing the wrapper to isolate our anti-corruption layer, that would be done in an integration test. However, since we're leveraging a built in wrapper, we don't have to worry about as many test cases as we would with our implementation of FuturesQuoteClientWrapper.

Testing the anti-corruption observable layer

Here's the equivalent test to verify quotes are transformed:

[Test]
public void OnFuturesQuote_WithAValidContract_StreamsQuoteWithContract()
{
    var validFuturesQuote = new FuturesQuote
        {
            Symbol = "CZ2013"
        };
    var scheduler = new TestScheduler();
    var futuresQuotes = scheduler.CreateColdObservable(ReactiveTest.OnNext(0, validFuturesQuote));
    var quotesWithContractClient = new AntiCorruptionLayerObservableClient(futuresQuotes);

    var quotesWithContracts = scheduler.Start(() => quotesWithContractClient.Quotes);

    quotesWithContracts.Messages.Should().HaveCount(1);
    var quoteWithContract = quotesWithContracts.Messages.Single().Value.Value;
    quoteWithContract.Quote.ShouldBeEquivalentTo(validFuturesQuote);
}

  • Setup
    • Again we start with a valid FuturesQuote.
    • The reactive extensions comes with testing abstractions in Rx-Testing. One of these is a TestScheduler
      • Schedulers dispatch work in the reactive framework
      • TestScheduler is optimized to control and monitor the testing process.
    • Our new client AntiCorruptionLayerObservableClient requires an IObservable<FuturesQuote>
      • TestScheduler.CreateColdObservable creates this observable for testing, there's more to learn about this API but for now those details aren't relevant.
      • We setup the validFuturesQuote message in the test observable.
  • Act
    • To run the test we tell the scheduler to Start which causes it to dispatch our validFuturesQuote message
    • The Start method takes an IObservable<T> and returns an ITestableObserver<T> where T is our transformed QuoteWithContract
      • The testable observer, note observer not observable, records message type, timing and values.
      • Time is irrelevant for Select transforms but will be invaluable later on.
  • Assert
    • Finally we assert the same condition as before, that only one quote was transformed for the provided validFuturesQuote

Pain points

  • The TestScheduler is designed to help with testing situations where time is involved. There's a bit of overhead to test Select operations where timing is irrelevant
    • Have to specify time (0 above) when setting up the observable scenario (scheduler.CreateColdObservable(ReactiveTest.OnNext(0, validFuturesQuote)))
    • Have to dig into messages skipping timing and message type to get to the value you want to verify (Messages.Single().Value.Value)

Benefits

  • Testing abstractions are built in
  • No awkward hacks to capture messages, it's part of the framework.
  • Focus on what (declarative), not how (imperative)

Implementing AntiCorruptionLayerObservableClient

There's one constructor to create the AntiCorruptionLayerObservableClient from the FuturesQuoteClient, and one to create this from IObservable<FuturesQuote>. These are equivalent to FuturesQuoteClientWrapper and AntiCorruptionLayerEventClient.ctor(IFuturesQuoteClient) respectively:

public class AntiCorruptionLayerObservableClient
{
    ...

    public AntiCorruptionLayerObservableClient(FuturesQuoteClient client)
        : this(FuturesQuotesSource(client))
    {
    }


    public AntiCorruptionLayerObservableClient(IObservable<FuturesQuote> quotes)
    {
        Quotes = quotes
            .Select(q => new NotifyOnBarrierEventsReactive.QuoteWithContract(q));
    }

    public IObservable<NotifyOnBarrierEventsReactive.QuoteWithContract> Quotes { get; private set; }
}

The only real code involved is in transforming the source observable, one call to Select!

Anti-corruption layer filtering

In the anti-corruption layer event sample I didn't go into excluding quotes with invalid contracts. It's trivial and I think the reactive sample is much more expressive:

[Test]
public void OnFuturesQuote_WithAnInvalidContract_StreamsNothing()
{
    var invalidFuturesQuote = new FuturesQuote
        {
            Symbol = "invalidcontract"
        };
    var scheduler = new TestScheduler();
    var futuresQuotes = scheduler.CreateColdObservable(ReactiveTest.OnNext(0, invalidFuturesQuote));
    var quotesWithContractClient = new AntiCorruptionLayerObservableClient(futuresQuotes);

    var quotesWithContracts = scheduler.Start(() => quotesWithContractClient.Quotes);

    quotesWithContracts.Messages.Should().BeEmpty();
}

To get this test to pass we simply add a filter:

public AntiCorruptionLayerObservableClient(IObservable<FuturesQuote> quotes)
{
    Quotes = quotes
        .Select(q => new NotifyOnBarrierEventsReactive.QuoteWithContract(q))
        .Where(NotifyOnBarrierEventsReactive.IsValidContract);
}

Conclusion

Implementing an anti-corruption layer around information streams with reactive programming offers several benefits over imperative approaches. I'll admit the benefits aren't as exciting without the time dependent aspects. However, when working with time it will become patently obvious that there's no real comparison.

6.7.2013

Reactive benefits with commodity option barrier events

Reactive programming offers many advantages over traditional imperative techniques when handling information streams like commodity market data. In this and several follow up posts I want to demonstrate some of the advantages I've found using these techniques. I'll be demonstrating this with c# and the Reactive Extensions framework.

Reactive Programming

What is reactive programming? From Wikipedia:

"reactive programming is a programming paradigm oriented around data flows and the propagation of change." en.wikipedia.org/wiki/Reactive_programming

Spreadsheet calculations are a great example of reactive techniques. If you are unfamiliar and want to know more look at the Wikipedia article or follow along in this series as I demonstrate some of the advantages.

Barrier options on commodity futures

I'm going to use barrier options to demonstrate the value of reactive programming. From Wikipedia:

a barrier option is an exotic derivative typically an option on the underlying asset whose price breaching the pre-set barrier level either springs the option into existence or extinguishes an already existing option. en.wikipedia.org/wiki/Barrier_option

If you aren't familiar with finance, an option is easy to think of as an insurance policy. One purpose is to protect a future purchase from major price changes. Imagine if you could buy insurance to cover the cost of gas if it rose above an extreme level, say $8/gallon.

To protect against major price swings you may need to pay a hefty premium. One way to reduce the premium is to purchase a barrier option. One way a barrier can work is to cancel the insurance policy. Say you are worried about gas rising above $8 a gallon but you doubt it will rise about $12. You might be willing to pay a reduced premium in exchange for the small chance that gas will breach $12 a gallon causing the policy to be canceled. Barriers in this case reduce the seller's risk and hence reduce premium.

If a breach occurs you might want to find out ASAP to mitigate the risk. This is the scenario I will use to demonstrate reactive techniques.

Traditional imperative approach

First I'll start with an imperative approach that is pretty common to come across.

Note: Most of the following code can be found in a more complete fashion here https://github.com/g0t4/blog-samples/tree/master/reactive/src.

Quote client

Many APIs that provide quotes in c# have been around a while and therefore use an event based interface:

var client = new FuturesQuoteClient();
client.Quotes += OnQuote;

Simply attach an OnQuote handler to process quotes.

Active barrier options by contract

In this example I'm going to assume there is a "cache" of options that gets updated periodically. The details of this are irrelevant.

_ActiveBarrierOptionsByContract = GetActiveBarrierOptionsByContract();

OnQuote handler

Here is an example of the OnQuote handler. I tried to leverage extracted methods and LINQ to show even in an imperative fashion, the code can be concise and clean (at least in my opinion):

private void OnQuote(object sender, FuturesQuote quote)
{
    var contract = ContractFromQuote(quote);
    if (IsValidContract(contract))
    {
        return;
    }
    IEnumerable<CommodityBarrierOption> activeBarrierOptionsForContract;
    if (!_ActiveBarrierOptionsByContract.TryGetValue(contract, out activeBarrierOptionsForContract))
    {
        return;
    }
    activeBarrierOptionsForContract
        .Where(option => BarrierIsBreached(option, quote))
        .Where(NoticeNotAlreadySent)
        .ToList()
        .ForEach(option => NotifyTheHumans(option, quote));
}

When the quote arrives, we parse the contract with ContractFromQuote. If the contract isn't valid we ignore the quote. When you interface with an external API there are going to be things you want to transform. This is part of your "Anti-Corruption layer" if you are familiar with domain driven design.

Without going into the details of options on futures, suffice to say the contract describes what type of gas (say regular versus premium) and when you will be buying it (a month). I'm really simplifying this as the location would matter too.

If _ActiveBarrierOptionsByContract doesn't contain the contract, we ignore the quote. We only care about quotes for contracts that we insured.

Next, we check each option to see if the BarrierIsBreached and if NoticeNotAlreadySent. If these conditions are met we'll NotifyTheHumans.

This example ignores any performance considerations. I'll address those in subsequent posts.

Refactoring towards reactive

Now let's refactor to incorporate reactive patterns. Starting imperative means the reactive solution will be somewhat "tainted" but will suffice to move us in the reactive direction.

Reactive benefit: decouple handler from source transforms

The imperative solution makes it difficult to decouple transforms and filters from handlers. Instead of receiving all FuturesQuotes, we only want to know about quotes with valid contracts. We could create new events and wire up conditions to trigger them. However, leveraging the reactive framework makes this possible with much less typing and in a much more expressive fashion.

Observables

First we have to wrap the FuturesQuote event to produce an observable. Think of this as a stream of FuturesQuotes:

private static IObservable<FuturesQuote> FuturesQuotesSource(FuturesQuoteClient client)
{
    return Observable
        .FromEvent<FuturesQuoteClient.QuoteHanlder, FuturesQuote>(h => client.Quotes += h, h => client.Quotes -= h);
}

This creates the observable:

var client = new FuturesQuoteClient();
FuturesQuotesSource(client)

We now have a stream of FuturesQuotes.

Transform

The Reactive Extensions provides LINQ operators that work on observables. In the example above we parse the contract into a local variable, instead we could create a new type QuoteWithContract and store both the quote and the parsed contract:

public class QuoteWithContract
{
    public FuturesQuote Quote { get; set; }
    public CommodityContract Contract { get; set; }

    public QuoteWithContract(FuturesQuote quote)
    {
        Quote = quote;
        Contract = ContractFromQuote(quote);
    }

    ...
}

Now we can apply the Select LINQ operator to transform our stream from FuturesQuote to QuoteWithContract:

var client = new FuturesQuoteClient();
FuturesQuotesSource(client)
    .Select(q => new QuoteWithContract(q))

Filter

The other thing we want to ensure is that the contract was valid, we can do this with the Where LINQ operator:

FuturesQuotesSource(client)
    .Select(q => new QuoteWithContract(q))
    .Where(IsValidContract)

private static bool IsValidContract(QuoteWithContract quote)
{
    return quote.Contract != null;
}

I like extracting method conditions to increase readability of the stream composition. We now have a stream of only quotes with a valid contract.

Reactive benefit: Thinking in streams

At this point we could Subscribe to the quotes stream and execute our imperative code:

private void OnQuote(QuoteWithContract quote)
{
    IEnumerable<CommodityBarrierOption> activeBarrierOptionsForContract;
    if (!_ActiveBarrierOptionsByContract.TryGetValue(quote.Contract, out activeBarrierOptionsForContract))
    {
        return;
    }
    activeBarrierOptionsForContract
        .Where(option => BarrierIsBreached(option, quote.Quote))
        .Where(NoticeNotAlreadySent)
        .ToList()
        .ForEach(option => NotifyTheHumans(option, quote.Quote));
}

But why not keep with the concept of a stream and re-frame our solution as producing a stream of barrier breach notices! To do this I've taken the above code and extracted this method that can take a quote and create the resulting breach notices:

private IEnumerable<BarrierBreachNotice> BarrierBreachNotices(QuoteWithContract quote)
{
    IEnumerable<CommodityBarrierOption> options;
    if (!_ActiveBarrierOptionsByContract.TryGetValue(quote.Contract, out options))
    {
        return Enumerable.Empty<BarrierBreachNotice>();
    }
    return options
        .Where(option => BarrierIsBreached(option, quote.Quote))
        .Select(option => new BarrierBreachNotice(option, quote));
}

public class BarrierBreachNotice
{
    public BarrierBreachNotice(CommodityBarrierOption option, QuoteWithContract quote)
    {
        Option = option;
        Quote = quote;
    }

    public CommodityBarrierOption Option { get; set; }
    public QuoteWithContract Quote { get; set; }
}

Now we can transform a stream of quotes, along with our active barrier options "cache", into a stream of BarrierBreachNotice. We'll use the SelectMany operator as each quote could generate more than one notice:

FuturesQuotesSource(client)
    .Select(q => new QuoteWithContract(q))
    .Where(IsValidContract)
    .SelectMany(BarrierBreachNotices)

And we want to avoid duplicate notices:

FuturesQuotesSource(client)
    .Select(q => new QuoteWithContract(q))
    .Where(IsValidContract)
    .SelectMany(BarrierBreachNotices)
    .Where(NoticeNotAlreadySent)

Reactive benefit: Descriptive subscriptions

Our original goal was to notify the humans:

FuturesQuotesSource(client)
    .Select(q => new QuoteWithContract(q))
    .Where(IsValidContract)
    .SelectMany(BarrierBreachNotices)
    .Where(NoticeNotAlreadySent)
    .Subscribe(NotifyTheHumans);

private void NotifyTheHumans(BarrierBreachNotice notice)
{
    ...
}

This subscription is much less work, no awkward += on an event and object sender overhead, a simple call to Subscribe!

Notice how easy it is to continue the idea of a stream. Transforms and filters are very easy to use to partition event handling into an ordered, composable pipeline. This is one of the extremely powerful aspects of reactive programming abstractions.

Next up

Next I want to discuss testing, further thinking in streams, performance and other considerations and how reactive programming techniques apply.