In Rx, how to group events by id and throttle each

2019-04-07 18:48发布

问题:

I got into a Rx spree, so to speak, and this question is related to mine here and here. Nevertheless, maybe these are of help to someone as I could see them as useful variations of the same theme.

Question: How could one group a random stream of int (say, on interval [0, 10] produced on random interval) objects into groups and provide for earch group a variable number of absence of events alarms (for the lack of better definition, for futher background see linked posts). More specifically with code, how could one define multipe throttle settings per group in the following:

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

Here the subscribe function will be called if there is more than one second of absence of IDs per group. What if one would like to define three different values for the absence of events (say, one second, five seconds and ten seconds) and all cancelled when an event arrives? What I can think of are:

  • Split each ID in idStream into several synthetic ones and provide a bijective mapping between the real IDs and the synthetic ones. For instance in this case ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 and then define a selector function in Throttle like so Func<int, Timespan>(i => /* switch(i)...*/) and then when Subscribe will be called, map the ID back. See also the linked questions for further background.
  • Create a nested grouping in which the IDs are grouped and then further the ID groups will be copied/replicated/forked (I don't know the proper term) into groups according to throttling values. This approach, I think, is rather complicated and I'm not sure if it would be the best one to go with. I'd sure be interested to see such a query, nevertheless.

In a more general setting, I suspect, this is a situation where there are multiple handlers per some group, albeit I haven't managed to find anything related to this.

<edit: As a (hopefully clarifying) an example idStream pushes one ID: 1 upon which three different counters would be initiated, each of which awaiting for the next event to occur or alarming if no new ID 1 is detected in time. Counter 1 (C1) awaits for five seconds, counter 2 (C2) for seven seconds and counter 3 (C3) for ten seconds. If a new ID 1 will be received within interval [0, 5] seconds, all counters will be re-initialized with the aforementioned values and no alarm will be sent. If a new ID will be received within interval [0, 7) seconds, C1 alarms and C2 and C3 will be re-initialized. Similarly if a new ID will be received within an interval [0, 10) seconds C1 and C2 fire, but C3 gets just reinitialized.

That is, there would be multiple "absence alarms" or in general, actions taken, towards one ID given some conditions. I'm not sure what would be a good analogue... Perhaps stacking "alert lights" in a tower so that first is green, then yellow and lastly red. As absence of an ID continues longer and longer, a color after color will be lit (in this case red is the last one). Then when one ID is detected, all the lights will be turned off.

<edit 2: Upon retrofitting James' code to the example as follows and leaving the rest as written I discovered the Subscribe will be called straight upon the first event on both of the two alarm levels.

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

Let's see what's happening here and if MaxLevels could be provided dynamically...

<edit 3: James' code works. The problem was between the chair and the keyboard! Changing the time to something more sensible sure did help. In fact, I changed them to bigger figures, but it was .FromTicks and it escaped me for a few minutes.

回答1:

This works I think - I'll try to add fuller explanation later. Each alarm level has a defined threshold (per signal group). These are expected to be of increasing duration.

The basic idea is to have the signals of all previous levels feed into the current level. The first level is a "zero" level of the signals themselves that is filtered out before the alarm stream is returned. Note that TSignal keys need to support value identity.

I'm sure there's room for simplification!

Sample unit test:

public class AlarmTests : ReactiveTest
{
    [Test]
    public void MultipleKeyMultipleSignalMultipleLevelTest()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(800);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(200, 2),
            OnNext(400, 1),
            OnNext(420, 2),
            OnNext(800, 1),
            OnNext(1000, 1),
            OnNext(1200, 1));

        Func<int, int> keySelector = i => i;
        Func<int, int, TimeSpan> thresholdSelector = (key, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(700, new Alarm<int>(1, 1)),
            OnNext(720, new Alarm<int>(2, 1)),
            OnNext(1220, new Alarm<int>(2, 2)),
            OnNext(1500, new Alarm<int>(1, 1)),
            OnNext(2000, new Alarm<int>(1, 2)));
    }

    [Test]
    public void CheckAlarmIsSuppressed()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(500);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(400, 1),
            OnNext(600, 1));

        Func<int, int> keySelector = i => i;

        Func<int, int, TimeSpan> thresholdSelector = (signal, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(900, new Alarm<int>(1, 1)),
            OnNext(1100, new Alarm<int>(1, 2)));
    }
}



public static class ObservableExtensions
{
    /// <summary>
    /// Create an alarm system that detects signal gaps of length
    /// determined by a signal key and signals alarms of increasing severity.
    /// </summary>
    /// <typeparam name="TSignal">Type of the signal</typeparam>
    /// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam>
    /// <param name="signals">Input signal stream</param>
    /// <param name="keySelector">Function to select a key from a signal for grouping</param>
    /// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level.
    /// Should return TimeSpan.MaxValue for levels above the highest level</param>
    /// <param name="levels">Number of alarm levels</param>
    /// <param name="scheduler">Scheduler use for throttling</param>
    /// <returns>A stream of alarms each of which contains the signal and alarm level</returns>
    public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>(
        this IObservable<TSignal> signals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int levels,
        IScheduler scheduler)
    {
        var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0))
                                  .Publish()
                                  .RefCount();

        for (int i = 0; i < levels; i++)
        {
            alarmSignals = alarmSignals.CreateAlarmSystemLevel(
                keySelector, thresholdSelector, i + 1, scheduler);
        }

        return alarmSignals.Where(alarm => alarm.Level != 0);

    }

    private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>(
        this IObservable<Alarm<TSignal>> alarmSignals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int level,
        IScheduler scheduler)
    {
        return alarmSignals
            .Where(alarmSignal => alarmSignal.Level == 0)
            .Select(alarmSignal => alarmSignal.Signal)
            .GroupByUntil(
                keySelector,
                grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
            .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level)))
            .Merge(alarmSignals);
    }
}

public class Alarm<TSignal> : IEquatable<Alarm<TSignal>>
{
    public Alarm(TSignal signal, int level)
    {
        Signal = signal;
        Level = level;
    }

    public TSignal Signal { get; private set; }
    public int Level { get; private set; }

    private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        if (ReferenceEquals(x, null))
            return false;
        if (ReferenceEquals(y, null))
            return false;
        if (ReferenceEquals(x, y))
            return true;

        return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level);
    }

    // Equality implementation added to help with testing.
    public override bool Equals(object other)
    {
        return Equals(this, other as Alarm<TSignal>);
    }

    public override string ToString()
    {
        return string.Format("Signal: {0} Level: {1}", Signal, Level);
    }

    public bool Equals(Alarm<TSignal> other)
    {
        return Equals(this, other);
    }

    public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return Equals(x, y);
    }

    public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return !Equals(x, y);
    }

    public override int GetHashCode()
    {
        return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329);
    }
}