Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-28 - enabled messages paho persistence #27

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build/*
39 changes: 22 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ By default `Export` service looks for config file at [`../configs/config.toml`][
password = "<thing_password>"
ca = "ca.crt"
cert = "thing.crt"
mtls = "false"
mtls = false
priv_key = "thing.key"
retain = "false"
retain = false
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = "false"
url = "tcp://mainflux.com:1883"

Expand Down Expand Up @@ -110,21 +112,24 @@ Edit Mainflux [docker-compose.yml][docker-compose]. NATS section must look like
Service will look for `config.toml` first and if not found it will be configured with env variables and new config file specified with `MF_EXPORT_CONFIG_FILE` will be saved with values populated from env vars.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

| Variable | Description | Default |
|-------------------------------|---------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |
| Variable | Description | Default |
|-------------------------------|-----------------------------------------------------------------------------------------|-----------------------|
| MF_NATS_URL | Nats url | localhost:4222 |
| MF_EXPORT_MQTT_HOST | Mqtt url where to export | tcp://localhost:1883 |
| MF_EXPORT_MQTT_USERNAME | MQTT username, thing id in case of mainflux | |
| MF_EXPORT_MQTT_PASSWORD | MQTT password, thing key in case of mainflux | |
| MF_EXPORT_MQTT_CHANNEL | MQTT channel where to publish | |
| MF_EXPORT_MQTT_SKIP_TLS | Skip tls verification | true |
| MF_EXPORT_MQTT_MTLS | Use MTLS for authentication | false |
| MF_EXPORT_MQTT_CA | CA for tls | ca.crt |
| MF_EXPORT_MQTT_CLIENT_CERT | Client cert for authentication in case when MTLS = true | thing.crt |
| MF_EXPORT_MQTT_CLIENT_PK | Client key for authentication in case when MTLS = true | thing.key |
| MF_EXPORT_MQTT_QOS | MQTT QOS | 0 |
| MF_EXPORT_MQTT_RETAIN | MQTT retain | false |
| MF_EXPORT_MQTT_CLEAN_SESSION | MQTT clean session | false |
| MF_EXPORT_MQTT_PERSIST | persist MQTT QOS 2 pending messages in filesystem, to avoid data loss | false |
| MF_EXPORT_MQTT_PERSIST_DIR | directory in which pending messages will be saved if persist is enabled | false |
| MF_EXPORT_CONFIG_FILE | Configuration file | config.toml |

for values in environment variables to take effect make sure that there is no `MF_EXPORT_CONF` file.

Expand Down
81 changes: 49 additions & 32 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,25 @@ import (
)

const (
svcName = "export"
defNatsURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = "false"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"
svcName = "export"
defNatsURL = nats.DefaultURL
defLogLevel = "debug"
defPort = "8170"
defMqttHost = "tcp://localhost:1883"
defMqttUsername = ""
defMqttPassword = ""
defMqttChannel = ""
defMqttSkipTLSVer = "true"
defMqttMTLS = "false"
defMqttCA = "ca.crt"
defMqttQoS = "0"
defMqttRetain = "false"
defMqttCleanSession = "true"
defMqttPersist = "false"
defMqttPersistDir = "../mqtt_persist"
defMqttCert = "thing.cert"
defMqttPrivKey = "thing.key"
defConfigFile = "../configs/config.toml"

defCacheURL = "localhost:6379"
defCachePass = ""
Expand All @@ -54,18 +57,21 @@ const (
envLogLevel = "MF_EXPORT_LOG_LEVEL"
envPort = "MF_EXPORT_PORT"

envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
envMqttPassword = "MF_EXPORT_MQTT_PASSWORD"
envMqttChannel = "MF_EXPORT_MQTT_CHANNEL"
envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_EXPORT_MQTT_MTLS"
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"
envMqttHost = "MF_EXPORT_MQTT_HOST"
envMqttUsername = "MF_EXPORT_MQTT_USERNAME"
envMqttPassword = "MF_EXPORT_MQTT_PASSWORD"
envMqttChannel = "MF_EXPORT_MQTT_CHANNEL"
envMqttSkipTLSVer = "MF_EXPORT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_EXPORT_MQTT_MTLS"
envMqttCA = "MF_EXPORT_MQTT_CA"
envMqttQoS = "MF_EXPORT_MQTT_QOS"
envMqttRetain = "MF_EXPORT_MQTT_RETAIN"
envMqttCleanSession = "MF_EXPORT_MQTT_CLEAN_SESSION"
envMqttPersist = "MF_MQTT_PERSIST"
envMqttPersistDir = "MF_MQTT_PERSIST_FILE"
envMqttCert = "MF_EXPORT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_EXPORT_MQTT_CLIENT_PK"
envConfigFile = "MF_EXPORT_CONFIG_FILE"

envCacheURL = "MF_EXPORT_CACHE_URL"
envCachePass = "MF_EXPORT_CACHE_PASS"
Expand Down Expand Up @@ -145,6 +151,14 @@ func loadConfigs() (exp.Config, error) {
if err != nil {
mqttRetain = false
}
mqttCleanSession, err := strconv.ParseBool(mainflux.Env(envMqttCleanSession, defMqttCleanSession))
if err != nil {
mqttRetain = false
}
mqttPersist, err := strconv.ParseBool(mainflux.Env(envMqttPersist, defMqttPersist))
if err != nil {
mqttPersist = false
}

q, err := strconv.ParseInt(mainflux.Env(envMqttQoS, defMqttQoS), 10, 64)
if err != nil {
Expand All @@ -166,10 +180,13 @@ func loadConfigs() (exp.Config, error) {
Password: mainflux.Env(envMqttPassword, defMqttPassword),
Username: mainflux.Env(envMqttUsername, defMqttUsername),

Retain: mqttRetain,
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,
Retain: mqttRetain,
CleanSession: mqttCleanSession,
Persist: mqttPersist,
PersistDir: mainflux.Env(envMqttPersistDir, defMqttPersistDir),
QoS: QoS,
MTLS: mqttMTLS,
SkipTLSVer: mqttSkipTLSVer,

CAPath: mainflux.Env(envMqttCA, defMqttCA),
ClientCertPath: mainflux.Env(envMqttCert, defMqttCert),
Expand Down
3 changes: 3 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ File = "/configs/export/config.toml"
password = ""
qos = 0
retain = false
clean_session = true
persist = false
persist_dir = "../mqtt_persist"
skip_tls_ver = true
username = ""

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type MQTT struct {
MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"`
SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"`
Retain bool `json:"retain" toml:"retain" mapstructure:"retain"`
CleanSession bool `json:"clean_session" toml:"clean_session" mapstructure:"clean_session"`
Persist bool `json:"persist" toml:"persist" mapstructure:"persist"`
PersistDir string `json:"persist_dir" toml:"persist_dir" mapstructure:"persist_dir"`
QoS int `json:"qos" toml:"qos" mapstructure:"qos"`
CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"`
ClientCertPath string `json:"client_cert_path" toml:"client_cert_path" mapstructure:"client_cert_path"`
Expand Down
9 changes: 8 additions & 1 deletion pkg/export/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,18 @@ func (e *exporter) mqttConnect(conf config.Config, logger logger.Logger) (mqtt.C
opts := mqtt.NewClientOptions().
AddBroker(conf.MQTT.Host).
SetClientID(e.id).
SetCleanSession(true).
SetCleanSession(conf.MQTT.CleanSession).
SetAutoReconnect(true).
SetOnConnectHandler(e.conn).
SetConnectionLostHandler(e.lost)

if conf.MQTT.Persist {
store := mqtt.NewFileStore(conf.MQTT.PersistDir)
opts.SetStore(store)
//disable clean session because paho deletes stored messages when restarts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments start with the capital letter. Also, add one space after //.

opts.SetCleanSession(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to make this configurable?

Copy link
Contributor Author

@pricelessrabbit pricelessrabbit Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if user set persistent = true but clean session = true, when service restarts messages are lost.
so i make it configurable (clean_session) in the config, but if user sets persist = true, the clean session is set accordingly.

I can change implementation and make the settings completely independent, but in that case user have to manually set clean_session = false when he wants to persist messages in fs

}

if conf.MQTT.Username != "" && conf.MQTT.Password != "" {
opts.SetUsername(conf.MQTT.Username)
opts.SetPassword(conf.MQTT.Password)
Expand Down