Streaminsight Join not producing any output

Hi,

I am trying to implement a PreDealCheck application that lets a trader decide if doing a trade would breach the limits assigned against a counterparty or not.

I have identified the data streams as tradeStream (comes when ever a predeal check is issued) , portfolioExposureStream(portfolio level Exposures , Eg: Counterparty level exposure , Product level exposure etc : these values come from a compute grid everynight) and LimitsStream (Limits assigned against counterparties , products etc: Comes from the database)

I had defined the trade payload as

   public class Trade
    {
        public String Counterparty { get; set; }
        public String Product { get; set; }
        public String  AssetGroup { get; set; }
        public Double ReceiveAmount { get; set; }
        public Double Exposure { get; set; }
    }

I had defined the reference data payload as

 public class PortfolioExposure
    {
        public String PortfolioName { get; set; }
        public Dictionary<String, String> RiskVector { get; set; }
        public String PortfolioType { get; set; }
     }

The idea is whenever a predeal check is issued. Take the trade join it with all the portfolioExposure Objects that it falls under then multiply the riskvectors of the portfolioExposure objects by the tradeamount * some factor and see if the adjusted risk is breaching the thresholds by joining with limit objects.

So a trade maps to multiple PortfolioExposure Objects , What does this mean ?

Say that trade T1 is against a counterparty "A" and the trade falls under the category of "FXForward". There could a portfolioExposure object PE1 that contains the exposure of the bank against "A" and another portfolioExposure object PE2 that contains the exposure of the bank against "A\FXForward" . What should happen is trade T1 should be joined with both PE1 & PE2 resulting in

{T1 ,PE1},{T1 ,PE2}

This won't be possible if I have one trade object that maps to multiple PortfolioExposure objects as Linq wouldn't let me use an or clause. So I decided to transform the received payload in to PdcTrade payload. So when pdc for a trade T1 is issued i transform the trade in to PDCT1 { Portfolio="A", PortfolioType ="Counteparty"} and PDCT2 {Portfolio="A\FXForward", PortfolioType ="Product"}

public class PdcTrade
    {
        public String Portfolio { get; set; }
        public Double Exposure { get; set; }
        public String PortfolioType { get; set; }
    }

