Skip to content

Commit

Permalink
Merge pull request #5 from emqx/mqtt
Browse files Browse the repository at this point in the history
feat: change app from http listener to mqtt client
  • Loading branch information
thalesmg authored Sep 24, 2024
2 parents 24a1493 + 6f36cea commit 7c64964
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 580 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: docker

on: [push]
on:
push:
tags:
- "**"
workflow_dispatch: {}

jobs:
docker:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pom.xml.asc
/profile.json
/.lsp/
/config.edn
.rebel_readline_history
19 changes: 13 additions & 6 deletions config.example.edn
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
:private-key "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----\n"
:port 443
:host "ORG_ID-ACCOUNT_ID.snowflakecomputing.com"
:scheme "https"
:role "TESTROLE"}
:scheme "https"}
:channels [{:chan-name "my_channel"
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE"
:on-error :continue}
:on-error :continue
:mqtt {:host "emqx1.emqx.net"
:port 1883
:clientid "chan1"
:topic "t/sf/1"
:qos 1}}
{:chan-name "my_channel2"
:database "TESTDATABASE"
:schema "PUBLIC"
:table "TESTTABLE2"
:on-error :abort}]
:server {:host "0.0.0.0"
:port 9099}}
:on-error :abort
:mqtt {:host "emqx1.emqx.net"
:port 1883
:clientid "chan2"
:topic "t/sf/2"
:qos 1}}]}
17 changes: 7 additions & 10 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
{:paths ["src" "resources"]
:deps {
;; https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk
net.snowflake/snowflake-ingest-sdk {:mvn/version "2.0.1"}
net.snowflake/snowflake-ingest-sdk {:mvn/version "2.2.2"
:exclusions [org.slf4j/slf4j-api]}

;; https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3
;; org.eclipse.paho/org.eclipse.paho.client.mqttv3 {:mvn/version "1.2.5"}
clojurewerkz/machine_head {:mvn/version "1.0.0"}

io.pedestal/pedestal.jetty {:mvn/version "0.6.0"}
io.pedestal/pedestal.route {:mvn/version "0.6.0"}
io.pedestal/pedestal.service {:mvn/version "0.6.0"}
;; https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
;; org.slf4j/slf4j-simple {:mvn/version "2.0.7"}
com.fzakaria/slf4j-timbre {:mvn/version "0.4.0"}
;; https://mvnrepository.com/artifact/org.slf4j/slf4j-api
org.slf4j/slf4j-api {:mvn/version "2.0.16"}
com.taoensso/telemere {:mvn/version "1.0.0-beta22"}
com.taoensso/telemere-slf4j {:mvn/version "1.0.0-beta22"}

prismatic/schema {:mvn/version "1.4.1"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
com.taoensso/timbre {:mvn/version "6.2.1"}
org.clojure/tools.cli {:mvn/version "1.0.219"}
cheshire/cheshire {:mvn/version "5.11.0"}
funcool/cats {:mvn/version "2.4.2"}
Expand Down
12 changes: 7 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# prerequisites

1. Have an account on Snowflake
2. Clojure 1.11
2. Clojure 1.11+
3. Have an user with a role that has the sufficient privileges on all relevant objects.
- Such role must have:
- `USAGE` on the database.
Expand Down Expand Up @@ -38,10 +38,13 @@ clj
```
```clojure
(require '[emqx.http :as server])
(require '[emqx.config :as config])
(require '[emqx.channel :as chan])
(chan/start-client)
(server/start)
(let [{:keys [:app-name :client :channels]} (config/get-config!)]
(chan/start-client client)
(doseq [chan-params channels]
(chan/ensure-streaming-agent app-name chan-params)
(mqtt/start-client chan-params)))
```
## uberjar
Expand All @@ -50,7 +53,6 @@ clj
clj -T:build uber
java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar
java -jar target/emqx-snowflake-proxy-0.0.0-standalone.jar -D taoensso.timbre.config.edn='{:min-level :info}'
```
## docker
Expand Down
1 change: 0 additions & 1 deletion resources/taoensso.timbre.config.edn

This file was deleted.

64 changes: 20 additions & 44 deletions src/emqx/adapter.clj
Original file line number Diff line number Diff line change
@@ -1,49 +1,25 @@
(ns emqx.adapter
(:require
[cats.monad.either :refer [left right]]
[schema.core :as s]))
[cheshire.core :as json]))

(s/defschema UpsertChannelWire
{:chan-name s/Str
:database s/Str
:schema s/Str
:table s/Str
:on-error (s/enum :continue :abort)})
(defn mqtt-client-config
[{:keys [:host :port :clientid :topic :qos :username :password]}]
{:uri (str "tcp://" host ":" port)
:clientid clientid
:topic topic
:qos qos
:opts (cond-> {:auto-reconnect true}
username (assoc :username username)
password (assoc :password password))})

(defn upsert-channel-in
[{:keys [:json-params :path-params]}]
(let [{:keys [:chan-name]} path-params
{:keys [:database :schema :table :on_error]} json-params
on-error (keyword on_error)
chan-params {:chan-name chan-name
:database database
:schema schema
:table table
:on-error on-error}
errors (s/check UpsertChannelWire chan-params)]
(if errors
(left errors)
(right chan-params))))

(s/defschema JsonVal
(s/maybe
(s/conditional
string? s/Str
number? s/Num
boolean? s/Bool
map? {s/Str (s/recursive #'JsonVal)}
vector? [(s/recursive #'JsonVal)])))

(s/defschema InsertRowsWire
;; TODO: should use more lax value types???
{:rows [(s/conditional
(every-pred map? not-empty) {s/Str JsonVal})]})

(defn insert-rows-in
[{:keys [:json-params]}]
(let [rows (get json-params "rows")
insert-rows-params {:rows rows}
errors (s/check InsertRowsWire insert-rows-params)]
(if errors
(left errors)
(right insert-rows-params))))
(defn channel-data-in
"Parses the incoming payload and expects it to be a JSON encoded object"
[^bytes payload]
(try
(let [decoded (json/parse-string (String. payload "UTF-8"))]
(if (map? decoded)
(right decoded)
(left "bad input: should be a single JSON object")))
(catch Exception e
(left (str "bad input: failure parsing json: " (.getMessage e))))))
54 changes: 32 additions & 22 deletions src/emqx/channel.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns emqx.channel
(:require
[camel-snake-kebab.core :as csk]
[taoensso.timbre :as log])
[emqx.log :as log])
(:import
[java.util Properties]
[net.snowflake.ingest.streaming
Expand Down Expand Up @@ -91,15 +91,17 @@
(try
(.getLatestCommittedOffsetToken chan)
(catch SFException e
(log/error :msg ::get-latest-commited-offset-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e))
(log/error!
::get-latest-commited-offset-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e))
(.printStackTrace e)
nil)
(catch Exception e
(log/error :msg ::get-latest-commited-offset-exception
:exception (.getMessage e))
(log/error!
::get-latest-commited-offset-exception
:exception (.getMessage e))
nil)))

;; TODO: return partial success count???
Expand All @@ -113,29 +115,36 @@
(map (fn [e]
{:row-index (.getRowIndex e)
:message (.getMessage e)})))]
(log/error :msg ::insert-row-errors :errors errors :offset-token offset-token)
(log/error!
::insert-row-errors
:errors errors
:offset-token offset-token)
{:errors errors})
(do
(log/debug :msg ::insert-rows-success :offset-token offset-token)
(log/debug!
::insert-rows-success
:offset-token offset-token)
{:errors nil})))
(catch SFException e
(log/error :msg ::insert-row-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e)
:offset-token offset-token)
(log/error!
::insert-row-exception
:exception (.getMessage e)
:cause (.getCause e)
:vendor-code (.getVendorCode e)
:offset-token offset-token)
(.printStackTrace e)
{:errors [(.getMessage e)]
:vendor-code (.getVendorCode e)})
(catch Exception e
(log/error :msg ::insert-row-exception
:exception (.getMessage e)
:offset-token offset-token)
(log/error!
::insert-row-exception
:exception (.getMessage e)
:offset-token offset-token)
{:errors [(.getMessage e)]})))

(defn- insert-rows
[{:keys [:chan :n] :as state} rows reply-promise]
(log/debug :msg ::insert-rows-enter :rows rows)
(log/debug! ::insert-rows-enter :rows rows)
(let [row-count (count rows)
offset-token (str (+ n row-count))
response (do-insert-rows chan rows offset-token)]
Expand All @@ -160,10 +169,11 @@
(if-let [channel-agent (@channels chan-name)]
(let [{:keys [:chan]} @channel-agent]
(.close chan)
(log/info :msg ::channel-closed
:chan-name chan-name
:closed? (.isClosed chan)
:valid? (.isValid chan))
(log/info!
::channel-closed
:chan-name chan-name
:closed? (.isClosed chan)
:valid? (.isValid chan))
(swap! channels dissoc chan-name)
(deliver reply-promise true))
(deliver reply-promise true))
Expand Down
49 changes: 40 additions & 9 deletions src/emqx/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,53 @@
:private-key s/Str
:port s/Num
:host s/Str
:scheme s/Str
:role s/Str})
:scheme s/Str})

