Sorting buffered Observables

2019-05-06 10:18发布

I've got a stream of tokens that are produced very quickly and a processer that is relatively slow. The tokens are of three sub-types and I would prefer them to processed by their priority. So, I would like the tokens to be buffered after they've been produced and are waiting to be processed and have that buffer sorted by priority.

Here're my classes:

public enum Priority
{
    High   = 3,
    Medium = 2,
    Low    = 1
}

public class Base : IComparable<Base>
{
    public int Id { get; set; }

    public int CompareTo(Base other)
    {
        return Id.CompareTo(other.Id);
    }
}

public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }

public class Token : IComparable<Token>
{
    private readonly string _toString;

    public Foo Foo { get; }

    public Bar Bar { get; }

    public Baz Baz { get; }

    public Priority Priority =>
        Baz == null
            ? Bar == null
                ? Priority.High
                : Priority.Medium
            : Priority.Low;

    public int CompareTo(Token other)
    {
        if (Priority > other.Priority)
        {
            return -1;
        }

        if (Priority < other.Priority)
        {
            return 1;
        }

        switch (Priority)
        {
            case Priority.High:
                return Foo.CompareTo(other.Foo);
            case Priority.Medium:
                return Bar.CompareTo(other.Bar);
            case Priority.Low:
                return Baz.CompareTo(other.Baz);
            default:
                throw new ArgumentOutOfRangeException();
        }
    }

    public override string ToString()
    {
        return _toString;
    }

    public Token(Foo foo)
    {
        _toString = $"{nameof(Foo)}:{foo.Id}";
        Foo = foo;
    }

    public Token(Foo foo, Bar bar) : this(foo)
    {
        _toString += $":{nameof(Bar)}:{bar.Id}";
        Bar = bar;
    }

    public Token(Foo foo, Baz baz) : this(foo)
    {
        _toString += $":{nameof(Baz)}:{baz.Id}";
        Baz = baz;
    }
}

And here is my producer code:

var random = new Random();
var bazId = 0;
var barId = 0;

var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
                                      .Select(Convert.ToInt32)
                                      .Take(3)
                 select new Token(new Foo { Id = id }))
                .Publish();

var barTokens = (from fooToken in fooTokens
                 from id in Observable.Range(0, random.Next(5, 10))
                                      .Select(_ => Interlocked.Increment(ref barId))
                 select new Token(fooToken.Foo, new Bar { Id = id }))
                .Publish();

var bazTokens = (from barToken in barTokens
                 from id in Observable.Range(0, random.Next(1, 5))
                                      .Select(_ => Interlocked.Increment(ref bazId))
                 select new Token(barToken.Foo, new Baz { Id = id }))
                .Publish();

var tokens = bazTokens.Merge(barTokens)
                      .Merge(fooTokens)
                      .Do(dt =>
                      {
                          Console.ForegroundColor = ConsoleColor.Red;
                          Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
                      });

// Subscription

bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();

However I'm a bit stuck as to how to buffer and sort the tokens. If I do this, the tokens appear to be produced and consumed at the same time, which suggests that there's some buffering going on behind the scenes, but I can't control it.

tokens.Subscribe(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

If I use a TPL Dataflow ActionBlock, I can see the tokens being produced correctly and processed correctly, but I'm still not sure how to do the sorting.

var proc = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

tokens.Subscribe(dt => proc.Post(dt));

Any ideas or pointers where to go next would be appreciated!

Update:

I got something to work. I added a helper to clean up the code for displaying the test data:

private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
    if (wait.HasValue)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
    }
    Console.ForegroundColor = col;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}

I added a SortedSet:

var set = new SortedSet<Token>();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt => set.Add(dt));

And I also added a consumer, although I'm not a fan of my implementation:

var source = new CancellationTokenSource();

Task.Run(() =>
{
    while (!source.IsCancellationRequested)
    {
        var dt = set.FirstOrDefault();
        if (dt == null)
        {
            continue;
        }

        if (set.Remove(dt))
        {
            Display(dt, ConsoleColor.Green, 250);
        }
    }
}, source.Token);

So, now I'm getting exactly the results I'm looking for, but a) I'm not happy with the while polling and b) If I want multiple consumers, I'm going to run into race conditions. So, I'm still looking for better implementations if anyone has one!

