Skip to content

Commit

Permalink
support device data receive (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhiran authored Nov 4, 2024
1 parent c2ccd7a commit cc4b6fb
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 41 deletions.
246 changes: 240 additions & 6 deletions api/http.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
package api

import (
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"math/big"
"net/http"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
goproto "google.golang.org/protobuf/proto"

"github.com/iotexproject/pebble-server/contract/ioid"
"github.com/iotexproject/pebble-server/contract/ioidregistry"
"github.com/iotexproject/pebble-server/db"
"github.com/iotexproject/pebble-server/proto"
)

type errResp struct {
Expand All @@ -35,9 +49,17 @@ type queryResp struct {
Version string `json:"version,omitempty"`
}

type receiveReq struct {
DeviceID string `json:"deviceID" binding:"required"`
Payload string `json:"payload" binding:"required"`
Signature string `json:"signature,omitempty" binding:"required"`
}

type httpServer struct {
engine *gin.Engine
db *db.DB
engine *gin.Engine
db *db.DB
ioidInstance *ioid.Ioid
ioidRegistryInstance *ioidregistry.Ioidregistry
}

func (s *httpServer) query(c *gin.Context) {
Expand Down Expand Up @@ -121,18 +143,230 @@ func (s *httpServer) query(c *gin.Context) {
}

func (s *httpServer) receive(c *gin.Context) {
req := &receiveReq{}
if err := c.ShouldBindJSON(req); err != nil {
slog.Error("failed to bind request", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid request payload")))
return
}

sigStr := req.Signature
req.Signature = ""

reqJson, err := json.Marshal(req)
if err != nil {
slog.Error("failed to marshal request into json format", "error", err)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to process request data")))
return
}

sig, err := hexutil.Decode(sigStr)
if err != nil {
slog.Error("failed to decode signature from hex format", "signature", sigStr, "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid signature format")))
return
}

h := crypto.Keccak256Hash(reqJson)
sigpk, err := crypto.SigToPub(h.Bytes(), sig)
if err != nil {
slog.Error("failed to recover public key from signature", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid signature; could not recover public key")))
return
}

owner := crypto.PubkeyToAddress(*sigpk)

d, err := s.db.Device(req.DeviceID)
if err != nil {
slog.Error("failed to query device", "error", err, "device_id", req.DeviceID)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device")))
return
}
if d != nil && d.Owner != owner.String() {
slog.Error("failed to check device permission in db", "device_id", req.DeviceID)
c.JSON(http.StatusForbidden, newErrResp(errors.New("no permission to access the device")))
return
}
if d == nil {
deviceAddr := common.HexToAddress(strings.TrimPrefix(req.DeviceID, "did:io:"))
tokenID, err := s.ioidRegistryInstance.DeviceTokenId(nil, deviceAddr)
if err != nil {
slog.Error("failed to query device token id", "error", err, "device_id", req.DeviceID)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device token id")))
return
}
deviceOwner, err := s.ioidInstance.OwnerOf(nil, tokenID)
if err != nil {
slog.Error("failed to query device owner", "error", err, "device_id", req.DeviceID, "token_id", tokenID.Uint64())
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device owner")))
return
}

if !bytes.Equal(deviceOwner.Bytes(), owner.Bytes()) {
slog.Error("failed to check device permission in contract", "device_id", req.DeviceID, "device_owner", deviceOwner.String(), "signature_owner", owner.String())
c.JSON(http.StatusForbidden, newErrResp(errors.New("no permission to access the device")))
return
}

dev := &db.Device{
ID: req.DeviceID,
Owner: owner.String(),
Address: deviceAddr.String(),
Status: db.CONFIRM,
Proposer: owner.String(),
OperationTimes: db.NewOperationTimes(),
}
if err := s.db.UpsertDevice(dev); err != nil {
slog.Error("failed to upsert device", "error", err, "device_id", req.DeviceID)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to upsert device")))
return
}
d = dev
}

payload, err := base64.RawURLEncoding.DecodeString(req.Payload)
if err != nil {
slog.Error("failed to decode base64 data", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to decode base64 data")))
return
}
binPkg, data, err := s.unmarshalPayload(payload)
if err != nil {
slog.Error("failed to unmarshal payload", "error", err)
c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to unmarshal payload")))
return
}
if err := s.handle(binPkg, data, d); err != nil {
slog.Error("failed to handle payload data", "error", err)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to handle payload data")))
return
}
c.Status(http.StatusOK)
}

func (s *httpServer) unmarshalPayload(payload []byte) (*proto.BinPackage, goproto.Message, error) {
pkg := &proto.BinPackage{}
if err := goproto.Unmarshal(payload, pkg); err != nil {
return nil, nil, errors.Wrap(err, "failed to unmarshal proto")
}

var d goproto.Message
switch t := pkg.GetType(); t {
case proto.BinPackage_CONFIG:
d = &proto.SensorConfig{}
case proto.BinPackage_STATE:
d = &proto.SensorState{}
case proto.BinPackage_DATA:
d = &proto.SensorData{}
default:
return nil, nil, errors.Errorf("unexpected senser package type: %d", t)
}

err := goproto.Unmarshal(pkg.GetData(), d)
return pkg, d, errors.Wrapf(err, "failed to unmarshal senser package")
}

func (s *httpServer) handle(binpkg *proto.BinPackage, data goproto.Message, d *db.Device) (err error) {
switch pkg := data.(type) {
case *proto.SensorConfig:
err = s.handleConfig(d, pkg)
case *proto.SensorState:
err = s.handleState(d, pkg)
case *proto.SensorData:
err = s.handleSensor(binpkg, d, pkg)
}
return errors.Wrapf(err, "failed to handle %T", data)
}

func (s *httpServer) handleConfig(dev *db.Device, pkg *proto.SensorConfig) error {
err := s.db.UpdateByID(dev.ID, map[string]any{
"bulk_upload": int32(pkg.GetBulkUpload()),
"data_channel": int32(pkg.GetDataChannel()),
"upload_period": int32(pkg.GetUploadPeriod()),
"bulk_upload_sampling_cnt": int32(pkg.GetBulkUploadSamplingCnt()),
"bulk_upload_sampling_freq": int32(pkg.GetBulkUploadSamplingFreq()),
"beep": int32(pkg.GetBeep()),
"real_firmware": pkg.GetFirmware(),
"configurable": pkg.GetDeviceConfigurable(),
"updated_at": time.Now(),
})
return errors.Wrapf(err, "failed to update device config: %s", dev.ID)
}

func (s *httpServer) handleState(dev *db.Device, pkg *proto.SensorState) error {
err := s.db.UpdateByID(dev.ID, map[string]any{
"state": int32(pkg.GetState()),
"updated_at": time.Now(),
})
return errors.Wrapf(err, "failed to update device state: %s %d", dev.ID, int32(pkg.GetState()))
}

func (s *httpServer) handleSensor(binpkg *proto.BinPackage, dev *db.Device, pkg *proto.SensorData) error {
snr := float64(pkg.GetSnr())
if snr > 2700 {
snr = 100
} else if snr < 700 {
snr = 25
} else {
snr, _ = big.NewFloat((snr-700)*0.0375 + 25).Float64()
}

vbat := (float64(pkg.GetVbat()) - 320) / 90
if vbat > 1 {
vbat = 100
} else if vbat < 0.1 {
vbat = 0.1
} else {
vbat *= 100
}

gyroscope, _ := json.Marshal(pkg.GetGyroscope())
accelerometer, _ := json.Marshal(pkg.GetAccelerometer())

dr := &db.DeviceRecord{
ID: dev.ID + "-" + fmt.Sprintf("%d", binpkg.GetTimestamp()),
Imei: dev.ID,
Timestamp: int64(binpkg.GetTimestamp()),
Signature: hex.EncodeToString(append(binpkg.GetSignature(), 0)),
Operator: "",
Snr: strconv.FormatFloat(snr, 'f', 1, 64),
Vbat: strconv.FormatFloat(vbat, 'f', 1, 64),
Latitude: decimal.NewFromInt32(pkg.GetLatitude()).Div(decimal.NewFromInt32(10000000)).StringFixed(7),
Longitude: decimal.NewFromInt32(pkg.GetLongitude()).Div(decimal.NewFromInt32(10000000)).StringFixed(7),
GasResistance: decimal.NewFromInt32(int32(pkg.GetGasResistance())).Div(decimal.NewFromInt32(100)).StringFixed(2),
Temperature: decimal.NewFromInt32(pkg.GetTemperature()).Div(decimal.NewFromInt32(100)).StringFixed(2),
Temperature2: decimal.NewFromInt32(int32(pkg.GetTemperature2())).Div(decimal.NewFromInt32(100)).StringFixed(2),
Pressure: decimal.NewFromInt32(int32(pkg.GetPressure())).Div(decimal.NewFromInt32(100)).StringFixed(2),
Humidity: decimal.NewFromInt32(int32(pkg.GetHumidity())).Div(decimal.NewFromInt32(100)).StringFixed(2),
Light: decimal.NewFromInt32(int32(pkg.GetLight())).Div(decimal.NewFromInt32(100)).StringFixed(2),
Gyroscope: string(gyroscope),
Accelerometer: string(accelerometer),
OperationTimes: db.NewOperationTimes(),
}
err := s.db.CreateDeviceRecord(dr)
return errors.Wrapf(err, "failed to create senser data: %s", dev.ID)
}

func Run(db *db.DB, address string) error {
func Run(db *db.DB, address string, client *ethclient.Client, ioidAddr, ioidRegistryAddr common.Address) error {
ioidInstance, err := ioid.NewIoid(ioidAddr, client)
if err != nil {
return errors.Wrap(err, "failed to new ioid contract instance")
}
ioidRegistryInstance, err := ioidregistry.NewIoidregistry(ioidRegistryAddr, client)
if err != nil {
return errors.Wrap(err, "failed to new ioid registry contract instance")
}
s := &httpServer{
engine: gin.Default(),
db: db,
engine: gin.Default(),
db: db,
ioidInstance: ioidInstance,
ioidRegistryInstance: ioidRegistryInstance,
}

s.engine.GET("/device", s.query)
s.engine.POST("/device", s.receive)

err := s.engine.Run(address)
err = s.engine.Run(address)
return errors.Wrap(err, "failed to start http server")
}
14 changes: 9 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"

"github.com/iotexproject/pebble-server/api"
Expand All @@ -29,23 +30,26 @@ func main() {
log.Fatal(errors.Wrap(err, "failed to new db"))
}

client, err := ethclient.Dial(cfg.ChainEndpoint)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to dial chain endpoint"))
}

if err := monitor.Run(
&monitor.Handler{
ScannedBlockNumber: db.ScannedBlockNumber,
UpsertScannedBlockNumber: db.UpsertScannedBlockNumber,
UpsertProjectMetadata: db.UpsertApp,
},
&monitor.ContractAddr{
Project: common.HexToAddress(cfg.ProjectContractAddr),
},
common.HexToAddress(cfg.ProjectContractAddr),
cfg.BeginningBlockNumber,
cfg.ChainEndpoint,
client,
); err != nil {
log.Fatal(errors.Wrap(err, "failed to run contract monitor"))
}

go func() {
if err := api.Run(db, cfg.ServiceEndpoint); err != nil {
if err := api.Run(db, cfg.ServiceEndpoint, client, common.HexToAddress(cfg.IoIDContractAddr), common.HexToAddress(cfg.IoIDRegistryContractAddr)); err != nil {
log.Fatal(err)
}
}()
Expand Down
14 changes: 14 additions & 0 deletions db/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db
import (
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

const (
Expand Down Expand Up @@ -48,3 +49,16 @@ func (d *DB) Device(id string) (*Device, error) {
}
return &t, nil
}

func (d *DB) UpsertDevice(t *Device) error {
err := d.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"owner", "address", "status", "proposer", "updated_at"}),
}).Create(t).Error
return errors.Wrap(err, "failed to upsert device")
}

func (d *DB) UpdateByID(id string, values map[string]any) error {
err := d.db.Model(&Device{}).Where("id = ?", id).Updates(values).Error
return errors.Wrap(err, "failed to update device")
}
9 changes: 9 additions & 0 deletions db/device_record.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package db

import (
"github.com/pkg/errors"
)

type DeviceRecord struct {
ID string `gorm:"primary_key"`
Imei string `gorm:"index:device_record_imei;not null"`
Expand All @@ -23,3 +27,8 @@ type DeviceRecord struct {
}

func (*DeviceRecord) TableName() string { return "device_record" }

func (d *DB) CreateDeviceRecord(t *DeviceRecord) error {
err := d.db.Create(t).Error
return errors.Wrap(err, "failed to create device record")
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ require (
github.com/fatih/color v1.17.0
github.com/gin-gonic/gin v1.10.0
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.4.0
github.com/spf13/viper v1.19.0
google.golang.org/protobuf v1.34.2
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.10
)
Expand Down Expand Up @@ -78,7 +80,6 @@ require (
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU=
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
Expand Down
Loading

0 comments on commit cc4b6fb

Please sign in to comment.