Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add proxy support #10

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion config.example.edn
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
:private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n"
:port 443
:host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com"
:scheme "https"}
:scheme "https"
;; uncomment the `:proxy` field if you need to use an HTTP proxy
;; to uncomment, simply remove the `_#` prefixes
#_:proxy #_{:host "my.proxy.host"
:port 1234
:user "proxy-user"
:password "secret-proxy-pass"}
}
:channels [{:chan-name "my_channel"
:database "TESTDATABASE"
:schema "PUBLIC"
Expand Down
29 changes: 28 additions & 1 deletion src/emqx/adapter.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
(ns emqx.adapter
(:require
[camel-snake-kebab.core :as csk]
[cats.monad.either :refer [left right]]
[cheshire.core :as json]))
[cheshire.core :as json])
(:import
[java.util Properties]
[net.snowflake.ingest.utils HttpUtil]))

(defn channel-client-proxy-config->props
[{:keys [:host :port :user :password]}]
(let [props (Properties.)]
(doto props
(.setProperty HttpUtil/USE_PROXY "true")
(.setProperty HttpUtil/PROXY_HOST host)
(.setProperty HttpUtil/PROXY_PORT (str port)))
(when user
(.setProperty props HttpUtil/HTTP_PROXY_USER user))
(when password
(.setProperty props HttpUtil/HTTP_PROXY_PASSWORD password))
props))

(defn channel-client-config->props
[params]
(let [keys [:user :url :private-key :port :host :scheme]
props (Properties.)
_ (doseq [k keys]
(.setProperty props (csk/->snake_case_string k) (str (get params k))))
proxy-props (when-some [proxy-cfg (:proxy params)]
(channel-client-proxy-config->props proxy-cfg))]
[props proxy-props]))

(defn mqtt-client-config
[{:keys [:host :port :clientid :topic :qos :clean-start :username :password]}]
Expand Down
11 changes: 5 additions & 6 deletions src/emqx/channel.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
(ns emqx.channel
(:require
[camel-snake-kebab.core :as csk]
[emqx.adapter :as adapter]
[emqx.log :as log])
(:import
[java.util Properties]
[net.snowflake.ingest.streaming
SnowflakeStreamingIngestClientFactory
OpenChannelRequest
Expand All @@ -19,10 +18,10 @@

(defn start-client
[{:keys [:client-name] :as params}]
(let [props (Properties.)
_ (doseq [[k v] params
:when (not= :client-name k)]
(.setProperty props (csk/->snake_case_string k) (str v)))
(let [[props proxy-props] (adapter/channel-client-config->props params)
_ (when proxy-props
(doseq [[k v] proxy-props]
(System/setProperty k v)))
c (.. (SnowflakeStreamingIngestClientFactory/builder client-name)
(setProperties props)
(build))]
Expand Down
9 changes: 8 additions & 1 deletion src/emqx/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
[schema.coerce :as coerce]
[schema.core :as s]))

(s/defschema ClientProxyConfig
{:host s/Str
:port s/Num
(s/optional-key :user) s/Str
(s/optional-key :password) s/Str})

(s/defschema ClientConfig
{:client-name s/Str
:user s/Str
:url s/Str
:private-key s/Str
:port s/Num
:host s/Str
:scheme s/Str})
:scheme s/Str
(s/optional-key :proxy) ClientProxyConfig})

(s/defschema MQTTConfig
{:host s/Str
Expand Down
50 changes: 50 additions & 0 deletions test/emqx/adapter_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,53 @@
:password "bar"})
:opts
(select-keys [:username :password]))))))

(deftest channel-client-config->props
(let [base-proxy-params {:host "proxy.host.com" :port 1234}
base-params {:client-name "client-name"
:user "client-user"
:url "snowflake.url.com"
:private-key "secret-private-key"
:port 443
:host "snowflake.host"
:scheme "https"}]
(testing "no proxy config"
(let [props (adapter/channel-client-config->props base-params)]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
nil]
props))))
(testing "proxy config, no username and password"
(let [props (adapter/channel-client-config->props (assoc base-params :proxy base-proxy-params))]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
{"http.useProxy" "true"
"http.proxyHost" "proxy.host.com"
"http.proxyPort" "1234"}]
props))))
(testing "proxy config, with username and password"
(let [props (-> base-params
(assoc :proxy base-proxy-params)
(assoc-in [:proxy :user] "proxy-user")
(assoc-in [:proxy :password] "proxy-password")
adapter/channel-client-config->props)]
(is (= [{"scheme" "https"
"port" "443"
"host" "snowflake.host"
"private_key" "secret-private-key"
"user" "client-user"
"url" "snowflake.url.com"}
{"http.useProxy" "true"
"http.proxyHost" "proxy.host.com"
"http.proxyPort" "1234"
"http.proxyUser" "proxy-user"
"http.proxyPassword" "proxy-password"}]
props))))))