I've created a new class called Actor which processes messages passed to it. The problem I am running into is figuring out what is the most elegant way to pass related but different messages to the Actor. My first idea is to use inheritance but it seems so bloated but it is strongly types which is a definite requirement.
Have any ideas?
Example
private abstract class QueueMessage { }
private class ClearMessage : QueueMessage
{
public static readonly ClearMessage Instance = new ClearMessage();
private ClearMessage() { }
}
private class TryDequeueMessage : QueueMessage
{
public static readonly TryDequeueMessage Instance = new TryDequeueMessage();
private TryDequeueMessage() { }
}
private class EnqueueMessage : QueueMessage
{
public TValue Item { get; private set; }
private EnqueueMessage(TValue item)
{
Item = item;
}
}
Actor Class
/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);
/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
/// <summary>The default total number of threads to process messages.</summary>
private const Int32 DefaultThreadCount = 1;
/// <summary>Used to serialize access to the message queue.</summary>
private readonly Locker Locker;
/// <summary>Stores the messages until they can be processed.</summary>
private readonly System.Collections.Generic.Queue<Message> MessageQueue;
/// <summary>Signals the actor thread to process a new message.</summary>
private readonly ManualResetEvent PostEvent;
/// <summary>This tells the actor thread to stop reading from the queue.</summary>
private readonly ManualResetEvent DisposeEvent;
/// <summary>Processes the messages posted to the actor.</summary>
private readonly List<Thread> ActorThreads;
/// <summary>Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class.</summary>
public Actor() : this(DefaultThreadCount) { }
/// <summary>Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class.</summary>
/// <param name="thread_count"></param>
public Actor(Int32 thread_count)
{
if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");
Locker = new Locker();
MessageQueue = new System.Collections.Generic.Queue<Message>();
EnqueueEvent = new ManualResetEvent(true);
PostEvent = new ManualResetEvent(false);
DisposeEvent = new ManualResetEvent(true);
ActorThreads = new List<Thread>();
for (Int32 i = 0; i < thread_count; i++)
{
var thread = new Thread(ProcessMessages);
thread.IsBackground = true;
thread.Start();
ActorThreads.Add(thread);
}
}
/// <summary>Posts a message and waits for the reply.</summary>
/// <param name="value">The message to post to the actor.</param>
/// <returns>The reply from the actor.</returns>
public TReply PostWithReply(TMessage message)
{
using (var wrapper = new Message(message))
{
lock (Locker) MessageQueue.Enqueue(wrapper);
PostEvent.Set();
wrapper.Channel.CompleteEvent.WaitOne();
return wrapper.Channel.Value;
}
}
/// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
/// <param name="value">The message to post to the actor.</param>
/// <param name="callback">The callback that will be invoked once the replay is received.</param>
public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
{
if (callback == null) throw new ArgumentNullException("callback");
ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
if (DisposeEvent.WaitOne(10))
{
DisposeEvent.Reset();
PostEvent.Set();
foreach (var thread in ActorThreads)
{
thread.Join();
}
((IDisposable)PostEvent).Dispose();
((IDisposable)DisposeEvent).Dispose();
}
}
/// <summary>Processes a message posted to the actor.</summary>
/// <param name="message">The message to be processed.</param>
protected abstract void ProcessMessage(Message message);
/// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
private void ProcessMessages()
{
while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
{
var message = (Message)null;
while (true)
{
lock (Locker)
{
message = MessageQueue.Count > 0 ?
MessageQueue.Dequeue() :
null;
if (message == null)
{
PostEvent.Reset();
break;
}
}
try
{
ProcessMessage(message);
}
catch
{
}
}
}
}
/// <summary>Represents a message that is passed to an actor.</summary>
protected class Message : IDisposable
{
/// <summary>The actual value of this message.</summary>
public TMessage Value { get; private set; }
/// <summary>The channel used to give a reply to this message.</summary>
public Channel Channel { get; private set; }
/// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
/// <param name="value">The actual value of the message.</param>
public Message(TMessage value)
{
Value = value;
Channel = new Channel();
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
Channel.Dispose();
}
}
/// <summary>Represents a channel used by an actor to reply to a message.</summary>
protected class Channel : IDisposable
{
/// <summary>The value of the reply.</summary>
public TReply Value { get; private set; }
/// <summary>Signifies that the message has been replied to.</summary>
public ManualResetEvent CompleteEvent { get; private set; }
/// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
public Channel()
{
CompleteEvent = new ManualResetEvent(false);
}
/// <summary>Reply to the message received.</summary>
/// <param name="value">The value of the reply.</param>
public void Reply(TReply value)
{
Value = value;
CompleteEvent.Set();
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
((IDisposable)CompleteEvent).Dispose();
}
}
}
Steve Gilham summarized how the compiler actually handles discriminated unions. For your own code, you could consider a simplified version of that. Given the following F#:
Here's one way to emulate it in C#:
Now, in code that is given a
QueueMessage
, you can switch on theMessageType
property in lieu of pattern matching, and make sure that you access theItem
property only onEnqueueMessage
s.EDIT
Here's another alternative, based on Juliet's code. I've tried to streamline things so that it's got a more usable interface from C#, though. This is preferable to the previous version in that you can't get a
MethodNotImplemented
exception.You'd use this code like this:
In your example code, you implement
PostWithAsyncReply
in terms ofPostWithReply
. That isn't ideal, because it means that when you callPostWithAsyncReply
and the actor takes a while to handle it, there are actually two threads tied up: the one executing the actor and the one waiting for it to finish. It would be better to have the one thread executing the actor and then calling the callback in the asynchronous case. (Obviously in the synchronous case there's no avoiding the tying up of two threads).Update:
More on the above: you construct an actor with an argument telling it how many threads to run. For simplicity suppose every actor runs with one thread (actually quite a good situation because actors can then have internal state with no locking on it, as only one thread accesses it directly).
Actor A calls actor B, waiting for a response. In order to handle the request, actor B needs to call actor C. So now A and B's only threads are waiting, and C's is the only one actually giving the CPU any work to do. So much for multi-threading! But this is what you get if you wait for answers all the time.
Okay, you could increase the number of threads you start in each actor. But you'd be starting them so they could sit around doing nothing. A stack uses up a lot of memory, and context switching can be expensive.
So it's better to send messages asynchronously, with a callback mechanism so you can pick up the finished result. The problem with your implementation is that you grab another thread from the thread pool, purely to sit around and wait. So you basically apply the workaround of increasing the number of threads. You allocate a thread to the task of never running.
It would be better to implement
PostWithReply
in terms ofPostWithAsyncReply
, i.e. the opposite way round. The asynchronous version is low-level. Building on my delegate-based example (because it involves less typing of code!):So the private implementation returns a bool. The public asynchronous method accepts an action that will receive the return value; both the private implementation and the callback action are executed on the same thread.
Hopefully the need to wait synchronously is going to be the minority case. But when you need it, it could be supplied by a helper method, totally general purpose and not tied to any specific actor or message type:
So now in some other actor we can say:
The type argument to
Wait
may be unnecessary, haven't tried compiling any of this. ButWait
basically magics-up a callback for you, so you can pass it to some asynchronous method, and on the outside you just get back whatever was passed to the callback as your return value. Note that the lambda you pass toWait
still actually executes on the same thread that calledWait
.We now return you to our regular program...
As for the actual problem you asked about, you send a message to an actor to get it to do something. Delegates are helpful here. They let you effectively get the compiler to generate you a class with some data, a constructor that you don't even have to call explicitly and also a method. If you're having to write a bunch of little classes, switch to delegates.
Now a specific derived actor follows this pattern:
So to send a message to the actor, you just call the public methods. The private
Impl
method does the real work. No need to write a bunch of message classes by hand.Obviously I've left out the stuff about replying, but that can all be done with more parameters. (See update above).
If you have this
in F#, then the C# equivalent of the CLR generated for class
Either<'a, 'b>
has inner types likeeach with a tag, a getter and a factory method
plus a discriminator
and a raft of methods to implement interfaces
IStructuralEquatable, IComparable, IStructuralComparable
There is a compile-time checked discriminated union type at Discriminated union in C#
Using the discriminated union could be done as follows:
A long shot, but anyway..
I am assuming that discriminated-union is F# for ADT (Abstract Data Type). Which means the type could be one of several things.
In case there are two, you could try and put it in a simple generic class with two type parameters:
To make it work for 3 or more, we need to replicate this class a number of times. Any one has a solution for function overloading depending on the runtime type?
Union types and pattern matching map pretty directly to the visitor pattern, I've posted about this a few times before:
So if you want to pass messages with lots of different types, you're stuck implementing the visitor pattern.
(Warning, untested code ahead, but should give you an idea of how its done)
Let's say we have something like this:
You end up with this bulky C# code:
Now whenever you want to do something with the message, you need to implement a visitor:
Then finally you can use it like this:
Yes, its as obtuse as it seems, but that's how you pass multiple messages a class in a type-safe manner, and that's also why we don't implement unions in C# ;)