diff --git a/.github/workflows/config.yml b/.github/workflows/config.yml index df00028..b21edbe 100644 --- a/.github/workflows/config.yml +++ b/.github/workflows/config.yml @@ -11,19 +11,19 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3.4.0 + uses: actions/checkout@v4.1.7 - name: Setup Java - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v4.2.1 with: java-version: '11' distribution: 'corretto' - name: Setup Clojure - uses: DeLaGuardo/setup-clojure@10.2 + uses: DeLaGuardo/setup-clojure@12.5 with: - cli: 1.11.1.1165 + cli: 1.11.3.1463 - name: Cache clojure dependencies - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.m2/repository diff --git a/CHANGELOG.md b/CHANGELOG.md index fed3ed7..a5c291c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +## 1.3 -- UNRELEASED + +*BREAKING CHANGES* + +Function `com.walmartlabs.lacinia.pedestal.subscriptions/listener-fn-factory` +was renamed to `subscription-websocket-endpoint`, and its behavior and options +have changed. Fortunately, most Lacinia applications only use this function +indirectly, via `com.walmartlabs.lacinia.pedestal2/enable-subscriptions`, which +hasn't changed. + +Other changes: + +* Bumped many dependencies to latest +* Bumped Pedestal dependency to 0.7.0 + ## 1.2 -- 23 Jun 2023 Logging from the `com.walmartlabs.lacinia.pedestal.subscriptions` namespace diff --git a/deps.edn b/deps.edn index 6e342b3..c9a180d 100644 --- a/deps.edn +++ b/deps.edn @@ -1,22 +1,26 @@ -{:deps {org.clojure/clojure {:mvn/version "1.11.1"} - com.walmartlabs/lacinia {:mvn/version "1.2.1"} - com.fasterxml.jackson.core/jackson-core {:mvn/version "2.15.2"} - io.pedestal/pedestal.service {:mvn/version "0.6.0"} - io.pedestal/pedestal.jetty {:mvn/version "0.6.0"} - com.rpl/proxy-plus {:mvn/version "0.0.9"}} +{:deps {org.clojure/clojure {:mvn/version "1.11.3"} + com.walmartlabs/lacinia {:mvn/version "1.2.2"} + com.fasterxml.jackson.core/jackson-core {:mvn/version "2.17.2"} + io.pedestal/pedestal.service {:mvn/version "0.7.0"} + io.pedestal/pedestal.jetty {:mvn/version "0.7.0"} + com.rpl/proxy-plus {:mvn/version "0.0.11"}} + :mvn/repos {"redplanetlabs" {:url "https://nexus.redplanetlabs.com/repository/maven-public-releases"}} :paths ["src" "resources"] :deps/prep-lib {:alias :build :fn prep :ensure "target/classes"} :aliases {:dev - {:extra-deps {clj-http/clj-http {:mvn/version "3.12.3"} + {:extra-deps {clj-http/clj-http {:mvn/version "3.13.0"} com.walmartlabs/test-reporting {:mvn/version "1.2"} com.stuartsierra/component {:mvn/version "1.1.0"} expound/expound {:mvn/version "0.9.0"} - hato/hato {:mvn/version "0.9.0"} - io.github.hlship/trace {:mvn/version "v1.0"} - io.aviso/logging {:mvn/version "1.0"}} + hato/hato {:mvn/version "1.0.0"} + nubank/matcher-combinators {:mvn/version "3.9.1"} + io.github.hlship/trace {:mvn/version "1.3"} + org.clj-commons/pretty {:mvn/version "3.0.0"} + io.github.tonsky/clj-reload {:mvn/version "0.7.1"} + ch.qos.logback/logback-classic {:mvn/version "1.5.6"}} :jvm-opts ["-Xmx500m"] :extra-paths ["test" "dev-resources" @@ -36,7 +40,7 @@ :build {:deps - {io.github.hlship/build-tools {:git/tag "0.9" :git/sha "4efa3c9"}} + {io.github.hlship/build-tools {:git/tag "0.10.2" :git/sha "3c446e4"}} :ns-default build}} :net.lewisship.build/scm diff --git a/dev-resources/data_readers.clj b/dev-resources/data_readers.clj new file mode 100644 index 0000000..79cebe6 --- /dev/null +++ b/dev-resources/data_readers.clj @@ -0,0 +1 @@ +{trace/result net.lewisship.trace/trace-result-reader} diff --git a/dev-resources/logback.xml b/dev-resources/logback.xml index 10d264c..869292c 100644 --- a/dev-resources/logback.xml +++ b/dev-resources/logback.xml @@ -17,8 +17,9 @@ - + + diff --git a/dev-resources/user.clj b/dev-resources/user.clj index 615c867..235e3fc 100644 --- a/dev-resources/user.clj +++ b/dev-resources/user.clj @@ -3,10 +3,18 @@ com.walmartlabs.lacinia.expound [clojure.spec.alpha :as s] [net.lewisship.trace :as trace] + [clj-commons.pretty.repl :as repl] + matcher-combinators.test [expound.alpha :as expound])) (s/check-asserts true) (trace/setup-default) +(repl/install-pretty-exceptions) + (alter-var-root #'s/*explain-out* (constantly expound/printer)) + +(comment + (trace/set-enable-trace! false) + ) diff --git a/src/com/walmartlabs/lacinia/pedestal/internal.clj b/src/com/walmartlabs/lacinia/pedestal/internal.clj index 4de5779..0ec073e 100644 --- a/src/com/walmartlabs/lacinia/pedestal/internal.clj +++ b/src/com/walmartlabs/lacinia/pedestal/internal.clj @@ -28,7 +28,6 @@ [io.pedestal.log :as log] [clojure.java.io :as io] [ring.util.response :as response] - [io.pedestal.http.jetty.websockets :as ws] [io.pedestal.http :as http] [io.pedestal.interceptor.chain :as chain] [com.walmartlabs.lacinia.pedestal.subscriptions :as subscriptions] @@ -265,12 +264,5 @@ (defn add-subscriptions-support [service-map compiled-schema subscriptions-path subscription-options] - (assoc-in service-map - [::http/container-options :context-configurator] - ;; The listener-fn is responsible for creating the listener; it is passed - ;; the request, response, and the ws-map. In sample code, the ws-map - ;; has callbacks such as :on-connect and :on-text, but in our scenario - ;; the callbacks are created by the listener-fn, so the value is nil. - #(ws/add-ws-endpoints % {subscriptions-path nil} - {:listener-fn - (subscriptions/listener-fn-factory compiled-schema subscription-options)}))) + (assoc-in service-map [::http/websockets subscriptions-path] + (subscriptions/subscription-websocket-endpoint compiled-schema subscription-options))) diff --git a/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj b/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj index 7d592a8..f013e37 100644 --- a/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj +++ b/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj @@ -25,7 +25,7 @@ [cheshire.core :as cheshire] [io.pedestal.interceptor :refer [interceptor]] [io.pedestal.interceptor.chain :as chain] - [io.pedestal.http.jetty.websockets :as ws] + [io.pedestal.websocket :as ws] [io.pedestal.log :as log] [com.walmartlabs.lacinia.parser :as parser] [com.walmartlabs.lacinia.validator :as validator] @@ -34,11 +34,9 @@ [com.walmartlabs.lacinia.resolve :as resolve] [clojure.string :as str] [clojure.spec.alpha :as s] - [com.walmartlabs.lacinia.selection :as selection] [com.walmartlabs.lacinia.pedestal.spec :as spec] [com.walmartlabs.lacinia.pedestal.interceptors :as interceptors]) - (:import - (org.eclipse.jetty.websocket.api UpgradeResponse))) + (:import (jakarta.websocket EndpointConfig Session))) (when (-> *clojure-version* :minor (< 9)) (require '[clojure.future :refer [pos-int?]])) @@ -63,13 +61,13 @@ which is passed along to the output-ch. Parse errors are converted into connection_error messages sent to the response-ch." - [input-ch output-ch response-data-ch] + [session-id input-ch output-ch response-data-ch] (go-loop [] (when-some [text (! response-data-ch {:type :connection_error :payload (util/as-error-map t)})))] @@ -83,9 +81,9 @@ (let [shutdown-ch (chan) response-spy-ch (chan 1) request (assoc payload - :id id - :shutdown-ch shutdown-ch - :response-data-ch response-spy-ch)] + :id id + :shutdown-ch shutdown-ch + :response-data-ch response-spy-ch)] ;; When the spy channel is closed, we write the id ;; to the cleanup-ch; the containing CSP then removes the ;; shutdown-ch from its subs map. @@ -105,7 +103,7 @@ (defn ^:private connection-loop "A loop started for each connection." - [keep-alive-ms ws-data-ch response-data-ch context] + [session-id keep-alive-ms ws-data-ch response-data-ch context] (let [cleanup-ch (chan 1)] ;; Keep track of subscriptions by (client-supplied) unique id. ;; The value is a shutdown channel that, when closed, triggers @@ -114,25 +112,25 @@ (alt! cleanup-ch ([id] - (log/trace :event ::cleanup-ch :id id) + (log/trace :event ::cleanup-ch :session-id session-id :id id) (recur (update connection-state :subs dissoc id))) ;; TODO: Maybe only after connection_init? (async/timeout keep-alive-ms) (do - (log/trace :event ::timeout) + (log/trace :event ::timeout :session-id session-id) (>! response-data-ch {:type :ka}) (recur connection-state)) ws-data-ch ([data] (if (nil? data) - ;; When the client closes the connection, any running subscriptions need to - ;; shutdown and cleanup. + ;; When the client closes the connection, any running subscriptions need to + ;; shutdown and cleanup. (do - (log/trace :event ::client-close) + (log/trace :event ::client-close :session-id session-id) (run! close! (-> connection-state :subs vals))) - ;; Otherwise it's a message from the client to be acted upon. + ;; Otherwise it's a message from the client to be acted upon. (let [{:keys [id payload type]} data] (case type "connection_init" @@ -147,7 +145,7 @@ (log/trace :event ::ignoring-duplicate :id id) (recur connection-state)) (do - (log/trace :event ::start :id id) + (log/trace :event ::start :session-id session-id :id id) (let [merged-context (assoc context :connection-params (:connection-params connection-state)) sub-shutdown-ch (execute-query-interceptors id payload response-data-ch cleanup-ch merged-context)] (recur (assoc-in connection-state [:subs id] sub-shutdown-ch))))) @@ -163,15 +161,15 @@ (do (log/trace :event ::terminate :id id) (run! close! (-> connection-state :subs vals)) - ;; This shuts down the connection entirely. + ;; This shuts down the connection entirely. (close! response-data-ch)) ;; Not recognized! (let [response (cond-> {:type :error :payload {:message "Unrecognized message type." :type type}} - id (assoc :id id))] - (log/trace :event ::unknown-type :type type :id id) + id (assoc :id id))] + (log/trace :event ::unknown-type :type type :session-id session-id :id id) (>! response-data-ch response) (recur connection-state)))))))))) @@ -222,19 +220,19 @@ (keep fix-up-message) (str/join "; ")) ".")} - locations (assoc :locations locations)) + locations (assoc :locations locations)) ;; Apollo spec only has room for one error, so just use the first (seq errors) (cond-> (first errors) - locations (assoc :locations locations)) + locations (assoc :locations locations)) :else ;; Strip off the exception added by Pedestal and convert ;; the message into an error map (cond-> {:message (to-message t)} - locations (assoc :locations locations)))) + locations (assoc :locations locations)))) (def exception-handler-interceptor "An interceptor that implements the :error callback, to send an \"error\" message to the client." @@ -333,15 +331,13 @@ (let [{:keys [::values-chan-fn request]} context source-stream-ch (values-chan-fn) {:keys [id shutdown-ch response-data-ch]} request - ;; Subscriptions only have a single selection. - field-name (-> parsed-query :selections first selection/field selection/field-name) source-stream (fn accept-value [value] (cond (nil? value) (close! source-stream-ch) - (resolve/is-resolver-result? value) - (resolve/on-deliver! value accept-value) + (resolve/is-resolver-result? value) + (resolve/on-deliver! value accept-value) :else (put! source-stream-ch value))) @@ -355,7 +351,7 @@ cleanup-fn (executor/invoke-streamer app-context source-stream) ;; Track how many streamed values are currently executing queries *execution-count (atom 0) - ;; Track when the streamer has passed a nil to shutdown the subscription cleanly + ;; Track when the streamer has passed a nil to shut down the subscription cleanly *shutting-down? (atom false) ;; Closed when shutting down and execution count drops to 0 streamer-shutdown-ch (chan)] @@ -489,8 +485,8 @@ execute-operation-interceptor]) -(defn listener-fn-factory - "A factory for the function used to create a WS listener. +(defn subscription-websocket-endpoint + "A factory for the websocket endpoint map. This function is invoked for each new client connecting to the service. @@ -510,6 +506,10 @@ Options: + :idle-timeout-ms Sets the idle timeout on the websocket connection; if omitted, the container default is used. + + :session-initializer Passed the jakarta.websocket.Session to perform any additional initialization. + :keep-alive-ms (default: 25000) : The interval at which keep alive messages are sent to the client. Note that configuring this timeout to be at or above 30s conflicts with a default Jetty timeout @@ -522,14 +522,6 @@ : A seq of interceptors for processing queries. The default is derived from [[default-subscription-interceptors]]. - :init-context - : A function returning the base context for the subscription-interceptors to operate on. - The function takes the following arguments: - - the minimal viable context for operation - - the ServletUpgradeRequest that initiated this connection - - the ServletUpgradeResponse to the upgrade request - Defaults to returning the context unchanged. - :response-chan-fn : A function that returns a new channel. Responses to be written to client are put into this channel. The default is a non-lossy channel with a buffer size of 10. @@ -543,43 +535,55 @@ : Used to create the channel of text responses sent to the client. The default is 10 (a non-lossy channel)." [compiled-schema options] - (let [{:keys [keep-alive-ms app-context init-context send-buffer-or-n response-chan-fn values-chan-fn] + (let [{:keys [keep-alive-ms app-context send-buffer-or-n response-chan-fn values-chan-fn session-initializer] :or {keep-alive-ms 25000 send-buffer-or-n 10 response-chan-fn #(chan 10) - values-chan-fn #(chan 1) - init-context (fn [ctx & _args] ctx)}} options + values-chan-fn #(chan 1)}} options interceptors (or (:subscription-interceptors options) (default-subscription-interceptors compiled-schema app-context)) - base-context (chain/enqueue {::chain/terminators [:response] - ::values-chan-fn values-chan-fn} - interceptors)] - (log/trace :event ::configuring :keep-alive-ms keep-alive-ms) - (fn [req resp _ws-map] - (.setAcceptedSubProtocol ^UpgradeResponse resp "graphql-ws") - (log/trace :event ::upgrade-requested) - (let [response-data-ch (response-chan-fn) ; server data -> client - ws-text-ch (chan 1) ; client text -> server - ws-data-ch (chan 10) ; client text -> client data - on-close (fn [_ _] - (log/trace :event ::closed) - (close! response-data-ch) - (close! ws-data-ch)) - base-context' (init-context base-context req resp) - on-connect (fn [_session send-ch] - (log/trace :event ::connected) - (response-encode-loop response-data-ch send-ch) - (ws-parse-loop ws-text-ch ws-data-ch response-data-ch) - (connection-loop keep-alive-ms ws-data-ch response-data-ch base-context'))] - (ws/make-ws-listener - {:on-connect (ws/start-ws-connection on-connect send-buffer-or-n) - :on-text #(put! ws-text-ch %) - :on-error #(log/error :event ::error :exception %) - :on-close on-close}))))) - -(s/fdef listener-fn-factory - :args (s/cat :compiled-schema ::spec/compiled-schema - :options (s/nilable ::listener-fn-factory-options))) + base-context (-> {::values-chan-fn values-chan-fn} + (chain/terminate-when :response) + (chain/enqueue interceptors)) + on-open (fn [^Session session ^EndpointConfig config] + (let [session-id (.getId session) + _ (do + (log/trace :event ::connected :id session-id) + (when session-initializer + (session-initializer session config))) + ; server data -> client + response-data-ch (response-chan-fn) + send-ch (ws/start-ws-connection session {:send-buffer-or-n send-buffer-or-n}) + ; client text -> server + ws-text-ch (chan 1) + ; client text -> client data + ws-data-ch (chan 10)] + (response-encode-loop response-data-ch send-ch) + (ws-parse-loop session-id ws-text-ch ws-data-ch response-data-ch) + (connection-loop session-id keep-alive-ms ws-data-ch response-data-ch base-context) + {:response-data-ch response-data-ch + :ws-text-ch ws-text-ch + :ws-data-ch ws-data-ch + :session-id session-id})) + on-text (fn [{:keys [ws-text-ch]} s] + (put! ws-text-ch s)) + on-close (fn [{:keys [response-data-ch ws-data-ch session-id]} _session reason] + (log/trace :event ::closed :reason reason :session-id session-id) + (close! response-data-ch) + (close! ws-data-ch)) + on-error (fn [{:keys [session-id]} _ t] + (log/error :event ::error :session-id session-id :exception t))] + (-> options + (select-keys [:idle-timeout-ms]) + (assoc :subprotocols ["graphql-ws"] + :on-open on-open + :on-close on-close + :on-text on-text + :on-error on-error)))) + +(s/fdef subscription-websocket-endpoint + :args (s/cat :compiled-schema ::spec/compiled-schema + :options (s/nilable ::listener-fn-factory-options))) (s/def ::listener-fn-factory-options (s/keys :opt-un [::keep-alive-ms ::spec/app-context diff --git a/test/com/walmartlabs/lacinia/pedestal/subscription_interceptors_test.clj b/test/com/walmartlabs/lacinia/pedestal/subscription_interceptors_test.clj index 7cda856..a7d7bb8 100644 --- a/test/com/walmartlabs/lacinia/pedestal/subscription_interceptors_test.clj +++ b/test/com/walmartlabs/lacinia/pedestal/subscription_interceptors_test.clj @@ -17,13 +17,9 @@ [clojure.test :refer [deftest is use-fixtures]] [com.walmartlabs.lacinia.pedestal :refer [inject]] [com.walmartlabs.lacinia.test-utils - :refer [test-server-fixture *ping-subscribes *ping-cleanups - ws-uri - subscriptions-fixture - send-data send-init (s/default-subscription-interceptors schema nil) - (inject invoke-count-interceptor :before ::s/execute-operation) - (inject user-agent-interceptor :before ::s/execute-operation)) + (inject invoke-count-interceptor :before ::s/execute-operation)) - :init-context - (fn [ctx ^ServletUpgradeRequest req resp] - (reset! *invoke-count 0) - (reset! *user-agent nil) - (assoc-in ctx [:request :user-agent] (.getHeader (.getHttpServletRequest req) "User-Agent")))}) + :session-initializer + (fn [_ _] + (reset! *invoke-count 0))}) (use-fixtures :once (test-server-fixture {:subscriptions true :keep-alive-ms 200} @@ -92,7 +75,4 @@ (expect-message {:id 987 :payload {:data {:ping {:message "short #1"}}} - :type "data"}) - - (is (not-empty @*user-agent) - "The user agent was set by the init-context function.")) + :type "data"}))