Skip to content

Commit

Permalink
* enabled messages paho persistence
Browse files Browse the repository at this point in the history
* enabled persistent session to avoid paho to discard pending message  if service restarted
* updated readme

Signed-off-by: PricelessRabbit <[email protected]>
  • Loading branch information
pricelessrabbit committed Nov 8, 2020
1 parent 32ca230 commit 917a81b
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build/*
38 changes: 21 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ By default `Export` service looks for config file at [`../configs/config.toml`][
password = "<thing_password>"
ca = "ca.crt"
cert = "thing.crt"
mtls = "false"
mtls = false
priv_key = "thing.key"
retain = "false"
retain = false
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = "false"
url = "tcp://mainflux.com:1883"

Expand Down Expand Up @@ -110,21 +112,23 @@ Edit Mainflux [docker-compose.yml][docker-compose]. NATS section must look like
Service will look for `config.toml` first and if not found it will be configured with env variables and new config file specified with `MF_EXPORT_CONFIG_FILE` will be saved with values populated from env vars.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

| Variable | Description | Default |
|-------------------------------|---------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |
| Variable | Description | Default |
|-------------------------------|-----------------------------------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false |
| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |

for values in environment variables to take effect make sure that there is no `MF_EXPORT_CONF` file.

Expand Down
14 changes: 12 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const (
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = false
defMqttRetain = "false"
defMqttPersist = "false"
defMqttPersistDir = "../mqtt_persist"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"
Expand All @@ -63,6 +65,8 @@ const (
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttPersist = "MF_MQTT_PERSIST"
envMqttPersistDir = "MF_MQTT_PERSIST_FILE"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"
Expand Down Expand Up @@ -141,10 +145,14 @@ func loadConfigs() (exp.Config, error) {
if err != nil {
mqttMTLS = false
}
mqttRetain, err := strconv.ParseBool(mainflux.Env(envMqttMTLS, defMqttMTLS))
mqttRetain, err := strconv.ParseBool(mainflux.Env(envMqttRetain, defMqttRetain))
if err != nil {
mqttRetain = false
}
mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist))
if err != nil {
mqttPersist = false
}

q, err := strconv.ParseInt(mainflux.Env(envMqttQoS, defMqttQoS), 10, 64)
if err != nil {
Expand All @@ -167,6 +175,8 @@ func loadConfigs() (exp.Config, error) {
Username: mainflux.Env(envMqttUsername, defMqttUsername),

Retain: mqttRetain,
Persist: mqttPersist,
PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir),
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,
Expand Down
2 changes: 2 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ File = "/configs/export/config.toml"
password = ""
qos = 0
retain = false
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = true
username = ""

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MQTT struct {
MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"`
SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"`
Retain bool `json:"retain" toml:"retain" mapstructure:"retain"`
Persist bool `json:"persist" toml:"persist" mapstructure:"persist"`
PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"`
QoS int `json:"qos" toml:"qos" mapstructure:"qos"`
CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"`
ClientCertPath string `json:"client_cert_path" toml:"client_cert_path" mapstructure:"client_cert_path"`
Expand Down
7 changes: 6 additions & 1 deletion pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,16 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C
opts := mqtt.NewClientOptions().
AddBroker(conf.MQTT.Host).
SetClientID(e.id).
SetCleanSession(true).
SetCleanSession(false).
SetAutoReconnect(true).
SetOnConnectHandler(e.conn).
SetConnectionLostHandler(e.lost)

if conf.MQTT.Persist {
store := mqtt.NewFileStore(conf.MQTT.PersistDir)
opts.SetStore(store)
}

if conf.MQTT.Username != "" && conf.MQTT.Password != "" {
opts.SetUsername(conf.MQTT.Username)
opts.SetPassword(conf.MQTT.Password)
Expand Down

0 comments on commit 917a81b

Please sign in to comment.