I have a mass transit consumer service set up to work with RabbitMQ and I can't figure out how to increase the speed of the consumer - it seems to hard cap at 10 messages received per second.
I have tried the steps listed here: https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ, with no success - setting the prefetch and the concurrent consumers to 25 does nothing other than increasing the acknowledged messages, but it doesn't increase the rate at which the messages are downloaded.
My config is as follows:
ServiceBusFactory.ConfigureDefaultSettings(x =>
{
x.SetConcurrentReceiverLimit(25);
x.SetConcurrentConsumerLimit(25);
});
_bus = ServiceBusFactory.New(
sbc =>
{
sbc.UseRabbitMq(x =>
x.ConfigureHost(
"rabbitmq://localhost/Dev/consume?prefetch=25",
y =>
{
y.SetUsername(config.Username);
y.SetPassword(config.Password);
}));
sbc.UseLog4Net();
sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
sbc.Subscribe(x => RegisterConsumers(x, container));
sbc.UseJsonSerializer();
sbc.SetConcurrentConsumerLimit(25);
});
I'm setting the concurrent consumer limit in two places as I'm not sure whether I need to set it on the default or in the bus configuration, and the consumers are registered via unity - I have omitted the consumer subscription as all subscribers are receiving.
I'm a little confused as to whether there's anything else I need to set or if I need to change the order in which I'm setting the configs.
Any help greatly appreciated.
After spending a romantic evening with the problem and trying out different things suggested by Chris, I've found out that there is yet another thing you have to do to make it work like it should.
Specifically, yes, you need to set the prefetch on the consumer queue address:
sbc.UseRabbitMq(
f =>
f.ConfigureHost(
new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
c =>
{
} )
);
int pf = 20; // prefetch
// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );
But this is still not enough.
The key is available in the code of the mtstress
tool Chris mention in his comment below his answer. It turns out the tool calls:
int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );
Adding this to my code resolves the issue. I wonder though why this is not required with MSMQ transport, though...
Update #1
After further investigation I've found a possible culprit. It's in the ServiceBusBuilderImpl
.
There is a method to raise the limit, the ConfigureThreadPool
.
The problem here is that it calls CalculateRequiredThreads
which should return the number of required threads. Unfortunately the latter returns a negative value on both my client Windows 7 and my Windows Server. Thus, the ConfigureThreadPool
effectively does nothing as the negative value is then ignored when calling ThreadPool.SetMin/MaxThreads
.
What about this negative value? It seems the CalculateRequiredThreads
calls ThreadPool.GetMinThreads
and ThreadPool.GetAvailableThreads
and uses a formula to came up with the number of required threads:
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
The problem here is that on my machines this effectively does:
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads)
which of course returns
-975
The conclusion is: the above code from the Mass Transit internals seems to be wrong. When I manually raise the limit in advance, the ConfigureMinThreads
respects it (as it sets the limit only if it is higher than the read value).
Without setting the limit manually in advance, the limit fails to be set and thus the code does as much threads as the default thread pool limit (which seems to be 8 on my machine).
Apparently someone assumed this formula will yield
40 + 8 - 8
in a default scenario. Why GetMinThreads
and GetAvailableThreads
return such unrelated values is yet to be determined...
Update #2
Changing
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
to
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
resolves the issue. Both return 1023 here and the output of the formula is a correct number of expected threads.
What amount of work is being performed by your consumer? If it runs fast enough, it's likely that the .NET runtime need not create additional threads to handle the inbound message rate.
We have many systems in production that use specified counts where we match the consumer limit with the prefetch count, and in all of those cases under load, the unacknowledged message count shown by RabbitMQ is equal to those settings. We typically see nearly the same number of threads processing messages. Initially the .NET runtime is conservative in the allocated threads used, but it quickly ramps up to the full thread count when consumers are simply waiting on a remote operation such as an HTTP request or SQL command.
If there is an area of the consumer that is single threaded, it might be limiting thread scaling based on that bottleneck, so verify that your threading model is properly configured as well.