So whenever the tradeInputAdapter gets a trade, it transforms them in to 4 PDC trades ,creates point events (we have 4 portfolio categories at the moment and hence 4 trades with each trade's portfolio type as one of the portfolio category) and enqueues a CTIEvent after all the 4 pdc trades are enqueued.

In this case

tradeStream is a point stream that has 4 inserts ( e1,e2,e3,e4) with start times of (2013-10-09 08:53:04.9479941,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0139979) , EnqueuedCti event timestamp    2013-10-09 08:53:05.0139979   

PortfolioReferenceDataStream loads all portfolio level exposure and creates point events for these and finally enqueues a cti event once done (timestamps from 2013-10-09 08:53:33.9856550 to 2013-10-09 08:53:34.0416582) finally a

EnqueuedCti event timestamp 2013-10-09 08:53:34.0446584

Now i import the timeSettings from tradeStream in to portfolioExposureStream as tradeStream is the fast stream.However as it turned out in my poc that loading reference data took longer than trades as there are fewer trades and a large amount of reference data. This phenomenon only happens during the initial load.Hence the CTI in trade stream is smaller than even the first event in reference data stream. Importing CTI from time stream in to reference data stream inserted the trade stream CTI as the first entry in reference data stream.

The join didn't produce any data. I openend event flow debugger and noticed that the imported CTI from trade stream 08:53:05.0139979  is less than even the first portfolio exposure reference data event 08:53:33.9856550. I thought this was  perhaps the reason why the join was not generating any data. Is this assumption correct ?

So what i tried later was not to enqueue any CTI's in tradeStream. I imported referencedata time settings in to trade stream however that didnt generate any data too.  I wanted to upload the trace files but cant seem to find a place to do it. Let me know how to send the trace files across i will be happy to do so.


Below is the code that iam using

 
  static void Main(string[] args)
        {

            XmlConfigurator.Configure();
            using (Server cepServer = Server.Create("Default"))
            {
                try
                {
                    // Host the queries in the "PDC" application
                    var cepApp = cepServer.CreateApplication("Pdc");

                    // Create a data stream of random activity events
                    var inputCfg = new GeneratorConfig()
                    {
                        CtiFrequency = 1,
                        EventInterval = 1000,
                        EventIntervalVariance = 250
                    };

                

                  
                    // We want time to advance as new activity updates arrive.  In order to do this
                    // we need to link the reference stream's CTIs to the activity stream via a 
                    // time import setting
                    var tradeStreamTimeImportSettings = new AdvanceTimeSettings(null,
                        new AdvanceTimeImportSettings("tradeStream"),
                        AdvanceTimePolicy.Adjust);

                    var referenceDataStreamTimeImportSettings = new AdvanceTimeSettings(null,
                       new AdvanceTimeImportSettings("exposureReferenceStream"),
                       AdvanceTimePolicy.Adjust);

                    var tradeStream = CepStream<PdcTrade>.Create(cepApp,
                      "tradeStream", typeof(GeneratorFactory),
                      inputCfg, EventShape.Point);


                    var referenceCfg = new PortfolioExposureInputConfig
                                           {
                                               PollingInterval = TimeSpan.Parse("00:00:30"),
                                               Directory = ConfigurationManager.AppSettings["ResultsPath"]
                                           };

                    // Use the time import settings to link CTI values from the tradeStream
                    // stream
                    var referenceStream = CepStream<PortfolioExposure>.Create(
                        "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory),
                        referenceCfg, EventShape.Point, tradeStreamTimeImportSettings);

                    //var referenceStream = CepStream<PortfolioExposure>.Create(
                    //    "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory),
                    //    referenceCfg, EventShape.Point);


                    // Convert the reference point stream into a reference signal
                    var referenceSignal = referenceStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

                    var passThroughTradeStream = from trade in tradeStream
                                                 select new {Trade = trade};

                    var passThroughReferenceStream = from referenceData in referenceStream
                                                 select new { ReferenceData = referenceData };

                    // Join trade stream and portfolioStream
                    var enrichedStream =
                        from trade in tradeStream
                        join pflioReferenceData in referenceSignal on  //trade.Portfolio  equals pflioReferenceData.PortfolioName
                        new { Portfolio = trade.Portfolio.ToLower(), PortfolioType = trade.PortfolioType.ToLower() } 
                        equals
                        new { Portfolio = pflioReferenceData.PortfolioName.ToLower(), PortfolioType = pflioReferenceData.PortfolioType.ToLower() }
                        select new
                        {
                            Trade=trade,
                            RiskVector=pflioReferenceData
                        };


                    // Pump these to the console for visibility
                    var outputCfg = new TracerConfig()
                    {
                        DisplayCtiEvents = true,
                        SingleLine = true,
                        TracerKind = TracerKind.Console,
                        TraceName = "OUTPUT"
                    };

                    StartQuery(cepApp, enrichedStream, outputCfg);

                    Console.WriteLine("Press <enter> to close application");
                    Console.ReadLine();
                }
                catch (Exception ex0)
                {
                    Console.WriteLine("Error: " + ex0.ToString());
                }
            }

        }

        private static void StartQuery<AT0>(Application cepApp, CepStream<AT0> enrichedStream, TracerConfig outputCfg)
        {
            var q = enrichedStream.ToQuery(cepApp,
                                           "enrichedData", "",
                                           typeof(TracerFactory), outputCfg,
                                           EventShape.Point, StreamEventOrder.FullyOrdered);
            q.Start();
            Console.WriteLine("Press <enter> to close application");
            Console.ReadLine();
            q.Stop();
        }




  • Edited by vijay_g Wednesday, October 09, 2013 2:38 PM
October 9th, 2013 1:20pm

Thanks for the quick reply DevBiker.

My comments

DevBiker: First, LINQ will allow you to join on an on. However, you can't do it with the join statement - you accomplish this with multiple from and then where clauses for your joins. Using your join as an example, it would look like the following:

So are you saying that streaminsight join on multiple fields is accomplished using multiple wheres but not using the below syntax ?

from trade in tradeStream

 join pflioReferenceData in referenceSignal on 

                        new{Portfolio=trade.Portfolio.ToLower(),PortfolioType=trade.PortfolioType.ToLower()}

                        equals

                        new{Portfolio=pflioReferenceData.PortfolioName.ToLower(),PortfolioType=pflioReferenceData.PortfolioType.ToLower()}

                       

DevBiker: Next ... you're reference stream CTI settings has Adjust for the violation policy BUT the stream is a point. I'm going to guess that you are expecting the policy to adjust the start time of the points (I did once upon a time, too) but that's not how it works. Adjust will only adjust the start time of an interval and only if that interval's duration crosses over the most recent CTI

Agree but thats the reason why I alter the eventduration to Max infinity which means the endtime should be infinity.That means this event shouldn't be expired yet. So shouldnt the Adjust take care of moving the start time of the event such that it is after the last CTI timestamp so that this event doesnt get dropped?

    // Convert the reference point stream into a reference signal
                    var referenceSignal = referenceStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

The reason why I also need this is because referencedata in our case would be updated on a daily basis. Portfolio level exposures change end of every business day. So the live stream for the new business date has to use the newer reference data. I took this approach after looking at a similar concept mentioned here  by Mark Simms 

Excerpt from this blog for your convenience.

// Convert the point events from the reference stream into edge events

var edgeEvents = from e in metadataStream

                        .AlterEventDuration(e => TimeSpan.MaxValue)

                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))

                    select e;

