Non-blocking TCP server using OTP principles

2019-03-16 04:53发布

问题:

I'm starting to learn Erlang, so I'm trying to write the "hello, world!" of concurrent programming, an IRC bot.

I've already written one using Erlang without any OTP niceties (supervisor, application, etc. behaviours). I'm looking to rewrite it using OTP principles but unfortunately I can't figure out the "right" way to do socket programming with OTP.

It seems the only reasonable way is to create another process manually and link it to the supervisor, but surely someone, somewhere, has done this before.

回答1:

I think this is what you're looking for: http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles It's a full tutorial about how to build a non-blocking TCP server using OTP (of course, is fully documented and explained).



回答2:

Great that you've began learning Erlang/OTP!

The following resources are very useful:

  • The OTP Design Principles. Read this carefully, if you already haven't. Note the common misconception that OTP is object orientated (OO): it's not! Forget everything about "inheritance". It's not possible to merely build complete systems by "extending" standard modules.
  • The Messaging System:

    These functions must be used to implement the use of system messages for a process

  • The Special Processes. A special process is an OTP-compliant process that can integrate well with supervisors.

This is some code I have in my project. I am an Erlang learner too, so don't trust the code too much, please.

-module(gen_tcpserver).

%% Public API
-export([start_link/2]).

%% start_link reference
-export([init/2]).

%% System internal API
-export([system_continue/3, system_terminate/4, system_code_change/4]).

-define(ACCEPT_TIMEOUT, 250).

-record(server_state, {socket=undefined,
                       args,
                       func}).

%% ListenArgs are given to gen_tcp:listen
%% AcceptFun(Socket) -> ok, blocks the TCP accept loop
start_link(ListenArgs, AcceptFun) ->
    State = #server_state{args=ListenArgs,func=AcceptFun},
    proc_lib:start_link(?MODULE, init, [self(), State]).

init(Parent, State) ->
    {Port, Options} = State#server_state.args,
    {ok, ListenSocket} = gen_tcp:listen(Port, Options),
    NewState = State#server_state{socket=ListenSocket},
    Debug = sys:debug_options([]),
    proc_lib:init_ack(Parent, {ok, self()}),
    loop(Parent, Debug, NewState).

loop(Parent, Debug, State) ->
    case gen_tcp:accept(State#server_state.socket, ?ACCEPT_TIMEOUT) of
        {ok, Socket} when Debug =:= [] -> ok = (State#server_state.func)(Socket);
        {ok, Socket} ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {accepted, Socket}),
            ok = (State#server_state.func)(Socket);
        {error, timeout} -> ok;
        {error, closed} when Debug =:= [] ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {closed}),
            exit(normal);
        {error, closed} -> exit(normal)
    end,
    flush(Parent, Debug, State).

flush(Parent, Debug, State) ->
    receive
        {system, From, Msg} ->
            sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, State)
        after 0 ->
            loop(Parent, Debug, State)
    end.

print_event(Device, Event, _Extra) ->
    io:format(Device, "*DBG* TCP event = ~p~n", [Event]).

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, State) ->
    gen_tcp:close(State#server_state.socket),
    exit(Reason).

system_code_change(State, _Module, _OldVsn, _Extra) ->
    {ok, State}.

Note that this is a compliant OTP process (it can be managed by a supervisor). You should use AcceptFun to spawn (=faster) a new worker child. I have not yet tested it thorough though.

1> {ok, A} = gen_tcpserver:start_link({8080,[]},fun(Socket)->gen_tcp:close(Socket) end).
{ok,<0.93.0>}
2> sys:trace(A, true).
ok
*DBG* TCP event = {accepted,#Port<0.2102>}
*DBG* TCP event = {accepted,#Port<0.2103>}
3> 

(After 2>'s ok I pointed my Google Chrome browser to port 8080: a great test for TCP!)



回答3:

Another way to implement an asynchronous TCP listener is by using supervisor_bridge.

Here is some code that I wrote to show this (not tested):

-module(connection_bridge).

-behaviour(supervisor_bridge).

% supervisor_bridge export
-export([init/1, terminate/2]).

% internal proc_lib:start_link
-export([accept_init/3]).

%% Port: see gen_tcp:listen(Port, _).
%% Options: see gen_tcp:listen(_, Options).
%% ConnectionHandler: Module:Function(Arguments)->pid() or fun/0->pid()
%% ConnectionHandler: return pid that will receive TCP messages
init({Port, Options, ConnectionHandler}) ->
    case gen_tcp:listen(Port, Options) of
        {ok, ListenSocket} ->
            {ok, ServerPid} = proc_lib:start_link(?MODULE, accept_init,
                [self(), ListenSocket, ConnectionHandler], 1000),
            {ok, ServerPid, ListenSocket};
        OtherResult -> OtherResult
    end.

terminate(_Reason, ListenSocket) ->
    gen_tcp:close(ListenSocket).

accept_init(ParentPid, ListenSocket, ConnectionHandler) ->
    proc_lib:init_ack(ParentPid, {ok, self()}),
    accept_loop(ListenSocket, ConnectionHandler).

accept_loop(ListenSocket, ConnectionHandler) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, ClientSocket} ->
            Pid = case ConnectionHandler of
                {Module, Function, Arguments} ->
                    apply(Module, Function, Arguments);
                Function when is_function(Function, 0) ->
                    Function()
            end,
            ok = gen_tcp:controlling_process(ClientSocket, Pid),
            accept_loop(ListenSocket, ConnectionHandler);
        {error, closed} ->
            error({shutdown, tcp_closed});
        {error, Reason} ->
            error(Reason)
    end.

A lot easier to understand than my other answer. The connection_bridge can be extended to support UDP and SCTP too.



标签: tcp erlang otp