RabbitMQ work queue is blocking consumers

2019-08-27 21:35发布

问题:

I'm following the example here

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

only in Erlang with amqp_client

The full code is here

https://github.com/jhw/rabbit_worker

As I understand it, to get worker- queue style behaviour you need to define your RabbitMQ topology as follows -

  • 'direct' exchange
  • single queue to which multiple consumers (workers) bind
  • routing key equal to queue name
  • workers to ack' each message
  • pre- fetch count of 1

Now I have this working fine, and can see the exchange passing messages round- robin fashion to the workers

The only problem is that there's no parallelism; only one worker is called at a time; it's as if a request to one worker is blocking the exchange from sending messages to any of the others

So I think I probably have my RabbitMQ topology set up incorrectly; but the problem is I don't know where.

Any thoughts ?

TIA.

Core code below -

-module(pool_router).

-behaviour(gen_server).

-include_lib("amqp_client/include/amqp_client.hrl").

%% API.
-export([start_link/0]).
-export([subscribe/1]).
-export([unsubscribe/1]).
-export([publish/2]).
-export([acknowledge/1]).

%% gen_server.
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).

-record(state, {rabbit_conn, rabbit_chan, queues}).

-define(RABBIT_USERNAME, <<"guest">>).
-define(RABBIT_PASSWORD, <<"Hufton123">>).

-define(EXCHANGE_NAME, <<"worker_exchange">>).
-define(EXCHANGE_TYPE, <<"direct">>).

-define(QUEUE_NAMES, [<<"worker_queue">>]).

%% API.

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% [stak_worker_sup:spawn_worker() || _ <- lists:seq(1, 3)].
%% [pool_router:publish(<<"worker_queue">>, {<<"fibonacci">>, <<"fibonacci">>, [40]}) || _ <- lists:seq(1, 9)].

publish(QueueName, MFA) ->
    gen_server:call(?MODULE, {publish, {QueueName, MFA}}).

acknowledge(Tag) ->
    gen_server:call(?MODULE, {acknowledge, Tag}).

subscribe(QueueName) ->
    gen_server:call(?MODULE, {subscribe, QueueName}).

unsubscribe(Tag) ->
    gen_server:call(?MODULE, {unsubscribe, Tag}).


%% gen_server.

init([]) ->
    RabbitParams=#amqp_params_network{username=?RABBIT_USERNAME,
                      password=?RABBIT_PASSWORD},    
    {ok, RabbitConn}=amqp_connection:start(RabbitParams),
    {amqp_channel, {ok, RabbitChan}}={amqp_channel, 
                      amqp_connection:open_channel(RabbitConn)},
    Exchange=#'exchange.declare'{exchange=?EXCHANGE_NAME,
                 type=?EXCHANGE_TYPE,
                 auto_delete=false}, 
    #'exchange.declare_ok'{}=amqp_channel:call(RabbitChan, Exchange),
    InitQueueFn=fun(QueueName) ->
            Queue=#'queue.declare'{queue=QueueName},
            #'queue.declare_ok'{}=amqp_channel:call(RabbitChan, Queue), 
            Binding=#'queue.bind'{queue=Queue#'queue.declare'.queue,
                          exchange=?EXCHANGE_NAME,
                          routing_key=QueueName}, 
            #'queue.bind_ok'{}=amqp_channel:call(RabbitChan, Binding),
            Queue
        end,
    Queues=[{QueueName, InitQueueFn(QueueName)} || QueueName <- ?QUEUE_NAMES],
    #'basic.qos_ok'{}=amqp_channel:call(RabbitChan, #'basic.qos'{prefetch_count=1}),
    io:format("router started~n"),     
    {ok, #state{rabbit_conn=RabbitConn,
        rabbit_chan=RabbitChan,
        queues=Queues}}.