What this bit of code does is:

  • Stretch the point events out to infinity (i.e. these reference values are good forever)
  • Clip each point event by any arriving event with the same sensor ID.  For example, given a value of (1001, SensorId_1001), if another event later arrives with the values (1001, MySensor), the initial event will be clipped off and the new value will be MySensor

Then, putting the whole thing together:

// Convert the point events from the reference stream into edge events

var edgeEvents = from e in metadataStream

                        .AlterEventDuration(e => TimeSpan.MaxValue)

                        .ClipEventDuration(metadataStream, (e1, e2) => (e1.SensorId == e2.SensorId))

                    select e;

    ////////////////////////////////////////////////////////////////////

    // Create a join of the two streams, and bind the output to the

    // console

var joinedQuery = from e1 in dataStream

                    join e2 in edgeEvents

                    on e1.SensorId equals e2.SensorId

                    select new

                    {

                        SensorId = e1.SensorId, Name = e2.Name, Value = e1.Value};

DevBiker: For reference data, I typically use an interval input adapter and set the start time to something absurdly far in the past (Jan 1, 1800, for example) and the end time absurdly in the future (Dec 31, 2300, for example) and then slowly increment these as I refresh - say by 10 seconds. This works very well when importing and adjusting as the event duration is guaranteed to span the CTI whether it's "live" data with the current time or recorded data being played back with the original time.

Well with this approach how would I update reference data ?

DevBiker: As for missing the first few "live" events because the reference buffer is still "loading" ... yes, that's pretty common. With the pre-2.1 Query Model (that you are using), this can be resolved by starting the reference query first and getting the events in there. You may need to pause a second to wait for the buffer to "fill" before starting the data adapter (or not).

Right now starting the query would call Start method on input adapters involved in the query isnt it?

This means I might have to use some kind of signalling between the dataStream input adapter and referenceStream input adapter?  dataStream input  adapter waits on an a waithandle , referenceStream input adapter signals this handle once its done inserting all the reference data events , once the dataStream input adapter receives the signal it starts calling CreateInsertEvent() for all the data events produced in the interim. Does that sound right ?

Or is there a better approach ?

DevBiker: Don't enqueue any CTIs from the reference data and rely on the data stream to provide the CTIs.

This bit is clear. Will do Thanks.








  • Edited by vijay_g Thursday, October 10, 2013 8:06 AM
Free Windows Admin Tool Kit Click here and download it now
October 10th, 2013 10:49am

Which version of StreamInsight are you using? I know that you are working with the "legacy" model (non-Reactive) ... but are you using 2.1?

Your scenario ... output being feedback in to input ... is best accomplished with StreamInsight 2.1 and a subject. That said, you could accomplish the same thing with adapters in pre-2.1 code; you'd have a matched set of output/input adapters that are subjects.

And no, you can't have a dictionary in a payload. You'll have to find another way to accomplish what you are trying to do here. Typically I'd suggest that you join to another stream and then group to analyze together ... using a UDA or UDO if necessary ... but I don't know enough about your requirements and use cases to really say much.

October 18th, 2013 7:25pm

I am using Streaminsight 2.1

