I am having a great deal of trouble joining a slow stream (with a variable frequency of 1-2 minutes) with a fast stream (with a freqency of 20 milliseconds).
I would like the result of the joined streams to output 1 aggregated result at the frequency of the slow stream. Below is an example that simulates my scenario at slower frequencies.
var streamableSlow = from s in slowSubject .ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime) select s; var streamableFast = from s in fastSubject .ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime) select s; var signalSlow = streamableSlow.AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(streamableSlow, (e1, e2) => (true)); var signalFast = streamableFast.AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(streamableSlow, (e1, e2) => (true)); var queryJoin = from s in signalSlow join f in signalFast on s equals f select new LongPair { slow = s, fast = f }; var query = from win in queryJoin.SnapshotWindow() select new LongPair { slow = win.Max(a => a.slow), fast = win.Count() };
I have 2 input streams that both emit a long. The slow stream at a 2 second interval and the fast stream at a 50 millisecond interval. The fast stream emits a random number between 1-4.
I first convert both inputs into point streams, and then alter and clip the event durations. I then join the streams together into a class LongPair and finally aggregate using a snapshot window.
Here is an example of my output:
slow=1, fast count=1 time=08:27:28
slow=1, fast count=2 time=08:27:28
slow=1, fast count=3 time=08:27:28
slow=1, fast count=4 time=08:27:28
slow=1, fast count=5 time=08:27:28
slow=1, fast count=6 time=08:27:28
slow=1, fast count=7 time=08:27:28
slow=1, fast count=8 time=08:27:28
slow=1, fast count=9 time=08:27:28
slow=2, fast count=1 time=08:27:30
slow=2, fast count=2 time=08:27:30
slow=3, fast count=1 time=08:27:32
slow=3, fast count=2 time=08:27:32
slow=4, fast count=1 time=08:27:34
slow=4, fast count=2 time=08:27:34
slow=4, fast count=3 time=08:27:34
slow=4, fast count=4 time=08:27:34
slow=4, fast count=5 time=08:27:34
slow=1, fast count=1 time=08:27:36
slow=1, fast count=2 time=08:27:36
slow=1, fast count=3 time=08:27:36
slow=2, fast count=1 time=08:27:38
slow=2, fast count=2 time=08:27:38
slow=3, fast count=1 time=08:27:40
slow=3, fast count=2 time=08:27:40
slow=3, fast count=3 time=08:27:40
slow=3, fast count=4 time=08:27:40
slow=4, fast count=1 time=08:27:42
slow=4, fast count=2 time=08:27:42
slow=4, fast count=3 time=08:27:42
slow=4, fast count=4 time=08:27:42
This is the output that would like to produce:
slow=1, fast count=9 time=08:27:28
slow=2, fast count=2 time=08:27:30
slow=3, fast count=2 time=08:27:32
slow=4, fast count=5 time=08:27:34
slow=1, fast count=3 time=08:27:36
slow=2, fast count=2 time=08:27:38
slow=3, fast count=4 time=08:27:40
slow=4, fast count=4 time=08:27:42
As you can see the output emits a result for every fast event, but I only want to output 1 aggregated event at the speed of the slow stream.
Now the crux of my problem is that in my real scenario the reference stream has a variable frequency (itself a snapshot window on each event received) and so I can't use either a hopping, tumbling or count window to calculate my fast count aggregate because
I have a fixed timespan or event count!
Is there a method of aggregating the fast stream over a variable timespan and variable number of fast events per slow stream event?