diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index dc59a16..e61d953 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -1,6 +1,10 @@ name: docker -on: [push] +on: + push: + tags: + - "**" + workflow_dispatch: {} jobs: docker: diff --git a/.gitignore b/.gitignore index dd1b34e..5f50563 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ pom.xml.asc /profile.json /.lsp/ /config.edn +.rebel_readline_history diff --git a/config.example.edn b/config.example.edn index fe72d8f..9679fff 100644 --- a/config.example.edn +++ b/config.example.edn @@ -4,17 +4,24 @@ :private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n" :port 443 :host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com" - :scheme "https" - :role "TESTROLE"} + :scheme "https"} :channels [{:chan-name "my_channel" :database "TESTDATABASE" :schema "PUBLIC" :table "TESTTABLE" - :on-error :continue} + :on-error :continue + :mqtt {:host "emqx1.emqx.net" + :port 1883 + :clientid "chan1" + :topic "t/sf/1" + :qos 1}} {:chan-name "my_channel2" :database "TESTDATABASE" :schema "PUBLIC" :table "TESTTABLE2" - :on-error :abort}] - :server {:host "0.0.0.0" - :port 9099}} + :on-error :abort + :mqtt {:host "emqx1.emqx.net" + :port 1883 + :clientid "chan2" + :topic "t/sf/2" + :qos 1}}]} diff --git a/deps.edn b/deps.edn index f69ef6f..0452fca 100644 --- a/deps.edn +++ b/deps.edn @@ -1,21 +1,18 @@ {:paths ["src" "resources"] :deps { ;; https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk - net.snowflake/snowflake-ingest-sdk {:mvn/version "2.0.1"} + net.snowflake/snowflake-ingest-sdk {:mvn/version "2.2.2" + :exclusions [org.slf4j/slf4j-api]} - ;; https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 - ;; org.eclipse.paho/org.eclipse.paho.client.mqttv3 {:mvn/version "1.2.5"} + clojurewerkz/machine_head {:mvn/version "1.0.0"} - io.pedestal/pedestal.jetty {:mvn/version "0.6.0"} - io.pedestal/pedestal.route {:mvn/version "0.6.0"} - io.pedestal/pedestal.service {:mvn/version "0.6.0"} - ;; https://mvnrepository.com/artifact/org.slf4j/slf4j-simple - ;; org.slf4j/slf4j-simple {:mvn/version "2.0.7"} - com.fzakaria/slf4j-timbre {:mvn/version "0.4.0"} + ;; https://mvnrepository.com/artifact/org.slf4j/slf4j-api + org.slf4j/slf4j-api {:mvn/version "2.0.16"} + com.taoensso/telemere {:mvn/version "1.0.0-beta22"} + com.taoensso/telemere-slf4j {:mvn/version "1.0.0-beta22"} prismatic/schema {:mvn/version "1.4.1"} camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} - com.taoensso/timbre {:mvn/version "6.2.1"} org.clojure/tools.cli {:mvn/version "1.0.219"} cheshire/cheshire {:mvn/version "5.11.0"} funcool/cats {:mvn/version "2.4.2"} diff --git a/readme.md b/readme.md index 0450f21..aa9ee67 100644 --- a/readme.md +++ b/readme.md @@ -3,7 +3,7 @@ # prerequisites 1. Have an account on Snowflake -2. Clojure 1.11 +2. Clojure 1.11+ 3. Have an user with a role that has the sufficient privileges on all relevant objects. - Such role must have: - `USAGE` on the database. @@ -38,10 +38,13 @@ clj ``` ```clojure -(require '[emqx.http :as server]) +(require '[emqx.config :as config]) (require '[emqx.channel :as chan]) -(chan/start-client) -(server/start) +(let [{:keys [:app-name :client :channels]} (config/get-config!)] + (chan/start-client client) + (doseq [chan-params channels] + (chan/ensure-streaming-agent app-name chan-params) + (mqtt/start-client chan-params))) ``` ## uberjar @@ -50,7 +53,6 @@ clj clj -T:build uber java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar -java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar -D taoensso.timbre.config.edn='{:min-level :info}' ``` ## docker diff --git a/resources/taoensso.timbre.config.edn b/resources/taoensso.timbre.config.edn deleted file mode 100644 index a72f525..0000000 --- a/resources/taoensso.timbre.config.edn +++ /dev/null @@ -1 +0,0 @@ -{:min-level :debug} diff --git a/src/emqx/adapter.clj b/src/emqx/adapter.clj index 885a747..abe1d4a 100644 --- a/src/emqx/adapter.clj +++ b/src/emqx/adapter.clj @@ -1,49 +1,25 @@ (ns emqx.adapter (:require [cats.monad.either :refer [left right]] - [schema.core :as s])) + [cheshire.core :as json])) -(s/defschema UpsertChannelWire - {:chan-name s/Str - :database s/Str - :schema s/Str - :table s/Str - :on-error (s/enum :continue :abort)}) +(defn mqtt-client-config + [{:keys [:host :port :clientid :topic :qos :username :password]}] + {:uri (str "tcp://" host ":" port) + :clientid clientid + :topic topic + :qos qos + :opts (cond-> {:auto-reconnect true} + username (assoc :username username) + password (assoc :password password))}) -(defn upsert-channel-in - [{:keys [:json-params :path-params]}] - (let [{:keys [:chan-name]} path-params - {:keys [:database :schema :table :on_error]} json-params - on-error (keyword on_error) - chan-params {:chan-name chan-name - :database database - :schema schema - :table table - :on-error on-error} - errors (s/check UpsertChannelWire chan-params)] - (if errors - (left errors) - (right chan-params)))) - -(s/defschema JsonVal - (s/maybe - (s/conditional - string? s/Str - number? s/Num - boolean? s/Bool - map? {s/Str (s/recursive #'JsonVal)} - vector? [(s/recursive #'JsonVal)]))) - -(s/defschema InsertRowsWire - ;; TODO: should use more lax value types??? - {:rows [(s/conditional - (every-pred map? not-empty) {s/Str JsonVal})]}) - -(defn insert-rows-in - [{:keys [:json-params]}] - (let [rows (get json-params "rows") - insert-rows-params {:rows rows} - errors (s/check InsertRowsWire insert-rows-params)] - (if errors - (left errors) - (right insert-rows-params)))) +(defn channel-data-in + "Parses the incoming payload and expects it to be a JSON encoded object" + [^bytes payload] + (try + (let [decoded (json/parse-string (String. payload "UTF-8"))] + (if (map? decoded) + (right decoded) + (left "bad input: should be a single JSON object"))) + (catch Exception e + (left (str "bad input: failure parsing json: " (.getMessage e)))))) diff --git a/src/emqx/channel.clj b/src/emqx/channel.clj index f641907..39162fb 100644 --- a/src/emqx/channel.clj +++ b/src/emqx/channel.clj @@ -1,7 +1,7 @@ (ns emqx.channel (:require [camel-snake-kebab.core :as csk] - [taoensso.timbre :as log]) + [emqx.log :as log]) (:import [java.util Properties] [net.snowflake.ingest.streaming @@ -91,15 +91,17 @@ (try (.getLatestCommittedOffsetToken chan) (catch SFException e - (log/error :msg ::get-latest-commited-offset-exception - :exception (.getMessage e) - :cause (.getCause e) - :vendor-code (.getVendorCode e)) + (log/error! + ::get-latest-commited-offset-exception + :exception (.getMessage e) + :cause (.getCause e) + :vendor-code (.getVendorCode e)) (.printStackTrace e) nil) (catch Exception e - (log/error :msg ::get-latest-commited-offset-exception - :exception (.getMessage e)) + (log/error! + ::get-latest-commited-offset-exception + :exception (.getMessage e)) nil))) ;; TODO: return partial success count??? @@ -113,29 +115,36 @@ (map (fn [e] {:row-index (.getRowIndex e) :message (.getMessage e)})))] - (log/error :msg ::insert-row-errors :errors errors :offset-token offset-token) + (log/error! + ::insert-row-errors + :errors errors + :offset-token offset-token) {:errors errors}) (do - (log/debug :msg ::insert-rows-success :offset-token offset-token) + (log/debug! + ::insert-rows-success + :offset-token offset-token) {:errors nil}))) (catch SFException e - (log/error :msg ::insert-row-exception - :exception (.getMessage e) - :cause (.getCause e) - :vendor-code (.getVendorCode e) - :offset-token offset-token) + (log/error! + ::insert-row-exception + :exception (.getMessage e) + :cause (.getCause e) + :vendor-code (.getVendorCode e) + :offset-token offset-token) (.printStackTrace e) {:errors [(.getMessage e)] :vendor-code (.getVendorCode e)}) (catch Exception e - (log/error :msg ::insert-row-exception - :exception (.getMessage e) - :offset-token offset-token) + (log/error! + ::insert-row-exception + :exception (.getMessage e) + :offset-token offset-token) {:errors [(.getMessage e)]}))) (defn- insert-rows [{:keys [:chan :n] :as state} rows reply-promise] - (log/debug :msg ::insert-rows-enter :rows rows) + (log/debug! ::insert-rows-enter :rows rows) (let [row-count (count rows) offset-token (str (+ n row-count)) response (do-insert-rows chan rows offset-token)] @@ -160,10 +169,11 @@ (if-let [channel-agent (@channels chan-name)] (let [{:keys [:chan]} @channel-agent] (.close chan) - (log/info :msg ::channel-closed - :chan-name chan-name - :closed? (.isClosed chan) - :valid? (.isValid chan)) + (log/info! + ::channel-closed + :chan-name chan-name + :closed? (.isClosed chan) + :valid? (.isValid chan)) (swap! channels dissoc chan-name) (deliver reply-promise true)) (deliver reply-promise true)) diff --git a/src/emqx/config.clj b/src/emqx/config.clj index b02430a..29ecd48 100644 --- a/src/emqx/config.clj +++ b/src/emqx/config.clj @@ -12,24 +12,53 @@ :private-key s/Str :port s/Num :host s/Str - :scheme s/Str - :role s/Str}) + :scheme s/Str}) + +(s/defschema MQTTConfig + {:host s/Str + :port s/Num + :clientid s/Str + :topic s/Str + :qos (s/enum 0 1 2) + (s/optional-key :username) s/Str + (s/optional-key :password) s/Str}) (s/defschema ChannelConfig {:chan-name s/Str :database s/Str :schema s/Str :table s/Str - :on-error (s/enum :continue :abort)}) - -(s/defschema ServerConfig - {:host s/Str - :port s/Num}) + :on-error (s/enum :continue :abort) + :mqtt MQTTConfig}) (s/defschema EMQXConfig {:client ClientConfig - :channels [ChannelConfig] - :server ServerConfig}) + :channels [ChannelConfig]}) + +(defn- duplicates + [coll] + (->> coll + frequencies + (filter (fn [[_ n]] (> n 1))) + (map (fn [[x _]] x)))) + +(defn- validate-unique! + [{:keys [:channels]} ks what] + (let [all-things (map (fn [chan-config] + (get-in chan-config ks)) + channels) + duplicate-things (duplicates all-things)] + (when (seq duplicate-things) + (throw (RuntimeException. + (format "Duplicate %s found: %s" what (pr-str duplicate-things))))))) + +(defn- validate-unique-clientids! + [channels] + (validate-unique! channels [:mqtt :clientid] "clientids")) + +(defn- validate-unique-chan-names! + [channels] + (validate-unique! channels [:chan-name] "channel names")) (defn get-config! "Get the configurations for startup. @@ -40,6 +69,8 @@ (edn/read (java.io.PushbackReader. r))) coercer (coerce/coercer! EMQXConfig coerce/keyword-enum-matcher) config (coercer params) + _ (validate-unique-clientids! config) + _ (validate-unique-chan-names! config) app-name (or (System/getenv "APP_NAME") (str (java.util.UUID/randomUUID)))] (assoc config :app-name app-name))) diff --git a/src/emqx/core.clj b/src/emqx/core.clj index becc595..156c572 100644 --- a/src/emqx/core.clj +++ b/src/emqx/core.clj @@ -2,33 +2,44 @@ (:require [emqx.channel :as chan] [emqx.config :as config] - [emqx.http :as http-server] - [taoensso.timbre :as log]) + [emqx.mqtt :as mqtt] + [emqx.log :as log]) (:gen-class)) +(defn- block-forever + [] + (while true + (Thread/sleep 60000))) + +(defn- cleanup-hook + [] + (log/info! ::closing-channels) + (doseq [[chan-name _] @chan/channels] + (log/info! ::closing-mqtt-client :channel-name chan-name) + (mqtt/stop-client chan-name) + (log/info! ::closing-channel :channel-name chan-name) + (chan/ensure-chan-deleted chan-name)) + (log/info! ::stopping-client) + (chan/stop-client) + (log/info! ::shutting-down-agents) + (shutdown-agents)) + (defn -main [& args] - (log/info :msg ::init-args :args args) + (log/info! ::init-args :args args) (.addShutdownHook (Runtime/getRuntime) - (Thread. ^Runnable - (fn [] - (log/info :msg ::closing-channels) - (doseq [[chan-name _] @chan/channels] - (log/info :msg ::closing-channel :channel-name chan-name) - (chan/ensure-chan-deleted chan-name)) - (log/info :msg ::stopping-client) - (chan/stop-client) - (log/info :msg ::shutting-down-agents) - (shutdown-agents)))) - (let [{:keys [:app-name :client :channels :server]} (config/get-config!)] + (Thread. ^Runnable cleanup-hook)) + (let [{:keys [:app-name :client :channels]} (config/get-config!)] ;; Should the client name be unique? (chan/start-client client) (doseq [chan-params channels] - (chan/ensure-streaming-agent app-name chan-params)) + (chan/ensure-streaming-agent app-name chan-params) + (mqtt/start-client chan-params)) (try - (http-server/start (assoc server :join? true)) + (block-forever) (catch Exception e + (log/error! ::main-fn-exception :reason e) (println (.getMessage e)) (.printStackTrace e) (System/exit 1))))) diff --git a/src/emqx/http.clj b/src/emqx/http.clj deleted file mode 100644 index fb40a47..0000000 --- a/src/emqx/http.clj +++ /dev/null @@ -1,193 +0,0 @@ -(ns emqx.http - (:require - [camel-snake-kebab.core :as csk] - [cats.monad.either :as e] - [clojure.string :as str] - [emqx.adapter :as adapter] - [emqx.channel :as chan] - [io.pedestal.http :as http] - [io.pedestal.http.body-params :as body-params] - [io.pedestal.http.content-negotiation :as conneg] - [io.pedestal.http.route :as route] - [io.pedestal.interceptor :refer [interceptor]] - [io.pedestal.interceptor.chain :as chain] - [io.pedestal.log :as log])) - -;; FIXME: terminate interceptor chain if content type is not json... -(def content-neg-interceptor - (let [supported-types ["application/json"]] - (conneg/negotiate-content supported-types))) - -(def chan-interceptor - {:name ::channel-interceptor - :enter - (fn [context] - (let [chan-name (get-in context [:request :path-params :chan-name]) - chan-agent (get @chan/channels chan-name)] - (if chan-agent - (assoc-in context [:request :chan] (get @chan/channels chan-name)) - (-> context - (assoc :response {:status 404 :body {:error "channel not found"}}) - chain/terminate))))}) - -;; TODO: transform recursively? -(def json-key-interceptor - {:name ::json-key-interceptor - :leave - (fn [context] - (if (map? (get-in context [:response :body])) - (update-in context [:response :body] update-keys csk/->snake_case) - context))}) - -;; TODO: make the whole interceptor chain use m/->= -(defn adapter-interceptor - [adapter-fn] - {:name ::adapter-interceptor - :enter - (fn [context] - (let [request (:request context) - res (adapter-fn request)] - (e/branch res - (fn [errors] - (-> context - (assoc :response {:status 400 :body {:error "bad input" - ;; TODO: better formatting... - :details (str errors)}}) - chain/terminate)) - (fn [parsed-params] - (assoc-in context [:request :input-params] parsed-params)))))}) - -(def log-request - "Logs all http requests with response time." - {:name ::log-request - :enter (fn [context] - (assoc-in context [:request :start-time] (System/currentTimeMillis))) - :leave (fn [context] - (let [{:keys [uri start-time request-method]} (:request context) - finish (System/currentTimeMillis) - total (- finish start-time)] - (log/info :msg "request completed" - :method (str/upper-case (name request-method)) - :uri uri - :status (:status (:response context)) - :response-time total) - context))}) - -#_(def upsert-channel - {:name ::upsert-channel - :enter - (fn [context] - (let [{:keys [:request]} context - {:keys [:json-params :path-params]} request - {:keys [:chan-name]} path-params - {:keys [:database :schema :table :on_error]} json-params - on-error (keyword on_error) - existing-chan-info (chan/chan-info chan-name) - chan-params {:chan-name chan-name - :database database - :schema schema - :table table - :on-error on-error} - chan-info (if (and existing-chan-info - (chan/equal-params? existing-chan-info chan-params)) - existing-chan-info - (do - (chan/ensure-chan-deleted chan-name) - (chan/ensure-streaming-agent chan-params) - (chan/chan-info chan-name)))] - (assoc context :response {:status 200 :body chan-info})))}) - -(def get-chan - {:name ::get-channel - :enter - (fn [context] - (let [chan-name (get-in context [:request :path-params :chan-name]) - chan-info (chan/chan-info chan-name)] - (if chan-info - (assoc context :response {:status 200 :body chan-info}) - (assoc context :response {:status 404}))))}) - -#_(def delete-chan - {:name ::delete-channel - :enter - (fn [context] - (let [chan-name (get-in context [:request :path-params :chan-name])] - (chan/ensure-chan-deleted chan-name) - (assoc context :response {:status 204})))}) - -;; TODO: validate input before this interceptor; pedestal-api??? -(def insert-rows - {:name ::insert-rows - :enter - (fn [context] - (let [{:keys [:json-params :chan]} (:request context) - rows (get json-params "rows" []) - result (chan/insert-rows-sync chan rows) - success? (-> result :errors empty?)] - (if success? - (assoc context :response {:status 204}) - (let [status (case (:vendor-code result) - "0013" 500 - 400)] - (assoc context :response {:status status :body result})))))}) - -(def routes - (route/expand-routes - #{["/channels/:chan-name/insert" - :post [content-neg-interceptor - (body-params/body-params - (body-params/default-parser-map - :json-options {:key-fn str})) - http/json-body - chan-interceptor - json-key-interceptor - (adapter-interceptor adapter/insert-rows-in) - insert-rows]] - - #_["/channels/:chan-name" - :post [content-neg-interceptor - (body-params/body-params) - http/json-body - json-key-interceptor - (adapter-interceptor adapter/upsert-channel-in) - upsert-channel]] - - #_["/channels/:chan-name" - :delete [content-neg-interceptor - (body-params/body-params) - http/json-body - json-key-interceptor - delete-chan]] - - ["/channels/:chan-name" - :get [content-neg-interceptor - (body-params/body-params) - http/json-body - chan-interceptor - json-key-interceptor - get-chan]]})) - -(defn create-server - [params] - (http/create-server - {::http/routes routes - ::http/request-logger (interceptor log-request) - ::http/type :jetty - ::http/join? (get params :join? false) - ::http/host (get params :host "0.0.0.0") - ::http/port (get params :port 9099)})) - -(defn start - [params] - (http/start (create-server params))) - -(defonce server (atom nil)) - -(defn start-dev - [params] - (reset! server (http/start (create-server params)))) - -(defn restart-dev - [params] - (swap! server http/stop) - (reset! server (start-dev params))) diff --git a/src/emqx/log.clj b/src/emqx/log.clj new file mode 100644 index 0000000..f44054b --- /dev/null +++ b/src/emqx/log.clj @@ -0,0 +1,26 @@ +(ns emqx.log + (:require + [taoensso.telemere :as t])) + +(defmacro log! + [level msg & data] + (let [data (apply hash-map data) + opts (cond-> {:level level} + (seq data) (assoc :data data))] + `(t/log! ~opts ~msg))) + +(defmacro debug! + [msg & data] + `(log! :debug ~msg ~@data)) + +(defmacro info! + [msg & data] + `(log! :info ~msg ~@data)) + +(defmacro warn! + [msg & data] + `(log! :warn ~msg ~@data)) + +(defmacro error! + [msg & data] + `(log! :error ~msg ~@data)) diff --git a/src/emqx/mqtt.clj b/src/emqx/mqtt.clj new file mode 100644 index 0000000..0177fbd --- /dev/null +++ b/src/emqx/mqtt.clj @@ -0,0 +1,65 @@ +(ns emqx.mqtt + (:require + [cats.monad.either :as either] + [clojurewerkz.machine-head.client :as mh] + [emqx.adapter :as adapter] + [emqx.channel :as chan] + [emqx.log :as log])) + +(defonce clients (atom {})) + +(defn subscribe + [client chan-name topic qos] + (let [chan-agent (get @chan/channels chan-name)] + (mh/subscribe + client + {topic qos} + (fn [^String _topic _metadata ^bytes payload] + (either/branch + (adapter/channel-data-in payload) + (fn [error] + (log/warn! ::bad-input :reason error)) + (fn [decoded-row] + (chan/insert-rows-sync chan-agent [decoded-row]))))))) + +(defn try-start-client + [chan-name {:keys [:uri :clientid :topic :qos :opts]}] + (try + (either/right + (mh/connect + uri + {:client-id clientid + :opts opts + :on-connect-complete (fn [client reconnect? server-uri] + (log/info! ::connection-established + :reconnect? reconnect? :server-uri server-uri) + (subscribe client chan-name topic qos)) + :on-connection-lost (fn [reason] + (log/warn! ::conection-lost :reason reason))})) + (catch Exception e + (either/left e)))) + +(defn start-client + [{:keys [:chan-name :mqtt]}] + (let [mqtt-config (adapter/mqtt-client-config mqtt)] + (either/branch + (try-start-client chan-name mqtt-config) + (fn [e] + (log/error! + ::failed-to-start-client + :chan-name chan-name + :reason e) + (throw e)) + (fn [conn] + (swap! clients assoc chan-name conn) + conn)))) + +(defn -stop-client + [client] + (mh/disconnect-and-close client)) + +(defn stop-client + [chan-name] + (when-let [client (get @clients chan-name)] + (-stop-client client) + (swap! clients dissoc chan-name))) diff --git a/test/emqx/adapter_test.clj b/test/emqx/adapter_test.clj index 0bfc7b7..1c4aa74 100644 --- a/test/emqx/adapter_test.clj +++ b/test/emqx/adapter_test.clj @@ -1,71 +1,51 @@ (ns emqx.adapter-test (:require [cats.monad.either :as e :refer [right]] - [clojure.test :refer [deftest is testing]] + [clojure.string :as str] + [clojure.test :refer [deftest are is testing]] [emqx.adapter :as adapter] [matcher-combinators.test])) -(deftest upsert-channel-in-test - (let [chan-name "my_channel" - valid-chan-params {:database "TESTDATABASE" - :schema "PUBLIC" - :table "TESTTABLE" - :on_error "continue"}] - (testing "valid params for upsert" - (let [res (adapter/upsert-channel-in {:path-params {:chan-name chan-name} - :json-params valid-chan-params})] - (is (= (right {:chan-name chan-name - :database (:database valid-chan-params) - :schema (:schema valid-chan-params) - :table (:table valid-chan-params) - :on-error :continue}) - res))) - (let [params (assoc valid-chan-params :on_error "abort") - res (adapter/upsert-channel-in {:path-params {:chan-name chan-name} - :json-params params})] - (is (= (right {:chan-name chan-name - :database (:database valid-chan-params) - :schema (:schema valid-chan-params) - :table (:table valid-chan-params) - :on-error :abort}) - res)))) - (testing "missing params for upsert" - (doseq [k (keys valid-chan-params)] - (let [invalid-chan-params (dissoc valid-chan-params k) - res (adapter/upsert-channel-in {:path-params {:chan-name chan-name} - :json-params invalid-chan-params})] - (is (e/left? res))))) - (testing "bad param types for upsert" - (doseq [k (keys valid-chan-params)] - (let [invalid-chan-params (assoc valid-chan-params k nil) - res (adapter/upsert-channel-in {:path-params {:chan-name chan-name} - :json-params invalid-chan-params})] - (is (e/left? res)))) - (let [invalid-chan-params (assoc valid-chan-params :on_error "dunno") - res (adapter/upsert-channel-in {:path-params {:chan-name chan-name} - :json-params invalid-chan-params})] - (is (e/left? res)))))) +(deftest channel-data-in + (testing "invalid json" + (let [res (adapter/channel-data-in (.getBytes "{"))] + (is (e/left? res)) + (e/branch-left + res + (fn [res] + (is (str/starts-with? res "bad input: failure parsing json")))))) + (testing "input is not a map" + (are [input] (let [res (adapter/channel-data-in (.getBytes input))] + (is (e/left? res)) + (e/branch-left + res + (fn [res] + (is (str/starts-with? res "bad input: should be a single JSON object"))))) + "[]" + "[{}]" + "1" + "\"string\"" + "null" + "1.23")) + (testing "valid input" + (are [x y] (= (right x) (adapter/channel-data-in (.getBytes y))) + {} "{}" + {"a" 10} "{\"a\":10}"))) -(deftest insert-rows-in-test - (testing "valid inputs" - (let [valid-cases {:empty-vec [] - :one-map-int [{"c1" 123}] - :one-map-float [{"c1" 1.23}] - :one-map-str [{"c1" "str"}] - :one-map-bool [{"c1" true}] - :one-map-null [{"c1" nil}] - :one-map-array [{"c1" ["a" 123 1.23 true nil [] {}]}] - :one-map-nested [{"c1" {"a" 123}}] - :two-maps [{"c1" 123 "c2" 1.23} {"c1" 321}]}] - (doseq [[name rows] valid-cases] - (is (= (right {:rows rows}) - (adapter/insert-rows-in {:json-params {"rows" rows}})) - {:case name})))) - (testing "invalid inputs" - (let [invalid-cases {:one-empty-map [{}] - :non-str-key [{:c1 123}] - :weird-val1 [{:c1 (fn [] true)}] - :mixed1 [{"c1" 123} {} {"c2" 321}] - :mixed2 [{"c1" 123} {:c1 123} {"c2" 321}]}] - (doseq [[name rows] invalid-cases] - (is (e/left? (adapter/insert-rows-in {:json-params {"rows" rows}})) {:case name}))))) +(deftest mqtt-client-config + (testing "optional username and password" + (is (= {} (-> (adapter/mqtt-client-config {}) + :opts + (select-keys [:username :password])))) + (is (= {:username "foo"} (-> (adapter/mqtt-client-config {:username "foo"}) + :opts + (select-keys [:username :password])))) + ;; not very useful, but... + (is (= {:password "bar"} (-> (adapter/mqtt-client-config {:password "bar"}) + :opts + (select-keys [:username :password])))) + (is (= {:username "foo" + :password "bar"} (-> (adapter/mqtt-client-config {:username "foo" + :password "bar"}) + :opts + (select-keys [:username :password])))))) diff --git a/test/emqx/http_test.clj b/test/emqx/http_test.clj deleted file mode 100644 index 8eb0df5..0000000 --- a/test/emqx/http_test.clj +++ /dev/null @@ -1,209 +0,0 @@ -(ns emqx.http-test - {:clj-kondo/config - '{:linters - {:unresolved-symbol {:exclude [(clojure.test/is [match?])]}}}} - (:require - [cheshire.core :as json] - [clojure.test :refer [deftest is testing use-fixtures]] - [emqx.config :as config] - [emqx.channel :as chan] - [emqx.http :as http-server] - [io.pedestal.http :as http] - [io.pedestal.http.route :as route] - [io.pedestal.test :refer [response-for]] - [matcher-combinators.test])) - -(defn make-service - [] - (::http/service-fn (http/create-servlet {::http/routes http-server/routes}))) - -(def url-for (route/url-for-routes http-server/routes)) - -(def ^:dynamic *service* nil) - -(defn client-fixture - [f] - (let [{:keys [:client]} (config/get-config!)] - (chan/start-client client)) - (f) - (chan/stop-client)) - -(defn server-fixture - [f] - (binding [*service* (make-service)] - (f)) - (doseq [chan-name (keys @chan/channels)] - (chan/ensure-chan-deleted chan-name))) - -(use-fixtures :once client-fixture) -(use-fixtures :each server-fixture) - -(defn get-channel - [chan-name] - (response-for *service* - :get (url-for ::http-server/get-channel - :path-params {:chan-name chan-name}))) - -#_(defn upsert-channel - [chan-name chan-params] - (response-for *service* - :post (url-for ::http-server/upsert-channel - :path-params {:chan-name chan-name}) - :headers {"Content-Type" "application/json"} - :body (json/generate-string chan-params))) - -#_(defn delete-channel - [chan-name] - (response-for *service* - :delete (url-for ::http-server/delete-channel - :path-params {:chan-name chan-name}))) - -#_(deftest ^:integration channels-test - (testing "create and delete channel" - (let [chan-name "my_channel"] - (is (= 404 (:status (get-channel chan-name)))) - (let [chan-params {"database" "TESTDATABASE" - "schema" "PUBLIC" - "table" "TESTTABLE" - "on_error" "continue"} - resp (upsert-channel chan-name chan-params)] - (is (= 200 (:status resp))) - (is (match? - (assoc chan-params - "chan_name" chan-name - "last_offset" any?) - (-> resp - :body - json/parse-string))) - (let [get-resp (get-channel chan-name)] - (is (= 200 (:status get-resp))) - (is (match? - (assoc chan-params - "chan_name" chan-name - "last_offset" any?) - (-> get-resp - :body - json/parse-string))) - (testing "upserting the channel with same params doesn't recreate it" - (let [change-triggered (atom false) - _ (add-watch chan/channels :test-upsert (fn [_k _r _o _n] - (reset! change-triggered true))) - resp (upsert-channel chan-name chan-params)] - (is (= 200 (:status resp))) - (is (not @change-triggered)) - (is (match? - (assoc chan-params - "chan_name" chan-name - "last_offset" any?) - (-> get-resp - :body - json/parse-string))) - (remove-watch chan/channels :test-upsert))) - (testing "upserting the channel with different params recreate it" - (let [change-triggered (atom false) - new-chan-params (assoc chan-params "on_error" "abort") - _ (add-watch chan/channels :test-upsert (fn [_k _r _o _n] - (reset! change-triggered true))) - recreate-resp (upsert-channel chan-name new-chan-params)] - (is (= 200 (:status recreate-resp))) - (is @change-triggered) - (is (match? - (assoc new-chan-params - "chan_name" chan-name - "last_offset" any?) - (-> recreate-resp - :body - json/parse-string))) - (remove-watch chan/channels :test-upsert))))) - (let [resp (delete-channel chan-name)] - (is (= 204 (:status resp))) - (is (= 404 (:status (get-channel chan-name))))) - (testing "deleting again is idempotent" - (let [resp (delete-channel chan-name)] - (is (= 204 (:status resp)))))))) - -#_(deftest ^:integration channels-adapter-integration-test - (testing "missing params for upsert" - (let [chan-name "my_channel" - valid-chan-params {"database" "TESTDATABASE" - "schema" "PUBLIC" - "table" "TESTTABLE" - "on_error" "continue"}] - (doseq [k (keys valid-chan-params)] - (let [invalid-chan-params (dissoc valid-chan-params k) - resp (upsert-channel chan-name invalid-chan-params)] - (is (= 400 (:status resp))) - (is (match? {"error" "bad input" - "details" string?} - (-> resp - :body - json/parse-string))))))) - (testing "bad param types for upsert" - (let [chan-name "my_channel" - valid-chan-params {"database" "TESTDATABASE" - "schema" "PUBLIC" - "table" "TESTTABLE" - "on_error" "continue"}] - (doseq [k (keys valid-chan-params)] - (let [invalid-chan-params (assoc valid-chan-params k nil) - resp (upsert-channel chan-name invalid-chan-params)] - (is (= 400 (:status resp))) - (is (match? {"error" "bad input" - "details" string?} - (-> resp - :body - json/parse-string))))) - (let [invalid-chan-params (assoc valid-chan-params "on_error" "dunno") - resp (upsert-channel chan-name invalid-chan-params)] - (is (= 400 (:status resp))) - (is (match? {"error" "bad input" - "details" string?} - (-> resp - :body - json/parse-string))))))) - -(defn insert-rows - [chan-name body-params] - (response-for *service* - :post (url-for ::http-server/insert-rows - :path-params {:chan-name chan-name}) - :headers {"Content-Type" "application/json"} - :body (json/generate-string body-params))) - -(deftest ^:integration insert-rows-test - (let [chan-name "my_channel" - valid-rows {"rows" [{"c1" 123} - {"c1" 234}]}] - (testing "channel not created" - (let [resp (insert-rows chan-name valid-rows)] - (is (= 404 (:status resp))))) - - (let [{:keys [:app-name :channels]} (config/get-config!)] - (doseq [chan-params channels] - (chan/ensure-streaming-agent app-name chan-params))) - - (testing "inserting valid rows" - (let [resp (insert-rows chan-name valid-rows)] - (is (= 204 (:status resp)))) - (is (= 204 (:status (insert-rows chan-name {"rows" []}))))) - (testing "inserting invalid rows" - (let [resp (insert-rows chan-name {"rows" [{"nonexistent" 123}]})] - (is (= 400 (:status resp))) - (is (match? - {"errors" [{"row-index" 0 - "message" #"The given row cannot be converted to the internal format: Extra columns"}]} - (json/parse-string (:body resp))))) - (let [resp (insert-rows chan-name {"rows" [{"c1" "wrong type"}]})] - (is (= 400 (:status resp))) - (is (match? - {"errors" [{"row-index" 0 - "message" #"The given row cannot be converted to the internal format due to invalid value"}]} - (json/parse-string (:body resp))))) - (let [resp (insert-rows chan-name {"rows" [{"c1" 123} - {"c1" "wrong type"} - {"c1" 234}]})] - (is (= 400 (:status resp))) - (is (match? - {"errors" [{"row-index" 1 - "message" #"The given row cannot be converted to the internal format due to invalid value"}]} - (json/parse-string (:body resp))))))))