Streaminsight Join not producing any output

Hi,

I am trying to implement a PreDealCheck application that lets a trader decide if doing a trade would breach the limits assigned against a counterparty or not.

I have identified the data streams as tradeStream (comes when ever a predeal check is issued) , portfolioExposureStream(portfolio level Exposures , Eg: Counterparty level exposure , Product level exposure etc : these values come from a compute grid everynight) and LimitsStream (Limits assigned against counterparties , products etc: Comes from the database)

I had defined the trade payload as

   public class Trade
    {
        public String Counterparty { get; set; }
        public String Product { get; set; }
        public String  AssetGroup { get; set; }
        public Double ReceiveAmount { get; set; }
        public Double Exposure { get; set; }
    }

I had defined the reference data payload as

 public class PortfolioExposure
    {
        public String PortfolioName { get; set; }
        public Dictionary<String, String> RiskVector { get; set; }
        public String PortfolioType { get; set; }
     }

The idea is whenever a predeal check is issued. Take the trade join it with all the portfolioExposure Objects that it falls under then multiply the riskvectors of the portfolioExposure objects by the tradeamount * some factor and see if the adjusted risk is breaching the thresholds by joining with limit objects.

So a trade maps to multiple PortfolioExposure Objects , What does this mean ?

Say that trade T1 is against a counterparty "A" and the trade falls under the category of "FXForward". There could a portfolioExposure object PE1 that contains the exposure of the bank against "A" and another portfolioExposure object PE2 that contains the exposure of the bank against "A\FXForward" . What should happen is trade T1 should be joined with both PE1 & PE2 resulting in

{T1 ,PE1},{T1 ,PE2}

This won't be possible if I have one trade object that maps to multiple PortfolioExposure objects as Linq wouldn't let me use an or clause. So I decided to transform the received payload in to PdcTrade payload. So when pdc for a trade T1 is issued i transform the trade in to PDCT1 { Portfolio="A", PortfolioType ="Counteparty"} and PDCT2 {Portfolio="A\FXForward", PortfolioType ="Product"}

public class PdcTrade
    {
        public String Portfolio { get; set; }
        public Double Exposure { get; set; }
        public String PortfolioType { get; set; }
    }