4条回答
我想做一个坏孩纸
2楼-- · 2019-05-06 10:31

Using Dataflow you can filter the tokens such that each priority level goes down a different path in your pipeline. The tokens are filtered through the use of a predicate on each priority typed link. Then it's up to you how you want to give preference based on priority.

Sorting:

var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

One way to give preference to higher priority items would be to allow more than the default sequential processing. You can do that by setting the MaxDegreeOfParallelism for each priority block.

Giving Preference:

var highPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 3}
var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, highPriOptions);

var midPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 2}   
var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, midPriOptions);

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

These samples are by no means complete but should at least give you the idea.

查看更多
来,给爷笑一个
3楼-- · 2019-05-06 10:34

Okay, so I used a normal lock for accessing the SortedSet, then increased the number of consumers and it seems to be working fine, so although I've not been able to come up with a full RX or a split RX / TPL DataFlow solution, this now does what I want, so I'll just show the changes I made in addition to the update in the original question and leave it there.

var set = new SortedSet<Token>();
var locker = new object();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt =>
{
    lock (locker)
    {
        set.Add(dt);
    }
});

for (var i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Run(() =>
    {
        while (!source.IsCancellationRequested)
        {
            Token dt;

            lock (locker)
            {
                dt = set.FirstOrDefault();
            }

            if (dt == null)
            {
                continue;
            }

            bool removed;

            lock (locker)
            {
                removed = set.Remove(dt);
            }

            if (removed)
            {
                Display(dt, ConsoleColor.Green, 750);
            }
        }
    }, source.Token);
}

Thank you to the people who posted solutions, I appreciate the time you spent.

查看更多
我命由我不由天
4楼-- · 2019-05-06 10:35

I think the conundrum here is that what you seem to be really after is the results of a pull model, based on fast, hot, push sources. What you seem to want is the "highest" priority yet received, but the question is "received by what?" If you had multiple subscribers, operating at different paces, they could each have their own view of what "highest" was.

So the way I see it is that you want to merge the sources into a kind of reactive, prioritized (sorted) queue, from which you pull results when the observer is ready.

I approached that by using a signal back to the Buffer, saying "my one observer is now ready to see the state of the prioritized list". This is achieved by using the Buffer overload that takes in an observable closing signal. That buffer contains the new list of elements received, which I just merge into the last list, sans 'highest'.

The code is just demo knocked up code for the purposes of this question - there are probably bugs:

 using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxTests
{
    class Program
    {
        static void Main(string[] args)
        {
            var p = new Program();
            p.TestPrioritisedBuffer();
            Console.ReadKey();


        }

        void TestPrioritisedBuffer()
        {
            var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Do((source) => Console.WriteLine("Source1:"+source));
            var source2 = Observable.Interval(TimeSpan.FromSeconds(5)).Scan((x,y)=>(x+100)).Do((source) => Console.WriteLine("Source2:" + source)); ;

            BehaviorSubject<bool> closingSelector = new BehaviorSubject<bool>(true);



            var m = Observable.Merge(source1, source2).
                Buffer(closingSelector).
                Select(s => new { list =s.ToList(), max=(long)0 }).
               Scan((x, y) =>
               {
                   var list = x.list.Union(y.list).OrderBy(k=>k);

                   var max = list.LastOrDefault();


                   var res = new
                   {
                       list = list.Take(list.Count()-1).ToList(),
                       max= max
                   };

                   return res;



               }
               ).
               Do((sorted) => Console.WriteLine("Sorted max:" + sorted.max + ".  Priority queue length:" + sorted.list.Count)).
               ObserveOn(Scheduler.Default); //observe on other thread

            m.Subscribe((v)=> { Console.WriteLine("Observed: "+v.max); Thread.Sleep(3000); closingSelector.OnNext(true); }) ;
        }
    }
}
查看更多
我想做一个坏孩纸
5楼-- · 2019-05-06 10:39

The container you want is a priority queue, unfortunately there is no implementation in the .net runtime (there is in the c++ stl/cli but priority_queue is not made available to other languages from that).

There are existing non-MS containers that fill this role, you would need to search and look at the results to pick one that meets your needs.

查看更多
登录 后发表回答