Rx how to group by a key a complex object and late

2020-07-17 15:52发布

问题:

This is related to my other question here. James World presented a solution as follows:

// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
.GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

<edit 2:

Question: How do I start the timers immediately without waiting for the first events to arrive? That's the root problem in my question, I guess. For that end, I planned on sending off dummy objects with the IDs I know should be there. But as I write in following, I ended up with some other problems. Nevertheless, I'd think solving that too would be interesting.

Forwards with the other interesting parts then! Now, if I'd like to group a complex object like the following and group by the key as follows (won't compile)

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

then I get into trouble. I'm unable to modify the part about SelectMany, Concat and Observable.Return so that the query would work as before. For instance, if I make query as

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key.First())))
    .Subscribe(i => Console.WriteLine(i.Id + "-" + i.IsTest);

Then two events are needed before an output can be observed in the Subscribe. It's the effect of the call to First, I gather. Furthermore, I woul like to use the complex object attributes in the call to alarmInterval too.

Can someone offer an explanation what's going on, perhaps even a solution? The problem in going with unmodified solution is that the grouping doesn't look Ids alone for the key value, but also the IsTest field.

<edit: As a note, the problem probably could be solved firsly by creating an explicit class or struct and then that implements a custom IEquatable and secondly then using James' code as-is so that grouping would happen by IDs alone. It feels like hack though.

回答1:

Also, if you want to count the number of times you've seen an item before the alarm goes off you can do it like this, taking advantage of the counter overload in Select.

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
    .SelectMany(grp => grp.Select((count, alarm) => new { count, alarm }).TakeLast(1));

Note, this will be 0 for the first (seed) item - which is probably what you want anyway.



回答2:

You are creating an anonymous type in your Select. Lets call it A1. I will assume your idStream is an IObservable. Since this is the Key in the GroupByUntil you do not need to worry about key comparison - int equality is fine.

The GroupByUntil is an IObservable<IGroupedObservable<int, A1>>.

The SelectMany as written is trying to be an IObservable<A1>. You need to just Concat(Observable.Return(grp.Key)) here - but the the type of the Key and the type of the Group elements must match or the SelectMany won't work. So the key would have to be an A1 too. Anonymous types use structural equality and the return type would be stream of A1 - but you can't declare that as a public return type.

If you just want the Id, you should add a .Select(x => x.Id) after the Throttle:

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)
                                           .Select(x => x.Id))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

If you want A1 instead - you'll need to create a concrete type that implements Equality.

EDIT

I've not tested it, but you could also flatten it more simply like this, I think this is easier! It is outputing A1 though, so you'll have to deal with that if you need to return the stream somewhere.

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
    .SelectMany(grp => grp.TakeLast(1));