diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cabf5dfb..37bd0335 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest , macos-latest, windows-latest ] - go-version: [ '1.20', '1.21' ] + go-version: [ '1.21', '1.22' ] steps: - name: Configure git run: git config --global core.autocrlf false # required on Windows diff --git a/api/client.go b/api/client.go index bbadb49c..9a1ba1a5 100644 --- a/api/client.go +++ b/api/client.go @@ -44,12 +44,6 @@ func (c *Client) TxpoolFee() (resp types.Currency, err error) { return } -// SyncerPeers returns the current peers of the syncer. -func (c *Client) SyncerPeers() (resp []GatewayPeer, err error) { - err = c.c.GET("/syncer/peers", &resp) - return -} - // SyncerConnect adds the address as a peer of the syncer. func (c *Client) SyncerConnect(addr string) (err error) { err = c.c.POST("/syncer/connect", addr, nil) diff --git a/api/server.go b/api/server.go index deb3691a..263bc1bb 100644 --- a/api/server.go +++ b/api/server.go @@ -10,8 +10,8 @@ import ( "go.sia.tech/core/consensus" "go.sia.tech/core/gateway" "go.sia.tech/core/types" + "go.sia.tech/coreutils/syncer" "go.sia.tech/explored/explorer" - "go.sia.tech/explored/syncer" ) type ( @@ -22,20 +22,19 @@ type ( RecommendedFee() types.Currency PoolTransactions() []types.Transaction V2PoolTransactions() []types.V2Transaction - AddPoolTransactions(txns []types.Transaction) error - AddV2PoolTransactions(txns []types.V2Transaction) error + AddPoolTransactions(txns []types.Transaction) (bool, error) + AddV2PoolTransactions(index types.ChainIndex, txns []types.V2Transaction) (bool, error) UnconfirmedParents(txn types.Transaction) []types.Transaction } // A Syncer can connect to other peers and synchronize the blockchain. Syncer interface { Addr() string - Peers() []*gateway.Peer - PeerInfo(peer string) (syncer.PeerInfo, bool) - Connect(addr string) (*gateway.Peer, error) + Peers() []*syncer.Peer + Connect(addr string) (*syncer.Peer, error) BroadcastHeader(bh gateway.BlockHeader) BroadcastTransactionSet(txns []types.Transaction) - BroadcastV2TransactionSet(txns []types.V2Transaction) + BroadcastV2TransactionSet(index types.ChainIndex, txns []types.V2Transaction) BroadcastV2BlockOutline(bo gateway.V2BlockOutline) } @@ -57,27 +56,6 @@ type server struct { s Syncer } -func (s *server) syncerPeersHandler(jc jape.Context) { - var peers []GatewayPeer - for _, p := range s.s.Peers() { - info, ok := s.s.PeerInfo(p.Addr) - if !ok { - continue - } - peers = append(peers, GatewayPeer{ - Addr: p.Addr, - Inbound: p.Inbound, - Version: p.Version, - - FirstSeen: info.FirstSeen, - ConnectedSince: info.LastConnect, - SyncedBlocks: info.SyncedBlocks, - SyncDuration: info.SyncDuration, - }) - } - jc.Encode(peers) -} - func (s *server) syncerConnectHandler(jc jape.Context) { var addr string if jc.Decode(&addr) != nil { @@ -122,17 +100,24 @@ func (s *server) txpoolBroadcastHandler(jc jape.Context) { if jc.Decode(&tbr) != nil { return } + + tip, err := s.e.Tip() + if jc.Check("failed to get tip", err) != nil { + return + } if len(tbr.Transactions) != 0 { - if jc.Check("invalid transaction set", s.cm.AddPoolTransactions(tbr.Transactions)) != nil { + _, err := s.cm.AddPoolTransactions(tbr.Transactions) + if jc.Check("invalid transaction set", err) != nil { return } s.s.BroadcastTransactionSet(tbr.Transactions) } if len(tbr.V2Transactions) != 0 { - if jc.Check("invalid v2 transaction set", s.cm.AddV2PoolTransactions(tbr.V2Transactions)) != nil { + _, err := s.cm.AddV2PoolTransactions(tip, tbr.V2Transactions) + if jc.Check("invalid v2 transaction set", err) != nil { return } - s.s.BroadcastV2TransactionSet(tbr.V2Transactions) + s.s.BroadcastV2TransactionSet(tip, tbr.V2Transactions) } } @@ -258,7 +243,6 @@ func NewServer(e Explorer, cm ChainManager, s Syncer) http.Handler { s: s, } return jape.Mux(map[string]jape.Handler{ - "GET /syncer/peers": srv.syncerPeersHandler, "POST /syncer/connect": srv.syncerConnectHandler, "POST /syncer/broadcast/block": srv.syncerBroadcastBlockHandler, diff --git a/cmd/explored/main.go b/cmd/explored/main.go index 312d5c35..271da842 100644 --- a/cmd/explored/main.go +++ b/cmd/explored/main.go @@ -75,7 +75,7 @@ func main() { consoleEncoder := zapcore.NewConsoleEncoder(consoleCfg) // only log info messages to console unless stdout logging is enabled - consoleCore := zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), zap.NewAtomicLevelAt(zap.InfoLevel)) + consoleCore := zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), zap.NewAtomicLevelAt(zap.DebugLevel)) log := zap.New(consoleCore, zap.AddCaller()) defer log.Sync() // redirect stdlib log to zap diff --git a/cmd/explored/node.go b/cmd/explored/node.go index 81ca9eb1..80d61936 100644 --- a/cmd/explored/node.go +++ b/cmd/explored/node.go @@ -3,21 +3,22 @@ package main import ( "context" "errors" - "log" + "io/fs" "net" + "os" "path/filepath" "strconv" "time" bolt "go.etcd.io/bbolt" - "go.sia.tech/core/chain" "go.sia.tech/core/consensus" "go.sia.tech/core/gateway" "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/syncer" "go.sia.tech/explored/explorer" "go.sia.tech/explored/internal/syncerutil" "go.sia.tech/explored/persist/sqlite" - "go.sia.tech/explored/syncer" "go.uber.org/zap" "lukechampine.com/upnp" ) @@ -160,7 +161,7 @@ func newNode(addr, dir string, chainNetwork string, useUPNP bool, logger *zap.Lo if err != nil { return nil, err } - e := explorer.NewExplorer(store) + tip, err := store.Tip() if errors.Is(err, sqlite.ErrNoTip) { tip = types.ChainIndex{ @@ -172,6 +173,13 @@ func newNode(addr, dir string, chainNetwork string, useUPNP bool, logger *zap.Lo } cm.AddSubscriber(store, tip) + hashPath := filepath.Join(dir, "./hash") + if err := os.MkdirAll(hashPath, fs.ModePerm); err != nil { + return nil, err + } + + e := explorer.NewExplorer(store) + l, err := net.Listen("tcp", addr) if err != nil { return nil, err @@ -218,7 +226,7 @@ func newNode(addr, dir string, chainNetwork string, useUPNP bool, logger *zap.Lo UniqueID: gateway.GenerateUniqueID(), NetAddress: syncerAddr, } - s := syncer.New(l, cm, ps, header, syncer.WithLogger(log.Default())) + s := syncer.New(l, cm, ps, header, syncer.WithLogger(logger.Named("syncer"))) return &node{ cm: cm, diff --git a/explorer/explorer.go b/explorer/explorer.go index 6c977574..601c696a 100644 --- a/explorer/explorer.go +++ b/explorer/explorer.go @@ -1,8 +1,8 @@ package explorer import ( - "go.sia.tech/core/chain" "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" ) // A Store is a database that stores information about elements, contracts, @@ -17,6 +17,8 @@ type Store interface { UnspentSiacoinOutputs(address types.Address, limit, offset uint64) ([]SiacoinOutput, error) UnspentSiafundOutputs(address types.Address, limit, offset uint64) ([]SiafundOutput, error) Balance(address types.Address) (sc types.Currency, sf uint64, err error) + + MerkleProof(leafIndex uint64) ([]types.Hash256, error) } // Explorer implements a Sia explorer. @@ -29,6 +31,11 @@ func NewExplorer(s Store) *Explorer { return &Explorer{s: s} } +// MerkleProof gets the merkle proof with the given leaf index. +func (e *Explorer) MerkleProof(leafIndex uint64) ([]types.Hash256, error) { + return e.s.MerkleProof(leafIndex) +} + // Tip returns the tip of the best known valid chain. func (e *Explorer) Tip() (types.ChainIndex, error) { return e.s.Tip() diff --git a/explorer/types.go b/explorer/types.go index 89667d6f..f2e67fa6 100644 --- a/explorer/types.go +++ b/explorer/types.go @@ -34,24 +34,15 @@ func (d Source) MarshalJSON() ([]byte, error) { } } -// A SiacoinOutput is a types.SiacoinOutput with added fields for output ID, -// source, and maturity height. +// A SiacoinOutput is a types.SiacoinElement with an added field for the +// source. type SiacoinOutput struct { - OutputID types.SiacoinOutputID `json:"outputID"` - Source Source `json:"source"` - MaturityHeight int `json:"maturityHeight"` - - types.SiacoinOutput + Source Source `json:"source"` + types.SiacoinElement } -// A SiafundOutput contains a types.SiafundOutput with added fields for output -// ID and claim start. -type SiafundOutput struct { - OutputID types.SiafundOutputID `json:"outputID"` - ClaimStart types.Currency `json:"claimStart"` - - types.SiafundOutput -} +// A SiafundOutput is a types.SiafundElement. +type SiafundOutput types.SiafundElement // A Transaction is a transaction that uses the wrapped types above. type Transaction struct { diff --git a/go.mod b/go.mod index 01cca0dc..6de2e631 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module go.sia.tech/explored -go 1.18 +go 1.21.6 require ( github.com/mattn/go-sqlite3 v1.14.19 - go.etcd.io/bbolt v1.3.7 - go.sia.tech/core v0.1.12-0.20231021194448-f1e65eb9f0d0 + go.etcd.io/bbolt v1.3.8 + go.sia.tech/core v0.2.1 + go.sia.tech/coreutils v0.0.2 go.sia.tech/jape v0.11.1 go.uber.org/zap v1.26.0 golang.org/x/term v0.6.0 diff --git a/go.sum b/go.sum index 5ef5bcbb..49a6bdbd 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,27 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= -go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= -go.sia.tech/core v0.1.12-0.20231021194448-f1e65eb9f0d0 h1:2nKOKa99g9h9m3hL5UortAbmnwuwXhDcTHIhzmqBae8= -go.sia.tech/core v0.1.12-0.20231021194448-f1e65eb9f0d0/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= -go.sia.tech/jape v0.9.0 h1:kWgMFqALYhLMJYOwWBgJda5ko/fi4iZzRxHRP7pp8NY= -go.sia.tech/jape v0.9.0/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= +go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY= +go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= +go.sia.tech/coreutils v0.0.2 h1:vDqMDM6dW6b/R3sO1ycr8fAnJXUiAvrzxehEIq/AsKA= +go.sia.tech/coreutils v0.0.2/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0= go.sia.tech/jape v0.11.1 h1:M7IP+byXL7xOqzxcHUQuXW+q3sYMkYzmMlMw+q8ZZw0= go.sia.tech/jape v0.11.1/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= @@ -25,6 +29,7 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 h1:NvGWuYG8dkDHFSKksI1P9faiVJ9rayE6l0+ouWVIDs8= golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -33,6 +38,7 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= lukechampine.com/frand v1.4.2 h1:RzFIpOvkMXuPMBb9maa4ND4wjBn71E1Jpf8BzJHMaVw= lukechampine.com/frand v1.4.2/go.mod h1:4S/TM2ZgrKejMcKMbeLjISpJMO+/eZ1zu3vYX9dtj3s= lukechampine.com/upnp v0.3.0 h1:UVCD6eD6fmJmwak6DVE3vGN+L46Fk8edTcC6XYCb6C4= diff --git a/internal/syncerutil/store.go b/internal/syncerutil/store.go index 5ca9f4db..16872894 100644 --- a/internal/syncerutil/store.go +++ b/internal/syncerutil/store.go @@ -2,12 +2,13 @@ package syncerutil import ( "encoding/json" + "errors" "net" "os" "sync" "time" - "go.sia.tech/explored/syncer" + "go.sia.tech/coreutils/syncer" ) type peerBan struct { @@ -28,11 +29,11 @@ func (eps *EphemeralPeerStore) banned(peer string) bool { return false // shouldn't happen } for _, s := range []string{ - peer, // 1.2.3.4:5678 - syncer.Subnet(host + "/32"), // 1.2.3.4:* - syncer.Subnet(host + "/24"), // 1.2.3.* - syncer.Subnet(host + "/16"), // 1.2.* - syncer.Subnet(host + "/8"), // 1.* + peer, // 1.2.3.4:5678 + syncer.Subnet(host, "/32"), // 1.2.3.4:* + syncer.Subnet(host, "/24"), // 1.2.3.* + syncer.Subnet(host, "/16"), // 1.2.* + syncer.Subnet(host, "/8"), // 1.* } { if b, ok := eps.bans[s]; ok { if time.Until(b.Expiry) <= 0 { @@ -46,37 +47,39 @@ func (eps *EphemeralPeerStore) banned(peer string) bool { } // AddPeer implements PeerStore. -func (eps *EphemeralPeerStore) AddPeer(peer string) { +func (eps *EphemeralPeerStore) AddPeer(peer string) error { eps.mu.Lock() defer eps.mu.Unlock() if _, ok := eps.peers[peer]; !ok { - eps.peers[peer] = syncer.PeerInfo{FirstSeen: time.Now()} + eps.peers[peer] = syncer.PeerInfo{Address: peer, FirstSeen: time.Now()} } + return nil } // Peers implements PeerStore. -func (eps *EphemeralPeerStore) Peers() []string { +func (eps *EphemeralPeerStore) Peers() ([]syncer.PeerInfo, error) { eps.mu.Lock() defer eps.mu.Unlock() - var peers []string - for p := range eps.peers { - if !eps.banned(p) { + var peers []syncer.PeerInfo + for addr, p := range eps.peers { + if !eps.banned(addr) { peers = append(peers, p) } } - return peers + return peers, nil } // UpdatePeerInfo implements PeerStore. -func (eps *EphemeralPeerStore) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) { +func (eps *EphemeralPeerStore) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) error { eps.mu.Lock() defer eps.mu.Unlock() info, ok := eps.peers[peer] if !ok { - return + return errors.New("no such peer") } fn(&info) eps.peers[peer] = info + return nil } // PeerInfo implements PeerStore. @@ -88,7 +91,7 @@ func (eps *EphemeralPeerStore) PeerInfo(peer string) (syncer.PeerInfo, bool) { } // Ban implements PeerStore. -func (eps *EphemeralPeerStore) Ban(peer string, duration time.Duration, reason string) { +func (eps *EphemeralPeerStore) Ban(peer string, duration time.Duration, reason string) error { eps.mu.Lock() defer eps.mu.Unlock() // canonicalize @@ -96,13 +99,14 @@ func (eps *EphemeralPeerStore) Ban(peer string, duration time.Duration, reason s peer = ipnet.String() } eps.bans[peer] = peerBan{Expiry: time.Now().Add(duration), Reason: reason} + return nil } // Banned implements PeerStore. -func (eps *EphemeralPeerStore) Banned(peer string) bool { +func (eps *EphemeralPeerStore) Banned(peer string) (bool, error) { eps.mu.Lock() defer eps.mu.Unlock() - return eps.banned(peer) + return eps.banned(peer), nil } // NewEphemeralPeerStore initializes an EphemeralPeerStore. @@ -175,21 +179,21 @@ func (jps *JSONPeerStore) save() error { } // AddPeer implements PeerStore. -func (jps *JSONPeerStore) AddPeer(peer string) { +func (jps *JSONPeerStore) AddPeer(peer string) error { jps.EphemeralPeerStore.AddPeer(peer) - jps.save() + return jps.save() } // UpdatePeerInfo implements PeerStore. -func (jps *JSONPeerStore) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) { +func (jps *JSONPeerStore) UpdatePeerInfo(peer string, fn func(*syncer.PeerInfo)) error { jps.EphemeralPeerStore.UpdatePeerInfo(peer, fn) - jps.save() + return jps.save() } // Ban implements PeerStore. -func (jps *JSONPeerStore) Ban(peer string, duration time.Duration, reason string) { +func (jps *JSONPeerStore) Ban(peer string, duration time.Duration, reason string) error { jps.EphemeralPeerStore.Ban(peer, duration, reason) - jps.save() + return jps.save() } // NewJSONPeerStore returns a JSONPeerStore backed by the specified file. diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 148a2e45..d15f76b5 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -5,8 +5,8 @@ import ( "errors" "fmt" - "go.sia.tech/core/chain" "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" "go.sia.tech/explored/explorer" ) @@ -163,6 +163,7 @@ func (s *Store) addTransactions(dbTxn txn, bid types.BlockID, txns []types.Trans type consensusUpdate interface { ForEachSiacoinElement(fn func(sce types.SiacoinElement, spent bool)) ForEachSiafundElement(fn func(sfe types.SiafundElement, spent bool)) + ForEachFileContractElement(fn func(fce types.FileContractElement, rev *types.FileContractElement, resolved, valid bool)) } func (s *Store) updateBalances(dbTxn txn, update consensusUpdate) error { @@ -186,7 +187,7 @@ func (s *Store) updateBalances(dbTxn txn, update consensusUpdate) error { rows, err := dbTxn.Query(`SELECT address, siacoin_balance, siafund_balance FROM address_balance - WHERE address IN (`+queryPlaceHolders(len(addressList))+`)`, queryArgs(addressList)...) + WHERE address IN (`+queryPlaceHolders(len(addressList))+`)`, addressList...) if err != nil { return fmt.Errorf("updateBalances: failed to query address_balance: %w", err) } @@ -278,12 +279,12 @@ func (s *Store) addSCOutputs(dbTxn txn, bid types.BlockID, update consensusUpdat } } - stmt, err := dbTxn.Prepare(`INSERT INTO siacoin_outputs(output_id, block_id, spent, source, maturity_height, address, value) - VALUES (?, ?, ?, ?, ?, ?, ?) + stmt, err := dbTxn.Prepare(`INSERT INTO siacoin_elements(output_id, block_id, leaf_index, merkle_proof, spent, source, maturity_height, address, value) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(output_id) DO UPDATE SET spent = ?`) if err != nil { - return nil, fmt.Errorf("addSCOutputs: failed to prepare siacoin_outputs statement: %w", err) + return nil, fmt.Errorf("addSCOutputs: failed to prepare siacoin_elements statement: %w", err) } defer stmt.Close() @@ -294,9 +295,9 @@ func (s *Store) addSCOutputs(dbTxn txn, bid types.BlockID, update consensusUpdat return } - result, err := stmt.Exec(dbEncode(sce.StateElement.ID), dbEncode(bid), spent, int(sources[types.SiacoinOutputID(sce.StateElement.ID)]), sce.MaturityHeight, dbEncode(sce.SiacoinOutput.Address), dbEncode(sce.SiacoinOutput.Value), spent) + result, err := stmt.Exec(dbEncode(sce.StateElement.ID), dbEncode(bid), dbEncode(sce.StateElement.LeafIndex), dbEncode(sce.StateElement.MerkleProof), spent, int(sources[types.SiacoinOutputID(sce.StateElement.ID)]), sce.MaturityHeight, dbEncode(sce.SiacoinOutput.Address), dbEncode(sce.SiacoinOutput.Value), spent) if err != nil { - updateErr = fmt.Errorf("addSCOutputs: failed to execute siacoin_outputs statement: %w", err) + updateErr = fmt.Errorf("addSCOutputs: failed to execute siacoin_elements statement: %w", err) return } @@ -312,12 +313,12 @@ func (s *Store) addSCOutputs(dbTxn txn, bid types.BlockID, update consensusUpdat } func (s *Store) addSFOutputs(dbTxn txn, bid types.BlockID, update consensusUpdate) (map[types.SiafundOutputID]int64, error) { - stmt, err := dbTxn.Prepare(`INSERT INTO siafund_outputs(output_id, block_id, spent, claim_start, address, value) - VALUES (?, ?, ?, ?, ?, ?) + stmt, err := dbTxn.Prepare(`INSERT INTO siafund_elements(output_id, block_id, leaf_index, merkle_proof, spent, claim_start, address, value) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(output_id) DO UPDATE SET spent = ?`) if err != nil { - return nil, fmt.Errorf("addSFOutputs: failed to prepare siafund_outputs statement: %w", err) + return nil, fmt.Errorf("addSFOutputs: failed to prepare siafund_elements statement: %w", err) } defer stmt.Close() @@ -328,9 +329,9 @@ func (s *Store) addSFOutputs(dbTxn txn, bid types.BlockID, update consensusUpdat return } - result, err := stmt.Exec(dbEncode(sfe.StateElement.ID), dbEncode(bid), spent, dbEncode(sfe.ClaimStart), dbEncode(sfe.SiafundOutput.Address), dbEncode(sfe.SiafundOutput.Value), spent) + result, err := stmt.Exec(dbEncode(sfe.StateElement.ID), dbEncode(bid), dbEncode(sfe.StateElement.LeafIndex), dbEncode(sfe.StateElement.MerkleProof), spent, dbEncode(sfe.ClaimStart), dbEncode(sfe.SiafundOutput.Address), dbEncode(sfe.SiafundOutput.Value), spent) if err != nil { - updateErr = fmt.Errorf("addSFOutputs: failed to execute siafund_outputs statement: %w", err) + updateErr = fmt.Errorf("addSFOutputs: failed to execute siafund_elements statement: %w", err) return } @@ -372,6 +373,10 @@ func (s *Store) applyUpdates() error { } else if err := s.addTransactions(dbTxn, update.Block.ID(), update.Block.Transactions, scDBIds, sfDBIds); err != nil { return fmt.Errorf("applyUpdates: failed to add transactions: addTransactions: %w", err) } + + if err := s.updateLeaves(dbTxn, update); err != nil { + return err + } } s.pendingUpdates = s.pendingUpdates[:0] return nil @@ -389,7 +394,8 @@ func (s *Store) revertUpdate(cru *chain.RevertUpdate) error { } else if err := s.updateBalances(dbTxn, cru); err != nil { return fmt.Errorf("revertUpdate: failed to update balances: %w", err) } - return nil + + return s.updateLeaves(dbTxn, cru) }) } diff --git a/persist/sqlite/encoding.go b/persist/sqlite/encoding.go index bfb505d7..90b42f7b 100644 --- a/persist/sqlite/encoding.go +++ b/persist/sqlite/encoding.go @@ -19,9 +19,23 @@ func dbEncode(obj any) any { obj.EncodeTo(e) e.Flush() return buf.Bytes() + case []types.Hash256: + var buf bytes.Buffer + e := types.NewEncoder(&buf) + e.WritePrefix(len(obj)) + for _, o := range obj { + o.EncodeTo(e) + } + e.Flush() + return buf.Bytes() + case types.Currency: + buf := make([]byte, 16) + binary.BigEndian.PutUint64(buf[:8], obj.Hi) + binary.BigEndian.PutUint64(buf[8:], obj.Lo) + return buf case uint64: b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, obj) + binary.BigEndian.PutUint64(b, obj) return b case time.Time: return obj.Unix() @@ -47,8 +61,17 @@ func (d *decodable) Scan(src any) error { dec := types.NewBufDecoder(src) v.DecodeFrom(dec) return dec.Err() + case *[]types.Hash256: + dec := types.NewBufDecoder(src) + *v = make([]types.Hash256, dec.ReadPrefix()) + for i := range *v { + (*v)[i].DecodeFrom(dec) + } + case *types.Currency: + v.Hi = binary.BigEndian.Uint64(src[:8]) + v.Lo = binary.BigEndian.Uint64(src[8:]) case *uint64: - *v = binary.LittleEndian.Uint64(src) + *v = binary.BigEndian.Uint64(src) default: return fmt.Errorf("cannot scan %T to %T", src, d.v) } diff --git a/persist/sqlite/init.go b/persist/sqlite/init.go index b4701734..93fb9f96 100644 --- a/persist/sqlite/init.go +++ b/persist/sqlite/init.go @@ -69,6 +69,9 @@ func (s *Store) init() error { return fmt.Errorf("failed to disable foreign key constraints: %w", err) } + // error is ignored -- the database may not have been initialized yet. + s.db.QueryRow("SELECT COUNT(*) FROM merkle_proofs WHERE i = 0").Scan(&s.numLeaves) + version := getDBVersion(s.db) switch { case version == 0: diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index f153f21a..126090f5 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -19,10 +19,12 @@ CREATE TABLE address_balance ( siafund_balance BLOB NOT NULL ); -CREATE TABLE siacoin_outputs ( +CREATE TABLE siacoin_elements ( id INTEGER PRIMARY KEY, block_id BLOB REFERENCES blocks(id) ON DELETE CASCADE NOT NULL, output_id BLOB UNIQUE NOT NULL, + leaf_index BLOB UNIQUE NOT NULL, + merkle_proof BLOB UNIQUE NOT NULL, spent INTEGER NOT NULL, source INTEGER NOT NULL, maturity_height INTEGER NOT NULL, @@ -30,26 +32,28 @@ CREATE TABLE siacoin_outputs ( value BLOB NOT NULL ); -CREATE INDEX siacoin_outputs_output_id_index ON siacoin_outputs(output_id); -CREATE INDEX siacoin_outputs_address_spent_index ON siacoin_outputs(address, spent); +CREATE INDEX siacoin_elements_output_id_index ON siacoin_elements(output_id); +CREATE INDEX siacoin_elements_address_spent_index ON siacoin_elements(address, spent); -CREATE TABLE siafund_outputs ( +CREATE TABLE siafund_elements ( id INTEGER PRIMARY KEY, block_id BLOB REFERENCES blocks(id) ON DELETE CASCADE NOT NULL, output_id BLOB UNIQUE NOT NULL, + leaf_index BLOB UNIQUE NOT NULL, + merkle_proof BLOB UNIQUE NOT NULL, spent INTEGER NOT NULL, claim_start BLOB NOT NULL, address BLOB NOT NULL, value BLOB NOT NULL ); -CREATE INDEX siafund_outputs_output_id_index ON siafund_outputs(output_id); -CREATE INDEX siafund_outputs_address_spent_index ON siafund_outputs(address, spent); +CREATE INDEX siafund_elements_output_id_index ON siafund_elements(output_id); +CREATE INDEX siafund_elements_address_spent_index ON siafund_elements(address, spent); CREATE TABLE miner_payouts ( block_id BLOB REFERENCES blocks(id) ON DELETE CASCADE NOT NULL, block_order INTEGER NOT NULL, - output_id INTEGER REFERENCES siacoin_outputs(id) ON DELETE CASCADE NOT NULL, + output_id INTEGER REFERENCES siacoin_elements(id) ON DELETE CASCADE NOT NULL, UNIQUE(block_id, block_order) ); @@ -93,7 +97,7 @@ CREATE INDEX transaction_siacoin_inputs_transaction_id_index ON transaction_siac CREATE TABLE transaction_siacoin_outputs ( transaction_id INTEGER REFERENCES transactions(id) ON DELETE CASCADE NOT NULL, transaction_order INTEGER NOT NULL, - output_id INTEGER REFERENCES siacoin_outputs(id) ON DELETE CASCADE NOT NULL, + output_id INTEGER REFERENCES siacoin_elements(id) ON DELETE CASCADE NOT NULL, UNIQUE(transaction_id, transaction_order) ); @@ -113,11 +117,18 @@ CREATE INDEX transaction_siafund_inputs_transaction_id_index ON transaction_siaf CREATE TABLE transaction_siafund_outputs ( transaction_id INTEGER REFERENCES transactions(id) ON DELETE CASCADE NOT NULL, transaction_order INTEGER NOT NULL, - output_id INTEGER REFERENCES siafund_outputs(id) ON DELETE CASCADE NOT NULL, + output_id INTEGER REFERENCES siafund_elements(id) ON DELETE CASCADE NOT NULL, UNIQUE(transaction_id, transaction_order) ); CREATE INDEX transaction_siafund_outputs_transaction_id_index ON transaction_siafund_outputs(transaction_id); +CREATE TABLE merkle_proofs ( + i INTEGER NOT NULL, + j INTEGER NOT NULL, + hash BLOB NOT NULL, + PRIMARY KEY(i ,j) +); + -- initialize the global settings table INSERT INTO global_settings (id, db_version) VALUES (0, 0); -- should not be changed diff --git a/persist/sqlite/merkle.go b/persist/sqlite/merkle.go new file mode 100644 index 00000000..a6c40dd1 --- /dev/null +++ b/persist/sqlite/merkle.go @@ -0,0 +1,96 @@ +package sqlite + +import ( + "math/bits" + + "go.sia.tech/core/types" +) + +func (s *Store) updateLeaves(dbTxn txn, update consensusUpdate) error { + modifyLeaf := func(stmt *loggedStmt, elem types.StateElement) error { + pos := elem.LeafIndex + for i, h := range elem.MerkleProof { + subtreeSize := uint64(1 << i) + if elem.LeafIndex&(1< s.numLeaves { + s.numLeaves = elem.LeafIndex + 1 + } + return nil + } + + stmt, err := dbTxn.Prepare(`INSERT INTO merkle_proofs(i, j, hash) VALUES (?, ?, ?) ON CONFLICT (i, j) DO UPDATE SET hash = ?;`) + if err != nil { + return err + } + + update.ForEachSiacoinElement(func(sce types.SiacoinElement, spent bool) { + if err != nil { + return + } + err = modifyLeaf(stmt, sce.StateElement) + return + }) + if err != nil { + return err + } + + update.ForEachSiafundElement(func(sce types.SiafundElement, spent bool) { + if err != nil { + return + } + err = modifyLeaf(stmt, sce.StateElement) + return + }) + if err != nil { + return err + } + + update.ForEachFileContractElement(func(sce types.FileContractElement, rev *types.FileContractElement, resolved, valid bool) { + if err != nil { + return + } + err = modifyLeaf(stmt, sce.StateElement) + return + }) + if err != nil { + return err + } + + return nil +} + +// MerkleProof implements explorer.Store. +func (s *Store) MerkleProof(leafIndex uint64) ([]types.Hash256, error) { + proof := make([]types.Hash256, bits.Len64(leafIndex^s.numLeaves)-1) + err := s.transaction(func(tx txn) error { + pos := leafIndex + stmt, err := tx.Prepare("SELECT hash FROM merkle_proofs WHERE i = ? AND j = ?") + if err != nil { + return err + } + for i := range proof { + subtreeSize := uint64(1 << i) + if leafIndex&(1< 10 { - frand.Shuffle(len(peers), reflect.Swapper(peers)) - peers = peers[:10] - } - return peers -} - -func (h *rpcHandler) Block(id types.BlockID) (types.Block, error) { - b, ok := h.s.cm.Block(id) - if !ok { - return types.Block{}, errors.New("block not found") - } - return b, nil -} - -func (h *rpcHandler) BlocksForHistory(history []types.BlockID, max uint64) ([]types.Block, uint64, error) { - return h.s.cm.BlocksForHistory(history, max) -} - -func (h *rpcHandler) Transactions(index types.ChainIndex, txnHashes []types.Hash256) (txns []types.Transaction, v2txns []types.V2Transaction, _ error) { - if b, ok := h.s.cm.Block(index.ID); ok { - // get txns from block - want := make(map[types.Hash256]bool) - for _, h := range txnHashes { - want[h] = true - } - for _, txn := range b.Transactions { - if want[txn.FullHash()] { - txns = append(txns, txn) - } - } - for _, txn := range b.V2Transactions() { - if want[txn.FullHash()] { - v2txns = append(v2txns, txn) - } - } - return - } - txns, v2txns = h.s.cm.TransactionsForPartialBlock(txnHashes) - return -} - -func (h *rpcHandler) Checkpoint(index types.ChainIndex) (types.Block, consensus.State, error) { - b, cs, ok := h.s.cm.SyncCheckpoint(index) - if !ok { - return types.Block{}, consensus.State{}, errors.New("checkpoint not found") - } - return b, cs, nil -} - -func (h *rpcHandler) RelayHeader(bh gateway.BlockHeader, origin *gateway.Peer) { - if _, ok := h.s.cm.Block(bh.ID()); ok { - return // already seen - } else if _, ok := h.s.cm.Block(bh.ParentID); !ok { - h.s.log.Printf("peer %v relayed a header with unknown parent (%v); triggering a resync", origin, bh.ParentID) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } else if cs := h.s.cm.TipState(); bh.ParentID != cs.Index.ID { - // block extends a sidechain, which peer (if honest) believes to be the - // heaviest chain - h.s.log.Printf("peer %v relayed a header that does not attach to our tip; triggering a resync", origin) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } else if bh.ID().CmpWork(cs.ChildTarget) < 0 { - h.s.ban(origin, errors.New("peer sent header with insufficient work")) - return - } - - // header is valid and attaches to our tip; request + validate full block - if b, err := origin.SendBlock(bh.ID(), h.s.config.SendBlockTimeout); err != nil { - // log-worthy, but not ban-worthy - h.s.log.Printf("couldn't retrieve new block %v after header relay from %v: %v", bh.ID(), origin, err) - return - } else if err := h.s.cm.AddBlocks([]types.Block{b}); err != nil { - h.s.ban(origin, err) - return - } - - h.s.relayHeader(bh, origin) // non-blocking -} - -func (h *rpcHandler) RelayTransactionSet(txns []types.Transaction, origin *gateway.Peer) { - // if we've already seen these transactions, don't relay them again - for _, txn := range txns { - if _, ok := h.s.cm.PoolTransaction(txn.ID()); !ok { - goto add - } - } - return - -add: - if err := h.s.cm.AddPoolTransactions(txns); err != nil { - // too risky to ban here (txns are probably just outdated), but at least - // log it if we think we're synced - if b, ok := h.s.cm.Block(h.s.cm.Tip().ID); ok && time.Since(b.Timestamp) < 2*h.s.cm.TipState().BlockInterval() { - h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err) - } - return - } - h.s.relayTransactionSet(txns, origin) // non-blocking -} - -func (h *rpcHandler) RelayV2Header(bh gateway.V2BlockHeader, origin *gateway.Peer) { - if _, ok := h.s.cm.Block(bh.Parent.ID); !ok { - h.s.log.Printf("peer %v relayed a v2 header with unknown parent (%v); triggering a resync", origin, bh.Parent.ID) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } - cs := h.s.cm.TipState() - bid := bh.ID(cs) - if _, ok := h.s.cm.Block(bid); ok { - // already seen - return - } else if bh.Parent.ID != cs.Index.ID { - // block extends a sidechain, which peer (if honest) believes to be the - // heaviest chain - h.s.log.Printf("peer %v relayed a header that does not attach to our tip; triggering a resync", origin) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } else if bid.CmpWork(cs.ChildTarget) < 0 { - h.s.ban(origin, errors.New("peer sent header with insufficient work")) - return - } - - // header is sufficiently valid; relay it - // - // NOTE: The purpose of header announcements is to inform the network as - // quickly as possible that a new block has been found. A proper - // BlockOutline should follow soon after, allowing peers to obtain the - // actual block. As such, we take no action here other than relaying. - h.s.relayV2Header(bh, origin) // non-blocking -} - -func (h *rpcHandler) RelayV2BlockOutline(bo gateway.V2BlockOutline, origin *gateway.Peer) { - if _, ok := h.s.cm.Block(bo.ParentID); !ok { - h.s.log.Printf("peer %v relayed a header with unknown parent (%v); triggering a resync", origin, bo.ParentID) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } - cs := h.s.cm.TipState() - bid := bo.ID(cs) - if _, ok := h.s.cm.Block(bid); ok { - // already seen - return - } else if bo.ParentID != cs.Index.ID { - // block extends a sidechain, which peer (if honest) believes to be the - // heaviest chain - h.s.log.Printf("peer %v relayed a header that does not attach to our tip; triggering a resync", origin) - h.s.mu.Lock() - h.s.synced[origin.Addr] = false - h.s.mu.Unlock() - return - } else if bid.CmpWork(cs.ChildTarget) < 0 { - h.s.ban(origin, errors.New("peer sent header with insufficient work")) - return - } - - // block has sufficient work and attaches to our tip, but may be missing - // transactions; first, check for them in our txpool; then, if block is - // still incomplete, request remaining transactions from the peer - txns, v2txns := h.s.cm.TransactionsForPartialBlock(bo.Missing()) - b, missing := bo.Complete(cs, txns, v2txns) - if len(missing) > 0 { - index := types.ChainIndex{ID: bid, Height: cs.Index.Height + 1} - txns, v2txns, err := origin.SendTransactions(index, missing, h.s.config.SendTransactionsTimeout) - if err != nil { - // log-worthy, but not ban-worthy - h.s.log.Printf("couldn't retrieve missing transactions of %v after relay from %v: %v", bid, origin, err) - return - } - b, missing = bo.Complete(cs, txns, v2txns) - if len(missing) > 0 { - // inexcusable - h.s.ban(origin, errors.New("peer sent wrong missing transactions for a block it relayed")) - return - } - } - if err := h.s.cm.AddBlocks([]types.Block{b}); err != nil { - h.s.ban(origin, err) - return - } - - // when we forward the block, exclude any txns that were in our txpool, - // since they're probably present in our peers' txpools as well - // - // NOTE: crucially, we do NOT exclude any txns we had to request from the - // sending peer, since other peers probably don't have them either - bo.RemoveTransactions(txns, v2txns) - - h.s.relayV2BlockOutline(bo, origin) // non-blocking -} - -func (h *rpcHandler) RelayV2TransactionSet(txns []types.V2Transaction, origin *gateway.Peer) { - // if we've already seen these transactions, don't relay them again - for _, txn := range txns { - if _, ok := h.s.cm.V2PoolTransaction(txn.ID()); !ok { - goto add - } - } - return - -add: - if err := h.s.cm.AddV2PoolTransactions(txns); err != nil { - // too risky to ban here (txns are probably just outdated), but at least - // log it if we think we're synced - if b, ok := h.s.cm.Block(h.s.cm.Tip().ID); ok && time.Since(b.Timestamp) < 2*h.s.cm.TipState().BlockInterval() { - h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err) - } - return - } - h.s.relayV2TransactionSet(txns, origin) // non-blocking -} - -func (s *Syncer) ban(p *gateway.Peer, err error) { - p.SetErr(errors.New("banned")) - s.pm.Ban(p.ConnAddr, 24*time.Hour, err.Error()) - - host, _, err := net.SplitHostPort(p.ConnAddr) - if err != nil { - return // shouldn't happen - } - // add a strike to each subnet - for subnet, maxStrikes := range map[string]int{ - Subnet(host + "/32"): 2, // 1.2.3.4:* - Subnet(host + "/24"): 8, // 1.2.3.* - Subnet(host + "/16"): 64, // 1.2.* - Subnet(host + "/8"): 512, // 1.* - } { - s.mu.Lock() - ban := (s.strikes[subnet] + 1) >= maxStrikes - if ban { - delete(s.strikes, subnet) - } else { - s.strikes[subnet]++ - } - s.mu.Unlock() - if ban { - s.pm.Ban(subnet, 24*time.Hour, "too many strikes") - } - } -} - -func (s *Syncer) runPeer(p *gateway.Peer) { - s.pm.AddPeer(p.Addr) - s.pm.UpdatePeerInfo(p.Addr, func(info *PeerInfo) { - info.LastConnect = time.Now() - }) - s.mu.Lock() - s.peers[p.Addr] = p - s.mu.Unlock() - defer func() { - s.mu.Lock() - delete(s.peers, p.Addr) - s.mu.Unlock() - }() - - h := &rpcHandler{s: s} - inflight := make(chan struct{}, s.config.MaxInflightRPCs) - for { - if p.Err() != nil { - return - } - id, stream, err := p.AcceptRPC() - if err != nil { - p.SetErr(err) - return - } - inflight <- struct{}{} - go func() { - defer stream.Close() - // NOTE: we do not set any deadlines on the stream. If a peer is - // slow, fine; we don't need to worry about resource exhaustion - // unless we have tons of peers. - if err := p.HandleRPC(id, stream, h); err != nil { - s.log.Printf("incoming RPC %v from peer %v failed: %v", id, p, err) - } - <-inflight - }() - } -} - -func (s *Syncer) relayHeader(h gateway.BlockHeader, origin *gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p == origin { - continue - } - go p.RelayHeader(h, s.config.RelayHeaderTimeout) - } -} - -func (s *Syncer) relayTransactionSet(txns []types.Transaction, origin *gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p == origin { - continue - } - go p.RelayTransactionSet(txns, s.config.RelayTransactionSetTimeout) - } -} - -func (s *Syncer) relayV2Header(bh gateway.V2BlockHeader, origin *gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p == origin || !p.SupportsV2() { - continue - } - go p.RelayV2Header(bh, s.config.RelayHeaderTimeout) - } -} - -func (s *Syncer) relayV2BlockOutline(pb gateway.V2BlockOutline, origin *gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p == origin || !p.SupportsV2() { - continue - } - go p.RelayV2BlockOutline(pb, s.config.RelayBlockOutlineTimeout) - } -} - -func (s *Syncer) relayV2TransactionSet(txns []types.V2Transaction, origin *gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p == origin || !p.SupportsV2() { - continue - } - go p.RelayV2TransactionSet(txns, s.config.RelayTransactionSetTimeout) - } -} - -func (s *Syncer) allowConnect(peer string, inbound bool) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.l == nil { - return errors.New("syncer is shutting down") - } - if s.pm.Banned(peer) { - return errors.New("banned") - } - var in, out int - for _, p := range s.peers { - if p.Inbound { - in++ - } else { - out++ - } - } - // TODO: subnet-based limits - if inbound && in >= s.config.MaxInboundPeers { - return errors.New("too many inbound peers") - } else if !inbound && out >= s.config.MaxOutboundPeers { - return errors.New("too many outbound peers") - } - return nil -} - -func (s *Syncer) alreadyConnected(peer *gateway.Peer) bool { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if p.UniqueID == peer.UniqueID { - return true - } - } - return false -} - -func (s *Syncer) acceptLoop() error { - for { - conn, err := s.l.Accept() - if err != nil { - return err - } - go func() { - defer conn.Close() - if err := s.allowConnect(conn.RemoteAddr().String(), true); err != nil { - s.log.Printf("rejected inbound connection from %v: %v", conn.RemoteAddr(), err) - } else if p, err := gateway.Accept(conn, s.header); err != nil { - s.log.Printf("failed to accept inbound connection from %v: %v", conn.RemoteAddr(), err) - } else if s.alreadyConnected(p) { - s.log.Printf("rejected inbound connection from %v: already connected", conn.RemoteAddr()) - } else { - s.runPeer(p) - } - }() - } -} - -func (s *Syncer) peerLoop(closeChan <-chan struct{}) error { - numOutbound := func() (n int) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if !p.Inbound { - n++ - } - } - return - } - - lastTried := make(map[string]time.Time) - peersForConnect := func() (peers []string) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.pm.Peers() { - // TODO: don't include port in comparison - if _, ok := s.peers[p]; !ok && time.Since(lastTried[p]) > 5*time.Minute { - peers = append(peers, p) - } - } - // TODO: weighted random selection? - frand.Shuffle(len(peers), reflect.Swapper(peers)) - return peers - } - discoverPeers := func() { - // try up to three randomly-chosen peers - var peers []*gateway.Peer - s.mu.Lock() - for _, p := range s.peers { - if peers = append(peers, p); len(peers) >= 3 { - break - } - } - s.mu.Unlock() - for _, p := range peers { - nodes, err := p.ShareNodes(s.config.ShareNodesTimeout) - if err != nil { - continue - } - for _, n := range nodes { - s.pm.AddPeer(n) - } - } - } - - ticker := time.NewTicker(s.config.PeerDiscoveryInterval) - defer ticker.Stop() - sleep := func() bool { - select { - case <-ticker.C: - return true - case <-closeChan: - return false - } - } - closing := func() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.l == nil - } - for fst := true; fst || sleep(); fst = false { - if numOutbound() >= s.config.MaxOutboundPeers { - continue - } - candidates := peersForConnect() - if len(candidates) == 0 { - discoverPeers() - continue - } - for _, p := range candidates { - if numOutbound() >= s.config.MaxOutboundPeers || closing() { - break - } - if _, err := s.Connect(p); err == nil { - s.log.Printf("formed outbound connection to %v", p) - } else { - s.log.Printf("failed to form outbound connection to %v: %v", p, err) - } - lastTried[p] = time.Now() - } - } - return nil -} - -func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { - peersForSync := func() (peers []*gateway.Peer) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.peers { - if s.synced[p.Addr] { - continue - } - if peers = append(peers, p); len(peers) >= 3 { - break - } - } - return - } - - ticker := time.NewTicker(s.config.SyncInterval) - defer ticker.Stop() - sleep := func() bool { - select { - case <-ticker.C: - return true - case <-closeChan: - return false - } - } - for fst := true; fst || sleep(); fst = false { - for _, p := range peersForSync() { - history, err := s.cm.History() - if err != nil { - return err // generally fatal - } - s.mu.Lock() - s.synced[p.Addr] = true - s.mu.Unlock() - s.log.Printf("starting sync with %v", p) - oldTip := s.cm.Tip() - oldTime := time.Now() - lastPrint := time.Now() - startTime, startHeight := oldTime, oldTip.Height - addBlocks := func(blocks []types.Block) error { - if err := s.cm.AddBlocks(blocks); err != nil { - return err - } - endTime, endHeight := time.Now(), s.cm.Tip().Height - s.pm.UpdatePeerInfo(p.Addr, func(info *PeerInfo) { - info.SyncedBlocks += endHeight - startHeight - info.SyncDuration += endTime.Sub(startTime) - }) - startTime, startHeight = endTime, endHeight - if time.Since(lastPrint) > 30*time.Second { - s.log.Printf("syncing with %v, tip now %v (avg %.2f blocks/s)", p, s.cm.Tip(), float64(s.cm.Tip().Height-oldTip.Height)/endTime.Sub(oldTime).Seconds()) - lastPrint = time.Now() - } - return nil - } - if p.SupportsV2() { - history := history[:] - err = func() error { - for { - blocks, rem, err := p.SendV2Blocks(history, s.config.MaxSendBlocks, s.config.SendBlocksTimeout) - if err != nil { - return err - } else if addBlocks(blocks); err != nil { - return err - } else if rem == 0 { - return nil - } - history = []types.BlockID{blocks[len(blocks)-1].ID()} - } - }() - } else { - err = p.SendBlocks(history, s.config.SendBlocksTimeout, addBlocks) - } - totalBlocks := s.cm.Tip().Height - oldTip.Height - if err != nil { - s.log.Printf("syncing with %v failed after %v blocks: %v", p, totalBlocks, err) - } else if newTip := s.cm.Tip(); newTip != oldTip { - s.log.Printf("finished syncing %v blocks with %v, tip now %v", totalBlocks, p, newTip) - } else { - s.log.Printf("finished syncing with %v, tip unchanged", p) - } - } - } - return nil -} - -// Run spawns goroutines for accepting inbound connections, forming outbound -// connections, and syncing the blockchain from active peers. It blocks until an -// error occurs, upon which all connections are closed and goroutines are -// terminated. To gracefully shutdown a Syncer, close its net.Listener. -func (s *Syncer) Run() error { - errChan := make(chan error) - closeChan := make(chan struct{}) - go func() { errChan <- s.acceptLoop() }() - go func() { errChan <- s.peerLoop(closeChan) }() - go func() { errChan <- s.syncLoop(closeChan) }() - err := <-errChan - - // when one goroutine exits, shutdown and wait for the others - close(closeChan) - s.l.Close() - s.mu.Lock() - s.l = nil - for addr, p := range s.peers { - p.Close() - delete(s.peers, addr) - } - s.mu.Unlock() - <-errChan - <-errChan - if errors.Is(err, net.ErrClosed) { - return nil // graceful shutdown - } - return err -} - -// Connect forms an outbound connection to a peer. -func (s *Syncer) Connect(addr string) (*gateway.Peer, error) { - if err := s.allowConnect(addr, false); err != nil { - return nil, err - } - ctx, cancel := context.WithTimeout(context.Background(), s.config.ConnectTimeout) - defer cancel() - // slightly gross polling hack so that we shutdown quickly - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(100 * time.Millisecond): - s.mu.Lock() - if s.l == nil { - cancel() - } - s.mu.Unlock() - } - } - }() - conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr) - if err != nil { - return nil, err - } - conn.SetDeadline(time.Now().Add(s.config.ConnectTimeout)) - defer conn.SetDeadline(time.Time{}) - p, err := gateway.Dial(conn, s.header) - if err != nil { - conn.Close() - return nil, err - } else if s.alreadyConnected(p) { - conn.Close() - return nil, errors.New("already connected") - } - go s.runPeer(p) - - // runPeer does this too, but doing it outside the goroutine prevents a race - s.mu.Lock() - s.peers[p.Addr] = p - s.mu.Unlock() - return p, nil -} - -// BroadcastHeader broadcasts a header to all peers. -func (s *Syncer) BroadcastHeader(h gateway.BlockHeader) { s.relayHeader(h, nil) } - -// BroadcastV2Header broadcasts a v2 header to all peers. -func (s *Syncer) BroadcastV2Header(h gateway.V2BlockHeader) { s.relayV2Header(h, nil) } - -// BroadcastV2BlockOutline broadcasts a v2 block outline to all peers. -func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) } - -// BroadcastTransactionSet broadcasts a transaction set to all peers. -func (s *Syncer) BroadcastTransactionSet(txns []types.Transaction) { s.relayTransactionSet(txns, nil) } - -// BroadcastV2TransactionSet broadcasts a v2 transaction set to all peers. -func (s *Syncer) BroadcastV2TransactionSet(txns []types.V2Transaction) { - s.relayV2TransactionSet(txns, nil) -} - -// Peers returns the set of currently-connected peers. -func (s *Syncer) Peers() []*gateway.Peer { - s.mu.Lock() - defer s.mu.Unlock() - var peers []*gateway.Peer - for _, p := range s.peers { - peers = append(peers, p) - } - return peers -} - -// PeerInfo returns metadata about the specified peer. -func (s *Syncer) PeerInfo(peer string) (PeerInfo, bool) { - s.mu.Lock() - defer s.mu.Unlock() - info, ok := s.pm.PeerInfo(peer) - return info, ok -} - -// Addr returns the address of the Syncer. -func (s *Syncer) Addr() string { - return s.l.Addr().String() -} - -// New returns a new Syncer. -func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, opts ...Option) *Syncer { - config := config{ - MaxInboundPeers: 8, - MaxOutboundPeers: 8, - MaxInflightRPCs: 3, - ConnectTimeout: 5 * time.Second, - ShareNodesTimeout: 5 * time.Second, - SendBlockTimeout: 60 * time.Second, - SendTransactionsTimeout: 60 * time.Second, - RelayHeaderTimeout: 5 * time.Second, - RelayBlockOutlineTimeout: 60 * time.Second, - RelayTransactionSetTimeout: 60 * time.Second, - SendBlocksTimeout: 120 * time.Second, - MaxSendBlocks: 10, - PeerDiscoveryInterval: 5 * time.Second, - SyncInterval: 5 * time.Second, - Logger: log.New(io.Discard, "", 0), - } - for _, opt := range opts { - opt(&config) - } - return &Syncer{ - l: l, - cm: cm, - pm: pm, - header: header, - config: config, - log: config.Logger, - peers: make(map[string]*gateway.Peer), - synced: make(map[string]bool), - } -}