diff --git a/config.example.edn b/config.example.edn index ec2c855..85962c6 100644 --- a/config.example.edn +++ b/config.example.edn @@ -4,7 +4,14 @@ :private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n" :port 443 :host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com" - :scheme "https"} + :scheme "https" + ;; uncomment the `:proxy` field if you need to use an HTTP proxy + ;; to uncomment, simply remove the `_#` prefixes + #_:proxy #_{:host "my.proxy.host" + :port 1234 + :user "proxy-user" + :password "secret-proxy-pass"} + } :channels [{:chan-name "my_channel" :database "TESTDATABASE" :schema "PUBLIC" diff --git a/src/emqx/adapter.clj b/src/emqx/adapter.clj index b1821cf..d5e8d00 100644 --- a/src/emqx/adapter.clj +++ b/src/emqx/adapter.clj @@ -1,7 +1,34 @@ (ns emqx.adapter (:require + [camel-snake-kebab.core :as csk] [cats.monad.either :refer [left right]] - [cheshire.core :as json])) + [cheshire.core :as json]) + (:import + [java.util Properties] + [net.snowflake.ingest.utils HttpUtil])) + +(defn channel-client-proxy-config->props + [{:keys [:host :port :user :password]}] + (let [props (Properties.)] + (doto props + (.setProperty HttpUtil/USE_PROXY "true") + (.setProperty HttpUtil/PROXY_HOST host) + (.setProperty HttpUtil/PROXY_PORT (str port))) + (when user + (.setProperty props HttpUtil/HTTP_PROXY_USER user)) + (when password + (.setProperty props HttpUtil/HTTP_PROXY_PASSWORD password)) + props)) + +(defn channel-client-config->props + [params] + (let [keys [:user :url :private-key :port :host :scheme] + props (Properties.) + _ (doseq [k keys] + (.setProperty props (csk/->snake_case_string k) (str (get params k)))) + proxy-props (when-some [proxy-cfg (:proxy params)] + (channel-client-proxy-config->props proxy-cfg))] + [props proxy-props])) (defn mqtt-client-config [{:keys [:host :port :clientid :topic :qos :clean-start :username :password]}] diff --git a/src/emqx/channel.clj b/src/emqx/channel.clj index 39162fb..4b28ea3 100644 --- a/src/emqx/channel.clj +++ b/src/emqx/channel.clj @@ -1,9 +1,8 @@ (ns emqx.channel (:require - [camel-snake-kebab.core :as csk] + [emqx.adapter :as adapter] [emqx.log :as log]) (:import - [java.util Properties] [net.snowflake.ingest.streaming SnowflakeStreamingIngestClientFactory OpenChannelRequest @@ -19,10 +18,10 @@ (defn start-client [{:keys [:client-name] :as params}] - (let [props (Properties.) - _ (doseq [[k v] params - :when (not= :client-name k)] - (.setProperty props (csk/->snake_case_string k) (str v))) + (let [[props proxy-props] (adapter/channel-client-config->props params) + _ (when proxy-props + (doseq [[k v] proxy-props] + (System/setProperty k v))) c (.. (SnowflakeStreamingIngestClientFactory/builder client-name) (setProperties props) (build))] diff --git a/src/emqx/config.clj b/src/emqx/config.clj index 670d83c..7c9c376 100644 --- a/src/emqx/config.clj +++ b/src/emqx/config.clj @@ -5,6 +5,12 @@ [schema.coerce :as coerce] [schema.core :as s])) +(s/defschema ClientProxyConfig + {:host s/Str + :port s/Num + (s/optional-key :user) s/Str + (s/optional-key :password) s/Str}) + (s/defschema ClientConfig {:client-name s/Str :user s/Str @@ -12,7 +18,8 @@ :private-key s/Str :port s/Num :host s/Str - :scheme s/Str}) + :scheme s/Str + (s/optional-key :proxy) ClientProxyConfig}) (s/defschema MQTTConfig {:host s/Str diff --git a/test/emqx/adapter_test.clj b/test/emqx/adapter_test.clj index 1c4aa74..6b079f0 100644 --- a/test/emqx/adapter_test.clj +++ b/test/emqx/adapter_test.clj @@ -49,3 +49,53 @@ :password "bar"}) :opts (select-keys [:username :password])))))) + +(deftest channel-client-config->props + (let [base-proxy-params {:host "proxy.host.com" :port 1234} + base-params {:client-name "client-name" + :user "client-user" + :url "snowflake.url.com" + :private-key "secret-private-key" + :port 443 + :host "snowflake.host" + :scheme "https"}] + (testing "no proxy config" + (let [props (adapter/channel-client-config->props base-params)] + (is (= [{"scheme" "https" + "port" "443" + "host" "snowflake.host" + "private_key" "secret-private-key" + "user" "client-user" + "url" "snowflake.url.com"} + nil] + props)))) + (testing "proxy config, no username and password" + (let [props (adapter/channel-client-config->props (assoc base-params :proxy base-proxy-params))] + (is (= [{"scheme" "https" + "port" "443" + "host" "snowflake.host" + "private_key" "secret-private-key" + "user" "client-user" + "url" "snowflake.url.com"} + {"http.useProxy" "true" + "http.proxyHost" "proxy.host.com" + "http.proxyPort" "1234"}] + props)))) + (testing "proxy config, with username and password" + (let [props (-> base-params + (assoc :proxy base-proxy-params) + (assoc-in [:proxy :user] "proxy-user") + (assoc-in [:proxy :password] "proxy-password") + adapter/channel-client-config->props)] + (is (= [{"scheme" "https" + "port" "443" + "host" "snowflake.host" + "private_key" "secret-private-key" + "user" "client-user" + "url" "snowflake.url.com"} + {"http.useProxy" "true" + "http.proxyHost" "proxy.host.com" + "http.proxyPort" "1234" + "http.proxyUser" "proxy-user" + "http.proxyPassword" "proxy-password"}] + props))))))