So whenever the tradeInputAdapter gets a trade, it transforms them in to 4 PDC trades ,creates point events (we have 4 portfolio categories at the moment and hence 4 trades with each trade's portfolio type as one of the portfolio category) and enqueues a CTIEvent after all the 4 pdc trades are enqueued.

In this case

tradeStream is a point stream that has 4 inserts ( e1,e2,e3,e4) with start times of (2013-10-09 08:53:04.9479941,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0139979) , EnqueuedCti event timestamp    2013-10-09 08:53:05.0139979   

PortfolioReferenceDataStream loads all portfolio level exposure and creates point events for these and finally enqueues a cti event once done (timestamps from 2013-10-09 08:53:33.9856550 to 2013-10-09 08:53:34.0416582) finally a

EnqueuedCti event timestamp 2013-10-09 08:53:34.0446584

Now i import the timeSettings from tradeStream in to portfolioExposureStream as tradeStream is the fast stream.However as it turned out in my poc that loading reference data took longer than trades as there are fewer trades and a large amount of reference data. This phenomenon only happens during the initial load.Hence the CTI in trade stream is smaller than even the first event in reference data stream. Importing CTI from time stream in to reference data stream inserted the trade stream CTI as the first entry in reference data stream.

The join didn't produce any data. I openend event flow debugger and noticed that the imported CTI from trade stream 08:53:05.0139979  is less than even the first portfolio exposure reference data event 08:53:33.9856550. I thought this was  perhaps the reason why the join was not generating any data. Is this assumption correct ?

So what i tried later was not to enqueue any CTI's in tradeStream. I imported referencedata time settings in to trade stream however that didnt generate any data too.  I wanted to upload the trace files but cant seem to find a place to do it. Let me know how to send the trace files across i will be happy to do so.


Below is the code that iam using

 
  static void Main(string[] args)
        {

            XmlConfigurator.Configure();
            using (Server cepServer = Server.Create("Default"))
            {
                try
                {
                    // Host the queries in the "PDC" application
                    var cepApp = cepServer.CreateApplication("Pdc");

                    // Create a data stream of random activity events
                    var inputCfg = new GeneratorConfig()
                    {
                        CtiFrequency = 1,
                        EventInterval = 1000,
                        EventIntervalVariance = 250
                    };

                

                  
                    // We want time to advance as new activity updates arrive.  In order to do this
                    // we need to link the reference stream's CTIs to the activity stream via a 
                    // time import setting
                    var tradeStreamTimeImportSettings = new AdvanceTimeSettings(null,
                        new AdvanceTimeImportSettings("tradeStream"),
                        AdvanceTimePolicy.Adjust);

                    var referenceDataStreamTimeImportSettings = new AdvanceTimeSettings(null,
                       new AdvanceTimeImportSettings("exposureReferenceStream"),
                       AdvanceTimePolicy.Adjust);

                    var tradeStream = CepStream<PdcTrade>.Create(cepApp,
                      "tradeStream", typeof(GeneratorFactory),
                      inputCfg, EventShape.Point);


                    var referenceCfg = new PortfolioExposureInputConfig
                                           {
                                               PollingInterval = TimeSpan.Parse("00:00:30"),
                                               Directory = ConfigurationManager.AppSettings["ResultsPath"]
                                           };

                    // Use the time import settings to link CTI values from the tradeStream
                    // stream
                    var referenceStream = CepStream<PortfolioExposure>.Create(
                        "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory),
                        referenceCfg, EventShape.Point, tradeStreamTimeImportSettings);

                    //var referenceStream = CepStream<PortfolioExposure>.Create(
                    //    "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory),
                    //    referenceCfg, EventShape.Point);


                    // Convert the reference point stream into a reference signal
                    var referenceSignal = referenceStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

                    var passThroughTradeStream = from trade in tradeStream
                                                 select new {Trade = trade};

                    var passThroughReferenceStream = from referenceData in referenceStream
                                                 select new { ReferenceData = referenceData };

                    // Join trade stream and portfolioStream
                    var enrichedStream =
                        from trade in tradeStream
                        join pflioReferenceData in referenceSignal on  //trade.Portfolio  equals pflioReferenceData.PortfolioName
                        new { Portfolio = trade.Portfolio.ToLower(), PortfolioType = trade.PortfolioType.ToLower() } 
                        equals
                        new { Portfolio = pflioReferenceData.PortfolioName.ToLower(), PortfolioType = pflioReferenceData.PortfolioType.ToLower() }
                        select new
                        {
                            Trade=trade,
                            RiskVector=pflioReferenceData
                        };


                    // Pump these to the console for visibility
                    var outputCfg = new TracerConfig()
                    {
                        DisplayCtiEvents = true,
                        SingleLine = true,
                        TracerKind = TracerKind.Console,
                        TraceName = "OUTPUT"
                    };

                    StartQuery(cepApp, enrichedStream, outputCfg);

                    Console.WriteLine("Press <enter> to close application");
                    Console.ReadLine();
                }
                catch (Exception ex0)
                {
                    Console.WriteLine("Error: " + ex0.ToString());
                }
            }

        }

        private static void StartQuery<AT0>(Application cepApp, CepStream<AT0> enrichedStream, TracerConfig outputCfg)
        {
            var q = enrichedStream.ToQuery(cepApp,
                                           "enrichedData", "",
                                           typeof(TracerFactory), outputCfg,
                                           EventShape.Point, StreamEventOrder.FullyOrdered);
            q.Start();
            Console.WriteLine("Press <enter> to close application");
            Console.ReadLine();
            q.Stop();
        }




  • Edited by vijay_g Wednesday, October 09, 2013 2:38 PM
October 9th, 2013 1:20pm

First, LINQ will allow you to join on an on. However, you can't do it with the join statement - you accomplish this with multiple from and then where clauses for your joins. Using your join as an example, it would look like the following:

var enrichedStream =
    from trade in tradeStream
    from pflioReferenceData in referenceSignal on  //trade.Portfolio  equals pflioReferenceData.PortfolioName
    where trade.Portfolio.ToLower() == pflioReferenceData.PortfolioName.ToLower()
		where trade.PortfolioType.ToLower() == pflioReferenceData.PortfolioType.ToLower() 
    select new
    {
        Trade=trade,
        RiskVector=pflioReferenceData
    };

As to why you aren't getting data ... there's a couple of things that could be going on. Since you are importing CTIs, that's not the issue - it typically is. ;-) Next ... you're reference stream CTI settings has Adjust for the violation policy BUT the stream is a point. I'm going to guess that you are expecting the policy to adjust the start time of the points (I did once upon a time, too) but that's not how it works. Adjust will only adjust the start time of an interval and only if that interval's duration crosses over the most recent CTI. An example ... let's say you have a CTI at 10:00:00. An interval event with a start time of 09:50:00 and an end time of 10:10:00 will be adjusted to have a start time of 10:00:00; the end time won't change. However, an interval event with a start time of 09:50:00 and an end time of 09:59:00 won't be adjusted - even if the start time was adjusted, it's already expired in the application timeline so it's just dropped. For reference data, I typically use an interval input adapter and set the start time to something absurdly far in the past (Jan 1, 1800, for example) and the end time absurdly in the future (Dec 31, 2300, for example) and then slowly increment these as I refresh - say by 10 seconds. This works very well when importing and adjusting as the event duration is guaranteed to span the CTI whether it's "live" data with the current time or recorded data being played back with the original time.

As for missing the first few "live" events because the reference buffer is still "loading" ... yes, that's pretty common. With the pre-2.1 Query Model (that you are using), this can be resolved by starting the reference query first and getting the events in there. You may need to pause a second to wait for the buffer to "fill" before starting the data adapter (or not). Don't enqueue any CTIs from the reference data and rely on the data stream to provide the CTIs.

Free Windows Admin Tool Kit Click here and download it now
October 9th, 2013 6:21pm

Thanks for the quick reply DevBiker.

My comments

DevBiker: First, LINQ will allow you to join on an on. However, you can't do it with the join statement - you accomplish this with multiple from and then where clauses for your joins. Using your join as an example, it would look like the following:

So are you saying that streaminsight join on multiple fields is accomplished using multiple wheres but not using the below syntax ?

from trade in tradeStream

 join pflioReferenceData in referenceSignal on 

                        new{Portfolio=trade.Portfolio.ToLower(),PortfolioType=trade.PortfolioType.ToLower()}

                        equals

                        new{Portfolio=pflioReferenceData.PortfolioName.ToLower(),PortfolioType=pflioReferenceData.PortfolioType.ToLower()}

                       

DevBiker: Next ... you're reference stream CTI settings has Adjust for the violation policy BUT the stream is a point. I'm going to guess that you are expecting the policy to adjust the start time of the points (I did once upon a time, too) but that's not how it works. Adjust will only adjust the start time of an interval and only if that interval's duration crosses over the most recent CTI

Agree but thats the reason why I alter the eventduration to Max infinity which means the endtime should be infinity.That means this event shouldn't be expired yet. So shouldnt the Adjust take care of moving the start time of the event such that it is after the last CTI timestamp so that this event doesnt get dropped?

    // Convert the reference point stream into a reference signal
                    var referenceSignal = referenceStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

The reason why I also need this is because referencedata in our case would be updated on a daily basis. Portfolio level exposures change end of every business day. So the live stream for the new business date has to use the newer reference data. I took this approach after looking at a similar concept mentioned here  by Mark Simms 

Excerpt from this blog for your convenience.

// Convert the point events from the reference stream into edge events

var edgeEvents = from e in metadataStream

                        .AlterEventDuration(e => TimeSpan.MaxValue)

                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))

                    select e;

