Skip to content

Commit

Permalink
Add support for rabbitmq broker (#56)
Browse files Browse the repository at this point in the history
* rename nats url

Signed-off-by: SammyOina <[email protected]>

* use pubsub interface

Signed-off-by: SammyOina <[email protected]>

* add id

Signed-off-by: SammyOina <[email protected]>

* remove debug statement

Signed-off-by: SammyOina <[email protected]>

* update make

Signed-off-by: SammyOina <[email protected]>

* update deps

Signed-off-by: SammyOina <[email protected]>

* remove subject from msg

Signed-off-by: SammyOina <[email protected]>

* add example file and change topic

Signed-off-by: SammyOina <[email protected]>

* update documentation

Signed-off-by: SammyOina <[email protected]>

* minor fixes

Signed-off-by: SammyOina <[email protected]>

---------

Signed-off-by: SammyOina <[email protected]>
  • Loading branch information
SammyOina authored Jul 28, 2023
1 parent 0cd3f4c commit a6e6d4f
Show file tree
Hide file tree
Showing 51 changed files with 10,413 additions and 84 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags)
COMMIT ?= $(shell git rev-parse HEAD)
TIME ?= $(shell date +%F_%T)

ifneq ($(MF_BROKER_TYPE),)
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
else
MF_BROKER_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -ldflags "-s -w \
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down
42 changes: 33 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Mainflux IoT Agent

