I got into a Rx spree, so to speak, and this question is related to mine here and here. Nevertheless, maybe these are of help to someone as I could see them as useful variations of the same theme.
Question: How could one group a random stream of int
(say, on interval [0, 10] produced on random interval) objects into groups and provide for earch group a variable number of absence of events alarms (for the lack of better definition, for futher background see linked posts). More specifically with code, how could one define multipe throttle settings per group in the following:
var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));
Here the subscribe function will be called if there is more than one second of absence of IDs per group. What if one would like to define three different values for the absence of events (say, one second, five seconds and ten seconds) and all cancelled when an event arrives? What I can think of are:
- Split each ID in
idStream
into several synthetic ones and provide a bijective mapping between the real IDs and the synthetic ones. For instance in this case ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 and then define a selector function inThrottle
like soFunc<int, Timespan>(i => /* switch(i)...*/)
and then whenSubscribe
will be called, map the ID back. See also the linked questions for further background. - Create a nested grouping in which the IDs are grouped and then further the ID groups will be copied/replicated/forked (I don't know the proper term) into groups according to throttling values. This approach, I think, is rather complicated and I'm not sure if it would be the best one to go with. I'd sure be interested to see such a query, nevertheless.
In a more general setting, I suspect, this is a situation where there are multiple handlers per some group, albeit I haven't managed to find anything related to this.
<edit:
As a (hopefully clarifying) an example idStream
pushes one ID: 1 upon which three different counters would be initiated, each of which awaiting for the next event to occur or alarming if no new ID 1 is detected in time. Counter 1 (C1) awaits for five seconds, counter 2 (C2) for seven seconds and counter 3 (C3) for ten seconds. If a new ID 1 will be received within interval [0, 5] seconds, all counters will be re-initialized with the aforementioned values and no alarm will be sent. If a new ID will be received within interval [0, 7) seconds, C1 alarms and C2 and C3 will be re-initialized. Similarly if a new ID will be received within an interval [0, 10) seconds C1 and C2 fire, but C3 gets just reinitialized.
That is, there would be multiple "absence alarms" or in general, actions taken, towards one ID given some conditions. I'm not sure what would be a good analogue... Perhaps stacking "alert lights" in a tower so that first is green, then yellow and lastly red. As absence of an ID continues longer and longer, a color after color will be lit (in this case red is the last one). Then when one ID is detected, all the lights will be turned off.
<edit 2:
Upon retrofitting James' code to the example as follows and leaving the rest as written I discovered the Subscribe
will be called straight upon the first event on both of the two alarm levels.
const int MaxLevels = 2;
var idAlarmStream = idStream
.Select(i => i)
.AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
.Subscribe(i =>
{
Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
});
Let's see what's happening here and if MaxLevels
could be provided dynamically...
<edit 3: James' code works. The problem was between the chair and the keyboard! Changing the time to something more sensible sure did help. In fact, I changed them to bigger figures, but it was .FromTicks
and it escaped me for a few minutes.
This works I think - I'll try to add fuller explanation later. Each alarm level has a defined threshold (per signal group). These are expected to be of increasing duration.
The basic idea is to have the signals of all previous levels feed into the current level. The first level is a "zero" level of the signals themselves that is filtered out before the alarm stream is returned. Note that TSignal keys need to support value identity.
I'm sure there's room for simplification!
Sample unit test: