Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RHP rate limiting #502

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/remove_rhp_session_endpoints.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
default: major
---

# Remove RHP Session endpoints
14 changes: 0 additions & 14 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/settings/pin"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.sia.tech/jape"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,14 +151,6 @@ type (
BroadcastToWebhook(id int64, event, scope string, data interface{}) error
}

// A RHPSessionReporter reports on RHP session lifecycle events
RHPSessionReporter interface {
Subscribe(rhp.SessionSubscriber)
Unsubscribe(rhp.SessionSubscriber)

Active() []rhp.Session
}

// An api provides an HTTP API for the host
api struct {
hostKey types.PublicKey
Expand All @@ -168,7 +159,6 @@ type (
log *zap.Logger
alerts Alerts
webhooks Webhooks
sessions RHPSessionReporter

sqlite3Store SQLite3Store

Expand Down Expand Up @@ -216,7 +206,6 @@ func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer,
hostKey: hostKey,
name: name,

sessions: noopSessionReporter{},
alerts: noopAlerts{},
webhooks: noopWebhooks{},
log: zap.NewNop(),
Expand Down Expand Up @@ -291,9 +280,6 @@ func NewServer(name string, hostKey types.PublicKey, cm ChainManager, s Syncer,
"DELETE /volumes/:id": a.handleDeleteVolume,
"DELETE /volumes/:id/cancel": a.handleDELETEVolumeCancelOp,
"PUT /volumes/:id/resize": a.handlePUTVolumeResize,
// session endpoints
"GET /sessions": a.handleGETSessions,
"GET /sessions/subscribe": a.handleGETSessionsSubscribe,
// tpool endpoints
"GET /tpool/fee": a.handleGETTPoolFee,
// wallet endpoints
Expand Down
14 changes: 0 additions & 14 deletions api/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/hostd/alerts"
"go.sia.tech/hostd/explorer"
"go.sia.tech/hostd/rhp"
"go.sia.tech/hostd/webhooks"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -51,13 +50,6 @@ func WithExplorer(explorer *explorer.Explorer) ServerOption {
}
}

// WithRHPSessionReporter sets the RHP session reporter for the API server.
func WithRHPSessionReporter(rsr RHPSessionReporter) ServerOption {
return func(a *api) {
a.sessions = rsr
}
}

// WithLogger sets the logger for the API server.
func WithLogger(log *zap.Logger) ServerOption {
return func(a *api) {
Expand All @@ -81,9 +73,3 @@ type noopAlerts struct{}

func (noopAlerts) Active() []alerts.Alert { return nil }
func (noopAlerts) Dismiss(...types.Hash256) {}

type noopSessionReporter struct{}

func (noopSessionReporter) Subscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Unsubscribe(rhp.SessionSubscriber) {}
func (noopSessionReporter) Active() []rhp.Session { return nil }
21 changes: 0 additions & 21 deletions api/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,24 +462,3 @@ func (w WalletPendingResp) PrometheusMetric() (metrics []prometheus.Metric) {
}
return
}

// PrometheusMetric returns Prometheus samples for the hosts sessions
func (s SessionResp) PrometheusMetric() (metrics []prometheus.Metric) {
for _, session := range s {
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_ingress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Ingress),
})
metrics = append(metrics, prometheus.Metric{
Name: "hostd_session_egress",
Labels: map[string]any{
"peer": session.PeerAddress,
},
Value: float64(session.Egress),
})
}
return
}
45 changes: 0 additions & 45 deletions api/rhpsessions.go

This file was deleted.

4 changes: 0 additions & 4 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.sia.tech/hostd/host/metrics"
"go.sia.tech/hostd/host/settings"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/rhp"
)

// JSON keys for host setting fields
Expand Down Expand Up @@ -193,9 +192,6 @@ type (

// WalletPendingResp is the response body for the [GET] /wallet/pending endpoint
WalletPendingResp []wallet.Event

// SessionResp is the response body for the [GET] /sessions endpoint
SessionResp []rhp.Session
)

// MarshalJSON implements json.Marshaler
Expand Down
86 changes: 57 additions & 29 deletions cmd/hostd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,37 @@ func startLocalhostListener(listenAddr string, log *zap.Logger) (l net.Listener,
return
}

func getRandomOpenPort() (uint16, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer l.Close()
_, portStr, err := net.SplitHostPort(l.Addr().String())
if err != nil {
return 0, fmt.Errorf("failed to split port: %w", err)
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return 0, fmt.Errorf("failed to parse port: %w", err)
}
return uint16(port), nil
}

func normalizeAddress(addr string) (string, error) {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
host, port, err := net.SplitHostPort(addr)
if err != nil {
return "", err
} else if port == "" || port == "0" {
randPort, err := getRandomOpenPort()
if err != nil {
return "", fmt.Errorf("failed to get open port: %w", err)
}
return net.JoinHostPort(host, strconv.FormatUint(uint64(randPort), 10)), nil
}
return addr, nil
}

