How to implement asynchronous code that looks sync

2019-08-07 11:38发布

问题:

Otherwise said, I want to rely on epoll (or similar) to write asynchronous network code that looks like regular code that is without relying on callbacks.

The code must look like synchronous code but unlike synchronous code instead of blocking to wait for network io, it must suspend the current coroutine and restart it when the file descriptor is ready.

回答1:

My initial thought to achieve that was relying on generators and yield. But this was a mistake that was partly mis-guided by the fact that python used to abuse yield from.

Anyway, guile fibers was a great insipiration and I adapted it to chez scheme.

Here is an example server code:

(define (handler request port)
  (values 200 #f (http-get "https://httpbin.davecheney.com/ip")))

(untangle (lambda ()
            (run-server "127.0.0.1" 8888)))

The handler returns its IP according the httpbin service. The code look synchronous with the help of call/cc actually call/1cc.

untangle will initiate the event loop with a lambda passed as argument!

Here is the definition of run-server:

(define (run-server ip port handler)
  (log 'info "HTTP server running at ~a:~a" ip port)
  (let* ((sock (socket 'inet 'stream 'ipv4)))
    (socket:setsockopt sock 1 2 1) ;; re-use address
    (socket:bind sock (make-address ip port))
    (socket:listen sock 1024)
    (let loop ()
      (let ((client (accept sock)))
        (let ((port (fd->port client)))
          (spawn (lambda () (run-once handler port)))
          (loop))))))

As you can see there is no callback. The only thing that is somewhat different from simple synchronous webserver is the spawn procedure that will handle the request in its own coroutine. In particular accept is asynchronous.

run-once will just pass the scheme request to handler and take its 3 values to build the response. Not very interesting. The part that looks synchronous, but is actually asynchronous is http-get above.

I will only explain, how accept works, given http-get requires to introduce custom binary ports, but suffice to say it is the same behavior...

(define (accept fd)
  (let ((out (socket:%accept fd 0 0)))
    (if (= out -1)
        (let ((code (socket:errno)))
          (if (= code EWOULDBLOCK)
              (begin
                (abort-to-prompt fd 'read)
                (accept fd))
              (error 'accept (socket:strerror code))))
        out)))

As you can see it calls a procedure abort-to-prompt that we could call simply pause that will "stop" the coroutine and call the prompt handler.

abort-to-prompt works in cooperation with call-with-prompt.

Since chez scheme doesn't have prompts I emulate it using two one shot continuations call/1cc

(define %prompt #f)
(define %abort (list 'abort))

(define (call-with-prompt thunk handler)
  (call-with-values (lambda ()
                      (call/1cc
                       (lambda (k)
                         (set! %prompt k)
                         (thunk))))
    (lambda out
      (cond
       ((and (pair? out) (eq? (car out) %abort))
        (apply handler (cdr out)))
       (else (apply values out))))))

(define (abort-to-prompt . args)
  (call/1cc
   (lambda (k)
     (let ((prompt %prompt))
       (set! %prompt #f)
       (apply prompt (cons %abort (cons k args)))))))

call-with-prompt will initiate a continuation a set! global called %prompt which means there is single prompt for THUNK. If the continuation arguments OUT, the second lambda of call-with-values, starts with the unique object %abort it means the continuation was reached via abort-to-prompt. It will call the HANDLER with the abort-to-prompt continuation and any argument passed to call-with-prompt continuation parameter that is the (apply handler (cons k (cdr out))).

abort-to-promp will initiate a new continuation to be able to come back, after the code executes the prompt's continuation stored in %prompt.

The call-with-prompt is at the heart of the event-loop. Here is it, in two pieces:

(define (exec epoll thunk waiting)
  (call-with-prompt
   thunk
   (lambda (k fd mode) ;; k is abort-to-prompt continuation that
                       ;; will allow to restart the coroutine

     ;; add fd to the correct epoll set
     (case mode
       ((write) (epoll-wait-write epoll fd))
       ((read) (epoll-wait-read epoll fd))
       (else (error 'untangle "mode not supported" mode)))
     (scheme:hash-table-set! waiting fd (make-event k mode)))))

(define (event-loop-run-once epoll waiting)
  ;; execute every callback waiting in queue, 
  ;; call the above exec procedure 
  (let loop ()
    (unless (null? %queue)
      ;; XXX: This is done like that because, exec might spawn
      ;; new coroutine, so we need to cut %queue right now. 
      (let ((head (car %queue))
            (tail (cdr %queue)))
        (set! %queue tail)
        (exec epoll head waiting)
        (loop))))

    ;; wait for ONE event
    (let ((fd (epoll-wait-one epoll (inf))
      (let ((event (scheme:hash-table-ref waiting fd)))
        ;; the event is / will be processed, no need to keep around
        (scheme:hash-table-delete! waiting fd)
        (case (event-mode event)
          ((write) (epoll-ctl epoll 2 fd (make-epoll-event-out fd)))
          ((read) (epoll-ctl epoll 2 fd (make-epoll-event-in fd))))
        ;; here it will schedule the event continuation that is the
        ;; abort-to-prompt continuation that will be executed by the
        ;; next call the above event loop event-loop-run-once
        (spawn (event-continuation event))))))

I think that is all.



回答2:

If you are using chez-scheme, there is chez-a-sync. It uses POSIX poll rather than epoll (epoll is linux specific). guile-a-sync2 is also available for guile-2.2/3.0.