What this bit of code does is:

  • Stretch the point events out to infinity (i.e. these reference values are good forever)
  • Clip each point event by any arriving event with the same sensor ID.  For example, given a value of (1001, SensorId_1001), if another event later arrives with the values (1001, MySensor), the initial event will be clipped off and the new value will be MySensor

Then, putting the whole thing together:

// Convert the point events from the reference stream into edge events

var edgeEvents = from e in metadataStream

                        .AlterEventDuration(e => TimeSpan.MaxValue)

                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))

                    select e;

    ////////////////////////////////////////////////////////////////////

    // Create a join of the two streams, and bind the output to the

    // console

var joinedQuery = from e1 in dataStream

                    join e2 in edgeEvents

                    on e1.SensorId equals e2.SensorId

                    select new

                    {

                        SensorId = e1.SensorId, Name = e2.Name, Value = e1.Value};

DevBiker: For reference data, I typically use an interval input adapter and set the start time to something absurdly far in the past (Jan 1, 1800, for example) and the end time absurdly in the future (Dec 31, 2300, for example) and then slowly increment these as I refresh - say by 10 seconds. This works very well when importing and adjusting as the event duration is guaranteed to span the CTI whether it's "live" data with the current time or recorded data being played back with the original time.

Well with this approach how would I update reference data ?