func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateKey, log *zap.Logger) error {
if err := deleteSiadData(cfg.Directory); err != nil {
return fmt.Errorf("failed to migrate v1 consensus database: %w", err)
Expand Down Expand Up @@ -186,27 +217,6 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
}
defer syncerListener.Close()

rhp2Listener, err := net.Listen("tcp", cfg.RHP2.Address)
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp3Listener, err := net.Listen("tcp", cfg.RHP3.TCPAddress)
if err != nil {
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

_, rhp3PortStr, err := net.SplitHostPort(rhp3Listener.Addr().String())
if err != nil {
return fmt.Errorf("failed to parse rhp3 port: %w", err)
}
rhp3Port, err := strconv.ParseUint(rhp3PortStr, 10, 16)
if err != nil {
return fmt.Errorf("failed to parse rhp3 port: %w", err)
}

syncerAddr := syncerListener.Addr().String()
if cfg.Syncer.EnableUPnP {
_, portStr, _ := net.SplitHostPort(cfg.Syncer.Address)
Expand Down Expand Up @@ -258,10 +268,23 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
return fmt.Errorf("failed to create webhook reporter: %w", err)
}
defer wr.Close()
sr := rhp.NewSessionReporter()

am := alerts.NewManager(alerts.WithEventReporter(wr), alerts.WithLog(log.Named("alerts")))

rhp3Addr, err := normalizeAddress(cfg.RHP3.TCPAddress)
if err != nil {
return fmt.Errorf("failed to normalize RHP3 address: %w", err)
}

_, rhp3PortStr, err := net.SplitHostPort(rhp3Addr)
if err != nil {
return fmt.Errorf("failed to parse rhp3 port: %w", err)
}
rhp3Port, err := strconv.ParseUint(rhp3PortStr, 10, 16)
if err != nil {
return fmt.Errorf("failed to parse rhp3 port: %w", err)
}

vm, err := storage.NewVolumeManager(store, storage.WithLogger(log.Named("volumes")), storage.WithAlerter(am))
if err != nil {
return fmt.Errorf("failed to create storage manager: %w", err)
Expand All @@ -287,27 +310,32 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
defer index.Close()

dr := rhp.NewDataRecorder(store, log.Named("data"))
rl, wl := sm.RHPBandwidthLimiters()
rhp2Listener, err := rhp.Listen("tcp", cfg.RHP2.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp2 addr: %w", err)
}
defer rhp2Listener.Close()

rhp2, err := rhp2.NewSessionHandler(rhp2Listener, hostKey, cm, s, wm, contractManager, sm, vm, rhp2.WithDataMonitor(dr), rhp2.WithLog(log.Named("rhp2")))
rhp3Listener, err := rhp.Listen("tcp", rhp3Addr, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to create rhp2 session handler: %w", err)
return fmt.Errorf("failed to listen on rhp3 addr: %w", err)
}
defer rhp3Listener.Close()

rhp2 := rhp2.NewSessionHandler(rhp2Listener, hostKey, cm, s, wm, contractManager, sm, vm, log.Named("rhp2"))
go rhp2.Serve()
defer rhp2.Close()

registry := registry.NewManager(hostKey, store, log.Named("registry"))
accounts := accounts.NewManager(store, sm)
rhp3, err := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contractManager, registry, vm, sm, rhp3.WithDataMonitor(dr), rhp3.WithSessionReporter(sr), rhp3.WithLog(log.Named("rhp3")))
if err != nil {
return fmt.Errorf("failed to create rhp3 session handler: %w", err)
}
rhp3 := rhp3.NewSessionHandler(rhp3Listener, hostKey, cm, s, wm, accounts, contractManager, registry, vm, sm, log.Named("rhp3"))
go rhp3.Serve()
defer rhp3.Close()

apiOpts := []api.ServerOption{
api.WithAlerts(am),
api.WithLogger(log.Named("api")),
api.WithRHPSessionReporter(sr),
api.WithWebhooks(wr),
api.WithSQLite3Store(store),
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
lukechampine.com/flagg v1.1.1
lukechampine.com/frand v1.5.1
lukechampine.com/upnp v0.3.0
nhooyr.io/websocket v1.8.17
)

require (
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,3 @@ lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w=
lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q=
lukechampine.com/upnp v0.3.0 h1:UVCD6eD6fmJmwak6DVE3vGN+L46Fk8edTcC6XYCb6C4=
lukechampine.com/upnp v0.3.0/go.mod h1:sOuF+fGSDKjpUm6QI0mfb82ScRrhj8bsqsD78O5nK1k=
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
4 changes: 2 additions & 2 deletions host/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func (m *ConfigManager) Settings() Settings {
return m.settings
}

// BandwidthLimiters returns the rate limiters for all traffic
func (m *ConfigManager) BandwidthLimiters() (ingress, egress *rate.Limiter) {
// RHPBandwidthLimiters returns the rate limiters for all RHP traffic
func (m *ConfigManager) RHPBandwidthLimiters() (ingress, egress *rate.Limiter) {
return m.ingressLimit, m.egressLimit
}

Expand Down
Loading
Loading