Skip to content

Commit

Permalink
Merge pull request #674 from SiaFoundation/pj/host-announce-window
Browse files Browse the repository at this point in the history
Host Announce Window
  • Loading branch information
ChrisSchinnerl authored Oct 17, 2023
2 parents fe8b497 + 7716bea commit 7131fa7
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 42 deletions.
7 changes: 4 additions & 3 deletions bus/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func newTestClient(dir string) (*client.Client, func() error, func(context.Conte
client := client.New("http://"+l.Addr().String(), "test")
b, cleanup, err := node.NewBus(node.BusConfig{
Bus: config.Bus{
Bootstrap: false,
GatewayAddr: "127.0.0.1:0",
UsedUTXOExpiry: time.Minute,
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: false,
GatewayAddr: "127.0.0.1:0",
UsedUTXOExpiry: time.Minute,
},
Miner: node.NewMiner(client),
}, filepath.Join(dir, "bus"), types.GeneratePrivateKey(), zap.New(zapcore.NewNopCore()))
Expand Down
2 changes: 2 additions & 0 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
Level: "warn",
},
Bus: config.Bus{
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: true,
GatewayAddr: build.DefaultGatewayAddress,
PersistInterval: time.Minute,
Expand Down Expand Up @@ -276,6 +277,7 @@ func main() {
flag.DurationVar(&cfg.Database.Log.SlowThreshold, "db.logger.slowThreshold", cfg.Database.Log.SlowThreshold, "slow threshold for logger - can be overwritten using RENTERD_DB_LOGGER_SLOW_THRESHOLD environment variable")

// bus
flag.Uint64Var(&cfg.Bus.AnnouncementMaxAgeHours, "bus.announcementMaxAgeHours", cfg.Bus.AnnouncementMaxAgeHours, "announcements older than this are ignored")
flag.BoolVar(&cfg.Bus.Bootstrap, "bus.bootstrap", cfg.Bus.Bootstrap, "bootstrap the gateway and consensus modules")
flag.StringVar(&cfg.Bus.GatewayAddr, "bus.gatewayAddr", cfg.Bus.GatewayAddr, "address to listen on for Sia peer connections - can be overwritten using RENTERD_BUS_GATEWAY_ADDR environment variable")
flag.DurationVar(&cfg.Bus.PersistInterval, "bus.persistInterval", cfg.Bus.PersistInterval, "interval at which to persist the consensus updates")
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (

// Bus contains the configuration for a bus.
Bus struct {
AnnouncementMaxAgeHours uint64 `yaml:"announcementMaxAgeHours"`
Bootstrap bool `yaml:"bootstrap"`
GatewayAddr string `yaml:"gatewayAddr"`
RemoteAddr string `yaml:"remoteAddr"`
Expand Down
4 changes: 3 additions & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
Expand Down Expand Up @@ -97,7 +98,8 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht
sqlLogger := stores.NewSQLLogger(l.Named("db"), cfg.DBLoggerConfig)
walletAddr := wallet.StandardAddress(seed.PublicKey())
sqlStoreDir := filepath.Join(dir, "partial_slabs")
sqlStore, ccid, err := stores.NewSQLStore(dbConn, alerts.WithOrigin(alertsMgr, "bus"), sqlStoreDir, true, cfg.PersistInterval, walletAddr, cfg.SlabBufferCompletionThreshold, l.Sugar(), sqlLogger)
announcementMaxAge := time.Duration(cfg.AnnouncementMaxAgeHours) * time.Hour
sqlStore, ccid, err := stores.NewSQLStore(dbConn, alerts.WithOrigin(alertsMgr, "bus"), sqlStoreDir, true, announcementMaxAge, cfg.PersistInterval, walletAddr, cfg.SlabBufferCompletionThreshold, l.Sugar(), sqlLogger)
if err != nil {
return nil, nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions internal/testing/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,10 +926,11 @@ func testNetwork() *consensus.Network {
func testBusCfg() node.BusConfig {
return node.BusConfig{
Bus: config.Bus{
Bootstrap: false,
GatewayAddr: "127.0.0.1:0",
PersistInterval: testPersistInterval,
UsedUTXOExpiry: time.Minute,
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: false,
GatewayAddr: "127.0.0.1:0",
PersistInterval: testPersistInterval,
UsedUTXOExpiry: time.Minute,
},
Network: testNetwork(),
}
Expand Down
21 changes: 0 additions & 21 deletions stores/autopilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,3 @@ func TestAutopilotStore(t *testing.T) {
t.Fatal("expected amount to be 99")
}
}

// testAutopilotConfig is the autopilot used for testing unless a different
// one is explicitly set.
var testAutopilotConfig = api.AutopilotConfig{
Contracts: api.ContractsConfig{
Allowance: types.Siacoins(1).Mul64(1e3),
Amount: 3,
Period: 144,
RenewWindow: 72,

Download: rhpv2.SectorSize * 500,
Upload: rhpv2.SectorSize * 500,
Storage: rhpv2.SectorSize * 5e3,

Set: testContractSet,
},
Hosts: api.HostsConfig{
MaxDowntimeHours: 10,
AllowRedundantIPs: true, // allow for integration tests by default
},
}
22 changes: 11 additions & 11 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ const (
// database per batch. Empirically tested to verify that this is a value
// that performs reasonably well.
hostRetrievalBatchSize = 10000

// interactionInsertionBatchSize is the number of interactions we insert at
// once.
interactionInsertionBatchSize = 100
)

var (
Expand Down Expand Up @@ -919,16 +915,20 @@ func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) {

var newAnnouncements []announcement
for _, sb := range cc.AppliedBlocks {
// Fetch announcements and add them to the queue.
var b types.Block
convertToCore(sb, (*types.V1Block)(&b))
hostdb.ForEachAnnouncement(types.Block(b), height, func(hostKey types.PublicKey, ha hostdb.Announcement) {
newAnnouncements = append(newAnnouncements, announcement{
hostKey: publicKey(hostKey),
announcement: ha,

// Process announcements, but only if they are not too old.
if b.Timestamp.After(time.Now().Add(-ss.announcementMaxAge)) {
hostdb.ForEachAnnouncement(types.Block(b), height, func(hostKey types.PublicKey, ha hostdb.Announcement) {
newAnnouncements = append(newAnnouncements, announcement{
hostKey: publicKey(hostKey),
announcement: ha,
})
ss.unappliedHostKeys[hostKey] = struct{}{}
})
ss.unappliedHostKeys[hostKey] = struct{}{}
})
}

// Update RevisionHeight and RevisionNumber for our contracts.
for _, txn := range sb.Transactions {
for _, rev := range txn.FileContractRevisions {
Expand Down
61 changes: 61 additions & 0 deletions stores/hostdb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stores

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -9,10 +10,12 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"gitlab.com/NebulousLabs/encoding"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/siad/crypto"
"go.sia.tech/siad/modules"
stypes "go.sia.tech/siad/types"
"gorm.io/gorm"
Expand Down Expand Up @@ -1039,6 +1042,39 @@ func TestSQLHostBlocklistBasic(t *testing.T) {
}
}

// TestAnnouncementMaxAge verifies old announcements are ignored.
func TestAnnouncementMaxAge(t *testing.T) {
db := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer db.Close()

if len(db.unappliedAnnouncements) != 0 {
t.Fatal("expected 0 announcements")
}

db.processConsensusChangeHostDB(
modules.ConsensusChange{
ID: modules.ConsensusChangeID{1},
BlockHeight: 1,
AppliedBlocks: []stypes.Block{
{
Timestamp: stypes.Timestamp(time.Now().Add(-time.Hour).Add(-time.Minute).Unix()),
Transactions: []stypes.Transaction{newTestTransaction(newTestHostAnnouncement("foo.com:1000"))},
},
{
Timestamp: stypes.Timestamp(time.Now().Add(-time.Hour).Add(time.Minute).Unix()),
Transactions: []stypes.Transaction{newTestTransaction(newTestHostAnnouncement("foo.com:1001"))},
},
},
},
)

if len(db.unappliedAnnouncements) != 1 {
t.Fatal("expected 1 announcement")
} else if db.unappliedAnnouncements[0].announcement.NetAddress != "foo.com:1001" {
t.Fatal("unexpected announcement")
}
}

// addTestHosts adds 'n' hosts to the db and returns their keys.
func (s *SQLStore) addTestHosts(n int) (keys []types.PublicKey, err error) {
cnt, err := s.contractsCount()
Expand Down Expand Up @@ -1098,3 +1134,28 @@ func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSett
Settings: settings,
}
}

func newTestPK() (stypes.SiaPublicKey, types.PrivateKey) {
sk := types.GeneratePrivateKey()
pk := sk.PublicKey()
return stypes.SiaPublicKey{
Algorithm: stypes.SignatureEd25519,
Key: pk[:],
}, sk
}

func newTestHostAnnouncement(na modules.NetAddress) (modules.HostAnnouncement, types.PrivateKey) {
spk, sk := newTestPK()
return modules.HostAnnouncement{
Specifier: modules.PrefixHostAnnouncement,
NetAddress: na,
PublicKey: spk,
}, sk
}

func newTestTransaction(ha modules.HostAnnouncement, sk types.PrivateKey) stypes.Transaction {
var buf bytes.Buffer
buf.Write(encoding.Marshal(ha))
buf.Write(encoding.Marshal(sk.SignHash(types.Hash256(crypto.HashObject(ha)))))
return stypes.Transaction{ArbitraryData: [][]byte{buf.Bytes()}}
}
12 changes: 11 additions & 1 deletion stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type (
unappliedOutputChanges []outputChange
unappliedTxnChanges []txnChange

// HostDB related fields
announcementMaxAge time.Duration

// SettingsDB related fields.
settingsMu sync.Mutex
settings map[string]string
Expand Down Expand Up @@ -127,7 +130,12 @@ func DBConfigFromEnv() (uri, user, password, dbName string) {
// NewSQLStore uses a given Dialector to connect to a SQL database. NOTE: Only
// pass migrate=true for the first instance of SQLHostDB if you connect via the
// same Dialector multiple times.
func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir string, migrate bool, persistInterval time.Duration, walletAddress types.Address, slabBufferCompletionThreshold int64, logger *zap.SugaredLogger, gormLogger glogger.Interface) (*SQLStore, modules.ConsensusChangeID, error) {
func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir string, migrate bool, announcementMaxAge, persistInterval time.Duration, walletAddress types.Address, slabBufferCompletionThreshold int64, logger *zap.SugaredLogger, gormLogger glogger.Interface) (*SQLStore, modules.ConsensusChangeID, error) {
// Sanity check announcement max age.
if announcementMaxAge == 0 {
return nil, modules.ConsensusChangeID{}, errors.New("announcementMaxAge must be non-zero")
}

if err := os.MkdirAll(partialSlabDir, 0700); err != nil {
return nil, modules.ConsensusChangeID{}, fmt.Errorf("failed to create partial slab dir: %v", err)
}
Expand Down Expand Up @@ -202,6 +210,8 @@ func NewSQLStore(conn gorm.Dialector, alerts alerts.Alerter, partialSlabDir stri
unappliedRevisions: make(map[types.FileContractID]revisionUpdate),
unappliedProofs: make(map[types.FileContractID]uint64),

announcementMaxAge: announcementMaxAge,

walletAddress: walletAddress,
chainIndex: types.ChainIndex{
Height: ci.Height,
Expand Down
2 changes: 1 addition & 1 deletion stores/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newTestSQLStore(t *testing.T, cfg testSQLStoreConfig) *testSQLStore {
conn := NewEphemeralSQLiteConnection(dbName)
walletAddrs := types.Address(frand.Entropy256())
alerts := alerts.WithOrigin(alerts.NewManager(), "test")
sqlStore, ccid, err := NewSQLStore(conn, alerts, dir, !cfg.skipMigrate, time.Second, walletAddrs, 0, zap.NewNop().Sugar(), newTestLogger())
sqlStore, ccid, err := NewSQLStore(conn, alerts, dir, !cfg.skipMigrate, time.Hour, time.Second, walletAddrs, 0, zap.NewNop().Sugar(), newTestLogger())
if err != nil {
t.Fatal("failed to create SQLStore", err)
}
Expand Down

0 comments on commit 7131fa7

Please sign in to comment.