How to combine GroupedObservables in rx.net?

2019-08-21 04:19发布

问题:

I have one observable that I use GroupBy on to get a number of streams. I actually want a Scan result over each sub-stream. Let's say the observable is over product prices and the scan result is average price per product type.

I have another stream of events pertaining to those 'products' (let's say "show product price" events) and I want to combine it with the previous stream's latest product price. So the Scan output per group needs to be combined with each element of the event stream to get the latest average price for that event's product.

For some reason I cannot get the right syntax and I have been bashing away at this all day. Can someone please help?


Update

I am adding the code below to illustrate the approximate intent.

 public class Node
{
    private List<int> Details = new List<int>();

    public void AddInfo(int x)
    {
        Details.Add(x );
    }

    public Node(int x)
    {
        Details.Add(x);  
    }

    public int Index => Details[0]%10; //just to simplify the grouping and debugging

    public int Latest => Details.Last();
}

public class Message
{
    private static Random _random = new Random();

    public int MessageNodeInfo { get; private set; }

    public Message()
    {
        MessageNodeInfo = _random.Next(); 
    }
}


public class AccumulatingInfoTest
{


    private static Random _random=new Random();

    private IObservable<Message> MessageStream()
    {
        TimeSpan timeSpan = TimeSpan.FromSeconds(0.5);


        var ret= Observable.Generate(0,
            _ => { return true; }, 
            _ => { return 0; }, 
            _ => { return new Message(); },
            _=> timeSpan)
            .Publish()
            .RefCount();



        return ret;

    }


    public class ArbitraryCommonClass
    {
        public int K { get; set; }
        public Message M { get; set; }
        public Node D { get; set; }

        public ArbitraryCommonClass Combine(ArbitraryCommonClass a)
        {
            return new ArbitraryCommonClass()
            {
                K = this.K,
                M = this.M ?? a.M,
                D = this.D ?? a.D
            };
        }
    }

    public void Start()
    {

        var inputStream = MessageStream();

        inputStream.Subscribe(y => Console.WriteLine("Input: K " + y.MessageNodeInfo % 10 + " V " + y.MessageNodeInfo));


        var nodeInfoStream = inputStream
            .Select(nodeInfo => new Node(nodeInfo.MessageNodeInfo))
            .GroupBy(node => node.Index)
            .Select(groupedObservable => new
                        {
                            Key = groupedObservable.Key,
                            Observable = groupedObservable
                                .Scan(

                                    (nodeAcc, node) => { nodeAcc.AddInfo(node.Latest); return nodeAcc; }

                                    )
                                .Select(a => new ArbitraryCommonClass() { K = a.Index, M = (Message)null, D = a })

                        }
                    );

        var groupedMessageStream =
            inputStream
            .GroupBy(
                    m => new Node(m.MessageNodeInfo).Index
                    )
            .Select(a => new
                        {
                            Key =a.Key,
                            Observable = a.Select(b => new ArbitraryCommonClass() { K = a.Key, M = b, D = null })

                        });



        var combinedStreams = nodeInfoStream
            .Merge(groupedMessageStream)
            .GroupBy(s => s.Key)
            .Select(grp => grp
                .Scan(

                    (state, next) => new { Key = state.Key, Observable = Observable.CombineLatest(state.Observable, next.Observable, (x, y) => { return x.Combine(y); }) }
                )



            )
            .Merge()
            .SelectMany(x => x.Observable.Select(a=>a));

        combinedStreams.Where(x=>x.M!=null).Subscribe(x => Console.WriteLine(x.K + " " + x.M.MessageNodeInfo + " " + x.D.Latest));














    }
}

回答1:

Assuming the following class:

public class Product
{
    public string Type { get; set; } = "Default";
    public decimal Price { get; set; }
}

Here's a use of GroupBy with Scan (shows the average product price grouped by type). The trick is to Select over the grouped observable to get to the individual groupings, do whatever, then (presumably) merge them back together. You could collapse the Select and the Merge into a single SelectMany, but it can be easier to read when separated:

var productSubject = new Subject<Product>();
var printSignal = new Subject<Unit>();

var latestAverages = productSubject.GroupBy(p => p.Type)
    .Select(g => g
        .Scan((0, 0.0m), (state, item) => (state.Item1 + 1, state.Item2 + item.Price)) //hold in state the count and the running total for each group
        .Select(t => (g.Key, t.Item2 / t.Item1)) //divide to get the average
    )
    .Merge()
    .Scan(ImmutableDictionary<string, decimal>.Empty, (state, t) => state.SetItem(t.Key, t.Item2)); //Finally, cache the average by group.


printSignal.WithLatestFrom(latestAverages, (_, d) => d)
    .Subscribe(avgs =>
    {
        foreach (var avg in avgs)
        {
            Console.WriteLine($"ProductType: {avg.Key}. Average: {avg.Value}");
        }
        Console.WriteLine();
    });

var productsList = new List<Product>()
{
    new Product { Price = 1.00m },
    new Product { Price = 2.00m },
    new Product { Price = 3.00m },

    new Product { Price = 2.00m, Type = "Alternate" },
    new Product { Price = 4.00m, Type = "Alternate" },
    new Product { Price = 6.00m, Type = "Alternate" },
};

productsList.ForEach(p => productSubject.OnNext(p));

printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 4.0m });
printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 8.0m, Type = "Alternate" });
printSignal.OnNext(Unit.Default);

This uses nuget package System.Collections.Immutable.