Skip to content

Commit

Permalink
Changes to support graphql-ws spec https://github.com/enisdenjo/graph…
Browse files Browse the repository at this point in the history
  • Loading branch information
gersak committed Nov 21, 2024
1 parent e13ee07 commit 5bf0eb1
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/com/walmartlabs/lacinia/pedestal/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,16 @@
;; Otherwise it's a message from the client to be acted upon.
(let [{:keys [id payload type]} data]
(case type
"ping"
(when (>! response-data-ch {:type :pong})
(recur connection-state))
"connection_init"
(when (>! response-data-ch {:type :connection_ack})
(recur (assoc connection-state :connection-params payload)))

;; TODO: Track state, don't allow start, etc. until after connection_init

"start"
("start" "subscribe")
(if (contains? (:subs connection-state) id)
(do
(log/trace :event ::ignoring-duplicate :id id)
Expand All @@ -150,7 +153,7 @@
sub-shutdown-ch (execute-query-interceptors id payload response-data-ch cleanup-ch merged-context)]
(recur (assoc-in connection-state [:subs id] sub-shutdown-ch)))))

"stop"
("stop" "complete")
(do
(log/trace :event ::stop :id id)
(when-some [sub-shutdown-ch (get-in connection-state [:subs id])]
Expand Down Expand Up @@ -247,6 +250,12 @@
:payload payload})
(close! response-data-ch)))}))

(defn ^:private protocol-response-type
[context]
(case (::subprotocol context)
("graphql-transport-ws") :next
:data))

(def send-operation-response-interceptor
"Interceptor responsible for the :response key of the context (set when a request
is either a query or mutation, but not a subscription). The :response data
Expand All @@ -257,7 +266,7 @@
:leave (fn [context]
(when-let [response (:response context)]
(let [{:keys [id response-data-ch]} (:request context)]
(put! response-data-ch {:type :data
(put! response-data-ch {:type (protocol-response-type context)
:id id
:payload response})
(put! response-data-ch {:type :complete
Expand Down Expand Up @@ -326,6 +335,7 @@
thread)
ch))


(defn ^:private execute-subscription
[context parsed-query]
(let [{:keys [::values-chan-fn request]} context
Expand Down Expand Up @@ -382,7 +392,7 @@
(resolve/on-deliver! (fn [response]
(log/trace :response response :id id)
(put! response-data-ch
{:type :data
{:type (protocol-response-type context)
:id id
:payload response})
(let [new-count (swap! *execution-count dec)]
Expand Down Expand Up @@ -535,14 +545,15 @@
: Used to create the channel of text responses sent to the client. The default is 10 (a non-lossy
channel)."
[compiled-schema options]
(let [{:keys [keep-alive-ms app-context send-buffer-or-n response-chan-fn values-chan-fn session-initializer]
(let [{:keys [keep-alive-ms app-context send-buffer-or-n response-chan-fn values-chan-fn session-initializer context-initializer]
:or {keep-alive-ms 25000
send-buffer-or-n 10
response-chan-fn #(chan 10)
values-chan-fn #(chan 1)}} options
interceptors (or (:subscription-interceptors options)
(default-subscription-interceptors compiled-schema app-context))
base-context (-> {::values-chan-fn values-chan-fn}
base-context (-> {::values-chan-fn values-chan-fn
}
(chain/terminate-when :response)
(chain/enqueue interceptors))
on-open (fn [^Session session ^EndpointConfig config]
Expand All @@ -557,7 +568,11 @@
; client text -> server
ws-text-ch (chan 1)
; client text -> client data
ws-data-ch (chan 10)]
ws-data-ch (chan 10)
base-context (as-> base-context context
(if-not (fn? context-initializer) context
(context-initializer context session))
(assoc context ::subprotocol (.getNegotiatedSubprotocol session)))]
(response-encode-loop response-data-ch send-ch)
(ws-parse-loop session-id ws-text-ch ws-data-ch response-data-ch)
(connection-loop session-id keep-alive-ms ws-data-ch response-data-ch base-context)
Expand All @@ -575,7 +590,7 @@
(log/error :event ::error :session-id session-id :exception t))]
(-> options
(select-keys [:idle-timeout-ms])
(assoc :subprotocols ["graphql-ws"]
(assoc :subprotocols ["graphql-ws" "graphql-transport-ws"]
:on-open on-open
:on-close on-close
:on-text on-text
Expand Down

0 comments on commit 5bf0eb1

Please sign in to comment.