DevBiker: As for missing the first few "live" events because the reference buffer is still "loading" ... yes, that's pretty common. With the pre-2.1 Query Model (that you are using), this can be resolved by starting the reference query first and getting the events in there. You may need to pause a second to wait for the buffer to "fill" before starting the data adapter (or not).

Right now starting the query would call Start method on input adapters involved in the query isnt it?

This means I might have to use some kind of signalling between the dataStream input adapter and referenceStream input adapter?  dataStream input  adapter waits on an a waithandle , referenceStream input adapter signals this handle once its done inserting all the reference data events , once the dataStream input adapter receives the signal it starts calling CreateInsertEvent() for all the data events produced in the interim. Does that sound right ?

Or is there a better approach ?

DevBiker: Don't enqueue any CTIs from the reference data and rely on the data stream to provide the CTIs.

This bit is clear. Will do Thanks.








  • Edited by vijay_g Thursday, October 10, 2013 8:06 AM
October 10th, 2013 10:49am

I meant join on an "or", not "on an on". The join statement is for equality only. That's a LINQ limitation, if I remember correctly. Using the multiple where statements, however, allows you to include or conditions ("||").

Altering the event duration doesn't help if the actual event enqueued violates the CTI. It never gets to the Alter operator, it just get dropped on the Input Cleanse operator. My guess ... without seeing your trace but looking at your query ... is that you have events getting dropped because a) you are enqueuing them as points and b) their start time is before the last enqueued CTI event that you are importing from your data stream. You can alter and clip all you want ... if the event gets dropped on the input_cleanse operator, you won't be affecting any events. The technique that I described handles issues with the input and making sure it's all set up and ready to go. As for updating the reference data, you still Alter/Clip (Signal Pattern) to keep the latest version "alive" in the stream. Using that interval for an input reference data event simply ensures that it doesn't get dropped on the way into the engine. I'm VERY familiar with Mark's blog and the techniques that he describes. I've also had the exact same challenges that you are running in to here and what I suggested to you simply builds on what Mark describes. Mark's blog also doesn't address how to handle issues where you are replaying recorded events with the original timestamps at high-speed.

As for starting the reference query first ... that won't work, actually. Since you're importing events from the data stream, you'll get an exception if you try to Start() the reference query before the data query (the import stream won't be found). Adding some kind of signaling between the adapters might work or you could simply buffer for, say, 5 seconds to allow the reference buffer to fill up and then release those initial 5 seconds' of data in one batch (with the original times) and, afterwards, continue processing as normal. I'm not sure though ... typically we've just punted on this because missing the reference joins on a couple of events right at startup wasn't a big enough deal to architect around and, when using 2.1 and subjects to "import" CTIs, there's a WHOLE lot more control over the import process and how both CTIs and events get released to the stream.

Free Windows Admin Tool Kit Click here and download it now
October 10th, 2013 4:34pm

An extension to this 

When  a trader commits  a pdc trade , i need to update the portfolioexposure object with the new risk vector that this pdc trade has caused. It would normally be bumping risk vector(risk values at different time points in future) with the pdctrade's risk exposure.

so dataStream affecting the referencestream and updating the referencestream with the new values that the dataStream events have caused. How do i accomplish this ?

I thought initially that i would write an output adapter that grabs the output values and portfolioexposure input adapter would pick these values from the outputadapter. Is this a good approach ?

If it is  there seems to be a problem with the typed output payload as my riskvector is a dictionary of int (time points) and double (Risk value) and stream insight seems to have a problem with this as outlined in http://social.msdn.microsoft.com/Forums/sqlserver/en-US/6e3b3077-4458-45fb-aed6-5e1de629a098/streaminsight-typedoutput-adapter-throws-an-error?forum=streaminsight which you are aware of.

October 18th, 2013 2:05am

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics