Skip to content

Commit

Permalink
feat: add proxy support
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Nov 25, 2024
1 parent c59226b commit 7d0fa87
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
9 changes: 8 additions & 1 deletion config.example.edn
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
:private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n"
:port 443
:host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com"
:scheme "https"}
:scheme "https"
;; uncomment the `:proxy` field if you need to use an HTTP proxy
;; to uncomment, simply remove the `_#` prefixes
#_:proxy #_{:host "my.proxy.host"
:port 1234
:user "proxy-user"
:password "secret-proxy-pass"}
}
:channels [{:chan-name "my_channel"
:database "TESTDATABASE"
:schema "PUBLIC"
Expand Down
29 changes: 28 additions & 1 deletion src/emqx/adapter.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
(ns emqx.adapter
(:require
[camel-snake-kebab.core :as csk]
[cats.monad.either :refer [left right]]
[cheshire.core :as json]))
[cheshire.core :as json])
(:import
[java.util Properties]
[net.snowflake.ingest.utils HttpUtil]))

(defn channel-client-proxy-config->props
[{:keys [:host :port :user :password]}]
(let [props (Properties.)]
(doto props
(.setProperty HttpUtil/USE_PROXY "true")
(.setProperty HttpUtil/PROXY_HOST host)
(.setProperty HttpUtil/PROXY_PORT (str port)))
(when user
(.setProperty props HttpUtil/HTTP_PROXY_USER user))
(when password
(.setProperty props HttpUtil/HTTP_PROXY_PASSWORD password))
props))

(defn channel-client-config->props
[params]
(let [keys [:user :url :private-key :port :host :scheme]
props (Properties.)
_ (doseq [k keys]
(.setProperty props (csk/->snake_case_string k) (str (get params k))))
proxy-props (when-some [proxy-cfg (:proxy params)]
(channel-client-proxy-config->props proxy-cfg))]
[props proxy-props]))

(defn mqtt-client-config
[{:keys [:host :port :clientid :topic :qos :clean-start :username :password]}]
Expand Down
11 changes: 5 additions & 6 deletions src/emqx/channel.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
(ns emqx.channel
(:require
[camel-snake-kebab.core :as csk]
[emqx.adapter :as adapter]
[emqx.log :as log])
(:import
[java.util Properties]
[net.snowflake.ingest.streaming
SnowflakeStreamingIngestClientFactory
OpenChannelRequest
Expand All @@ -19,10 +18,10 @@

(defn start-client
[{:keys [:client-name] :as params}]
(let [props (Properties.)
_ (doseq [[k v] params
:when (not= :client-name k)]
(.setProperty props (csk/->snake_case_string k) (str v)))
(let [[props proxy-props] (adapter/channel-client-config->props params)
_ (when proxy-props
(doseq [[k v] proxy-props]
(System/setProperty k v)))
c (.. (SnowflakeStreamingIngestClientFactory/builder client-name)
(setProperties props)
(build))]
Expand Down
9 changes: 8 additions & 1 deletion src/emqx/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
[schema.coerce :as coerce]
[schema.core :as s]))

(s/defschema ClientProxyConfig
{:host s/Str
:port s/Num
(s/optional-key :user) s/Str
(s/optional-key :password) s/Str})

(s/defschema ClientConfig
{:client-name s/Str
:user s/Str
:url s/Str
:private-key s/Str
:port s/Num
:host s/Str
:scheme s/Str})
:scheme s/Str
(s/optional-key :proxy) ClientProxyConfig})

(s/defschema MQTTConfig
{:host s/Str
Expand Down
50 changes: 50 additions & 0 deletions test/emqx/adapter_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,53 @@
:password "bar"})
:opts
(select-keys [:username :password]))))))

(deftest channel-client-config->props
(let [base-proxy-params {:host "proxy.host.com" :port 1234}
base-params {:client-name "client-name"
:user "client-user"
:url "snowflake.url.com"
:private-key "secret-private-key"
:port 443
:host "snowflake.host"
:scheme "https"}]
(testing "no proxy config"
(let [props (adapter/channel-client-config->props base-params)]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
nil]
props))))
(testing "proxy config, no username and password"
(let [props (adapter/channel-client-config->props (assoc base-params :proxy base-proxy-params))]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
{"http.useProxy" "true"
"http.proxyHost" "proxy.host.com"
"http.proxyPort" "1234"}]
props))))
(testing "proxy config, with username and password"
(let [props (-> base-params
(assoc :proxy base-proxy-params)
(assoc-in [:proxy :user] "proxy-user")
(assoc-in [:proxy :password] "proxy-password")
adapter/channel-client-config->props)]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
{"http.useProxy" "true"
"http.proxyHost" "proxy.host.com"
"http.proxyPort" "1234"
"http.proxyUser" "proxy-user"
"http.proxyPassword" "proxy-password"}]
props))))))

0 comments on commit 7d0fa87

Please sign in to comment.