diff --git a/api/http.go b/api/http.go index a750d11..dbf04ac 100644 --- a/api/http.go +++ b/api/http.go @@ -1,17 +1,31 @@ package api import ( + "bytes" + "encoding/base64" + "encoding/hex" "encoding/json" + "fmt" "log/slog" + "math/big" "net/http" + "strconv" "strings" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" "github.com/gin-gonic/gin" "github.com/pkg/errors" + "github.com/shopspring/decimal" + goproto "google.golang.org/protobuf/proto" + "github.com/iotexproject/pebble-server/contract/ioid" + "github.com/iotexproject/pebble-server/contract/ioidregistry" "github.com/iotexproject/pebble-server/db" + "github.com/iotexproject/pebble-server/proto" ) type errResp struct { @@ -35,9 +49,17 @@ type queryResp struct { Version string `json:"version,omitempty"` } +type receiveReq struct { + DeviceID string `json:"deviceID" binding:"required"` + Payload string `json:"payload" binding:"required"` + Signature string `json:"signature,omitempty" binding:"required"` +} + type httpServer struct { - engine *gin.Engine - db *db.DB + engine *gin.Engine + db *db.DB + ioidInstance *ioid.Ioid + ioidRegistryInstance *ioidregistry.Ioidregistry } func (s *httpServer) query(c *gin.Context) { @@ -121,18 +143,230 @@ func (s *httpServer) query(c *gin.Context) { } func (s *httpServer) receive(c *gin.Context) { + req := &receiveReq{} + if err := c.ShouldBindJSON(req); err != nil { + slog.Error("failed to bind request", "error", err) + c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid request payload"))) + return + } + + sigStr := req.Signature + req.Signature = "" + + reqJson, err := json.Marshal(req) + if err != nil { + slog.Error("failed to marshal request into json format", "error", err) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to process request data"))) + return + } + + sig, err := hexutil.Decode(sigStr) + if err != nil { + slog.Error("failed to decode signature from hex format", "signature", sigStr, "error", err) + c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid signature format"))) + return + } + + h := crypto.Keccak256Hash(reqJson) + sigpk, err := crypto.SigToPub(h.Bytes(), sig) + if err != nil { + slog.Error("failed to recover public key from signature", "error", err) + c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "invalid signature; could not recover public key"))) + return + } + + owner := crypto.PubkeyToAddress(*sigpk) + + d, err := s.db.Device(req.DeviceID) + if err != nil { + slog.Error("failed to query device", "error", err, "device_id", req.DeviceID) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device"))) + return + } + if d != nil && d.Owner != owner.String() { + slog.Error("failed to check device permission in db", "device_id", req.DeviceID) + c.JSON(http.StatusForbidden, newErrResp(errors.New("no permission to access the device"))) + return + } + if d == nil { + deviceAddr := common.HexToAddress(strings.TrimPrefix(req.DeviceID, "did:io:")) + tokenID, err := s.ioidRegistryInstance.DeviceTokenId(nil, deviceAddr) + if err != nil { + slog.Error("failed to query device token id", "error", err, "device_id", req.DeviceID) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device token id"))) + return + } + deviceOwner, err := s.ioidInstance.OwnerOf(nil, tokenID) + if err != nil { + slog.Error("failed to query device owner", "error", err, "device_id", req.DeviceID, "token_id", tokenID.Uint64()) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to query device owner"))) + return + } + + if !bytes.Equal(deviceOwner.Bytes(), owner.Bytes()) { + slog.Error("failed to check device permission in contract", "device_id", req.DeviceID, "device_owner", deviceOwner.String(), "signature_owner", owner.String()) + c.JSON(http.StatusForbidden, newErrResp(errors.New("no permission to access the device"))) + return + } + + dev := &db.Device{ + ID: req.DeviceID, + Owner: owner.String(), + Address: deviceAddr.String(), + Status: db.CONFIRM, + Proposer: owner.String(), + OperationTimes: db.NewOperationTimes(), + } + if err := s.db.UpsertDevice(dev); err != nil { + slog.Error("failed to upsert device", "error", err, "device_id", req.DeviceID) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to upsert device"))) + return + } + d = dev + } + + payload, err := base64.RawURLEncoding.DecodeString(req.Payload) + if err != nil { + slog.Error("failed to decode base64 data", "error", err) + c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to decode base64 data"))) + return + } + binPkg, data, err := s.unmarshalPayload(payload) + if err != nil { + slog.Error("failed to unmarshal payload", "error", err) + c.JSON(http.StatusBadRequest, newErrResp(errors.Wrap(err, "failed to unmarshal payload"))) + return + } + if err := s.handle(binPkg, data, d); err != nil { + slog.Error("failed to handle payload data", "error", err) + c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to handle payload data"))) + return + } + c.Status(http.StatusOK) +} + +func (s *httpServer) unmarshalPayload(payload []byte) (*proto.BinPackage, goproto.Message, error) { + pkg := &proto.BinPackage{} + if err := goproto.Unmarshal(payload, pkg); err != nil { + return nil, nil, errors.Wrap(err, "failed to unmarshal proto") + } + + var d goproto.Message + switch t := pkg.GetType(); t { + case proto.BinPackage_CONFIG: + d = &proto.SensorConfig{} + case proto.BinPackage_STATE: + d = &proto.SensorState{} + case proto.BinPackage_DATA: + d = &proto.SensorData{} + default: + return nil, nil, errors.Errorf("unexpected senser package type: %d", t) + } + + err := goproto.Unmarshal(pkg.GetData(), d) + return pkg, d, errors.Wrapf(err, "failed to unmarshal senser package") +} + +func (s *httpServer) handle(binpkg *proto.BinPackage, data goproto.Message, d *db.Device) (err error) { + switch pkg := data.(type) { + case *proto.SensorConfig: + err = s.handleConfig(d, pkg) + case *proto.SensorState: + err = s.handleState(d, pkg) + case *proto.SensorData: + err = s.handleSensor(binpkg, d, pkg) + } + return errors.Wrapf(err, "failed to handle %T", data) +} + +func (s *httpServer) handleConfig(dev *db.Device, pkg *proto.SensorConfig) error { + err := s.db.UpdateByID(dev.ID, map[string]any{ + "bulk_upload": int32(pkg.GetBulkUpload()), + "data_channel": int32(pkg.GetDataChannel()), + "upload_period": int32(pkg.GetUploadPeriod()), + "bulk_upload_sampling_cnt": int32(pkg.GetBulkUploadSamplingCnt()), + "bulk_upload_sampling_freq": int32(pkg.GetBulkUploadSamplingFreq()), + "beep": int32(pkg.GetBeep()), + "real_firmware": pkg.GetFirmware(), + "configurable": pkg.GetDeviceConfigurable(), + "updated_at": time.Now(), + }) + return errors.Wrapf(err, "failed to update device config: %s", dev.ID) +} + +func (s *httpServer) handleState(dev *db.Device, pkg *proto.SensorState) error { + err := s.db.UpdateByID(dev.ID, map[string]any{ + "state": int32(pkg.GetState()), + "updated_at": time.Now(), + }) + return errors.Wrapf(err, "failed to update device state: %s %d", dev.ID, int32(pkg.GetState())) +} + +func (s *httpServer) handleSensor(binpkg *proto.BinPackage, dev *db.Device, pkg *proto.SensorData) error { + snr := float64(pkg.GetSnr()) + if snr > 2700 { + snr = 100 + } else if snr < 700 { + snr = 25 + } else { + snr, _ = big.NewFloat((snr-700)*0.0375 + 25).Float64() + } + + vbat := (float64(pkg.GetVbat()) - 320) / 90 + if vbat > 1 { + vbat = 100 + } else if vbat < 0.1 { + vbat = 0.1 + } else { + vbat *= 100 + } + gyroscope, _ := json.Marshal(pkg.GetGyroscope()) + accelerometer, _ := json.Marshal(pkg.GetAccelerometer()) + + dr := &db.DeviceRecord{ + ID: dev.ID + "-" + fmt.Sprintf("%d", binpkg.GetTimestamp()), + Imei: dev.ID, + Timestamp: int64(binpkg.GetTimestamp()), + Signature: hex.EncodeToString(append(binpkg.GetSignature(), 0)), + Operator: "", + Snr: strconv.FormatFloat(snr, 'f', 1, 64), + Vbat: strconv.FormatFloat(vbat, 'f', 1, 64), + Latitude: decimal.NewFromInt32(pkg.GetLatitude()).Div(decimal.NewFromInt32(10000000)).StringFixed(7), + Longitude: decimal.NewFromInt32(pkg.GetLongitude()).Div(decimal.NewFromInt32(10000000)).StringFixed(7), + GasResistance: decimal.NewFromInt32(int32(pkg.GetGasResistance())).Div(decimal.NewFromInt32(100)).StringFixed(2), + Temperature: decimal.NewFromInt32(pkg.GetTemperature()).Div(decimal.NewFromInt32(100)).StringFixed(2), + Temperature2: decimal.NewFromInt32(int32(pkg.GetTemperature2())).Div(decimal.NewFromInt32(100)).StringFixed(2), + Pressure: decimal.NewFromInt32(int32(pkg.GetPressure())).Div(decimal.NewFromInt32(100)).StringFixed(2), + Humidity: decimal.NewFromInt32(int32(pkg.GetHumidity())).Div(decimal.NewFromInt32(100)).StringFixed(2), + Light: decimal.NewFromInt32(int32(pkg.GetLight())).Div(decimal.NewFromInt32(100)).StringFixed(2), + Gyroscope: string(gyroscope), + Accelerometer: string(accelerometer), + OperationTimes: db.NewOperationTimes(), + } + err := s.db.CreateDeviceRecord(dr) + return errors.Wrapf(err, "failed to create senser data: %s", dev.ID) } -func Run(db *db.DB, address string) error { +func Run(db *db.DB, address string, client *ethclient.Client, ioidAddr, ioidRegistryAddr common.Address) error { + ioidInstance, err := ioid.NewIoid(ioidAddr, client) + if err != nil { + return errors.Wrap(err, "failed to new ioid contract instance") + } + ioidRegistryInstance, err := ioidregistry.NewIoidregistry(ioidRegistryAddr, client) + if err != nil { + return errors.Wrap(err, "failed to new ioid registry contract instance") + } s := &httpServer{ - engine: gin.Default(), - db: db, + engine: gin.Default(), + db: db, + ioidInstance: ioidInstance, + ioidRegistryInstance: ioidRegistryInstance, } s.engine.GET("/device", s.query) s.engine.POST("/device", s.receive) - err := s.engine.Run(address) + err = s.engine.Run(address) return errors.Wrap(err, "failed to start http server") } diff --git a/cmd/server/main.go b/cmd/server/main.go index 1d2fae1..fd106f5 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -8,6 +8,7 @@ import ( "syscall" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "github.com/iotexproject/pebble-server/api" @@ -29,23 +30,26 @@ func main() { log.Fatal(errors.Wrap(err, "failed to new db")) } + client, err := ethclient.Dial(cfg.ChainEndpoint) + if err != nil { + log.Fatal(errors.Wrap(err, "failed to dial chain endpoint")) + } + if err := monitor.Run( &monitor.Handler{ ScannedBlockNumber: db.ScannedBlockNumber, UpsertScannedBlockNumber: db.UpsertScannedBlockNumber, UpsertProjectMetadata: db.UpsertApp, }, - &monitor.ContractAddr{ - Project: common.HexToAddress(cfg.ProjectContractAddr), - }, + common.HexToAddress(cfg.ProjectContractAddr), cfg.BeginningBlockNumber, - cfg.ChainEndpoint, + client, ); err != nil { log.Fatal(errors.Wrap(err, "failed to run contract monitor")) } go func() { - if err := api.Run(db, cfg.ServiceEndpoint); err != nil { + if err := api.Run(db, cfg.ServiceEndpoint, client, common.HexToAddress(cfg.IoIDContractAddr), common.HexToAddress(cfg.IoIDRegistryContractAddr)); err != nil { log.Fatal(err) } }() diff --git a/db/device.go b/db/device.go index 1760c20..a0f5283 100644 --- a/db/device.go +++ b/db/device.go @@ -3,6 +3,7 @@ package db import ( "github.com/pkg/errors" "gorm.io/gorm" + "gorm.io/gorm/clause" ) const ( @@ -48,3 +49,16 @@ func (d *DB) Device(id string) (*Device, error) { } return &t, nil } + +func (d *DB) UpsertDevice(t *Device) error { + err := d.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"owner", "address", "status", "proposer", "updated_at"}), + }).Create(t).Error + return errors.Wrap(err, "failed to upsert device") +} + +func (d *DB) UpdateByID(id string, values map[string]any) error { + err := d.db.Model(&Device{}).Where("id = ?", id).Updates(values).Error + return errors.Wrap(err, "failed to update device") +} diff --git a/db/device_record.go b/db/device_record.go index c63b8bb..513c089 100644 --- a/db/device_record.go +++ b/db/device_record.go @@ -1,5 +1,9 @@ package db +import ( + "github.com/pkg/errors" +) + type DeviceRecord struct { ID string `gorm:"primary_key"` Imei string `gorm:"index:device_record_imei;not null"` @@ -23,3 +27,8 @@ type DeviceRecord struct { } func (*DeviceRecord) TableName() string { return "device_record" } + +func (d *DB) CreateDeviceRecord(t *DeviceRecord) error { + err := d.db.Create(t).Error + return errors.Wrap(err, "failed to create device record") +} diff --git a/go.mod b/go.mod index 710a224..da9fb42 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,9 @@ require ( github.com/fatih/color v1.17.0 github.com/gin-gonic/gin v1.10.0 github.com/pkg/errors v0.9.1 + github.com/shopspring/decimal v1.4.0 github.com/spf13/viper v1.19.0 + google.golang.org/protobuf v1.34.2 gorm.io/driver/postgres v1.5.9 gorm.io/gorm v1.25.10 ) @@ -78,7 +80,6 @@ require ( golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/go.sum b/go.sum index ba3040a..fe9b81d 100644 --- a/go.sum +++ b/go.sum @@ -213,6 +213,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= diff --git a/monitor/monitor.go b/monitor/monitor.go index 702a715..3d1a7de 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -15,8 +15,6 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" - "github.com/iotexproject/pebble-server/contract/ioid" - "github.com/iotexproject/pebble-server/contract/ioidregistry" "github.com/iotexproject/pebble-server/contract/project" ) @@ -40,13 +38,11 @@ type ContractAddr struct { type contract struct { h *Handler - addr *ContractAddr + projectAddr common.Address beginningBlockNumber uint64 listStepSize uint64 watchInterval time.Duration client *ethclient.Client - ioidInstance *ioid.Ioid - ioidRegistryInstance *ioidregistry.Ioidregistry projectInstance *project.Project } @@ -58,10 +54,6 @@ var allTopic = []common.Hash{ projectAddMetadataTopic, } -func (a *ContractAddr) all() []common.Address { - return []common.Address{a.IOID, a.IOIDRegistry} -} - func (c *contract) processLogs(logs []types.Log) error { sort.Slice(logs, func(i, j int) bool { if logs[i].BlockNumber != logs[j].BlockNumber { @@ -94,7 +86,7 @@ func (c *contract) list() (uint64, error) { head = max(head, h) query := ethereum.FilterQuery{ - Addresses: c.addr.all(), + Addresses: []common.Address{c.projectAddr}, Topics: [][]common.Hash{allTopic}, } ctx := context.Background() @@ -135,7 +127,7 @@ func (c *contract) list() (uint64, error) { func (c *contract) watch(listedBlockNumber uint64) { scannedBlockNumber := listedBlockNumber query := ethereum.FilterQuery{ - Addresses: c.addr.all(), + Addresses: []common.Address{c.projectAddr}, Topics: [][]common.Hash{allTopic}, } ticker := time.NewTicker(c.watchInterval) @@ -167,34 +159,19 @@ func (c *contract) watch(listedBlockNumber uint64) { }() } -func Run(h *Handler, addr *ContractAddr, beginningBlockNumber uint64, chainEndpoint string) error { - client, err := ethclient.Dial(chainEndpoint) - if err != nil { - return errors.Wrap(err, "failed to dial chain endpoint") - } - - ioidInstance, err := ioid.NewIoid(addr.IOID, client) - if err != nil { - return errors.Wrap(err, "failed to new ioid contract instance") - } - ioidRegistryInstance, err := ioidregistry.NewIoidregistry(addr.IOIDRegistry, client) - if err != nil { - return errors.Wrap(err, "failed to new ioid registry contract instance") - } - projectInstance, err := project.NewProject(addr.Project, client) +func Run(h *Handler, projectAddr common.Address, beginningBlockNumber uint64, client *ethclient.Client) error { + projectInstance, err := project.NewProject(projectAddr, client) if err != nil { return errors.Wrap(err, "failed to new project contract instance") } c := &contract{ h: h, - addr: addr, + projectAddr: projectAddr, beginningBlockNumber: beginningBlockNumber, listStepSize: 500, watchInterval: 1 * time.Second, client: client, - ioidInstance: ioidInstance, - ioidRegistryInstance: ioidRegistryInstance, projectInstance: projectInstance, }