![](https://github.com/mainflux/agent/workflows/Go/badge.svg)
![badge](https://github.com/mainflux/agent/workflows/Go/badge.svg)
![ci][ci]
![release][release]
[![go report card][grc-badge]][grc-url]
Expand All @@ -14,6 +14,7 @@
Mainflux IoT Agent is a communication, execution and SW management agent for Mainflux system.

## Install

Get the code:

```bash
Expand All @@ -22,11 +23,13 @@ cd $GOPATH/github.com/mainflux/agent
```

Make:

```bash
make
```

## Usage

Get Nats server and start it, by default it starts on port `4222`

```bash
Expand All @@ -44,6 +47,7 @@ MF_AGENT_BOOTSTRAP_KEY=<bootstrap_key> \
MF_AGENT_BOOTSTRAP_URL=http://localhost:9013/things/bootstrap \
build/mainflux-agent
```

or,if [Mainflux UI](https://github.com/mainflux/ui) is used,

```bash
Expand All @@ -54,9 +58,11 @@ build/mainflux-agent
```

### Config

Agent configuration is kept in `config.toml` if not otherwise specified with env var.

Example configuration:

```toml
[Agent]

Expand All @@ -83,7 +89,7 @@ Example configuration:
username = ""

[Agent.server]
nats_url = "localhost:4222"
broker_url = "localhost:4222"
port = "9999"

```
Expand All @@ -105,7 +111,7 @@ Environment:
| MF_AGENT_CONTROL_CHANNEL | Channel for sending controls, commands | |
| MF_AGENT_DATA_CHANNEL | Channel for data sending | |
| MF_AGENT_ENCRYPTION | Encryption | false |
| MF_AGENT_NATS_URL | Nats url | nats://localhost:4222 |
| MF_AGENT_BROKER_URL | Broker url | nats://localhost:4222 |
| MF_AGENT_MQTT_USERNAME | MQTT username, Mainflux thing id | |
| MF_AGENT_MQTT_PASSWORD | MQTT password, Mainflux thing key | |
| MF_AGENT_MQTT_SKIP_TLS | Skip TLS verification for MQTT | true |
Expand All @@ -122,12 +128,15 @@ Here `thing` is a Mainflux thing, and control channel from `channels` is used wi
(i.e. app needs to PUB/SUB on `/channels/<control_channel_id>/messages/req` and `/channels/<control_channel_id>/messages/res`).

## Sending commands to other services
You can send commands to other services that are subscribed on the same Nats server as Agent.
Commands are being sent via MQTT to topic:
* `channels/<control_channel_id>/messages/services/<service_name>/<subtopic>`

when messages is received Agent forwards them to Nats on subject:
* `commands.<service_name>.<subtopic>`.
You can send commands to other services that are subscribed on the same Broker as Agent.
Commands are being sent via MQTT to topic:

* `channels/<control_channel_id>/messages/services/<service_name>/<subtopic>`

when messages is received Agent forwards them to Broker on subject:

* `commands.<service_name>.<subtopic>`.

Payload is up to the application and service itself.

Expand All @@ -138,16 +147,26 @@ mosquitto_pub -u <thing_id> -P <thing_key> -t channels/<control_channel_id>/mess
```

## Heartbeat service

Services running on the same host can publish to `heartbeat.<service-name>.<service-type>` a heartbeat message.
Agent will keep a record on those service and update their `live` status.
If heartbeat is not received in 10 sec it marks it `offline`.
Upon next heartbeat service will be marked `online` again.

To test heartbeat run:

```bash
go run -tags <broker_name> ./examples/publish/main.go -s <broker_url> heartbeat.<service-name>.<service-type> "";
```

Broker names include: nats and rabbitmq.

To check services that are currently registered to agent you can:

```bash
curl -s -S X GET http://localhost:9999/services
```

```json
[
{
Expand Down Expand Up @@ -196,23 +215,28 @@ Check the output in terminal where you subscribed for results. You should see so


## How to save config via agent

Agent can be used to send configuration file for the [Export][export] service from cloud to gateway via MQTT.
Here is the example command:

```bash
mosquitto_pub -u <thing_id> -P <thing_key> -t channels/<control_channel_id>/messages/req -h localhost -p 1883 -m "[{\"bn\":\"1:\", \"n\":\"config\", \"vs\":\"<config_file_path>, <file_content_base64>\"}]"

```

* `<config_file_path>` - file path where to save contents
* `<file_content_base64>` - file content, base64 encoded marshaled toml.

Here is an example how to make payload for the command:

```go
b,_ := toml.Marshal(export.Config)
payload := base64.StdEncoding.EncodeToString(b)
```

Example payload:
```

```text
RmlsZSA9ICIuLi9jb25maWdzL2NvbmZpZy50b21sIgoKW2V4cF0KICBsb2dfbGV2ZWwgPSAiZGVidWciCiAgbmF0cyA9ICJuYXRzOi8vMTI3LjAuMC4xOjQyMjIiCiAgcG9ydCA9ICI4MTcwIgoKW21xdHRdCiAgY2FfcGF0aCA9ICJjYS5jcnQiCiAgY2VydF9wYXRoID0gInRoaW5nLmNydCIKICBjaGFubmVsID0gIiIKICBob3N0ID0gInRjcDovL2xvY2FsaG9zdDoxODgzIgogIG10bHMgPSBmYWxzZQogIHBhc3N3b3JkID0gImFjNmI1N2UwLTliNzAtNDVkNi05NGM4LWU2N2FjOTA4NjE2NSIKICBwcml2X2tleV9wYXRoID0gInRoaW5nLmtleSIKICBxb3MgPSAwCiAgcmV0YWluID0gZmFsc2UKICBza2lwX3Rsc192ZXIgPSBmYWxzZQogIHVzZXJuYW1lID0gIjRhNDM3ZjQ2LWRhN2ItNDQ2OS05NmI3LWJlNzU0YjVlOGQzNiIKCltbcm91dGVzXV0KICBtcXR0X3RvcGljID0gIjRjNjZhNzg1LTE5MDAtNDg0NC04Y2FhLTU2ZmI4Y2ZkNjFlYiIKICBuYXRzX3RvcGljID0gIioiCg==
```

Expand Down
23 changes: 12 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
nats "github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -92,6 +93,9 @@ var (
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

cfg, err := loadEnvConfig()
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err))
Expand All @@ -107,12 +111,11 @@ func main() {
logger.Error(fmt.Sprintf("Failed to load config: %s", err))
}

nc, err := nats.Connect(cfg.Server.NatsURL)
pubsub, err := brokers.NewPubSub(cfg.Server.BrokerURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
logger.Fatal(fmt.Sprintf("Failed to connect to Broker: %s %s", err, cfg.Server.BrokerURL))
}
defer nc.Close()
defer pubsub.Close()

mqttClient, err := connectToMQTTBroker(cfg.MQTT, logger)
if err != nil {
Expand All @@ -121,7 +124,7 @@ func main() {
}
edgexClient := edgex.NewClient(cfg.Edgex.URL, logger)

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
svc, err := agent.New(ctx, mqttClient, &cfg, edgexClient, pubsub, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
Expand All @@ -143,17 +146,15 @@ func main() {
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, nc, logger)
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
b := conn.NewBroker(svc, mqttClient, cfg.Channels.Control, pubsub, logger)

srv := &http.Server{
Addr: fmt.Sprintf(":%s", cfg.Server.Port),
Handler: api.MakeHandler(svc),
}

g.Go(func() error {
return b.Subscribe()
return b.Subscribe(ctx)
})

g.Go(func() error {
Expand All @@ -172,8 +173,8 @@ func main() {

func loadEnvConfig() (agent.Config, error) {
sc := agent.ServerConfig{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
BrokerURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
}
cc := agent.ChanConfig{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Expand Down
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ File = "config.toml"
username = "77d74527-7457-4dc2-9b36-01f01ce62726"

[server]
nats_url = "localhost:4222"
broker_url = "localhost:4222"
port = "9999"

[terminal]
Expand Down
64 changes: 64 additions & 0 deletions examples/publish/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"flag"
"log"
"os"

mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/nats-io/nats.go"
)

func main() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var showHelp = flag.Bool("h", false, "Show help message")

log.SetFlags(0)
flag.Usage = usage
flag.Parse()

if *showHelp {
showUsageAndExit(0)
}

args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}

subj, msg := args[0], []byte(args[1])

logger, err := mflog.New(os.Stdout, "info")
if err != nil {
log.Fatalf("failed to init logger: %s", err)
}

ps, err := brokers.NewPublisher(*urls)
if err != nil {
logger.Error(err.Error())
return
}
defer ps.Close()

if err := ps.Publish(context.Background(), subj, &messaging.Message{
Channel: subj,
Payload: msg,
}); err != nil {
logger.Error(err.Error())
return
}
logger.Info("Message published")
}

func usage() {
log.Printf("Usage: publish [-s server] <channel> <msg>\n")
flag.PrintDefaults()
}

func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
Expand Down Expand Up @@ -160,6 +163,7 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
Expand Down Expand Up @@ -197,6 +201,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/go-kit/kit/otelkit v0.42.
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -279,6 +285,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
Loading

0 comments on commit a6e6d4f

Please sign in to comment.