Hi,
I am trying to implement a PreDealCheck application that lets a trader decide if doing a trade would breach the limits assigned against a counterparty or not.
I have identified the data streams as tradeStream (comes when ever a predeal check is issued) , portfolioExposureStream(portfolio level Exposures , Eg: Counterparty level exposure , Product level exposure etc : these values come from a compute grid everynight) and LimitsStream (Limits assigned against counterparties , products etc: Comes from the database)
I had defined the trade payload as
public class Trade
{
public String Counterparty { get; set; }
public String Product { get; set; }
public String AssetGroup { get; set; }
public Double ReceiveAmount { get; set; }
public Double Exposure { get; set; }
}
I had defined the reference data payload as
public class PortfolioExposure
{
public String PortfolioName { get; set; }
public Dictionary<String, String> RiskVector { get; set; }
public String PortfolioType { get; set; }
}
The idea is whenever a predeal check is issued. Take the trade join it with all the portfolioExposure Objects that it falls under then multiply the riskvectors of the portfolioExposure objects by the tradeamount * some factor and see if the adjusted risk
is breaching the thresholds by joining with limit objects.
So a trade maps to multiple PortfolioExposure Objects , What does this mean ?
Say that trade T1 is against a counterparty "A" and the trade falls under the category of "FXForward". There could a portfolioExposure object PE1 that contains the exposure of the bank against "A" and another portfolioExposure object PE2 that contains the exposure of the bank against "A\FXForward" . What should happen is trade T1 should be joined with both PE1 & PE2 resulting in
{T1 ,PE1},{T1 ,PE2}
This won't be possible if I have one trade object that maps to multiple PortfolioExposure objects as Linq wouldn't let me use an or clause. So I decided to transform the received payload in to PdcTrade payload. So when pdc for a trade T1 is issued i transform
the trade in to PDCT1 { Portfolio="A", PortfolioType ="Counteparty"} and PDCT2 {Portfolio="A\FXForward", PortfolioType ="Product"}
public class PdcTrade
{
public String Portfolio { get; set; }
public Double Exposure { get; set; }
public String PortfolioType { get; set; }
}
So whenever the tradeInputAdapter gets a trade, it transforms them in to 4 PDC trades ,creates point events (we have 4 portfolio categories at the moment and hence 4 trades with each trade's portfolio type as one of the portfolio category) and enqueues a CTIEvent after all the 4 pdc trades are enqueued.
In this case
tradeStream is a point stream that has 4 inserts ( e1,e2,e3,e4) with start times of (2013-10-09 08:53:04.9479941,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0129979,2013-10-09 08:53:05.0139979) , EnqueuedCti event timestamp 2013-10-09 08:53:05.0139979
PortfolioReferenceDataStream loads all portfolio level exposure and creates point events for these and finally enqueues a cti event once done (timestamps from 2013-10-09 08:53:33.9856550 to 2013-10-09 08:53:34.0416582) finally a
EnqueuedCti event timestamp 2013-10-09 08:53:34.0446584
Now i import the timeSettings from tradeStream in to portfolioExposureStream as tradeStream is the fast stream.However as it turned out in my poc that loading reference data took longer than trades as there are fewer trades and a large amount of reference data. This phenomenon only happens during the initial load.Hence the CTI in trade stream is smaller than even the first event in reference data stream. Importing CTI from time stream in to reference data stream inserted the trade stream CTI as the first entry in reference data stream.
The join didn't produce any data. I openend event flow debugger and noticed that the imported CTI from trade stream 08:53:05.0139979 is less than even the first portfolio exposure reference data event 08:53:33.9856550. I thought this was perhaps the reason why the join was not generating any data. Is this assumption correct ?
So what i tried later was not to enqueue any CTI's in tradeStream. I imported referencedata time settings in to trade stream however that didnt generate any data too. I wanted to upload the trace files but cant seem to find a place to do it. Let me
know how to send the trace files across i will be happy to do so.
Below is the code that iam using
static void Main(string[] args) { XmlConfigurator.Configure(); using (Server cepServer = Server.Create("Default")) { try { // Host the queries in the "PDC" application var cepApp = cepServer.CreateApplication("Pdc"); // Create a data stream of random activity events var inputCfg = new GeneratorConfig() { CtiFrequency = 1, EventInterval = 1000, EventIntervalVariance = 250 }; // We want time to advance as new activity updates arrive. In order to do this // we need to link the reference stream's CTIs to the activity stream via a // time import setting var tradeStreamTimeImportSettings = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("tradeStream"), AdvanceTimePolicy.Adjust); var referenceDataStreamTimeImportSettings = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("exposureReferenceStream"), AdvanceTimePolicy.Adjust); var tradeStream = CepStream<PdcTrade>.Create(cepApp, "tradeStream", typeof(GeneratorFactory), inputCfg, EventShape.Point); var referenceCfg = new PortfolioExposureInputConfig { PollingInterval = TimeSpan.Parse("00:00:30"), Directory = ConfigurationManager.AppSettings["ResultsPath"] }; // Use the time import settings to link CTI values from the tradeStream // stream var referenceStream = CepStream<PortfolioExposure>.Create( "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory), referenceCfg, EventShape.Point, tradeStreamTimeImportSettings); //var referenceStream = CepStream<PortfolioExposure>.Create( // "exposureReferenceStream", typeof(PortfolioTypedInputAdapterFactory), // referenceCfg, EventShape.Point); // Convert the reference point stream into a reference signal var referenceSignal = referenceStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(referenceStream, (e1, e2) => ((e1.PortfolioName == e2.PortfolioName) && (e1.PortfolioType == e2.PortfolioType))); var passThroughTradeStream = from trade in tradeStream select new {Trade = trade}; var passThroughReferenceStream = from referenceData in referenceStream select new { ReferenceData = referenceData }; // Join trade stream and portfolioStream var enrichedStream = from trade in tradeStream join pflioReferenceData in referenceSignal on //trade.Portfolio equals pflioReferenceData.PortfolioName new { Portfolio = trade.Portfolio.ToLower(), PortfolioType = trade.PortfolioType.ToLower() } equals new { Portfolio = pflioReferenceData.PortfolioName.ToLower(), PortfolioType = pflioReferenceData.PortfolioType.ToLower() } select new { Trade=trade, RiskVector=pflioReferenceData }; // Pump these to the console for visibility var outputCfg = new TracerConfig() { DisplayCtiEvents = true, SingleLine = true, TracerKind = TracerKind.Console, TraceName = "OUTPUT" }; StartQuery(cepApp, enrichedStream, outputCfg); Console.WriteLine("Press <enter> to close application"); Console.ReadLine(); } catch (Exception ex0) { Console.WriteLine("Error: " + ex0.ToString()); } } } private static void StartQuery<AT0>(Application cepApp, CepStream<AT0> enrichedStream, TracerConfig outputCfg) { var q = enrichedStream.ToQuery(cepApp, "enrichedData", "", typeof(TracerFactory), outputCfg, EventShape.Point, StreamEventOrder.FullyOrdered); q.Start(); Console.WriteLine("Press <enter> to close application"); Console.ReadLine(); q.Stop(); }
- Edited by vijay_g Wednesday, October 09, 2013 2:38 PM