handle_call({publish, {QueueName, {Mod, Fn, Args}=MFA}}, {_From, _}, #state{rabbit_chan=RabbitChan}=State) ->
    io:format("Publishing ~p~n", [MFA]),
    Payload=jsx:encode([{<<"M">>, Mod},
            {<<"F">>, Fn},
            {<<"A">>, Args}]),
    Publish=#'basic.publish'{exchange=?EXCHANGE_NAME,
                 routing_key=QueueName},
    ok=amqp_channel:cast(RabbitChan, Publish, #amqp_msg{payload=Payload}),
    {reply, ok, State};
handle_call({acknowledge, Tag}, {From, _}, #state{rabbit_chan=RabbitChan}=State) ->
    ok=amqp_channel:cast(RabbitChan, #'basic.ack'{delivery_tag=Tag}),
    io:format("~p [~p] acknowledged~n", [From, Tag]),
    {reply, ok, State};
handle_call({subscribe, QueueName}, {From, _}, #state{queues=Queues, rabbit_chan=RabbitChan}=State) ->
    io:format("~p subscribed~n", [From]),
    {_, Queue}=proplists:lookup(QueueName, Queues), %% NB no error checking
    #'basic.consume_ok'{consumer_tag=Tag}=amqp_channel:subscribe(RabbitChan, #'basic.consume'{queue=Queue#'queue.declare'.queue}, From),
    {reply, {ok, Tag}, State};
handle_call({unsubscribe, Tag}, {From, _}, #state{rabbit_chan=RabbitChan}=State) ->
    io:format("~p [~p] unsubscribed~n", [From, Tag]),
    #'basic.cancel_ok'{}=amqp_channel:call(RabbitChan, #'basic.cancel'{consumer_tag=Tag}),
    {reply, ok, State};
handle_call(_Request, _From, State) ->
    {reply, ignored, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, State) ->
    ok=amqp_channel:close(State#state.rabbit_chan),
    ok=amqp_connection:close(State#state.rabbit_conn),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% internal functions

-module(stak_worker).

-behaviour(gen_server).

-include_lib("amqp_client/include/amqp_client.hrl").

%% API.

-export([start_link/0]).
-export([stop/1]).

%% gen_server.

-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).

-record(state, {rabbit_tag}).

-define(QUEUE_NAME, <<"worker_queue">>).

%% API.

%% {ok, Pid}=stak_worker:start_link().
%% stak_worker:stop(Pid).

start_link() ->
    gen_server:start_link(?MODULE, [], []).

stop(Pid) ->
    gen_server:cast(Pid, stop).

%% gen_server.

%% don't run request automatically
%% workers should subscribe to router on startup and unsubscribe on termination
%% router then routes messages to workers

init([]) ->
    io:format("starting worker ~p~n", [self()]),    
    {ok, #state{}, 0}.

handle_call(_Request, _From, State) ->
    {reply, ignored, State}.

handle_cast(stop, State) ->
    {stop, normal, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({#'basic.deliver'{delivery_tag=Tag}, #amqp_msg{payload=Payload}=_Content}, State) ->
    %% io:format("~p received ~p~n", [self(), Payload]),
    Struct=jsx:decode(Payload),
    {_, ModBin}=proplists:lookup(<<"M">>, Struct),
    {_, FnBin}=proplists:lookup(<<"F">>, Struct),
    {_, Args}=proplists:lookup(<<"A">>, Struct),
    Mod=list_to_atom(binary_to_list(ModBin)),
    Fn=list_to_atom(binary_to_list(FnBin)),
    Mod:Fn(Args),
    ok=pool_router:acknowledge(Tag),
    {noreply, State};
handle_info(timeout, State) ->
    %% io:format("~p subscribing~n", [self()]),
    {ok, RabbitTag}=pool_router:subscribe(?QUEUE_NAME),
    {noreply, State#state{rabbit_tag=RabbitTag}};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, State) ->
    io:format("~p shutting down~n", [self()]),
    ok=pool_router:unsubscribe(State#state.rabbit_tag),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

回答1:

Avoid use of prefetch_count; it limits the number of un- ack'ed messages on a single channel to one; so if have a worker pool, thus will cause it to operate sequentially rather than in parallel (because every request sent to a worker is only ack'ed once the worker has completed; ie you have multiple un- ack'ed messages at a given point in time)