I'm developed one project now, where I had the exact same requirement. I used the pedestal service in combination with core.async
to implement SSE and it's working really well.
Unfortunately, I can't open-source this work now, but basically, I did something like the snippets below, only more complicated because of authentication. (It's not particularly easy to do authentication in SSE from browser, because you can't pass any custom headers in your new EventSource(SOME_URI);
call.
So the snippets:
(ns chat-service.service
(:require [clojure.set :as set]
[clojure.core.async :as async :refer [<!! >!! <! >!]]
[cheshire.core :as json]
[io.pedestal.service.http :as bootstrap]
[io.pedestal.service.log :as log]
[io.pedestal.service.http.route :as route]
[io.pedestal.service.http.sse :as sse]
[io.pedestal.service.http.route.definition :refer [defroutes]]))
(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})
(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))
;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))
(defn- sse-msg [event msg-data]
{:event event :msg msg-data})
;; service functions
(defn- remove-subscriber
"Removes transport channel from atom subscribers->notifications and tears down
SSE connection."
[transport-channel context]
(let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
(log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
(swap! subscribers->notifications dissoc subscriber)
(sse/end-event-stream context)))
(defn send-event
"Sends updates via SSE connection, takes also transport channel to close it
in case of the exception."
[transport-channel context {:keys [event msg]}]
(try
(log/info :msg "calling event sending fn")
(sse/send-event context event (json/generate-string msg json-opts))
(catch java.io.IOException ioe
(async/close! transport-channel))))
(defn create-transport-channel
"Creates transport channel with receiving end pushing updates to SSE connection.
Associates this transport channel in atom subscribers->notifications under random
generated UUID."
[context]
(let [temporary-id (generate-id)
channel (async/chan)]
(swap! subscribers->notifications assoc temporary-id channel)
(async/go-loop []
(when-let [payload (<! channel)]
(send-event channel context payload)
(recur))
(remove-subscriber channel context))
(async/put! channel (sse-msg "eventsourceVerification"
{:handshakeToken temporary-id}))))
(defn subscribe
"Subscribes anonymous user to SSE connection. Transport channel with timeout set up
will be created for pushing any new data to this connection."
[context]
(create-transport-channel context))
(defroutes routes
[[["/notifications/chat"
{:get [::subscribe (sse/start-event-stream subscribe)]}]]])
(def service {:env :prod
::bootstrap/routes routes
::bootstrap/resource-path "/public"
::bootstrap/type :jetty
::bootstrap/port 8081})
One "problem" i encountered is the default way how pedestal handles dropped SSE connections.
Because of the scheduled heartbeat job, it logs exception whenever connection is dropped and you didn't call end-event-stream context.
I wish there was a way to disable/tweak this behavior, or at least provide my own tear-down function which will be called whenever heartbeat job fails with EofException.