How to join a slow and fast stream and output at the slowest frequency?

I am having a great deal of trouble joining a slow stream (with a variable frequency of 1-2 minutes) with a fast stream (with a freqency of 20 milliseconds).
I would like the result of the joined streams to output 1 aggregated result at the frequency of the slow stream. Below is an example that simulates my scenario at slower frequencies.

var streamableSlow = from s in slowSubject
	.ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime)
	select s;

var streamableFast = from s in fastSubject
	.ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime)
	select s;


var signalSlow = streamableSlow.AlterEventDuration(e => TimeSpan.MaxValue)
	.ClipEventDuration(streamableSlow, (e1, e2) => (true));


var signalFast = streamableFast.AlterEventDuration(e => TimeSpan.MaxValue)
	.ClipEventDuration(streamableSlow, (e1, e2) => (true));



var queryJoin = from s in signalSlow
				join f in signalFast
					on s equals f
				select new LongPair
				{
					slow = s,
					fast = f
				};


var query = from win in queryJoin.SnapshotWindow()
				 select new LongPair
					 {
						 slow = win.Max(a => a.slow),
						 fast = win.Count()
					 };

I have 2 input streams that both emit a long. The slow stream at a 2 second interval and the fast stream at a 50 millisecond interval. The fast stream emits a random number between 1-4.
I first convert both inputs into point streams, and then alter and clip the event durations. I then join the streams together into a class LongPair and finally aggregate using a snapshot window.

Here is an example of my output:
slow=1, fast count=1   time=08:27:28
slow=1, fast count=2   time=08:27:28
slow=1, fast count=3   time=08:27:28
slow=1, fast count=4   time=08:27:28
slow=1, fast count=5   time=08:27:28
slow=1, fast count=6   time=08:27:28
slow=1, fast count=7   time=08:27:28
slow=1, fast count=8   time=08:27:28
slow=1, fast count=9   time=08:27:28
slow=2, fast count=1   time=08:27:30
slow=2, fast count=2   time=08:27:30
slow=3, fast count=1   time=08:27:32
slow=3, fast count=2   time=08:27:32
slow=4, fast count=1   time=08:27:34
slow=4, fast count=2   time=08:27:34
slow=4, fast count=3   time=08:27:34
slow=4, fast count=4   time=08:27:34
slow=4, fast count=5   time=08:27:34
slow=1, fast count=1   time=08:27:36
slow=1, fast count=2   time=08:27:36
slow=1, fast count=3   time=08:27:36
slow=2, fast count=1   time=08:27:38
slow=2, fast count=2   time=08:27:38
slow=3, fast count=1   time=08:27:40
slow=3, fast count=2   time=08:27:40
slow=3, fast count=3   time=08:27:40
slow=3, fast count=4   time=08:27:40
slow=4, fast count=1   time=08:27:42
slow=4, fast count=2   time=08:27:42
slow=4, fast count=3   time=08:27:42
slow=4, fast count=4   time=08:27:42

This is the output that would like to produce:
slow=1, fast count=9   time=08:27:28
slow=2, fast count=2   time=08:27:30
slow=3, fast count=2   time=08:27:32
slow=4, fast count=5   time=08:27:34
slow=1, fast count=3   time=08:27:36
slow=2, fast count=2   time=08:27:38
slow=3, fast count=4   time=08:27:40
slow=4, fast count=4   time=08:27:42

As you can see the output emits a result for every fast event, but I only want to output 1 aggregated event at the speed of the slow stream.
Now the crux of my problem is that in my real scenario the reference stream has a variable frequency (itself a snapshot window on each event received) and so I can't use either a hopping, tumbling or count window to calculate my fast count aggregate because I have a fixed timespan or event count!

Is there a method of aggregating the fast stream over a variable timespan and variable number of fast events per slow stream event?

July 3rd, 2013 4:47am

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics