I've got a stream of tokens that are produced very quickly and a processer that is relatively slow. The tokens are of three sub-types and I would prefer them to processed by their priority. So, I would like the tokens to be buffered after they've been produced and are waiting to be processed and have that buffer sorted by priority.
Here're my classes:
public enum Priority
{
High = 3,
Medium = 2,
Low = 1
}
public class Base : IComparable<Base>
{
public int Id { get; set; }
public int CompareTo(Base other)
{
return Id.CompareTo(other.Id);
}
}
public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }
public class Token : IComparable<Token>
{
private readonly string _toString;
public Foo Foo { get; }
public Bar Bar { get; }
public Baz Baz { get; }
public Priority Priority =>
Baz == null
? Bar == null
? Priority.High
: Priority.Medium
: Priority.Low;
public int CompareTo(Token other)
{
if (Priority > other.Priority)
{
return -1;
}
if (Priority < other.Priority)
{
return 1;
}
switch (Priority)
{
case Priority.High:
return Foo.CompareTo(other.Foo);
case Priority.Medium:
return Bar.CompareTo(other.Bar);
case Priority.Low:
return Baz.CompareTo(other.Baz);
default:
throw new ArgumentOutOfRangeException();
}
}
public override string ToString()
{
return _toString;
}
public Token(Foo foo)
{
_toString = $"{nameof(Foo)}:{foo.Id}";
Foo = foo;
}
public Token(Foo foo, Bar bar) : this(foo)
{
_toString += $":{nameof(Bar)}:{bar.Id}";
Bar = bar;
}
public Token(Foo foo, Baz baz) : this(foo)
{
_toString += $":{nameof(Baz)}:{baz.Id}";
Baz = baz;
}
}
And here is my producer code:
var random = new Random();
var bazId = 0;
var barId = 0;
var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
.Select(Convert.ToInt32)
.Take(3)
select new Token(new Foo { Id = id }))
.Publish();
var barTokens = (from fooToken in fooTokens
from id in Observable.Range(0, random.Next(5, 10))
.Select(_ => Interlocked.Increment(ref barId))
select new Token(fooToken.Foo, new Bar { Id = id }))
.Publish();
var bazTokens = (from barToken in barTokens
from id in Observable.Range(0, random.Next(1, 5))
.Select(_ => Interlocked.Increment(ref bazId))
select new Token(barToken.Foo, new Baz { Id = id }))
.Publish();
var tokens = bazTokens.Merge(barTokens)
.Merge(fooTokens)
.Do(dt =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
// Subscription
bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();
However I'm a bit stuck as to how to buffer and sort the tokens. If I do this, the tokens appear to be produced and consumed at the same time, which suggests that there's some buffering going on behind the scenes, but I can't control it.
tokens.Subscribe(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
If I use a TPL Dataflow ActionBlock
, I can see the tokens being produced correctly and processed correctly, but I'm still not sure how to do the sorting.
var proc = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
tokens.Subscribe(dt => proc.Post(dt));
Any ideas or pointers where to go next would be appreciated!
Update:
I got something to work. I added a helper to clean up the code for displaying the test data:
private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
if (wait.HasValue)
{
Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
}
Console.ForegroundColor = col;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}
I added a SortedSet
:
var set = new SortedSet<Token>();
var tokens = bazTokens
.Merge(barTokens)
.Merge(fooTokens)
.Do(dt => Display(dt, ConsoleColor.Red));
tokens.Subscribe(dt => set.Add(dt));
And I also added a consumer, although I'm not a fan of my implementation:
var source = new CancellationTokenSource();
Task.Run(() =>
{
while (!source.IsCancellationRequested)
{
var dt = set.FirstOrDefault();
if (dt == null)
{
continue;
}
if (set.Remove(dt))
{
Display(dt, ConsoleColor.Green, 250);
}
}
}, source.Token);
So, now I'm getting exactly the results I'm looking for, but a) I'm not happy with the while
polling and b) If I want multiple consumers, I'm going to run into race conditions. So, I'm still looking for better implementations if anyone has one!