CEP Client with WPF GridDataControl

Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.

 

clip_image002[1]

 

 

F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.

I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,

type JobCompletedEventArgs<'T>(job:int, result:'T) =

        inherit EventArgs()

        member x.Job with get() = job

        member x.Result with get() = result

    type AsyncWorker<'T>(jobs: seq>) =

     

        // This declares an F# event that we can raise

        let allCompleted  = new Event<'T[]>()

        let error         = new Event()

#if WPF

        let canceled      = new Event()

#else

        let canceled      = new Event()

#endif

        let jobCompleted  = new Event>()

        let cancellationCapability = new CancellationTokenSource()

        /// Start an instance of the work

        member x.Start()    =

            // Capture the synchronization context to allow us to raise events back on the GUI thread

            let syncContext = SynchronizationContext.CaptureCurrent()

     

            // Mark up the jobs with numbers

            let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))

            let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args                

            let work = 

                Async.Parallel

                   [ for (job,jobNumber) in jobs ->

                       async { let! result = job

                               syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result))

                               return result } ]

     

            Async.StartWithContinuations

                ( work,

                  (fun res -> raiseEventOnGuiThread(allCompleted, res)),

                  (fun exn -> raiseEventOnGuiThread(error, exn)),

                  (fun exn -> raiseEventOnGuiThread(canceled, exn)),

                    cancellationCapability.Token)

        /// Raised when a particular job completes

        []

        member x.JobCompleted = jobCompleted.Publish

        /// Raised when all jobs complete

        []

        member x.AllCompleted = allCompleted.Publish

        /// Raised when the composition is cancelled successfully

        []

        member x.Canceled = canceled.Publish

        /// Raised when the composition exhibits an error

        []

        member x.Error = error.Publish

We have used [] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx, we need to have an event that inherits from System.EventArgs–JobCompletedEventArgs does that here. The AsyncWorker is now ready to be used as a library for running parallel code.

Stock Quotes Reader

The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.

type StockAvailableEventArgs(stocks:string[]) =

        inherit EventArgs()

        member x.Stocks with get() = stocks

    type StockQuotesReader(quotes:string) =

        

        let stockAvailableEvent = new Event()

        let httpLines (uri:string) =

          async { let request = WebRequest.Create uri   

                  use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse)

                  use stream = response.GetResponseStream()

                  use reader = new StreamReader(stream)

                  let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]

                  return lines }        

        // n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume

        let yahooUri (quotes:string) = 

            let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)

            uri

        member x.GetStocks() =

            let stocks = [httpLines(yahooUri quotes)]

            stocks

//        member x.TestAsync() =

//            let stocks = httpLines(yahooUri quotes)

//            Async.RunSynchronously(stocks)

        member x.PullStocks() =

            let stocks = x.GetStocks()

            let worker = new AsyncWorker<_>(stocks)

            worker.JobCompleted.Add(fun args ->

                stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))

            )

            worker.Start()

        static member GetAsyncReader(quotes) =

            let reader = new StockQuotesReader(quotes)

            let stocks = reader.GetStocks()

            let worker = new AsyncWorker<_>(stocks)

            worker       

        []

        member x.StockAvailable = stockAvailableEvent.Publish

The above wrapper class does some interesting things:

  • It has an async block code that returns a line of data based on the response stream.
  • PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.

CEP Client

On the CEP client we will be using the following things:

· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.

· Rx – creates requests and updates the ViewModel bound to the grid.

Application Setup

The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.

  

        

Using Rx to create requests

This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.

 private void RealTimeStocks(int delay, string quotes)

        {

            var stockReader = new StockQuotesReader(quotes);

            var stockFeeds = Observable.Defer(() =>

            {

                stockReader.PullStocks();

                var evt = from e in Observable.FromEvent(stockReader, "StockAvailable")

                          select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };

                var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));

                return delayedEvt;

            }).Repeat();

            var stocks = from s in stockFeeds

                         from t in s

                         select t.Stocks;

            stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) =>
            {
                this.AddOrUpdateModel(stockQuotes);

            });

        }

We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.

 

clip_image004[1]

This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.

 

Share this post:

Leave a comment