DevBiker:Your scenario ... output being feedback in to input ... is best accomplished with StreamInsight 2.1 and a subject. That said, you could accomplish the same thing with adapters in pre-2.1 code; you'd have a matched set of output/input adapters that are subjects.

Could you please point me to some examples?

DevBiker:And no, you can't have a dictionary in a payload. You'll have to find another way to accomplish what you are trying to do here. Typically I'd suggest that you join to another stream and then group to analyze together

My reference data payload is (exposureReferenceDataStream)

 public class PortfolioExposure

    {

        public String PortfolioName { get; set; }

        public Dictionary<DateTime, Double> RiskVector { get; set; }

        public String PortfolioType { get; set; }

    }  

Risk vector is an array of Dates and amount that the counterparty owes to the bank at that date. Its called Risk because should the counterparty default, our bank would lose that money

So if a trade matures in 20 years, its risk is calculated across different time points starting today, 3days, 1week, 10 days etc until its maturity (20 years). A computation engine running on the grid will typically would calculate the exposure of the trades at different timepoints and writes them to a file. Time points at which risk is calculated for each trade is same and predetermined. 

For example see the below line from the computation engine

ABC,PFE_AUD,0,82892.000839224813,245466.59923476045,300068.953861769,349921.11832370894,436074.72958423389,558130.65831297624,910528.39035136625,889980.06361973297,981172.67517056817,906442.07013014564,758654.68989876215,703250.43773963454,647454.25240147696,957770.18425958371,1089072.855011757,1383040.6446861608,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0

This means that for Counterparty ABC , our bank has exposure of 0 today (ideally when they get in to trade none of the parties are at profit) 3days from today exposure is 82892.000839224813 Australian dollars ,so on and soforth

When I prime load the application I would want the reference data input adapter to parse this file , read each line and generate portfolio Exposure objects.

 

I have a CreditLimit object that contains the Limits (Maximum amount that the bank can trade against a counterparty) across these timepoints. I intend to write one more reference data input adapter for the below creditlimit objects (creditLimitStream)

public class CreditLimit

    {

        public String PortfolioName { get; set; }

        public Dictionary<DateTime, Double> CreditLimits { get; set; }

        public String PortfolioType { get; set; }

    }

I have a PdcTrade object that represents the pre deal check trade. This is the fast moving stream

 [EventGenerator(typeof(PdcTradeFactory))]

    public class PdcTrade

    {

        public String Portfolio { get; set; }

        public Double AddOnExposure { get; set; }

        public String PortfolioType { get; set; }

    }

I import the CTIs in to both the reference data streams from the trade stream.

Now the problem statement here

whenever there is a pdcTrade coming in to the system, it should be joined with the right exposurestream and creditlimit stream based on the counterparty that the trade belongs to(PortfolioName). Once joined the adjusted exposure for that counterparty(PortfolioName) would be sum of trades exposure and the riskvalue at each time point from the exposureStream.

This resulting adjusted exposure should be compared with the CreditLimit Object from the creditLimitStream and if there is atleast one time point where the exposure exceeds the allowed creditLimit then flag that this PDC has failed

So what should my Linq query be to accomplish this ?

 



  • Edited by vijay_g Tuesday, October 22, 2013 10:12 AM
Free Windows Admin Tool Kit Click here and download it now
October 22nd, 2013 11:14am

First, and to reiterate, you cannot have a dictionary as your payload. It won't work. Period.

Instead, look at having multiple individual reference events, one for each dictionary entry. Perhaps something like the following:

 public class PortfolioExposure

    {
        public String PortfolioName { get; set; }
        public DateTime RiskVectorDate {get; set;}
        public Double RiskVectorValue {get; set;}
        public String PortfolioType { get; set; }
    }  

I know you need to analyze the exposures (all of them) together with the trade. You'll do this with an aggregate window ... in your case, probably a snapshot window. Since this is likely more complex than a simple, built-in aggregate function, you would use a UDO or a UDA to do the calculations (depending on what the return needs to look like for the adjusted exposure; single value would be a UDA, multi-value payload would be a UDO). Take the loop that you would typically do in your code and either encapsulate that in the UDO/UDA or in a query, depending on how you can do the logic. Or - since the adjusted exposure is a sum, you can calculate the exposure for each of the risk elements independently and then sum in a snapshot window.

