Skip to content

Commit

Permalink
Merge pull request #57 from SiaFoundation:host-scanning
Browse files Browse the repository at this point in the history
Host scanning
  • Loading branch information
n8maninger authored Sep 6, 2024
2 parents dc6db55 + 9b0b6cc commit fc13382
Show file tree
Hide file tree
Showing 18 changed files with 2,713 additions and 30 deletions.
12 changes: 12 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (c *Client) ContractsKey(key types.PublicKey) (resp []explorer.FileContract
return
}

// Host returns information about the host with a given ed25519 key.
func (c *Client) Host(key types.PublicKey) (resp explorer.Host, err error) {
err = c.c.GET(fmt.Sprintf("/pubkey/%s/host", key), &resp)
return
}

// BlockMetrics returns the most recent metrics about the Sia blockchain.
func (c *Client) BlockMetrics() (resp explorer.Metrics, err error) {
err = c.c.GET("/metrics/block", &resp)
Expand All @@ -190,6 +196,12 @@ func (c *Client) BlockMetricsID(id types.BlockID) (resp explorer.Metrics, err er
return
}

// HostMetrics returns various metrics about currently available hosts.
func (c *Client) HostMetrics() (resp explorer.HostMetrics, err error) {
err = c.c.GET("/metrics/host", &resp)
return
}

// Search returns what type of object an ID is.
func (c *Client) Search(id types.Hash256) (resp explorer.SearchType, err error) {
err = c.c.GET(fmt.Sprintf("/search/%s", id), &resp)
Expand Down
30 changes: 30 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
Block(id types.BlockID) (explorer.Block, error)
BestTip(height uint64) (types.ChainIndex, error)
Metrics(id types.BlockID) (explorer.Metrics, error)
HostMetrics() (explorer.HostMetrics, error)
Transactions(ids []types.TransactionID) ([]explorer.Transaction, error)
Balance(address types.Address) (sc types.Currency, immatureSC types.Currency, sf uint64, err error)
SiacoinElements(ids []types.SiacoinOutputID) (result []explorer.SiacoinOutput, err error)
Expand All @@ -61,6 +62,8 @@ type (
Contracts(ids []types.FileContractID) (result []explorer.FileContract, err error)
ContractsKey(key types.PublicKey) (result []explorer.FileContract, err error)
Search(id types.Hash256) (explorer.SearchType, error)

Hosts(pks []types.PublicKey) ([]explorer.Host, error)
}
)

Expand Down Expand Up @@ -222,6 +225,14 @@ func (s *server) blocksMetricsIDHandler(jc jape.Context) {
jc.Encode(metrics)
}

func (s *server) hostMetricsHandler(jc jape.Context) {
metrics, err := s.e.HostMetrics()
if jc.Check("failed to get host metrics", err) != nil {
return
}
jc.Encode(metrics)
}

func (s *server) blocksIDHandler(jc jape.Context) {
var id types.BlockID
if jc.DecodeParam("id", &id) != nil {
Expand Down Expand Up @@ -430,6 +441,23 @@ func (s *server) pubkeyContractsHandler(jc jape.Context) {
jc.Encode(fcs)
}

func (s *server) pubkeyHostHandler(jc jape.Context) {
errNotFound := errors.New("host not found")

var key types.PublicKey
if jc.DecodeParam("key", &key) != nil {
return
}
hosts, err := s.e.Hosts([]types.PublicKey{key})
if jc.Check("failed to get host", err) != nil {
return
} else if len(hosts) == 0 {
jc.Error(errNotFound, http.StatusNotFound)
return
}
jc.Encode(hosts[0])
}

func (s *server) searchIDHandler(jc jape.Context) {
errNotFound := errors.New("no contract found")
const maxLen = len(types.Hash256{})
Expand Down Expand Up @@ -494,9 +522,11 @@ func NewServer(e Explorer, cm ChainManager, s Syncer) http.Handler {
"POST /contracts": srv.contractsBatchHandler,

"GET /pubkey/:key/contracts": srv.pubkeyContractsHandler,
"GET /pubkey/:key/host": srv.pubkeyHostHandler,

"GET /metrics/block": srv.blocksMetricsHandler,
"GET /metrics/block/:id": srv.blocksMetricsIDHandler,
"GET /metrics/host": srv.hostMetricsHandler,

"GET /search/:id": srv.searchIDHandler,
})
Expand Down
11 changes: 10 additions & 1 deletion cmd/explored/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ var cfg = config.Config{
Bootstrap: true,
EnableUPNP: false,
},
Scanner: config.Scanner{
Threads: 10,
Timeout: 30 * time.Second,
MaxLastScan: 3 * time.Hour,
MinLastAnnouncement: 90 * 24 * time.Hour,
},
Consensus: config.Consensus{
Network: "mainnet",
},
Expand Down Expand Up @@ -344,11 +350,14 @@ func main() {
defer s.Close()
go s.Run(ctx)

e, err := explorer.NewExplorer(cm, store, cfg.Index.BatchSize, log.Named("explorer"))
e, err := explorer.NewExplorer(cm, store, cfg.Index.BatchSize, cfg.Scanner, log.Named("explorer"))
if err != nil {
log.Error("failed to create explorer", zap.Error(err))
return
}
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer timeoutCancel()
defer e.Shutdown(timeoutCtx)

api := api.NewServer(e, cm, s)
server := &http.Server{
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package config

import "time"

type (
// HTTP contains the configuration for the HTTP server.
HTTP struct {
Expand All @@ -14,6 +16,14 @@ type (
Peers []string `yaml:"peers,omitempty"`
}

// Scanner contains the configuration for the host scanner.
Scanner struct {
Threads int `yaml:"threads,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty"`
MaxLastScan time.Duration `yaml:"maxLastScan,omitempty"`
MinLastAnnouncement time.Duration `yaml:"minLastAnnouncement,omitempty"`
}

// Consensus contains the configuration for the consensus set.
Consensus struct {
Network string `yaml:"network,omitempty"`
Expand Down Expand Up @@ -56,6 +66,7 @@ type (
HTTP HTTP `yaml:"http,omitempty"`
Consensus Consensus `yaml:"consensus,omitempty"`
Syncer Syncer `yaml:"syncer,omitempty"`
Scanner Scanner `yaml:"scanner,omitempty"`
Log Log `yaml:"log,omitempty"`
Index Index `yaml:"index,omitempty"`
}
Expand Down
87 changes: 71 additions & 16 deletions explorer/explorer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package explorer

import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/explored/config"
"go.uber.org/zap"
)

Expand All @@ -21,6 +25,7 @@ var (
// A ChainManager manages the consensus state
type ChainManager interface {
Tip() types.ChainIndex
TipState() consensus.State
BestIndex(height uint64) (types.ChainIndex, bool)

OnReorg(fn func(types.ChainIndex)) (cancel func())
Expand All @@ -31,12 +36,14 @@ type ChainManager interface {
// and blocks.
type Store interface {
UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error
AddHostScans(scans []HostScan) error

Tip() (types.ChainIndex, error)
Block(id types.BlockID) (Block, error)
BestTip(height uint64) (types.ChainIndex, error)
MerkleProof(leafIndex uint64) ([]types.Hash256, error)
Metrics(id types.BlockID) (Metrics, error)
HostMetrics() (HostMetrics, error)
Transactions(ids []types.TransactionID) ([]Transaction, error)
UnspentSiacoinOutputs(address types.Address, offset, limit uint64) ([]SiacoinOutput, error)
UnspentSiafundOutputs(address types.Address, offset, limit uint64) ([]SiafundOutput, error)
Expand All @@ -46,24 +53,35 @@ type Store interface {
ContractsKey(key types.PublicKey) (result []FileContract, err error)
SiacoinElements(ids []types.SiacoinOutputID) (result []SiacoinOutput, err error)
SiafundElements(ids []types.SiafundOutputID) (result []SiafundOutput, err error)

Hosts(pks []types.PublicKey) ([]Host, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]HostAnnouncement, error)
}

// Explorer implements a Sia explorer.
type Explorer struct {
s Store
mu sync.Mutex
cm ChainManager

scanCfg config.Scanner

log *zap.Logger

wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc

unsubscribe func()
}

func syncStore(store Store, cm ChainManager, index types.ChainIndex, batchSize int) error {
for index != cm.Tip() {
crus, caus, err := cm.UpdatesSince(index, batchSize)
func (e *Explorer) syncStore(index types.ChainIndex, batchSize int) error {
for index != e.cm.Tip() {
crus, caus, err := e.cm.UpdatesSince(index, batchSize)
if err != nil {
return fmt.Errorf("failed to subscribe to chain manager: %w", err)
}

if err := store.UpdateChainState(crus, caus); err != nil {
if err := e.s.UpdateChainState(crus, caus); err != nil {
return fmt.Errorf("failed to process updates: %w", err)
}
if len(crus) > 0 {
Expand All @@ -77,37 +95,45 @@ func syncStore(store Store, cm ChainManager, index types.ChainIndex, batchSize i
}

// NewExplorer returns a Sia explorer.
func NewExplorer(cm ChainManager, store Store, batchSize int, log *zap.Logger) (*Explorer, error) {
e := &Explorer{s: store}
func NewExplorer(cm ChainManager, store Store, batchSize int, scanCfg config.Scanner, log *zap.Logger) (*Explorer, error) {
ctx, ctxCancel := context.WithCancel(context.Background())
e := &Explorer{
s: store,
cm: cm,
scanCfg: scanCfg,
ctx: ctx,
ctxCancel: ctxCancel,
log: log,
}

tip, err := store.Tip()
tip, err := e.s.Tip()
if errors.Is(err, ErrNoTip) {
tip = types.ChainIndex{}
} else if err != nil {
return nil, fmt.Errorf("failed to get tip: %w", err)
}
if err := syncStore(store, cm, tip, batchSize); err != nil {
if err := e.syncStore(tip, batchSize); err != nil {
return nil, fmt.Errorf("failed to subscribe to chain manager: %w", err)
}

reorgChan := make(chan types.ChainIndex, 1)
go func() {
for range reorgChan {
e.mu.Lock()
lastTip, err := store.Tip()
lastTip, err := e.s.Tip()
if errors.Is(err, ErrNoTip) {
lastTip = types.ChainIndex{}
} else if err != nil {
log.Error("failed to get tip", zap.Error(err))
e.log.Error("failed to get tip", zap.Error(err))
}
if err := syncStore(store, cm, lastTip, batchSize); err != nil {
log.Error("failed to sync store", zap.Error(err))
if err := e.syncStore(lastTip, batchSize); err != nil {
e.log.Error("failed to sync store", zap.Error(err))
}
e.mu.Unlock()
}
}()

e.unsubscribe = cm.OnReorg(func(index types.ChainIndex) {
go e.scanHosts()

e.unsubscribe = e.cm.OnReorg(func(index types.ChainIndex) {
select {
case reorgChan <- index:
default:
Expand All @@ -116,6 +142,25 @@ func NewExplorer(cm ChainManager, store Store, batchSize int, log *zap.Logger) (
return e, nil
}

// Shutdown tries to close the scanning goroutines in the explorer.
func (e *Explorer) Shutdown(ctx context.Context) error {
e.ctxCancel()

done := make(chan struct{})
go func() {
e.wg.Wait()
close(done)
}()

// Wait for the WaitGroup to finish or the context to be cancelled
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// Tip returns the tip of the best known valid chain.
func (e *Explorer) Tip() (types.ChainIndex, error) {
return e.s.Tip()
Expand All @@ -141,6 +186,11 @@ func (e *Explorer) Metrics(id types.BlockID) (Metrics, error) {
return e.s.Metrics(id)
}

// HostMetrics returns various metrics about currently available hosts.
func (e *Explorer) HostMetrics() (HostMetrics, error) {
return e.s.HostMetrics()
}

// Transactions returns the transactions with the specified IDs.
func (e *Explorer) Transactions(ids []types.TransactionID) ([]Transaction, error) {
return e.s.Transactions(ids)
Expand Down Expand Up @@ -188,6 +238,11 @@ func (e *Explorer) SiafundElements(ids []types.SiafundOutputID) (result []Siafun
return e.s.SiafundElements(ids)
}

// Hosts returns the hosts with the specified public keys.
func (e *Explorer) Hosts(pks []types.PublicKey) ([]Host, error) {
return e.s.Hosts(pks)
}

// Search returns the element type (address, block, transaction, contract ID)
// for a given ID.
func (e *Explorer) Search(id types.Hash256) (SearchType, error) {
Expand Down
Loading

0 comments on commit fc13382

Please sign in to comment.