Combining latest from an observable of observables

2019-05-18 21:27发布

问题:

Suppose I have a set of URIs that I am monitoring for availability. Each URI is either "up" or "down", and new URIs to monitor may be added to the system at any time:

public enum ConnectionStatus
{
    Up,
    Down
}

public class WebsiteStatus
{
    public string Uri
    {
        get;
        set;
    }

    public ConnectionStatus Status
    {
        get;
        set;
    }
}

public class Program
{
    static void Main(string[] args)
    {
        var statusStream = new Subject<WebsiteStatus>();
        Test(statusStream);

        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Test(IObservable<WebsiteStatus> statusStream)
    {
    }
}

Now suppose in Test() I want to reactively ascertain:

  • whether all URIs are down (as a bool)
  • which URIs are down (as IEnumerable<string>)

So Test would end up creating an observable like IObservable<Tuple<bool, IEnumerable<string>>> where the bool indicates whether all URIs are down and the IEnumerable<string> contains those URIs that are.

How do I go about this? My initial thinking is that I would need to group by the URI, then combine the latest from each group into a list that I could then perform a Select against. However, this did not work out due to the way CombineLatest works.

EDIT: Thanks to Matthew's answer I looked into rxx and found that it implemented a CombineLatest overload in exactly the fashion I would have expected in rx out of the box, except that I needed to change it so that it publishes even when there is only a single source stream being combined (by default it was waiting for a minimum of two source streams). Also, I can't justify pulling in an extra 2MB of binaries for the sake of one method, so I have copy/pasted it into my project. Doing so, I was able to solve as follows:

private static void Test(IObservable<WebsiteStatus> statusStream)
{
    statusStream
        .GroupBy(x => x.Uri)
        .CombineLatest()
        .Select(
            x =>
            {
                var down = x.Where(y => y.Status == ConnectionStatus.Down);
                var downCount = down.Count();
                var downUris = down.Select(y => y.Uri).ToList();

                return new
                {
                    AllDown = x.Count == downCount,
                    DownUris = downUris
                };
            })
        .Subscribe(x =>
        {
            Console.WriteLine("    Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
        });
}

回答1:

The neatest way is to use the Rxx extension in this answer. An alternative is below, it just keeps a list of sites that are down/up.

var downStream = statusStream
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (down, newStatus) =>
    {
        if (newStatus.IsUp)
            return down.Where(uri => uri != newStatus.Uri);
        else if (!down.Contains(newStatus.Uri))
            return down.Concat(new string[] { newStatus.Uri });
        else
            return down;
    });

var upStream = statusStream
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (up, newStatus) =>
    {
        if (!newStatus.IsUp)
            return up.Where(uri => uri != newStatus.Uri);
        else if (!up.Contains(newStatus.Uri))
            return down.Concat(new string[] { newStatus.Uri });
        else
            return up;
    });

var allDown = upStream.Select(up => !up.Any());