Credit limit would be the same ... you'll have multiple events, one for each dictionary member. This part is easy since if there is one that exceeds the allowed credit limit, you return a result. That's may be as simple as a join between the streams with a where clause comparing the credit limit and the adjusted exposure.

I'm not sure of how the DateTimes factor into the credit limit WRT the adjusted exposure.

If you mock it up in LinqPad and post that (either here or on a SkyDrive share) we can get into more details.

October 22nd, 2013 2:10pm

I flattened my payloads and got rid of the dictionaries.

   public class RiskValueAtPoint
    {
        public DateTime TimePoint { get; set; }
        public double RiskValue { get; set; }
        public String PortfolioName { get; set; }
        public String PortfolioType { get; set; }
        
   }
 public class GranularCreditLimit
     {
         public String PortfolioName { get; set; }
         public DateTime TimePoint { get; set; }
         public double CreditLimit { get; set; }
         public String PortfolioType { get; set; }
        

     }


 public class PdcTrade
    {
        public String Portfolio { get; set; }
        public double AddOnExposure { get; set; }
        public String PortfolioType { get; set; }
        
    }


class Program
    {
        private static ILog Log = LogManager.GetLogger(typeof(Program));
        static void Main(string[] args)
        {

            XmlConfigurator.Configure();
            using (Server cepServer = Server.Create("Default"))
            {
                try
                {
                    // Host the queries in the "PDC" application
                    var cepApp = cepServer.CreateApplication("Pdc");

                  



                    // Create a data stream of random activity events
                    var tradeInputCfg = new GeneratorConfig()
                    {
                        EventInterval = TimeSpan.Parse("00:00:10"),
                        EventIntervalVariance = 250,
                        Delay = TimeSpan.Parse("00:00:10")
                    };



                    var tradeStream = CepStream<PdcTrade>.Create(cepApp,
                        "tradeStream", typeof(GeneratorFactory),
                        tradeInputCfg, EventShape.Point);

                    // We want time to advance as new activity updates arrive.  In order to do this
                    // we need to link the reference stream's CTIs to the activity stream via a 
                    // time import setting
                    var timeImportSettings = new AdvanceTimeSettings(null,
                        new AdvanceTimeImportSettings("tradeStream"),
                        AdvanceTimePolicy.Adjust);


                    var referenceCfg = new GeneratorConfig()
                                           {
                                               Directory = ConfigurationManager.AppSettings["ResultsPath"],
                                               EventInterval = TimeSpan.Parse("00:00:30"),
                                               EventIntervalVariance = 0,
                                               Delay = TimeSpan.Parse("00:00:01")

                                           };

                    // Use the time import settings to link CTI values from the tradeStream
                    // stream
                    var referenceStream = CepStream<RiskValueAtPoint>.Create(
                        "exposureReferenceStream", typeof(GeneratorFactory),
                        referenceCfg, EventShape.Interval, timeImportSettings);

                    // Convert the reference point stream into a reference signal
                    var referenceSignal = referenceStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

                    var creditLimitCfg = new GeneratorConfig()
                    {
                        Directory = ConfigurationManager.AppSettings["ResultsPath"],
                        EventInterval = TimeSpan.Parse("00:12:00"),
                        EventIntervalVariance = 0,
                        Delay = TimeSpan.Parse("00:00:01")

                    };
                    var creditLimitStream = CepStream<GranularCreditLimit>.Create(
                        "creditLimitStream", typeof(GeneratorFactory),
                        creditLimitCfg, EventShape.Interval, timeImportSettings);

                    // Convert the creditlimit point stream into a reference signal
                    var creditLimitSignal = creditLimitStream
                        .AlterEventDuration(e => TimeSpan.MaxValue)
                        .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) &&
                                                                                             (e1.PortfolioType == e2.PortfolioType)));

                 
                    // Join trade stream and riskExposureStream
                    var adjustedRiskStream =
                        from riskValueAtPoint in referenceSignal group riskValueAtPoint by new {PortfolioName=riskValueAtPoint.PortfolioName,PortfolioType=riskValueAtPoint.PortfolioType} into portfolioGroup
                        from riskValue in portfolioGroup
                        from trade in tradeStream
                        where new {PortfolioName=trade.Portfolio,PortfolioType=trade.PortfolioType} == portfolioGroup.Key
                        select new
                                   {
                                       Trade = trade,
                                       OriginalRiskValue = riskValue.RiskValue,
                                       TimePoint = riskValue.TimePoint,
                                       AdjustedRiskValue = CustomFunctions.GetAdjustedRiskVector(trade.AddOnExposure,riskValue.RiskValue) ,
                                       PortfolioName=trade.Portfolio,
                                       PortfolioType=trade.PortfolioType,
                                       
                                   };

                    // Join the AdjustedRiskStream with creditLimit stream
                    var creditLimitCheckStream = from creditLimit in creditLimitSignal
                                                 group creditLimit by
                                                     new
                                                         {
                                                             PortfolioName = creditLimit.PortfolioName,
                                                             PortfolioType = creditLimit.PortfolioType
                                                         }
                                                 into creditLimitGroup
                                                 from limit in creditLimitGroup
                                                 from adjustedTrade in adjustedRiskStream
                                                 where
                                                     new
                                                         {
                                                             PortfolioName = adjustedTrade.PortfolioName,
                                                             PortfolioType = adjustedTrade.PortfolioType
                                                         } ==
                                                     creditLimitGroup.Key
                                                 where adjustedTrade.TimePoint == limit.TimePoint
                                                 select new
                                                            {
                                                                TimePoint = adjustedTrade.TimePoint,
                                                                PortfolioName = adjustedTrade.PortfolioName,
                                                                PortfolioType = adjustedTrade.PortfolioType,
                                                                OriginalRiskValue = adjustedTrade.OriginalRiskValue,
                                                                AdjustedRiskValue = adjustedTrade.AdjustedRiskValue,
                                                                CreditLimit = limit.CreditLimit
                                                            };
                                                

                    var limitBreachesStream= from joinedStream in creditLimitCheckStream
                                              where joinedStream.AdjustedRiskValue >= joinedStream.CreditLimit 
                                              select new  {  TimePoint = joinedStream.TimePoint,
                                                                PortfolioName = joinedStream.PortfolioName,
                                                                PortfolioType = joinedStream.PortfolioType,
                                                                OriginalRiskValue = joinedStream.OriginalRiskValue,
                                                                AdjustedRiskValue = joinedStream.AdjustedRiskValue,
                                                                CreditLimit = joinedStream.CreditLimit
                                                            };

                    // Pump these to the console for visibility
                    var outputCfg = new TracerConfig()
                    {
                        DisplayCtiEvents = true,
                        SingleLine = false,
                        TracerKind = TracerKind.Console,
                        TraceName = "OUTPUT"
                    };

                    StartQuery(cepApp, limitBreachesStream, outputCfg);

                    Console.WriteLine("Press <enter> to close application");
                    Console.ReadLine();
                }
                catch (Exception ex0)
                {
                    Console.WriteLine("Error: " + ex0.ToString());
                }
            }

        }

        private static void StartQuery<AT0>(Application cepApp, CepStream<AT0> enrichedStream, TracerConfig outputCfg)
        {
            var q = enrichedStream.ToQuery(cepApp,
                                           "enrichedData", "",
                                           typeof(TracerFactory), outputCfg,
                                           EventShape.Point, StreamEventOrder.FullyOrdered);
            q.Start();
            Console.WriteLine("Press <enter> to close application");
            Console.ReadLine();
            q.Stop();
        }
    }

DevBiker: I know you need to analyze the exposures (all of them) together with the trade. You'll do this with an aggregate window ... in your case, probably a snapshot window

What is the need of a snapshot window ? Does it mean what i am doing above is incorrect. I seem to be getting the right results ?

DevBiker:I'm not sure of how the DateTimes factor into the credit limit WRT the adjusted exposure.

Risk against a counterparty is always for a duration and it changes as the duration changes, Credit Limits are assigned for a specific duration. So Adjusted Risk value for a time point should be compared with the Allowed Credit limit across that duration. My Join query uses timepoint to correlate risk and creditlimit. It would become clear to you after you look at the code.

Free Windows Admin Tool Kit Click here and download it now
October 25th, 2013 10:04am

Well, if you're getting the correct results, then you're good, right?

I thought that you needed to do a sum on the results somewhere in there; I don't see it in the code listing above. But if you don't need it ... that's fine. A snapshot window will allow you to do the aggregate every time you get a new result, rather than based on a time window.

October 27th, 2013 12:07am

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics