Skip to content

Commit

Permalink
feat: upgrade confx to make mqtt reconnecting when lost conn
Browse files Browse the repository at this point in the history
  • Loading branch information
saitofun committed Jul 18, 2024
1 parent b366646 commit 1cbe1ed
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ cmd/sequencer/config/local.yml
cmd/sequencer/project.json
cmd/replication/config/local.yml
cmd/replication/replication-monitor
cmd/simulator/config/local.yml
cmd/simulator/simulator

37 changes: 37 additions & 0 deletions cmd/simulator/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#NAME=$(shell basename $$PWD)
NAME=simulator
FEATURE=$(shell git rev-parse --abbrev-ref HEAD)
VERSION=$(shell git describe --tags --always)
COMMITID=$(shell git rev-parse --short --show HEAD)
DATE=$(shell TZ=Asia/Shanghai date +%Y%m%d%H%M%S)

VERSION_PATH=main

LDFLAGS="-s -w \
-X ${VERSION_PATH}.Name=${NAME} \
-X ${VERSION_PATH}.Feature=${FEATURE} \
-X ${VERSION_PATH}.Version=${VERSION} \
-X ${VERSION_PATH}.CommitID=${COMMITID} \
-X ${VERSION_PATH}.Date=${DATE} "


STATIC_LDFLAGS="-linkmode 'external' -extldflags '-static' -s -w \
-X ${VERSION_PATH}.Name=${NAME} \
-X ${VERSION_PATH}.Feature=${FEATURE} \
-X ${VERSION_PATH}.Version=${VERSION} \
-X ${VERSION_PATH}.CommitID=${COMMITID} \
-X ${VERSION_PATH}.Date=${DATE} "


build: clean
CGO_ENABLE=0 go build -ldflags ${LDFLAGS} -o ${NAME}

build_static: clean
CGO_ENABLE=0 go build -ldflags ${STATIC_LDFLAGS} -o ${NAME}

run: build
./${NAME}

clean:
@rm -rf ${NAME}

11 changes: 11 additions & 0 deletions cmd/simulator/config/default.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SIMULATOR__Logger_Level: DEBUG
SIMULATOR__MqttBroker_Cert_CA: ""
SIMULATOR__MqttBroker_Cert_Crt: ""
SIMULATOR__MqttBroker_Cert_Key: ""
SIMULATOR__MqttBroker_Keepalive: "3000000000"
SIMULATOR__MqttBroker_QoS: ONCE
SIMULATOR__MqttBroker_RetainPublish: "false"
SIMULATOR__MqttBroker_Retry_Interval: "3000000000"
SIMULATOR__MqttBroker_Retry_Repeats: "3"
SIMULATOR__MqttBroker_Server: tcp://127.0.0.1:1883
SIMULATOR__MqttBroker_Timeout: "10000000000"
141 changes: 141 additions & 0 deletions cmd/simulator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package main

import (
"encoding/json"
"log/slog"
"os"
"os/signal"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"github.com/xoctopus/confx/confapp"
"github.com/xoctopus/confx/confmws/confmqtt"
"github.com/xoctopus/x/misc/must"

"github.com/machinefi/sprout-pebble-sequencer/pkg/middlewares/logger"
)

var (
Name = "simulator"
Feature string
Version string
CommitID string
Date string

app *confapp.AppCtx
config = &struct {
MqttBroker *confmqtt.Broker
Logger *logger.Logger
Devices []string
}{
MqttBroker: &confmqtt.Broker{},
Logger: &logger.Logger{Level: slog.LevelDebug},
}
)

func init() {
meta := confapp.Meta{
Name: Name,
Feature: Feature,
Version: Version,
CommitID: CommitID,
Date: Date,
}
app = confapp.NewAppContext(
confapp.WithBuildMeta(meta),
confapp.WithMainRoot("."),
confapp.WithMainExecutor(Main),
)

app.Conf(config)
}

func Main() error {
if len(config.Devices) == 0 {
return nil
}

clients := make([]string, 0, len(config.Devices)*2)

for _, imei := range config.Devices {
go func(imei string) {
clients = append(clients, PubSubQuery(imei)...)
}(imei)
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
_ = <-sig

for _, clientID := range clients {
config.MqttBroker.CloseByClientID(clientID)
}

return nil
}

func main() {
if err := app.Command.Execute(); err != nil {
app.PrintErrln(err)
}
os.Exit(-1)
}

func PubSubQuery(imei string) []string {
broker := config.MqttBroker
logger := config.Logger
clients := make([]string, 2)

{
topic := "backend/" + imei + "/status"
client, err := broker.NewClient(uuid.NewString(), topic)
must.NoErrorWrap(err, "failed to new sub mqtt client: [topic %s]", topic)
clients[1] = client.ID()
sequence := 0
err = client.Subscribe(func(_ mqtt.Client, message mqtt.Message) {
rsp := &struct {
Status int32 `json:"status"`
Proposer string `json:"proposer,omitempty"`
Firmware string `json:"firmware,omitempty"`
URI string `json:"uri,omitempty"`
Version string `json:"version,omitempty"`
ServerMeta string `json:"server_meta"`
}{}
pl := message.Payload()
if err = json.Unmarshal(pl, rsp); err != nil {
logger.Error(err, "failed to unmarshal response", "seq", sequence, "topic", topic, "response", string(pl))
} else {
logger.Info("sub", "seq", sequence, "data", rsp, "topic", topic)
}
sequence++
})
if err != nil {
logger.Error(err, "failed to subscribing", "topic", topic)
panic(err)
}
logger.Info("subscribing started", "topic", topic)
}

go func() {
topic := "device/" + imei + "/query"
client, err := broker.NewClient(uuid.NewString(), topic)
must.NoErrorWrap(err, "failed to new pub mqtt client: [topic %s]", topic)

clients[0] = client.ID()
logger.Info("publishing started", "topic", topic)
sequence := 0
for {
err := client.Publish([]byte{})
if err != nil {
logger.Error(err, "failed to publish", "seq", sequence, "topic", topic)
} else {
logger.Info("pub", "seq", sequence, "topic", topic)
sequence++
}
time.Sleep(time.Second * 5)
}
}()

return clients
}
37 changes: 26 additions & 11 deletions doc/migration_reocrd.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
| | | 351358813281992 | |
| | | 351358813065718 | |
| | | 351358813357594 | |
| 2024-07-17 | 2024-07-17 07:00:00(planed) | 350916066753800 | missing blockchain event |
| 2024-07-17 | 2024-07-17 07:30:00(migrated) | 350916066753800 | missing blockchain event |
| | | 350916066755219 | |
| | | 350916067051147 | |
| | | 350916067066178 | |
Expand All @@ -32,7 +32,7 @@
| | | 351358813281182 | |
| | | 351358813374102 | |
| | | 351358815441396 | |
| | | %0 | |
| | | %0 | last imei is '0' |

migrate steps:

Expand Down Expand Up @@ -97,28 +97,43 @@ psql -h $OLD_PEBBLE_DB -U $OLD_PEBBLE_DB_USER -d $OLD_PEBBLE_DB_NAME -c \
```shell
PGPASSWORD=$OLD_PEBBLE_DB_PASSWORD psql -h $OLD_PEBBLE_DB -U $OLD_PEBBLE_DB_USER -d $OLD_PEBBLE_DB_NAME -c "\
COPY ( \
SELECT (
SELECT
id,name,owner,address,status,avatar,config,real_firmware,total_gas,bulk_upload,data_channel,upload_period,bulk_upload_sampling_cnt,bulk_upload_sampling_freq,beep,state,type,configurable \
) \
FROM \
device \
WHERE
id not in('103381234567407','351358810263647','351358813375182','350916067079072','350916067070162','350916067099906','350916067070345','351358810283462','351358813281760','351358813282131','350916067066608','351358813281992','351358813065718','351358813357594')
WHERE \
id NOT IN ( \
'103381234567407','351358810263647','351358813375182','351358810263514','350916067079072', \
'350916067070162','350916067099906','350916067070345','351358810283462','351358813281760', \
'351358813282131','350916067066608','351358813281992','351358813065718','351358813357594', \
'350916066753800','350916066755219','350916067051147','350916067066178','350916067066269', \
'350916067066673','350916067070824','350916067094295','351358810263407','351358813083174', \
'351358813094361','351358813280705','351358813281182','351358813374102','351358815441396' \
)
AND \
id NOT LIKE '%0' \
ORDER BY \
id \
) TO STDOUT WITH CSV HEADER" >device_old.csv

PGPASSWORD=$NEW_PEBBLE_DB_PASSWORD psql -h $NEW_PEBBLE_DB -U $NEW_PEBBLE_DB_USER -d $NEW_PEBBLE_DB_NAME -c "\
COPY ( \
SELECT (
SELECT
id,name,owner,address,status,avatar,config,real_firmware,total_gas,bulk_upload,data_channel,upload_period,bulk_upload_sampling_cnt,bulk_upload_sampling_freq,beep,state,type,configurable \
) \
FROM \
device \
WHERE
id not in('103381234567407','351358810263647','351358813375182','350916067079072','350916067070162','350916067099906','350916067070345','351358810283462','351358813281760','351358813282131','350916067066608','351358813281992','351358813065718','351358813357594')
WHERE \
id NOT IN ( \
'103381234567407','351358810263647','351358813375182','351358810263514','350916067079072', \
'350916067070162','350916067099906','350916067070345','351358810283462','351358813281760', \
'351358813282131','350916067066608','351358813281992','351358813065718','351358813357594', \
'350916066753800','350916066755219','350916067051147','350916067066178','350916067066269', \
'350916067066673','350916067070824','350916067094295','351358810263407','351358813083174', \
'351358813094361','351358813280705','351358813281182','351358813374102','351358815441396' \
)
AND \
id NOT LIKE '%0' \
ORDER BY \
id \
) TO STDOUT WITH CSV HEADER" >device_new.csv

```
22 changes: 9 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
github.com/xhd2015/xgo/runtime v1.0.43
github.com/xoctopus/confx v0.1.1
github.com/xoctopus/datatypex v0.0.6
github.com/xoctopus/x v0.0.18
golang.org/x/crypto v0.24.0
github.com/xhd2015/xgo/runtime v1.0.45
github.com/xoctopus/confx v0.1.2
github.com/xoctopus/datatypex v0.0.7
github.com/xoctopus/x v0.0.20
golang.org/x/crypto v0.25.0
google.golang.org/protobuf v1.34.2
gorm.io/datatypes v1.2.1
gorm.io/driver/postgres v1.5.9
Expand Down Expand Up @@ -63,8 +63,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand All @@ -74,7 +73,6 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand All @@ -86,7 +84,6 @@ require (
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/modood/table v0.0.0-20220527013332-8d47e76dad33 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
Expand All @@ -95,18 +92,17 @@ require (
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/smarty/assertions v1.16.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.5.6 // indirect
Expand Down
Loading

0 comments on commit 1cbe1ed

Please sign in to comment.