From 38d21359f987cde0b63d1be701f3d90554ed2e60 Mon Sep 17 00:00:00 2001 From: Kevin Petremann Date: Sun, 25 Jun 2023 16:45:24 +0200 Subject: [PATCH] refacto: use qos 1 --- cmd/dht2mqtt/main.go | 2 -- internal/mqtt/mqtt.go | 26 ++++++++++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cmd/dht2mqtt/main.go b/cmd/dht2mqtt/main.go index b39efa3..2e30267 100644 --- a/cmd/dht2mqtt/main.go +++ b/cmd/dht2mqtt/main.go @@ -42,12 +42,10 @@ func main() { log.Info().Msg("connected to sensor") // Connect to MQTT server - log.Info().Msgf("connecting to MQTT server") publisher := mqtt.NewPublisher(cfg.MQTTUrl, cfg.MQTTTopicRoot, cfg.MQTTUsername, cfg.MQTTPassword) if err := publisher.Connect(); err != nil { log.Fatal().Err(err).Msg("publisher init error") } - log.Info().Msgf("connected to MQTT server") // Watch for metrics ch := make(chan sensor.Payload, 10) diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index b23ff60..69f687b 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -3,6 +3,7 @@ package mqtt import ( "encoding/json" "errors" + "fmt" "time" "github.com/rs/zerolog/log" @@ -16,7 +17,7 @@ const mqttTimeout = 5 type Publisher struct { client mqtt.Client - mqttUrl string // example: "tcp://10.2.0.166:1883" + mqttUrl string username string password string topicRoot string @@ -35,6 +36,14 @@ func NewPublisher(mqttUrl, topicRoot, username, password string) Publisher { } } +var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { + log.Info().Msg("connected to MQTT") +} + +var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { + log.Error().Err(err).Msg("connecttion to MQTT lost") +} + // Connect connects to the MQTT server; // // The Publisher must have been instantiated first using NewPublisher. @@ -47,10 +56,12 @@ func (p *Publisher) Connect() error { opts.SetPassword(p.password) } opts.SetAutoReconnect(true) + opts.SetOnConnectHandler(connectHandler) + opts.SetConnectionLostHandler(connectLostHandler) p.client = mqtt.NewClient(opts) - if t := p.client.Connect(); t.Wait() && t.Error() != nil { + if t := p.client.Connect(); t.WaitTimeout(10*time.Second) && t.Error() != nil { log.Fatal().Err(t.Error()).Send() } @@ -73,10 +84,17 @@ func (p *Publisher) Publish(sensorName string, payload interface{}) error { } log.Debug().Msgf("publishing new MQTT message: '%s %s'", p.topicRoot+sensorName, data) - t := p.client.Publish(p.topicRoot+sensorName, 0, false, data) - if ok := t.Wait(); !ok { + t := p.client.Publish(p.topicRoot+sensorName, 1, false, data) + ack := t.WaitTimeout(10 * time.Second) + + if !ack { return errors.New("MQTT server did not confirm receiving the message") } + if t.Error() != nil { + return fmt.Errorf("MQTT server publish error: %w", t.Error()) + } + + log.Debug().Msgf("published: '%s %s'", p.topicRoot+sensorName, data) return nil }