I have a mass transit consumer service set up to work with RabbitMQ and I can't figure out how to increase the speed of the consumer - it seems to hard cap at 10 messages received per second.
I have tried the steps listed here: https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ, with no success - setting the prefetch and the concurrent consumers to 25 does nothing other than increasing the acknowledged messages, but it doesn't increase the rate at which the messages are downloaded.
My config is as follows:
ServiceBusFactory.ConfigureDefaultSettings(x =>
{
x.SetConcurrentReceiverLimit(25);
x.SetConcurrentConsumerLimit(25);
});
_bus = ServiceBusFactory.New(
sbc =>
{
sbc.UseRabbitMq(x =>
x.ConfigureHost(
"rabbitmq://localhost/Dev/consume?prefetch=25",
y =>
{
y.SetUsername(config.Username);
y.SetPassword(config.Password);
}));
sbc.UseLog4Net();
sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
sbc.Subscribe(x => RegisterConsumers(x, container));
sbc.UseJsonSerializer();
sbc.SetConcurrentConsumerLimit(25);
});
I'm setting the concurrent consumer limit in two places as I'm not sure whether I need to set it on the default or in the bus configuration, and the consumers are registered via unity - I have omitted the consumer subscription as all subscribers are receiving.
I'm a little confused as to whether there's anything else I need to set or if I need to change the order in which I'm setting the configs.
Any help greatly appreciated.
What amount of work is being performed by your consumer? If it runs fast enough, it's likely that the .NET runtime need not create additional threads to handle the inbound message rate.
We have many systems in production that use specified counts where we match the consumer limit with the prefetch count, and in all of those cases under load, the unacknowledged message count shown by RabbitMQ is equal to those settings. We typically see nearly the same number of threads processing messages. Initially the .NET runtime is conservative in the allocated threads used, but it quickly ramps up to the full thread count when consumers are simply waiting on a remote operation such as an HTTP request or SQL command.
If there is an area of the consumer that is single threaded, it might be limiting thread scaling based on that bottleneck, so verify that your threading model is properly configured as well.
After spending a romantic evening with the problem and trying out different things suggested by Chris, I've found out that there is yet another thing you have to do to make it work like it should.
Specifically, yes, you need to set the prefetch on the consumer queue address:
But this is still not enough.
The key is available in the code of the
mtstress
tool Chris mention in his comment below his answer. It turns out the tool calls:Adding this to my code resolves the issue. I wonder though why this is not required with MSMQ transport, though...
Update #1
After further investigation I've found a possible culprit. It's in the
ServiceBusBuilderImpl
.There is a method to raise the limit, the
ConfigureThreadPool
.The problem here is that it calls
CalculateRequiredThreads
which should return the number of required threads. Unfortunately the latter returns a negative value on both my client Windows 7 and my Windows Server. Thus, theConfigureThreadPool
effectively does nothing as the negative value is then ignored when callingThreadPool.SetMin/MaxThreads
.What about this negative value? It seems the
CalculateRequiredThreads
callsThreadPool.GetMinThreads
andThreadPool.GetAvailableThreads
and uses a formula to came up with the number of required threads:The problem here is that on my machines this effectively does:
which of course returns
The conclusion is: the above code from the Mass Transit internals seems to be wrong. When I manually raise the limit in advance, the
ConfigureMinThreads
respects it (as it sets the limit only if it is higher than the read value).Without setting the limit manually in advance, the limit fails to be set and thus the code does as much threads as the default thread pool limit (which seems to be 8 on my machine).
Apparently someone assumed this formula will yield
in a default scenario. Why
GetMinThreads
andGetAvailableThreads
return such unrelated values is yet to be determined...Update #2
Changing
to
resolves the issue. Both return 1023 here and the output of the formula is a correct number of expected threads.