From 3609784718913fb08e4069d44294576e7d93d67a Mon Sep 17 00:00:00 2001 From: Nate Date: Wed, 11 Dec 2024 20:34:21 -0800 Subject: [PATCH] feat(cmd,settings,sqlite): Add support for RHP4 announcements --- ...d_support_for_announcing_rhp4_addresses.md | 5 ++ cmd/hostd/run.go | 58 +++++++++++++------ host/settings/announce.go | 32 +++++----- host/settings/announce_test.go | 41 +++++++------ host/settings/options.go | 18 ++++++ host/settings/settings.go | 6 +- host/settings/settings_test.go | 2 +- host/settings/update.go | 14 ++--- persist/sqlite/migrations.go | 22 +++++++ persist/sqlite/store_test.go | 2 +- 10 files changed, 137 insertions(+), 63 deletions(-) create mode 100644 .changeset/added_support_for_announcing_rhp4_addresses.md diff --git a/.changeset/added_support_for_announcing_rhp4_addresses.md b/.changeset/added_support_for_announcing_rhp4_addresses.md new file mode 100644 index 00000000..5dab2acb --- /dev/null +++ b/.changeset/added_support_for_announcing_rhp4_addresses.md @@ -0,0 +1,5 @@ +--- +default: minor +--- + +# Support announcing RHP4 addresses diff --git a/cmd/hostd/run.go b/cmd/hostd/run.go index 578b31f3..6b1ddb00 100644 --- a/cmd/hostd/run.go +++ b/cmd/hostd/run.go @@ -144,18 +144,19 @@ func getRandomOpenPort() (uint16, error) { return uint16(port), nil } -func normalizeAddress(addr string) (string, error) { - host, port, err := net.SplitHostPort(addr) +func normalizeAddress(addr string) (string, uint16, error) { + host, portStr, err := net.SplitHostPort(addr) if err != nil { - return "", err - } else if port == "" || port == "0" { + return "", 0, err + } else if portStr == "" || portStr == "0" { randPort, err := getRandomOpenPort() if err != nil { - return "", fmt.Errorf("failed to get open port: %w", err) + return "", 0, fmt.Errorf("failed to get open port: %w", err) } - return net.JoinHostPort(host, strconv.FormatUint(uint64(randPort), 10)), nil + return net.JoinHostPort(host, strconv.FormatUint(uint64(randPort), 10)), randPort, nil } - return addr, nil + port, err := strconv.ParseUint(portStr, 10, 16) + return addr, uint16(port), err } func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateKey, log *zap.Logger) error { @@ -272,18 +273,14 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK am := alerts.NewManager(alerts.WithEventReporter(wr), alerts.WithLog(log.Named("alerts"))) - rhp3Addr, err := normalizeAddress(cfg.RHP3.TCPAddress) + rhp2Addr, rhp2Port, err := normalizeAddress(cfg.RHP2.Address) if err != nil { - return fmt.Errorf("failed to normalize RHP3 address: %w", err) + return fmt.Errorf("failed to normalize RHP2 address: %w", err) } - _, rhp3PortStr, err := net.SplitHostPort(rhp3Addr) - if err != nil { - return fmt.Errorf("failed to parse rhp3 port: %w", err) - } - rhp3Port, err := strconv.ParseUint(rhp3PortStr, 10, 16) + rhp3Addr, rhp3Port, err := normalizeAddress(cfg.RHP3.TCPAddress) if err != nil { - return fmt.Errorf("failed to parse rhp3 port: %w", err) + return fmt.Errorf("failed to normalize RHP3 address: %w", err) } vm, err := storage.NewVolumeManager(store, storage.WithLogger(log.Named("volumes")), storage.WithAlerter(am)) @@ -292,7 +289,33 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK } defer vm.Close() - sm, err := settings.NewConfigManager(hostKey, store, cm, s, vm, wm, settings.WithAlertManager(am), settings.WithRHP3Port(uint16(rhp3Port)), settings.WithLog(log.Named("settings"))) + var rhp4PortStr string + for _, addr := range cfg.RHP4.ListenAddresses { + _, portStr, err := net.SplitHostPort(addr.Address) + if err != nil { + return fmt.Errorf("failed to parse RHP4 address: %w", err) + } else if rhp4PortStr == "" { + rhp4PortStr = portStr + } else if rhp4PortStr != portStr { + return errors.New("RHP4 listen addresses must all have the same port") + } + } + _, rhp4Port, err := normalizeAddress(net.JoinHostPort("", rhp4PortStr)) + if err != nil { + return fmt.Errorf("failed to normalize RHP4 address: %w", err) + } + // update the listen addresses with the normalized port + for i := range cfg.RHP4.ListenAddresses { + host, _, _ := net.SplitHostPort(cfg.RHP4.ListenAddresses[i].Address) + cfg.RHP4.ListenAddresses[i].Address = net.JoinHostPort(host, strconv.FormatUint(uint64(rhp4Port), 10)) + } + + sm, err := settings.NewConfigManager(hostKey, store, cm, s, vm, wm, + settings.WithAlertManager(am), + settings.WithRHP2Port(uint16(rhp2Port)), + settings.WithRHP3Port(uint16(rhp3Port)), + settings.WithRHP4Port(uint16(rhp4Port)), + settings.WithLog(log.Named("settings"))) if err != nil { return fmt.Errorf("failed to create settings manager: %w", err) } @@ -312,7 +335,7 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK dr := rhp.NewDataRecorder(store, log.Named("data")) rl, wl := sm.RHPBandwidthLimiters() - rhp2Listener, err := rhp.Listen("tcp", cfg.RHP2.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl)) + rhp2Listener, err := rhp.Listen("tcp", rhp2Addr, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl)) if err != nil { return fmt.Errorf("failed to listen on rhp2 addr: %w", err) } @@ -351,6 +374,7 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK if err != nil { return fmt.Errorf("failed to listen on rhp4 addr: %w", err) } + log.Debug("started RHP4 listener", zap.String("address", l.Addr().String())) stopListenerFuncs = append(stopListenerFuncs, l.Close) go rhp.ServeRHP4SiaMux(l, rhp4, log.Named("rhp4")) default: diff --git a/host/settings/announce.go b/host/settings/announce.go index 5d1a7cfb..589ce58e 100644 --- a/host/settings/announce.go +++ b/host/settings/announce.go @@ -20,13 +20,21 @@ type ( } ) +func (m *ConfigManager) rhp2NetAddress() string { + return net.JoinHostPort(m.Settings().NetAddress, strconv.Itoa(int(m.rhp2Port))) +} + +func (m *ConfigManager) rhp4NetAddress() string { + return net.JoinHostPort(m.Settings().NetAddress, strconv.Itoa(int(m.rhp4Port))) +} + // Announce announces the host to the network func (m *ConfigManager) Announce() error { // get the current settings settings := m.Settings() if m.validateNetAddress { - if err := validateNetAddress(settings.NetAddress); err != nil { + if err := validateHostname(settings.NetAddress); err != nil { return fmt.Errorf("failed to validate net address %q: %w", settings.NetAddress, err) } } @@ -40,7 +48,7 @@ func (m *ConfigManager) Announce() error { ArbitraryData: [][]byte{ chain.HostAnnouncement{ PublicKey: m.hostKey.PublicKey(), - NetAddress: settings.NetAddress, + NetAddress: m.rhp2NetAddress(), }.ToArbitraryData(m.hostKey), }, MinerFees: []types.Currency{minerFee}, @@ -64,7 +72,10 @@ func (m *ConfigManager) Announce() error { txn := types.V2Transaction{ Attestations: []types.Attestation{ chain.V2HostAnnouncement{ - {Protocol: rhp4.ProtocolTCPSiaMux, Address: settings.NetAddress}, // TODO: this isn't correct + { + Protocol: rhp4.ProtocolTCPSiaMux, + Address: m.rhp4NetAddress(), + }, }.ToAttestation(cs, m.hostKey), }, MinerFee: minerFee, @@ -88,12 +99,7 @@ func (m *ConfigManager) Announce() error { return nil } -func validateNetAddress(netaddress string) error { - host, port, err := net.SplitHostPort(netaddress) - if err != nil { - return fmt.Errorf("failed to split net address: %w", err) - } - +func validateHostname(host string) error { // Check that the host is not empty or localhost. if host == "" { return errors.New("empty net address") @@ -101,14 +107,6 @@ func validateNetAddress(netaddress string) error { return errors.New("net address cannot be localhost") } - // Check that the port is a valid number. - n, err := strconv.Atoi(port) - if err != nil { - return fmt.Errorf("failed to parse port: %w", err) - } else if n < 1 || n > 65535 { - return errors.New("port must be between 1 and 65535") - } - // If the host is an IP address, check that it is a public IP address. ip := net.ParseIP(host) if ip != nil { diff --git a/host/settings/announce_test.go b/host/settings/announce_test.go index 115c4aa6..9194900c 100644 --- a/host/settings/announce_test.go +++ b/host/settings/announce_test.go @@ -1,6 +1,7 @@ package settings_test import ( + "net" "testing" "go.sia.tech/core/types" @@ -61,10 +62,10 @@ func TestAutoAnnounce(t *testing.T) { defer idx.Close() settings := settings.DefaultSettings - settings.NetAddress = "foo.bar:1234" + settings.NetAddress = "foo.bar" sm.UpdateSettings(settings) - assertAnnouncement := func(t *testing.T, expectedAddr string, height uint64) { + assertAnnouncement := func(t *testing.T, expectedHost string, height uint64) { t.Helper() index, ok := node.Chain.BestIndex(height) @@ -72,17 +73,18 @@ func TestAutoAnnounce(t *testing.T) { t.Fatalf("failed to get index at height %v (%s)", height, node.Chain.Tip()) } + netaddress := net.JoinHostPort(expectedHost, "9982") ann, err := sm.LastAnnouncement() if err != nil { t.Fatal(err) - } else if ann.Address != expectedAddr { - t.Fatalf("expected address %q, got %q", expectedAddr, ann.Address) + } else if ann.Address != netaddress { + t.Fatalf("expected address %q, got %q", netaddress, ann.Address) } else if ann.Index != index { t.Fatalf("expected index %q, got %q", index, ann.Index) } } - assertV2Announcement := func(t *testing.T, expectedAddr string, height uint64) { + assertV2Announcement := func(t *testing.T, expectedHost string, height uint64) { t.Helper() index, ok := node.Chain.BestIndex(height) @@ -95,8 +97,9 @@ func TestAutoAnnounce(t *testing.T) { t.Fatal(err) } + netaddress := net.JoinHostPort(expectedHost, "9984") h := types.NewHasher() - types.EncodeSlice(h.E, chain.V2HostAnnouncement{{Protocol: rhp4.ProtocolTCPSiaMux, Address: expectedAddr}}) + types.EncodeSlice(h.E, chain.V2HostAnnouncement{{Protocol: rhp4.ProtocolTCPSiaMux, Address: netaddress}}) if err := h.E.Flush(); err != nil { t.Fatal(err) } @@ -125,30 +128,30 @@ func TestAutoAnnounce(t *testing.T) { // fund the wallet and trigger the first auto-announce mineAndSync(t, network.MaturityDelay+1+1) - assertAnnouncement(t, "foo.bar:1234", network.MaturityDelay+1+1) // first maturity height + funds available + confirmation + assertAnnouncement(t, "foo.bar", network.MaturityDelay+1+1) // first maturity height + funds available + confirmation // mine until the next announcement and confirm it lastHeight := node.Chain.Tip().Height mineAndSync(t, 51) - assertAnnouncement(t, "foo.bar:1234", lastHeight+50+1) // first confirm + interval + confirmation + assertAnnouncement(t, "foo.bar", lastHeight+50+1) // first confirm + interval + confirmation // change the address - settings.NetAddress = "baz.qux:5678" + settings.NetAddress = "baz.qux" sm.UpdateSettings(settings) // trigger and confirm the new announcement lastHeight = node.Chain.Tip().Height mineAndSync(t, 2) - assertAnnouncement(t, "baz.qux:5678", lastHeight+2) + assertAnnouncement(t, "baz.qux", lastHeight+2) // mine until the v2 hardfork activates. The host should re-announce with a // v2 attestation. n := node.Chain.TipState().Network mineAndSync(t, n.HardforkV2.AllowHeight-node.Chain.Tip().Height+1) - assertV2Announcement(t, "baz.qux:5678", n.HardforkV2.AllowHeight+1) + assertV2Announcement(t, "baz.qux", n.HardforkV2.AllowHeight+1) // mine a few more blocks to ensure the host doesn't re-announce mineAndSync(t, 10) - assertV2Announcement(t, "baz.qux:5678", n.HardforkV2.AllowHeight+1) + assertV2Announcement(t, "baz.qux", n.HardforkV2.AllowHeight+1) } func TestAutoAnnounceV2(t *testing.T) { @@ -212,7 +215,7 @@ func TestAutoAnnounceV2(t *testing.T) { } } - assertV2Announcement := func(t *testing.T, expectedAddr string, height uint64) { + assertV2Announcement := func(t *testing.T, expectedHost string, height uint64) { t.Helper() index, ok := node.Chain.BestIndex(height) @@ -226,7 +229,7 @@ func TestAutoAnnounceV2(t *testing.T) { } h := types.NewHasher() - types.EncodeSlice(h.E, chain.V2HostAnnouncement{{Protocol: rhp4.ProtocolTCPSiaMux, Address: expectedAddr}}) + types.EncodeSlice(h.E, chain.V2HostAnnouncement{{Protocol: rhp4.ProtocolTCPSiaMux, Address: net.JoinHostPort(expectedHost, "9984")}}) if err := h.E.Flush(); err != nil { t.Fatal(err) } @@ -240,23 +243,23 @@ func TestAutoAnnounceV2(t *testing.T) { } settings := settings.DefaultSettings - settings.NetAddress = "foo.bar:1234" + settings.NetAddress = "foo.bar" sm.UpdateSettings(settings) // fund the wallet and trigger the first auto-announce mineAndSync(t, network.MaturityDelay+1+1) - assertV2Announcement(t, "foo.bar:1234", network.MaturityDelay+1+1) // first maturity height + funds available + confirmation + assertV2Announcement(t, "foo.bar", network.MaturityDelay+1+1) // first maturity height + funds available + confirmation // mine until the next announcement and confirm it lastHeight := node.Chain.Tip().Height mineAndSync(t, 51) - assertV2Announcement(t, "foo.bar:1234", lastHeight+50+1) // first confirm + interval + confirmation + assertV2Announcement(t, "foo.bar", lastHeight+50+1) // first confirm + interval + confirmation // change the address - settings.NetAddress = "baz.qux:5678" + settings.NetAddress = "baz.qux" sm.UpdateSettings(settings) // trigger and confirm the new announcement lastHeight = node.Chain.Tip().Height mineAndSync(t, 2) - assertV2Announcement(t, "baz.qux:5678", lastHeight+2) + assertV2Announcement(t, "baz.qux", lastHeight+2) } diff --git a/host/settings/options.go b/host/settings/options.go index 129bd6fa..702fd5d1 100644 --- a/host/settings/options.go +++ b/host/settings/options.go @@ -47,6 +47,15 @@ func WithInitialSettings(settings Settings) Option { } } +// WithRHP2Port sets the port that the host is listening for +// RHP2 connections on. This is appended to the host's net address +// and announced on the blockchain. +func WithRHP2Port(port uint16) Option { + return func(c *ConfigManager) { + c.rhp2Port = port + } +} + // WithRHP3Port sets the port that the host is listening for // RHP3 connections on. This is part of the RHP2 settings. func WithRHP3Port(port uint16) Option { @@ -54,3 +63,12 @@ func WithRHP3Port(port uint16) Option { c.rhp3Port = port } } + +// WithRHP4Port sets the port that the host is listening for +// RHP4 connections on. This is appended to the host's net address +// and announced on the blockchain. +func WithRHP4Port(port uint16) Option { + return func(c *ConfigManager) { + c.rhp4Port = port + } +} diff --git a/host/settings/settings.go b/host/settings/settings.go index 3c24f5ba..732090c6 100644 --- a/host/settings/settings.go +++ b/host/settings/settings.go @@ -162,7 +162,9 @@ type ( lastIPv4 net.IP lastIPv6 net.IP + rhp2Port uint16 rhp3Port uint16 + rhp4Port uint16 tg *threadgroup.ThreadGroup } @@ -245,7 +247,7 @@ func (m *ConfigManager) UpdateSettings(s Settings) error { // if a netaddress is set, validate it if strings.TrimSpace(s.NetAddress) != "" && m.validateNetAddress { - if err := validateNetAddress(s.NetAddress); err != nil { + if err := validateHostname(s.NetAddress); err != nil { return fmt.Errorf("failed to validate net address: %w", err) } } @@ -444,7 +446,9 @@ func NewConfigManager(hostKey types.PrivateKey, store Store, cm ChainManager, s ingressLimit: rate.NewLimiter(rate.Inf, defaultBurstSize), egressLimit: rate.NewLimiter(rate.Inf, defaultBurstSize), + rhp2Port: 9982, rhp3Port: 9983, + rhp4Port: 9984, } for _, opt := range opts { diff --git a/host/settings/settings_test.go b/host/settings/settings_test.go index c2e79b9b..37235836 100644 --- a/host/settings/settings_test.go +++ b/host/settings/settings_test.go @@ -59,7 +59,7 @@ func TestSettings(t *testing.T) { updated := sm.Settings() updated.WindowSize = 100 - updated.NetAddress = "localhost:10082" + updated.NetAddress = "localhost" updated.BaseRPCPrice = types.Siacoins(1) if err := sm.UpdateSettings(updated); err != nil { diff --git a/host/settings/update.go b/host/settings/update.go index 923ce7d7..27906f25 100644 --- a/host/settings/update.go +++ b/host/settings/update.go @@ -134,12 +134,7 @@ func (m *ConfigManager) ProcessActions(index types.ChainIndex) error { } nextHeight := announcement.Index.Height + m.announceInterval - netaddress := m.Settings().NetAddress - if err := validateNetAddress(netaddress); err != nil && m.validateNetAddress { - m.log.Debug("failed to validate net address", zap.Error(err)) - return nil - } - shouldAnnounce = index.Height >= nextHeight || announcement.Address != netaddress + shouldAnnounce = index.Height >= nextHeight || announcement.Address != m.rhp2NetAddress() } else { announceHash, announceIndex, err := m.store.LastV2AnnouncementHash() if err != nil { @@ -148,7 +143,12 @@ func (m *ConfigManager) ProcessActions(index types.ChainIndex) error { nextHeight := announceIndex.Height + m.announceInterval h := types.NewHasher() - types.EncodeSlice(h.E, chain.V2HostAnnouncement{{Protocol: rhp4.ProtocolTCPSiaMux, Address: m.Settings().NetAddress}}) + types.EncodeSlice(h.E, chain.V2HostAnnouncement{ + { + Protocol: rhp4.ProtocolTCPSiaMux, + Address: m.rhp4NetAddress(), + }, + }) if err := h.E.Flush(); err != nil { return fmt.Errorf("failed to hash v2 announcement: %w", err) } diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index e38d215e..b5cbb83e 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -2,7 +2,9 @@ package sqlite import ( "database/sql" + "errors" "fmt" + "net" "time" "go.sia.tech/core/types" @@ -10,6 +12,25 @@ import ( "go.uber.org/zap" ) +// migrateVersion35 trims the port from the net_address in the host settings +// table. +func migrateVersion35(tx *txn, _ *zap.Logger) error { + var netaddress string + err := tx.QueryRow(`SELECT net_address FROM host_settings`).Scan(&netaddress) + if errors.Is(err, sql.ErrNoRows) { + return nil + } else if err != nil { + return fmt.Errorf("failed to query host settings: %w", err) + } + // if parsing fails, return nil + host, _, err := net.SplitHostPort(netaddress) + if err != nil { + return nil + } + _, err = tx.Exec(`UPDATE host_settings SET net_address = ?`, host) + return err +} + // migrateVersion34 removes the lock tables func migrateVersion34(tx *txn, log *zap.Logger) error { _, err := tx.Exec(`DROP TABLE locked_sectors; @@ -987,4 +1008,5 @@ var migrations = []func(tx *txn, log *zap.Logger) error{ migrateVersion32, migrateVersion33, migrateVersion34, + migrateVersion35, } diff --git a/persist/sqlite/store_test.go b/persist/sqlite/store_test.go index ece67051..b05b39eb 100644 --- a/persist/sqlite/store_test.go +++ b/persist/sqlite/store_test.go @@ -141,7 +141,7 @@ func TestBackup(t *testing.T) { newSettings := settings.Settings{ MaxContractDuration: 100, - NetAddress: "foo.bar.baz:9981", + NetAddress: "foo.bar.baz", AcceptingContracts: true, } if err := db.UpdateSettings(newSettings); err != nil {