I'm writing an application server in Clojure that will use ClojureScript on the client.
I'd like to find an efficient, idiomatic way to push data from the server to the client as realtime events, ideally using a some combination of:
(But I'm open to other possibilities)
Can anyone provide a good example / approach to doing this?
I prefer to use aleph, here is the wiki, you can simply use wrap-ring-handler
function to wrap existed handlers.
For the 'push' function, most useful part is aleph's async handler. It builds on top of netty, not a one connection one thread model, so the server side do not need worry about the tcp connection count.
Some implement details:
- Server side use aysnc handler, hold all the client connections (channels)
- In 60(for example) seconds, if there is no 'new data', send an empty response
- if server side has a response ,send it.
- The client side can simply send a normal http request to the server
- when the client get the response, handle the response body, and then, re-send a http request again
- please check the client and all the proxy servers to set the correct timeout value
There are more ways here : http://en.wikipedia.org/wiki/Push_technology
I've been trying out the library Chord recently, and I really like it.
It provides a small core.async wrapper around the Websocket support in http-kit.
From the github page:
On the Server
(:require [chord.http-kit :refer [with-channel]]
[clojure.core.async :refer [<! >! put! close! go]])
(defn your-handler [req]
(with-channel req ws-ch
(go
(let [{:keys [message]} (<! ws-ch)]
(println "Message received:" message)
(>! ws-ch "Hello client from server!")
(close! ws-ch)))))
On the Client
(:require [chord.client :refer [ws-ch]]
[cljs.core.async :refer [<! >! put! close!]])
(:require-macros [cljs.core.async.macros :refer [go]])
(go
(let [ws (<! (ws-ch "ws://localhost:3000/ws"))]
(>! ws "Hello server from client!")))
I think it's still in early stages though - it doesn't handle disconnections yet.
I'm developed one project now, where i had the exact same requirement, i used pedestal service in combination with core.async to implement SSE and it's working really well.
Unfortunately, i can't open-source this work now, but basically, i did something like the snippets below, only more complicated because of authentication, which is not particularly easy in SSE from browser, because you can't pass any custom headers in yout 'new EventSource(SOME_URI);' call.
So the snippets:
(ns chat-service.service
(:require [clojure.set :as set]
[clojure.core.async :as async :refer [<!! >!! <! >!]]
[cheshire.core :as json]
[io.pedestal.service.http :as bootstrap]
[io.pedestal.service.log :as log]
[io.pedestal.service.http.route :as route]
[io.pedestal.service.http.sse :as sse]
[io.pedestal.service.http.route.definition :refer [defroutes]]))
(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})
(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))
;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))
(defn- sse-msg [event msg-data]
{:event event :msg msg-data})
;; service functions
(defn- remove-subscriber
"Removes transport channel from atom subscribers->notifications and tears down
SSE connection."
[transport-channel context]
(let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
(log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
(swap! subscribers->notifications dissoc subscriber)
(sse/end-event-stream context)))
(defn send-event
"Sends updates via SSE connection, takes also transport channel to close it
in case of the exception."
[transport-channel context {:keys [event msg]}]
(try
(log/info :msg "calling event sending fn")
(sse/send-event context event (json/generate-string msg json-opts))
(catch java.io.IOException ioe
(async/close! transport-channel))))
(defn create-transport-channel
"Creates transport channel with receiving end pushing updates to SSE connection.
Associates this transport channel in atom subscribers->notifications under random
generated UUID."
[context]
(let [temporary-id (generate-id)
channel (async/chan)]
(swap! subscribers->notifications assoc temporary-id channel)
(async/go-loop []
(when-let [payload (<! channel)]
(send-event channel context payload)
(recur))
(remove-subscriber channel context))
(async/put! channel (sse-msg "eventsourceVerification"
{:handshakeToken temporary-id}))))
(defn subscribe
"Subscribes anonymous user to SSE connection. Transport channel with timeout set up
will be created for pushing any new data to this connection."
[context]
(create-transport-channel context))
(defroutes routes
[[["/notifications/chat"
{:get [::subscribe (sse/start-event-stream subscribe)]}]]])
(def service {:env :prod
::bootstrap/routes routes
::bootstrap/resource-path "/public"
::bootstrap/type :jetty
::bootstrap/port 8081})
One "problem" i encountered is the default way how pedestal handles dropped SSE connections.
Because of the scheduled heartbeat job, it logs exception whenever connection is dropped and you didn't call end-event-stream context.
I wish there was a way to disable/tweak this behavior, or at least provide my own tear-down function which will be called whenever heartbeat job fails with EofException.