(s/defschema MQTTConfig
{:host s/Str
:port s/Num
:clientid s/Str
:topic s/Str
:qos (s/enum 0 1 2)
(s/optional-key :username) s/Str
(s/optional-key :password) s/Str})

(s/defschema ChannelConfig
{:chan-name s/Str
:database s/Str
:schema s/Str
:table s/Str
:on-error (s/enum :continue :abort)})

(s/defschema ServerConfig
{:host s/Str
:port s/Num})
:on-error (s/enum :continue :abort)
:mqtt MQTTConfig})

(s/defschema EMQXConfig
{:client ClientConfig
:channels [ChannelConfig]
:server ServerConfig})
:channels [ChannelConfig]})

(defn- duplicates
[coll]
(->> coll
frequencies
(filter (fn [[_ n]] (> n 1)))
(map (fn [[x _]] x))))

(defn- validate-unique!
[{:keys [:channels]} ks what]
(let [all-things (map (fn [chan-config]
(get-in chan-config ks))
channels)
duplicate-things (duplicates all-things)]
(when (seq duplicate-things)
(throw (RuntimeException.
(format "Duplicate %s found: %s" what (pr-str duplicate-things)))))))

(defn- validate-unique-clientids!
[channels]
(validate-unique! channels [:mqtt :clientid] "clientids"))

(defn- validate-unique-chan-names!
[channels]
(validate-unique! channels [:chan-name] "channel names"))

(defn get-config!
"Get the configurations for startup.
Expand All @@ -40,6 +69,8 @@
(edn/read (java.io.PushbackReader. r)))
coercer (coerce/coercer! EMQXConfig coerce/keyword-enum-matcher)
config (coercer params)
_ (validate-unique-clientids! config)
_ (validate-unique-chan-names! config)
app-name (or (System/getenv "APP_NAME")
(str (java.util.UUID/randomUUID)))]
(assoc config :app-name app-name)))
Loading

0 comments on commit 7c64964

Please sign in to comment.