diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8e4c21faf..e8a32e5ec 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,6 +47,16 @@ jobs: uses: n8maninger/action-golang-test@v1 with: args: "-race;-short" + - name: Test Stores - MySQL + if: matrix.os == 'ubuntu-latest' + uses: n8maninger/action-golang-test@v1 + env: + RENTERD_DB_URI: 127.0.0.1:3800 + RENTERD_DB_USER: root + RENTERD_DB_PASSWORD: test + with: + package: "./stores" + args: "-race;-short" - name: Test Integration uses: n8maninger/action-golang-test@v1 with: diff --git a/alerts/alerts.go b/alerts/alerts.go index 424196d4f..6b009360d 100644 --- a/alerts/alerts.go +++ b/alerts/alerts.go @@ -35,9 +35,9 @@ const ( type ( Alerter interface { + Alerts(_ context.Context, opts AlertsOpts) (resp AlertsResponse, err error) RegisterAlert(_ context.Context, a Alert) error DismissAlerts(_ context.Context, ids ...types.Hash256) error - DismissAllAlerts(_ context.Context) error } // Severity indicates the severity of an alert. @@ -66,11 +66,27 @@ type ( } AlertsOpts struct { - Offset int - Limit int + Offset int + Limit int + Severity Severity + } + + AlertsResponse struct { + Alerts []Alert `json:"alerts"` + HasMore bool `json:"hasMore"` + Totals struct { + Info int `json:"info"` + Warning int `json:"warning"` + Error int `json:"error"` + Critical int `json:"critical"` + } `json:"totals"` } ) +func (ar AlertsResponse) Total() int { + return ar.Totals.Info + ar.Totals.Warning + ar.Totals.Error + ar.Totals.Critical +} + // String implements the fmt.Stringer interface. func (s Severity) String() string { switch s { @@ -87,15 +103,8 @@ func (s Severity) String() string { } } -// MarshalJSON implements the json.Marshaler interface. -func (s Severity) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`%q`, s.String())), nil -} - -// UnmarshalJSON implements the json.Unmarshaler interface. -func (s *Severity) UnmarshalJSON(b []byte) error { - status := strings.Trim(string(b), `"`) - switch status { +func (s *Severity) LoadString(str string) error { + switch str { case severityInfoStr: *s = SeverityInfo case severityWarningStr: @@ -105,11 +114,21 @@ func (s *Severity) UnmarshalJSON(b []byte) error { case severityCriticalStr: *s = SeverityCritical default: - return fmt.Errorf("unrecognized severity: %v", status) + return fmt.Errorf("unrecognized severity: %v", str) } return nil } +// MarshalJSON implements the json.Marshaler interface. +func (s Severity) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`%q`, s.String())), nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (s *Severity) UnmarshalJSON(b []byte) error { + return s.LoadString(strings.Trim(string(b), `"`)) +} + // RegisterAlert implements the Alerter interface. func (m *Manager) RegisterAlert(ctx context.Context, alert Alert) error { if alert.ID == (types.Hash256{}) { @@ -136,17 +155,6 @@ func (m *Manager) RegisterAlert(ctx context.Context, alert Alert) error { }) } -// DismissAllAlerts implements the Alerter interface. -func (m *Manager) DismissAllAlerts(ctx context.Context) error { - m.mu.Lock() - toDismiss := make([]types.Hash256, 0, len(m.alerts)) - for alertID := range m.alerts { - toDismiss = append(toDismiss, alertID) - } - m.mu.Unlock() - return m.DismissAlerts(ctx, toDismiss...) -} - // DismissAlerts implements the Alerter interface. func (m *Manager) DismissAlerts(ctx context.Context, ids ...types.Hash256) error { var dismissed []types.Hash256 @@ -175,19 +183,34 @@ func (m *Manager) DismissAlerts(ctx context.Context, ids ...types.Hash256) error }) } -// Active returns the host's active alerts. -func (m *Manager) Active(offset, limit int) []Alert { +// Alerts returns the host's active alerts. +func (m *Manager) Alerts(_ context.Context, opts AlertsOpts) (AlertsResponse, error) { m.mu.Lock() defer m.mu.Unlock() + offset, limit := opts.Offset, opts.Limit + resp := AlertsResponse{} + if offset >= len(m.alerts) { - return nil + return resp, nil } else if limit == -1 { limit = len(m.alerts) } alerts := make([]Alert, 0, len(m.alerts)) for _, a := range m.alerts { + if a.Severity == SeverityInfo { + resp.Totals.Info++ + } else if a.Severity == SeverityWarning { + resp.Totals.Warning++ + } else if a.Severity == SeverityError { + resp.Totals.Error++ + } else if a.Severity == SeverityCritical { + resp.Totals.Critical++ + } + if opts.Severity != 0 && a.Severity != opts.Severity { + continue // filter by severity + } alerts = append(alerts, a) } sort.Slice(alerts, func(i, j int) bool { @@ -196,8 +219,10 @@ func (m *Manager) Active(offset, limit int) []Alert { alerts = alerts[offset:] if limit < len(alerts) { alerts = alerts[:limit] + resp.HasMore = true } - return alerts + resp.Alerts = alerts + return resp, nil } func (m *Manager) RegisterWebhookBroadcaster(b webhooks.Broadcaster) { @@ -231,6 +256,11 @@ func WithOrigin(alerter Alerter, origin string) Alerter { } } +// Alerts implements the Alerter interface. +func (a *originAlerter) Alerts(ctx context.Context, opts AlertsOpts) (resp AlertsResponse, err error) { + return a.alerter.Alerts(ctx, opts) +} + // RegisterAlert implements the Alerter interface. func (a *originAlerter) RegisterAlert(ctx context.Context, alert Alert) error { if alert.Data == nil { @@ -240,11 +270,6 @@ func (a *originAlerter) RegisterAlert(ctx context.Context, alert Alert) error { return a.alerter.RegisterAlert(ctx, alert) } -// DismissAllAlerts implements the Alerter interface. -func (a *originAlerter) DismissAllAlerts(ctx context.Context) error { - return a.alerter.DismissAllAlerts(ctx) -} - // DismissAlerts implements the Alerter interface. func (a *originAlerter) DismissAlerts(ctx context.Context, ids ...types.Hash256) error { return a.alerter.DismissAlerts(ctx, ids...) diff --git a/api/object.go b/api/object.go index 506f0eed1..35df9b636 100644 --- a/api/object.go +++ b/api/object.go @@ -54,7 +54,7 @@ type ( Object struct { Metadata ObjectUserMetadata `json:"metadata,omitempty"` ObjectMetadata - object.Object + *object.Object `json:"omitempty"` } // ObjectMetadata contains various metadata about an object. @@ -212,13 +212,14 @@ type ( } GetObjectOptions struct { - Prefix string - Offset int - Limit int - IgnoreDelim bool - Marker string - SortBy string - SortDir string + Prefix string + Offset int + Limit int + IgnoreDelim bool + Marker string + OnlyMetadata bool + SortBy string + SortDir string } ListObjectOptions struct { @@ -316,6 +317,9 @@ func (opts GetObjectOptions) Apply(values url.Values) { if opts.Marker != "" { values.Set("marker", opts.Marker) } + if opts.OnlyMetadata { + values.Set("onlymetadata", "true") + } if opts.SortBy != "" { values.Set("sortBy", opts.SortBy) } diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 292670dc5..f4762c4d4 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -14,12 +14,13 @@ import ( ) var ( - alertAccountRefillID = frand.Entropy256() // constant until restarted - alertLostSectorsID = frand.Entropy256() // constant until restarted - alertLowBalanceID = frand.Entropy256() // constant until restarted - alertMigrationID = frand.Entropy256() // constant until restarted - alertPruningID = frand.Entropy256() // constant until restarted - alertRenewalFailedID = frand.Entropy256() // constant until restarted + alertAccountRefillID = randomAlertID() // constant until restarted + alertChurnID = randomAlertID() // constant until restarted + alertLostSectorsID = randomAlertID() // constant until restarted + alertLowBalanceID = randomAlertID() // constant until restarted + alertMigrationID = randomAlertID() // constant until restarted + alertPruningID = randomAlertID() // constant until restarted + alertRenewalFailedID = randomAlertID() // constant until restarted ) func alertIDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 { @@ -54,6 +55,20 @@ func (ap *Autopilot) DismissAlert(ctx context.Context, ids ...types.Hash256) { } } +func (ap *Autopilot) HasAlert(ctx context.Context, id types.Hash256) bool { + ar, err := ap.alerts.Alerts(ctx, alerts.AlertsOpts{Offset: 0, Limit: -1}) + if err != nil { + ap.logger.Errorf("failed to fetch alerts: %v", err) + return false + } + for _, alert := range ar.Alerts { + if alert.ID == id { + return true + } + } + return false +} + func newAccountLowBalanceAlert(address types.Address, balance, allowance types.Currency, bh, renewWindow, endHeight uint64) alerts.Alert { severity := alerts.SeverityInfo if bh+renewWindow/2 >= endHeight { @@ -137,37 +152,6 @@ func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid type } } -func newContractSetChangeAlert(name string, additions map[types.FileContractID]contractSetAddition, removals map[types.FileContractID]contractSetRemoval) alerts.Alert { - var hint string - if len(removals) > 0 { - hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set." - } - - removedReasons := make(map[string]string, len(removals)) - for k, v := range removals { - removedReasons[k.String()] = v.Reason - } - - return alerts.Alert{ - ID: randomAlertID(), - Severity: alerts.SeverityInfo, - Message: "Contract set changed", - Data: map[string]any{ - "name": name, - "set_additions": additions, - "set_removals": removals, - "hint": hint, - - // TODO: these fields can be removed on the next major release, they - // contain redundant information - "added": len(additions), - "removed": len(removals), - "removals": removedReasons, - }, - Timestamp: time.Now(), - } -} - func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert { return alerts.Alert{ ID: alertIDForHost(alertLostSectorsID, hk), diff --git a/autopilot/churn.go b/autopilot/churn.go new file mode 100644 index 000000000..fdc1a0f54 --- /dev/null +++ b/autopilot/churn.go @@ -0,0 +1,68 @@ +package autopilot + +import ( + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/alerts" +) + +type ( + accumulatedChurn struct { + additions map[types.FileContractID]contractSetAdditions + removals map[types.FileContractID]contractSetRemovals + } +) + +func newAccumulatedChurn() *accumulatedChurn { + return &accumulatedChurn{ + additions: make(map[types.FileContractID]contractSetAdditions), + removals: make(map[types.FileContractID]contractSetRemovals), + } +} + +func (c *accumulatedChurn) Alert(name string) alerts.Alert { + var hint string + if len(c.removals) > 0 { + hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set." + } + + return alerts.Alert{ + ID: alertChurnID, + Severity: alerts.SeverityInfo, + Message: "Contract set changed", + Data: map[string]any{ + "name": name, + "set_additions": c.additions, + "set_removals": c.removals, + "hint": hint, + }, + Timestamp: time.Now(), + } +} + +func (c *accumulatedChurn) Apply(additions map[types.FileContractID]contractSetAdditions, removals map[types.FileContractID]contractSetRemovals) { + for fcid, a := range additions { + if _, exists := c.additions[fcid]; !exists { + c.additions[fcid] = a + } else { + additions := c.additions[fcid] + additions.Additions = append(additions.Additions, a.Additions...) + c.additions[fcid] = additions + } + } + for fcid, r := range removals { + if _, exists := c.removals[fcid]; !exists { + c.removals[fcid] = r + } else { + removals := c.removals[fcid] + removals.Removals = append(removals.Removals, r.Removals...) + c.removals[fcid] = removals + } + } +} + +func (c *accumulatedChurn) Reset() { + c.additions = make(map[types.FileContractID]contractSetAdditions) + c.removals = make(map[types.FileContractID]contractSetRemovals) +} diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 092f2a831..9e2b52cca 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -85,6 +85,7 @@ const ( type ( contractor struct { ap *Autopilot + churn *accumulatedChurn resolver *ipResolver logger *zap.SugaredLogger @@ -122,15 +123,25 @@ type ( recoverable bool } + contractSetAdditions struct { + HostKey types.PublicKey `json:"hostKey"` + Additions []contractSetAddition `json:"additions"` + } + contractSetAddition struct { - Size uint64 `json:"size"` - HostKey types.PublicKey `json:"hostKey"` + Size uint64 `json:"size"` + Time api.TimeRFC3339 `json:"time"` + } + + contractSetRemovals struct { + HostKey types.PublicKey `json:"hostKey"` + Removals []contractSetRemoval `json:"removals"` } contractSetRemoval struct { - Size uint64 `json:"size"` - HostKey types.PublicKey `json:"hostKey"` - Reason string `json:"reason"` + Size uint64 `json:"size"` + Reason string `json:"reasons"` + Time api.TimeRFC3339 `json:"time"` } renewal struct { @@ -143,6 +154,7 @@ type ( func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *contractor { return &contractor{ ap: ap, + churn: newAccumulatedChurn(), logger: ap.logger.Named("contractor"), revisionBroadcastInterval: revisionBroadcastInterval, @@ -453,8 +465,9 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, } // log added and removed contracts - setAdditions := make(map[types.FileContractID]contractSetAddition) - setRemovals := make(map[types.FileContractID]contractSetRemoval) + setAdditions := make(map[types.FileContractID]contractSetAdditions) + setRemovals := make(map[types.FileContractID]contractSetRemovals) + now := api.TimeNow() for _, contract := range oldSet { _, exists := inNewSet[contract.ID] _, renewed := inNewSet[renewalsFromTo[contract.ID]] @@ -464,11 +477,18 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, reason = "unknown" } - setRemovals[contract.ID] = contractSetRemoval{ - Size: contractData[contract.ID], - HostKey: contract.HostKey, - Reason: reason, + if _, exists := setRemovals[contract.ID]; !exists { + setRemovals[contract.ID] = contractSetRemovals{ + HostKey: contract.HostKey, + } } + removals := setRemovals[contract.ID] + removals.Removals = append(removals.Removals, contractSetRemoval{ + Size: contractData[contract.ID], + Reason: reason, + Time: now, + }) + setRemovals[contract.ID] = removals c.logger.Debugf("contract %v was removed from the contract set, size: %v, reason: %v", contract.ID, contractData[contract.ID], reason) } } @@ -476,10 +496,17 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, _, existed := inOldSet[contract.ID] _, renewed := renewalsToFrom[contract.ID] if !existed && !renewed { - setAdditions[contract.ID] = contractSetAddition{ - Size: contractData[contract.ID], - HostKey: contract.HostKey, + if _, exists := setAdditions[contract.ID]; !exists { + setAdditions[contract.ID] = contractSetAdditions{ + HostKey: contract.HostKey, + } } + additions := setAdditions[contract.ID] + additions.Additions = append(additions.Additions, contractSetAddition{ + Size: contractData[contract.ID], + Time: now, + }) + setAdditions[contract.ID] = additions c.logger.Debugf("contract %v was added to the contract set, size: %v", contract.ID, contractData[contract.ID]) } } @@ -499,7 +526,6 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, } // record churn metrics - now := api.TimeNow() var metrics []api.ContractSetChurnMetric for fcid := range setAdditions { metrics = append(metrics, api.ContractSetChurnMetric{ @@ -514,7 +540,7 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, Name: c.ap.state.cfg.Contracts.Set, ContractID: fcid, Direction: api.ChurnDirRemoved, - Reason: removal.Reason, + Reason: removal.Removals[0].Reason, Timestamp: now, }) } @@ -536,7 +562,11 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string, ) hasChanged := len(setAdditions)+len(setRemovals) > 0 if hasChanged { - c.ap.RegisterAlert(ctx, newContractSetChangeAlert(name, setAdditions, setRemovals)) + if !c.ap.HasAlert(ctx, alertChurnID) { + c.churn.Reset() + } + c.churn.Apply(setAdditions, setRemovals) + c.ap.RegisterAlert(ctx, c.churn.Alert(name)) } return hasChanged } diff --git a/bus/bus.go b/bus/bus.go index eb5c88b31..cf1417771 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -126,7 +126,7 @@ type ( ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) - DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error + DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) Bucket(_ context.Context, bucketName string) (api.Bucket, error) CreateBucket(_ context.Context, bucketName string, policy api.BucketPolicy) error @@ -137,6 +137,7 @@ type ( CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) ListObjects(ctx context.Context, bucketName, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) Object(ctx context.Context, bucketName, path string) (api.Object, error) + ObjectMetadata(ctx context.Context, bucketName, path string) (api.Object, error) ObjectEntries(ctx context.Context, bucketName, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error) ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) @@ -1226,13 +1227,22 @@ func (b *bus) objectsHandlerGET(jc jape.Context) { if jc.DecodeForm("bucket", &bucket) != nil { return } + var onlymetadata bool + if jc.DecodeForm("onlymetadata", &onlymetadata) != nil { + return + } - o, err := b.ms.Object(jc.Request.Context(), bucket, path) + var o api.Object + var err error + if onlymetadata { + o, err = b.ms.ObjectMetadata(jc.Request.Context(), bucket, path) + } else { + o, err = b.ms.Object(jc.Request.Context(), bucket, path) + } if errors.Is(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return - } - if jc.Check("couldn't load object", err) != nil { + } else if jc.Check("couldn't load object", err) != nil { return } jc.Encode(api.ObjectsResponse{Object: &o}) @@ -1434,9 +1444,11 @@ func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) { } else if jc.DecodeParam("root", &root) != nil { return } - err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root) + n, err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root) if jc.Check("failed to mark sector as lost", err) != nil { return + } else if n > 0 { + b.logger.Infow("successfully marked sector as lost", "hk", hk, "root", root) } } @@ -1750,8 +1762,21 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) { }, nil } +func (b *bus) handleGETAlertsDeprecated(jc jape.Context) { + ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{Offset: 0, Limit: -1}) + if jc.Check("failed to fetch alerts", err) != nil { + return + } + jc.Encode(ar.Alerts) +} + func (b *bus) handleGETAlerts(jc jape.Context) { + if jc.Request.FormValue("offset") == "" && jc.Request.FormValue("limit") == "" { + b.handleGETAlertsDeprecated(jc) + return + } offset, limit := 0, -1 + var severity alerts.Severity if jc.DecodeForm("offset", &offset) != nil { return } else if jc.DecodeForm("limit", &limit) != nil { @@ -1759,18 +1784,21 @@ func (b *bus) handleGETAlerts(jc jape.Context) { } else if offset < 0 { jc.Error(errors.New("offset must be non-negative"), http.StatusBadRequest) return + } else if jc.DecodeForm("severity", &severity) != nil { + return + } + ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{ + Offset: offset, + Limit: limit, + Severity: severity, + }) + if jc.Check("failed to fetch alerts", err) != nil { + return } - jc.Encode(b.alertMgr.Active(offset, limit)) + jc.Encode(ar) } func (b *bus) handlePOSTAlertsDismiss(jc jape.Context) { - var all bool - if jc.DecodeForm("all", &all) != nil { - return - } else if all { - jc.Check("failed to dismiss all alerts", b.alertMgr.DismissAllAlerts(jc.Request.Context())) - return - } var ids []types.Hash256 if jc.Decode(&ids) != nil { return diff --git a/bus/client/alerts.go b/bus/client/alerts.go index bff3c13a5..28c3b9a84 100644 --- a/bus/client/alerts.go +++ b/bus/client/alerts.go @@ -10,15 +10,16 @@ import ( ) // Alerts fetches the active alerts from the bus. -func (c *Client) Alerts(opts alerts.AlertsOpts) (alerts []alerts.Alert, err error) { +func (c *Client) Alerts(ctx context.Context, opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) { values := url.Values{} - if opts.Offset != 0 { - values.Set("offset", fmt.Sprint(opts.Offset)) - } + values.Set("offset", fmt.Sprint(opts.Offset)) if opts.Limit != 0 { values.Set("limit", fmt.Sprint(opts.Limit)) } - err = c.c.GET("/alerts?"+values.Encode(), &alerts) + if opts.Severity != 0 { + values.Set("severity", opts.Severity.String()) + } + err = c.c.WithContext(ctx).GET("/alerts?"+values.Encode(), &resp) return } @@ -27,11 +28,6 @@ func (c *Client) DismissAlerts(ctx context.Context, ids ...types.Hash256) error return c.dismissAlerts(ctx, false, ids...) } -// DismissAllAlerts dimisses all registered alerts. -func (c *Client) DismissAllAlerts(ctx context.Context) error { - return c.dismissAlerts(ctx, true) -} - func (c *Client) dismissAlerts(ctx context.Context, all bool, ids ...types.Hash256) error { values := url.Values{} if all { diff --git a/go.mod b/go.mod index 10f7f9015..8475ecd54 100644 --- a/go.mod +++ b/go.mod @@ -14,14 +14,14 @@ require ( github.com/montanaflynn/stats v0.7.1 gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe go.sia.tech/core v0.2.1 - go.sia.tech/coreutils v0.0.2 + go.sia.tech/coreutils v0.0.3 go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 go.sia.tech/hostd v1.0.2 go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 go.sia.tech/mux v1.2.0 go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca - go.sia.tech/web/renterd v0.45.0 - go.uber.org/zap v1.26.0 + go.sia.tech/web/renterd v0.46.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.19.0 golang.org/x/term v0.17.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 8783df6f7..6f05af353 100644 --- a/go.sum +++ b/go.sum @@ -243,6 +243,8 @@ go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY= go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/coreutils v0.0.2 h1:vDqMDM6dW6b/R3sO1ycr8fAnJXUiAvrzxehEIq/AsKA= go.sia.tech/coreutils v0.0.2/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0= +go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y= +go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0= go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0= go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg= go.sia.tech/hostd v1.0.2 h1:GjzNIAlwg3/dViF6258Xn5DI3+otQLRqmkoPDugP+9Y= @@ -260,12 +262,17 @@ go.sia.tech/web/renterd v0.45.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902u go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.sia.tech/web/renterd v0.46.0 h1:BMVg4i7LxSlc8wZ4T0EG1k3EK4JxVIzCfD3/cjmwH0k= +go.sia.tech/web/renterd v0.46.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index bc4457543..7be8371a7 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1316,7 +1316,7 @@ func TestUploadDownloadSameHost(t *testing.T) { // build a frankenstein object constructed with all sectors on the same host res.Object.Slabs[0].Shards = shards[res.Object.Slabs[0].Shards[0].LatestHost] - tt.OK(b.AddObject(context.Background(), api.DefaultBucketName, "frankenstein", testContractSet, res.Object.Object, api.AddObjectOptions{})) + tt.OK(b.AddObject(context.Background(), api.DefaultBucketName, "frankenstein", testContractSet, *res.Object.Object, api.AddObjectOptions{})) // assert we can download this object tt.OK(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, "frankenstein", api.DownloadObjectOptions{})) @@ -1925,9 +1925,9 @@ func TestAlerts(t *testing.T) { tt.OK(b.RegisterAlert(context.Background(), alert)) findAlert := func(id types.Hash256) *alerts.Alert { t.Helper() - alerts, err := b.Alerts(alerts.AlertsOpts{}) + ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{}) tt.OK(err) - for _, alert := range alerts { + for _, alert := range ar.Alerts { if alert.ID == id { return &alert } @@ -1962,25 +1962,57 @@ func TestAlerts(t *testing.T) { } // try to find with offset = 1 - foundAlerts, err := b.Alerts(alerts.AlertsOpts{Offset: 1}) + ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{Offset: 1}) + foundAlerts := ar.Alerts tt.OK(err) if len(foundAlerts) != 1 || foundAlerts[0].ID != alert.ID { t.Fatal("wrong alert") } // try to find with limit = 1 - foundAlerts, err = b.Alerts(alerts.AlertsOpts{Limit: 1}) + ar, err = b.Alerts(context.Background(), alerts.AlertsOpts{Limit: 1}) + foundAlerts = ar.Alerts tt.OK(err) if len(foundAlerts) != 1 || foundAlerts[0].ID != alert2.ID { t.Fatal("wrong alert") } - // dismiss all - tt.OK(b.DismissAllAlerts(context.Background())) - foundAlerts, err = b.Alerts(alerts.AlertsOpts{}) - tt.OK(err) - if len(foundAlerts) != 0 { - t.Fatal("expected 0 alerts", len(foundAlerts)) + // register more alerts + for severity := alerts.SeverityInfo; severity <= alerts.SeverityCritical; severity++ { + for j := 0; j < 3*int(severity); j++ { + tt.OK(b.RegisterAlert(context.Background(), alerts.Alert{ + ID: frand.Entropy256(), + Severity: severity, + Message: "test", + Data: map[string]interface{}{ + "origin": "test", + }, + Timestamp: time.Now(), + })) + } + } + for severity := alerts.SeverityInfo; severity <= alerts.SeverityCritical; severity++ { + ar, err = b.Alerts(context.Background(), alerts.AlertsOpts{Severity: severity}) + tt.OK(err) + if ar.Total() != 32 { + t.Fatal("expected 32 alerts", ar.Total()) + } else if ar.Totals.Info != 3 { + t.Fatal("expected 3 info alerts", ar.Totals.Info) + } else if ar.Totals.Warning != 6 { + t.Fatal("expected 6 warning alerts", ar.Totals.Warning) + } else if ar.Totals.Error != 9 { + t.Fatal("expected 9 error alerts", ar.Totals.Error) + } else if ar.Totals.Critical != 14 { + t.Fatal("expected 14 critical alerts", ar.Totals.Critical) + } else if severity == alerts.SeverityInfo && len(ar.Alerts) != ar.Totals.Info { + t.Fatalf("expected %v info alerts, got %v", ar.Totals.Info, len(ar.Alerts)) + } else if severity == alerts.SeverityWarning && len(ar.Alerts) != ar.Totals.Warning { + t.Fatalf("expected %v warning alerts, got %v", ar.Totals.Warning, len(ar.Alerts)) + } else if severity == alerts.SeverityError && len(ar.Alerts) != ar.Totals.Error { + t.Fatalf("expected %v error alerts, got %v", ar.Totals.Error, len(ar.Alerts)) + } else if severity == alerts.SeverityCritical && len(ar.Alerts) != ar.Totals.Critical { + t.Fatalf("expected %v critical alerts, got %v", ar.Totals.Critical, len(ar.Alerts)) + } } } diff --git a/object/object.go b/object/object.go index 49375f3b4..2331f6251 100644 --- a/object/object.go +++ b/object/object.go @@ -3,6 +3,7 @@ package object import ( "bytes" "crypto/cipher" + "crypto/md5" "encoding/binary" "encoding/hex" "fmt" @@ -43,6 +44,9 @@ func (k *EncryptionKey) UnmarshalBinary(b []byte) error { // String implements fmt.Stringer. func (k EncryptionKey) String() string { + if k.entropy == nil { + return "" + } return "key:" + hex.EncodeToString(k.entropy[:]) } @@ -139,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{ return usedContracts } +func (o *Object) ComputeETag() string { + // calculate the eTag using the precomputed sector roots to avoid having to + // hash the entire object again. + h := md5.New() + b := make([]byte, 8) + for _, slab := range o.Slabs { + binary.LittleEndian.PutUint32(b[:4], slab.Offset) + binary.LittleEndian.PutUint32(b[4:], slab.Length) + h.Write(b) + for _, shard := range slab.Shards { + h.Write(shard.Root[:]) + } + } + return string(hex.EncodeToString(h.Sum(nil))) +} + // TotalSize returns the total size of the object. func (o Object) TotalSize() int64 { var n int64 diff --git a/object/slab.go b/object/slab.go index 9c3afa608..f2762abf3 100644 --- a/object/slab.go +++ b/object/slab.go @@ -3,6 +3,7 @@ package object import ( "bytes" "io" + "sync" "github.com/klauspost/reedsolomon" rhpv2 "go.sia.tech/core/rhp/v2" @@ -79,11 +80,17 @@ func (s Slab) Length() int { // Encrypt xors shards with the keystream derived from s.Key, using a // different nonce for each shard. func (s Slab) Encrypt(shards [][]byte) { - for i, shard := range shards { - nonce := [24]byte{1: byte(i)} - c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:]) - c.XORKeyStream(shard, shard) + var wg sync.WaitGroup + for i := range shards { + wg.Add(1) + go func(i int) { + nonce := [24]byte{1: byte(i)} + c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:]) + c.XORKeyStream(shards[i], shards[i]) + wg.Done() + }(i) } + wg.Wait() } // Encode encodes slab data into sector-sized shards. The supplied shards should @@ -151,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) { // slice offset), using a different nonce for each shard. func (ss SlabSlice) Decrypt(shards [][]byte) { offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards)) - for i, shard := range shards { - nonce := [24]byte{1: byte(i)} - c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:]) - c.SetCounter(offset) - c.XORKeyStream(shard, shard) + var wg sync.WaitGroup + for i := range shards { + wg.Add(1) + go func(i int) { + nonce := [24]byte{1: byte(i)} + c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:]) + c.SetCounter(offset) + c.XORKeyStream(shards[i], shards[i]) + wg.Done() + }(i) } + wg.Wait() } // Recover recovers a slice of slab data from the supplied shards. diff --git a/s3/backend.go b/s3/backend.go index de72bb005..c05a3ec98 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -287,7 +287,10 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // HeadObject should return a NotFound() error if the object does not // exist. func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*gofakes3.Object, error) { - res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{IgnoreDelim: true}) + res, err := s.b.Object(ctx, bucketName, objectName, api.GetObjectOptions{ + IgnoreDelim: true, + OnlyMetadata: true, + }) if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) { return nil, gofakes3.KeyNotFound(objectName) } else if err != nil { diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 0815e137d..cbbc4428a 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -56,15 +56,7 @@ func TestSQLHostDB(t *testing.T) { // Insert an announcement for the host and another one for an unknown // host. - a := announcement{ - blockHeight: 42, - blockID: types.BlockID{1, 2, 3}, - hk: hk, - timestamp: time.Now().UTC().Round(time.Second), - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "address", - }, - } + a := newTestAnnouncement(hk, "address") err = ss.insertTestAnnouncement(a) if err != nil { t.Fatal(err) @@ -119,7 +111,7 @@ func TestSQLHostDB(t *testing.T) { if err != nil { t.Fatal(err) } - if h3.NetAddress != a.NetAddress { + if h3.NetAddress != unknownKeyAnn.NetAddress { t.Fatal("wrong net address") } if h3.KnownSince.IsZero() { @@ -505,24 +497,10 @@ func TestInsertAnnouncements(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() - // Create announcements for 2 hosts. - ann1 := announcement{ - timestamp: time.Now(), - blockHeight: 1, - blockID: types.BlockID{1}, - hk: types.GeneratePrivateKey().PublicKey(), - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "foo.bar:1000", - }, - } - ann2 := announcement{ - hk: types.GeneratePrivateKey().PublicKey(), - HostAnnouncement: chain.HostAnnouncement{}, - } - ann3 := announcement{ - hk: types.GeneratePrivateKey().PublicKey(), - HostAnnouncement: chain.HostAnnouncement{}, - } + // Create announcements for 3 hosts. + ann1 := newTestAnnouncement(types.GeneratePrivateKey().PublicKey(), "foo.bar:1000") + ann2 := newTestAnnouncement(types.GeneratePrivateKey().PublicKey(), "") + ann3 := newTestAnnouncement(types.GeneratePrivateKey().PublicKey(), "") // Insert the first one and check that all fields are set. if err := insertAnnouncements(ss.db, []announcement{ann1}); err != nil { @@ -535,12 +513,12 @@ func TestInsertAnnouncements(t *testing.T) { ann.Model = Model{} // ignore expectedAnn := dbAnnouncement{ HostKey: publicKey(ann1.hk), - BlockHeight: 1, - BlockID: types.BlockID{1}.String(), + BlockHeight: ann1.blockHeight, + BlockID: ann1.blockID.String(), NetAddress: "foo.bar:1000", } if ann != expectedAnn { - t.Fatal("mismatch") + t.Fatal("mismatch", cmp.Diff(ann, expectedAnn)) } // Insert the first and second one. if err := insertAnnouncements(ss.db, []announcement{ann1, ann2}); err != nil { @@ -1095,12 +1073,7 @@ func (s *SQLStore) addTestHost(hk types.PublicKey) error { // addCustomTestHost ensures a host with given hostkey and net address exists. func (s *SQLStore) addCustomTestHost(hk types.PublicKey, na string) error { s.unappliedHostKeys[hk] = struct{}{} - s.unappliedAnnouncements = append(s.unappliedAnnouncements, []announcement{{ - hk: hk, - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: na, - }, - }}...) + s.unappliedAnnouncements = append(s.unappliedAnnouncements, newTestAnnouncement(hk, na)) s.lastSave = time.Now().Add(s.persistInterval * -2) return s.applyUpdates(false) } @@ -1139,6 +1112,18 @@ func newTestPK() (types.PublicKey, types.PrivateKey) { return pk, sk } +func newTestAnnouncement(hk types.PublicKey, na string) announcement { + return announcement{ + blockHeight: 42, + blockID: types.BlockID{1, 2, 3}, + hk: hk, + timestamp: time.Now().UTC().Round(time.Second), + HostAnnouncement: chain.HostAnnouncement{ + NetAddress: na, + }, + } +} + func newTestHostAnnouncement(na string) (chain.HostAnnouncement, types.PrivateKey) { _, sk := newTestPK() a := chain.HostAnnouncement{ diff --git a/stores/metadata.go b/stores/metadata.go index d257bf681..c281c9800 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -410,14 +410,14 @@ func (s dbSlab) convert() (slab object.Slab, err error) { } func (raw rawObjectMetadata) convert() api.ObjectMetadata { - return api.ObjectMetadata{ - ETag: raw.ETag, - Health: raw.Health, - MimeType: raw.MimeType, - ModTime: api.TimeRFC3339(time.Time(raw.ModTime).UTC()), - Name: raw.Name, - Size: raw.Size, - } + return newObjectMetadata( + raw.Name, + raw.ETag, + raw.MimeType, + raw.Health, + time.Time(raw.ModTime), + raw.Size, + ) } func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { @@ -1499,6 +1499,10 @@ func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixN gorm.Expr(sqlConcat(tx, "?", "SUBSTR(object_id, ?)")), prefixNew, utf8.RuneCountInString(prefixOld)+1, prefixOld+"%", utf8.RuneCountInString(prefixOld), prefixOld, sqlWhereBucket("objects", bucket)) + + if !isSQLite(tx) { + inner = tx.Raw("SELECT * FROM (?) as i", inner) + } resp := tx.Model(&dbObject{}). Where("object_id IN (?)", inner). Delete(&dbObject{}) @@ -1548,13 +1552,14 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath // No copying is happening. We just update the metadata on the src // object. srcObj.MimeType = mimeType - om = api.ObjectMetadata{ - Health: srcObj.Health, - MimeType: srcObj.MimeType, - ModTime: api.TimeRFC3339(srcObj.CreatedAt.UTC()), - Name: srcObj.ObjectID, - Size: srcObj.Size, - } + om = newObjectMetadata( + srcObj.ObjectID, + srcObj.Etag, + srcObj.MimeType, + srcObj.Health, + srcObj.CreatedAt, + srcObj.Size, + ) if err := s.updateUserMetadata(tx, srcObj.ID, metadata); err != nil { return fmt.Errorf("failed to update user metadata: %w", err) } @@ -1602,21 +1607,22 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath return fmt.Errorf("failed to create object metadata: %w", err) } - om = api.ObjectMetadata{ - MimeType: dstObj.MimeType, - ETag: dstObj.Etag, - Health: srcObj.Health, - ModTime: api.TimeRFC3339(dstObj.CreatedAt.UTC()), - Name: dstObj.ObjectID, - Size: dstObj.Size, - } + om = newObjectMetadata( + dstObj.ObjectID, + dstObj.Etag, + dstObj.MimeType, + dstObj.Health, + dstObj.CreatedAt, + dstObj.Size, + ) return nil }) return } -func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { - return s.retryTransaction(func(tx *gorm.DB) error { +func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) { + var deletedSectors int + err := s.retryTransaction(func(tx *gorm.DB) error { // Fetch contract_sectors to delete. var sectors []dbContractSector err := tx.Raw(` @@ -1655,6 +1661,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo } else if res.RowsAffected != int64(len(sectors)) { return fmt.Errorf("expected %v affected rows but got %v", len(sectors), res.RowsAffected) } + deletedSectors = len(sectors) // Increment the host's lostSectors by the number of lost sectors. if err := tx.Exec("UPDATE hosts SET lost_sectors = lost_sectors + ? WHERE public_key = ?", len(sectors), publicKey(hk)).Error; err != nil { @@ -1682,6 +1689,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo } return nil }) + return deletedSectors, err } func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error { @@ -2312,21 +2320,57 @@ func (s *SQLStore) objectHydrate(ctx context.Context, tx *gorm.DB, bucket, path // return object return api.Object{ Metadata: metadata, - ObjectMetadata: api.ObjectMetadata{ - ETag: obj[0].ObjectETag, - Health: obj[0].ObjectHealth, - MimeType: obj[0].ObjectMimeType, - ModTime: api.TimeRFC3339(obj[0].ObjectModTime.UTC()), - Name: obj[0].ObjectName, - Size: obj[0].ObjectSize, - }, - Object: object.Object{ + ObjectMetadata: newObjectMetadata( + obj[0].ObjectName, + obj[0].ObjectETag, + obj[0].ObjectMimeType, + obj[0].ObjectHealth, + obj[0].ObjectModTime, + obj[0].ObjectSize, + ), + Object: &object.Object{ Key: key, Slabs: slabs, }, }, nil } +// ObjectMetadata returns an object's metadata +func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (api.Object, error) { + var resp api.Object + err := s.db.Transaction(func(tx *gorm.DB) error { + var obj dbObject + err := tx.Model(&dbObject{}). + Joins("INNER JOIN buckets b ON objects.db_bucket_id = b.id"). + Where("b.name", bucket). + Where("object_id", path). + Take(&obj). + Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return api.ErrObjectNotFound + } else if err != nil { + return err + } + oum, err := s.objectMetadata(ctx, tx, bucket, path) + if err != nil { + return err + } + resp = api.Object{ + ObjectMetadata: newObjectMetadata( + obj.ObjectID, + obj.Etag, + obj.MimeType, + obj.Health, + obj.CreatedAt, + obj.Size, + ), + Metadata: oum, + } + return nil + }) + return resp, err +} + func (s *SQLStore) objectMetadata(ctx context.Context, tx *gorm.DB, bucket, path string) (api.ObjectUserMetadata, error) { var rows []dbObjectUserMetadata err := tx. @@ -2347,6 +2391,17 @@ func (s *SQLStore) objectMetadata(ctx context.Context, tx *gorm.DB, bucket, path return metadata, nil } +func newObjectMetadata(name, etag, mimeType string, health float64, modTime time.Time, size int64) api.ObjectMetadata { + return api.ObjectMetadata{ + ETag: etag, + Health: health, + ModTime: api.TimeRFC3339(modTime.UTC()), + Name: name, + Size: size, + MimeType: mimeType, + } +} + func (s *SQLStore) objectRaw(ctx context.Context, txn *gorm.DB, bucket string, path string) (rows rawObject, err error) { // NOTE: we LEFT JOIN here because empty objects are valid and need to be // included in the result set, when we convert the rawObject before @@ -2677,20 +2732,32 @@ func archiveContracts(ctx context.Context, tx *gorm.DB, contracts []dbContract, return nil } +func pruneSlabs(tx *gorm.DB) error { + // delete slabs without any associated slices or buffers + return tx.Exec(` +DELETE +FROM slabs +WHERE NOT EXISTS (SELECT 1 FROM slices WHERE slices.db_slab_id = slabs.id) +AND slabs.db_buffered_slab_id IS NULL +`).Error +} + // deleteObject deletes an object from the store and prunes all slabs which are // without an obect after the deletion. That means in case of packed uploads, // the slab is only deleted when no more objects point to it. -func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (numDeleted int64, _ error) { +func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) { tx = tx.Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)). Delete(&dbObject{}) if tx.Error != nil { return 0, tx.Error } - numDeleted = tx.RowsAffected + numDeleted := tx.RowsAffected if numDeleted == 0 { return 0, nil // nothing to prune if no object was deleted + } else if err := pruneSlabs(tx); err != nil { + return numDeleted, err } - return + return numDeleted, nil } // deleteObjects deletes a batch of objects from the database. The order of @@ -2719,8 +2786,12 @@ func (s *SQLStore) deleteObjects(bucket string, path string) (numDeleted int64, if err := res.Error; err != nil { return res.Error } - duration = time.Since(start) + // prune slabs if we deleted an object rowsAffected = res.RowsAffected + if rowsAffected > 0 { + return pruneSlabs(tx) + } + duration = time.Since(start) return nil }); err != nil { return 0, fmt.Errorf("failed to delete objects: %w", err) diff --git a/stores/metadata_test.go b/stores/metadata_test.go index bd5413135..205812287 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "errors" "fmt" - "math" "os" "reflect" "sort" @@ -17,7 +16,6 @@ import ( "github.com/google/go-cmp/cmp" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" - "go.sia.tech/coreutils/chain" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" "gorm.io/gorm" @@ -89,7 +87,7 @@ func TestObjectBasic(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(got.Object, want) { + if !reflect.DeepEqual(*got.Object, want) { t.Fatal("object mismatch", cmp.Diff(got.Object, want)) } @@ -120,7 +118,7 @@ func TestObjectBasic(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(got2.Object, want2) { + if !reflect.DeepEqual(*got2.Object, want2) { t.Fatal("object mismatch", cmp.Diff(got2.Object, want2)) } } @@ -177,7 +175,7 @@ func TestObjectMetadata(t *testing.T) { } // assert it matches - if !reflect.DeepEqual(got.Object, want) { + if !reflect.DeepEqual(*got.Object, want) { t.Log(got.Object) t.Log(want) t.Fatal("object mismatch", cmp.Diff(got.Object, want, cmp.AllowUnexported(object.EncryptionKey{}))) @@ -220,12 +218,7 @@ func TestSQLContractStore(t *testing.T) { } // Add an announcement. - err = ss.insertTestAnnouncement(announcement{ - hk: hk, - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "address", - }, - }) + err = ss.insertTestAnnouncement(newTestAnnouncement(hk, "address")) if err != nil { t.Fatal(err) } @@ -516,21 +509,11 @@ func TestRenewedContract(t *testing.T) { hk, hk2 := hks[0], hks[1] // Add announcements. - err = ss.insertTestAnnouncement(announcement{ - hk: hk, - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "address", - }, - }) + err = ss.insertTestAnnouncement(newTestAnnouncement(hk, "address")) if err != nil { t.Fatal(err) } - err = ss.insertTestAnnouncement(announcement{ - hk: hk2, - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "address2", - }, - }) + err = ss.insertTestAnnouncement(newTestAnnouncement(hk2, "address2")) if err != nil { t.Fatal(err) } @@ -1023,7 +1006,7 @@ func TestSQLMetadataStore(t *testing.T) { one := uint(1) expectedObj := dbObject{ - DBBucketID: 1, + DBBucketID: ss.DefaultBucketID(), Health: 1, ObjectID: objID, Key: obj1Key, @@ -1086,7 +1069,7 @@ func TestSQLMetadataStore(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(fullObj.Object, obj1) { + if !reflect.DeepEqual(*fullObj.Object, obj1) { t.Fatal("object mismatch", cmp.Diff(fullObj, obj1)) } @@ -1184,6 +1167,7 @@ func TestSQLMetadataStore(t *testing.T) { slabs[i].Shards[0].Model = Model{} slabs[i].Shards[0].Contracts[0].Model = Model{} slabs[i].Shards[0].Contracts[0].Host.Model = Model{} + slabs[i].Shards[0].Contracts[0].Host.LastAnnouncement = time.Time{} slabs[i].HealthValidUntil = 0 } if !reflect.DeepEqual(slab1, expectedObjSlab1) { @@ -1199,7 +1183,7 @@ func TestSQLMetadataStore(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(fullObj.Object, obj1) { + if !reflect.DeepEqual(*fullObj.Object, obj1) { t.Fatal("object mismatch") } @@ -2228,10 +2212,9 @@ func TestUpdateSlab(t *testing.T) { t.Fatal(err) } var s dbSlab - if err := ss.db.Model(&dbSlab{}). + if err := ss.db.Where(&dbSlab{Key: key}). Joins("DBContractSet"). Preload("Shards"). - Where("key = ?", key). Take(&s). Error; err != nil { t.Fatal(err) @@ -2280,12 +2263,7 @@ func TestRecordContractSpending(t *testing.T) { } // Add an announcement. - err = ss.insertTestAnnouncement(announcement{ - hk: hk, - HostAnnouncement: chain.HostAnnouncement{ - NetAddress: "address", - }, - }) + err = ss.insertTestAnnouncement(newTestAnnouncement(hk, "address")) if err != nil { t.Fatal(err) } @@ -2684,7 +2662,7 @@ func TestPartialSlab(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(obj, fetched.Object) { + if !reflect.DeepEqual(obj, *fetched.Object) { t.Fatal("mismatch", cmp.Diff(obj, fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) } @@ -2720,7 +2698,7 @@ func TestPartialSlab(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(obj2, fetched.Object) { + if !reflect.DeepEqual(obj2, *fetched.Object) { t.Fatal("mismatch", cmp.Diff(obj2, fetched.Object)) } @@ -2768,7 +2746,7 @@ func TestPartialSlab(t *testing.T) { if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(obj3, fetched.Object) { + if !reflect.DeepEqual(obj3, *fetched.Object) { t.Fatal("mismatch", cmp.Diff(obj3, fetched.Object, cmp.AllowUnexported(object.EncryptionKey{}))) } @@ -3604,8 +3582,10 @@ func TestDeleteHostSector(t *testing.T) { } // Prune the sector from hk1. - if err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil { + if n, err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil { t.Fatal(err) + } else if n != 2 { + t.Fatal("no sectors were pruned", n) } // Make sure 2 contractSector entries exist. @@ -3920,7 +3900,7 @@ func TestSlabCleanupTrigger(t *testing.T) { // create objects obj1 := dbObject{ ObjectID: "1", - DBBucketID: 1, + DBBucketID: ss.DefaultBucketID(), Health: 1, } if err := ss.db.Create(&obj1).Error; err != nil { @@ -3928,7 +3908,7 @@ func TestSlabCleanupTrigger(t *testing.T) { } obj2 := dbObject{ ObjectID: "2", - DBBucketID: 1, + DBBucketID: ss.DefaultBucketID(), Health: 1, } if err := ss.db.Create(&obj2).Error; err != nil { @@ -3964,7 +3944,8 @@ func TestSlabCleanupTrigger(t *testing.T) { } // delete the object - if err := ss.db.Delete(&obj1).Error; err != nil { + err := ss.RemoveObject(context.Background(), api.DefaultBucketName, obj1.ObjectID) + if err != nil { t.Fatal(err) } @@ -3977,7 +3958,8 @@ func TestSlabCleanupTrigger(t *testing.T) { } // delete second object - if err := ss.db.Delete(&obj2).Error; err != nil { + err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj2.ObjectID) + if err != nil { t.Fatal(err) } @@ -4001,7 +3983,7 @@ func TestSlabCleanupTrigger(t *testing.T) { } obj3 := dbObject{ ObjectID: "3", - DBBucketID: 1, + DBBucketID: ss.DefaultBucketID(), Health: 1, } if err := ss.db.Create(&obj3).Error; err != nil { @@ -4021,7 +4003,8 @@ func TestSlabCleanupTrigger(t *testing.T) { } // delete third object - if err := ss.db.Delete(&obj3).Error; err != nil { + err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj3.ObjectID) + if err != nil { t.Fatal(err) } if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { @@ -4140,11 +4123,11 @@ func TestUpdateObjectReuseSlab(t *testing.T) { // fetch the object var dbObj dbObject - if err := ss.db.Where("db_bucket_id", 1).Take(&dbObj).Error; err != nil { + if err := ss.db.Where("db_bucket_id", ss.DefaultBucketID()).Take(&dbObj).Error; err != nil { t.Fatal(err) } else if dbObj.ID != 1 { t.Fatal("unexpected id", dbObj.ID) - } else if dbObj.DBBucketID != 1 { + } else if dbObj.DBBucketID != ss.DefaultBucketID() { t.Fatal("bucket id mismatch", dbObj.DBBucketID) } else if dbObj.ObjectID != "1" { t.Fatal("object id mismatch", dbObj.ObjectID) @@ -4246,7 +4229,7 @@ func TestUpdateObjectReuseSlab(t *testing.T) { // fetch the object var dbObj2 dbObject - if err := ss.db.Where("db_bucket_id", 1). + if err := ss.db.Where("db_bucket_id", ss.DefaultBucketID()). Where("object_id", "2"). Take(&dbObj2).Error; err != nil { t.Fatal(err) @@ -4330,63 +4313,94 @@ func TestTypeCurrency(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() + // prepare the table + if isSQLite(ss.db) { + if err := ss.db.Exec("CREATE TABLE currencies (id INTEGER PRIMARY KEY AUTOINCREMENT,c BLOB);").Error; err != nil { + t.Fatal(err) + } + } else { + if err := ss.db.Exec("CREATE TABLE currencies (id INT AUTO_INCREMENT PRIMARY KEY, c BLOB);").Error; err != nil { + t.Fatal(err) + } + } + + // insert currencies in random order + if err := ss.db.Exec("INSERT INTO currencies (c) VALUES (?),(?),(?);", bCurrency(types.MaxCurrency), bCurrency(types.NewCurrency64(1)), bCurrency(types.ZeroCurrency)).Error; err != nil { + t.Fatal(err) + } + + // fetch currencies and assert they're sorted + var currencies []bCurrency + if err := ss.db.Raw(`SELECT c FROM currencies ORDER BY c ASC`).Scan(¤cies).Error; err != nil { + t.Fatal(err) + } else if !sort.SliceIsSorted(currencies, func(i, j int) bool { + return types.Currency(currencies[i]).Cmp(types.Currency(currencies[j])) < 0 + }) { + t.Fatal("currencies not sorted", currencies) + } + + // convenience variables + c0 := currencies[0] + c1 := currencies[1] + cM := currencies[2] + tests := []struct { - a types.Currency - b types.Currency + a bCurrency + b bCurrency cmp string }{ { - a: types.ZeroCurrency, - b: types.NewCurrency64(1), + a: c0, + b: c1, cmp: "<", }, { - a: types.NewCurrency64(1), - b: types.NewCurrency64(1), + a: c1, + b: c0, + cmp: ">", + }, + { + a: c0, + b: c1, + cmp: "!=", + }, + { + a: c1, + b: c1, cmp: "=", }, { - a: types.NewCurrency(math.MaxUint64, 0), - b: types.NewCurrency(0, math.MaxUint64), + a: c0, + b: cM, cmp: "<", }, { - a: types.NewCurrency(0, math.MaxUint64), - b: types.NewCurrency(math.MaxUint64, 0), + a: cM, + b: c0, cmp: ">", }, + { + a: cM, + b: cM, + cmp: "=", + }, } - for _, test := range tests { + for i, test := range tests { var result bool - err := ss.db.Raw("SELECT ? "+test.cmp+" ?", bCurrency(test.a), bCurrency(test.b)).Scan(&result).Error - if err != nil { + query := fmt.Sprintf("SELECT ? %s ?", test.cmp) + if !isSQLite(ss.db) { + query = strings.Replace(query, "?", "HEX(?)", -1) + } + if err := ss.db.Raw(query, test.a, test.b).Scan(&result).Error; err != nil { t.Fatal(err) } else if !result { - t.Fatalf("unexpected result %v for %v %v %v", result, test.a, test.cmp, test.b) - } else if test.cmp == "<" && test.a.Cmp(test.b) >= 0 { + t.Errorf("unexpected result in case %d/%d: expected %v %s %v to be true", i+1, len(tests), types.Currency(test.a).String(), test.cmp, types.Currency(test.b).String()) + } else if test.cmp == "<" && types.Currency(test.a).Cmp(types.Currency(test.b)) >= 0 { t.Fatal("invalid result") - } else if test.cmp == ">" && test.a.Cmp(test.b) <= 0 { + } else if test.cmp == ">" && types.Currency(test.a).Cmp(types.Currency(test.b)) <= 0 { t.Fatal("invalid result") - } else if test.cmp == "=" && test.a.Cmp(test.b) != 0 { + } else if test.cmp == "=" && types.Currency(test.a).Cmp(types.Currency(test.b)) != 0 { t.Fatal("invalid result") } } - - c := func(c uint64) bCurrency { - return bCurrency(types.NewCurrency64(c)) - } - - var currencies []bCurrency - err := ss.db.Raw(` -WITH input(col) as -(values (?),(?),(?)) -SELECT * FROM input ORDER BY col ASC -`, c(3), c(1), c(2)).Scan(¤cies).Error - if err != nil { - t.Fatal(err) - } else if !sort.SliceIsSorted(currencies, func(i, j int) bool { - return types.Currency(currencies[i]).Cmp(types.Currency(currencies[j])) < 0 - }) { - t.Fatal("currencies not sorted", currencies) - } } diff --git a/stores/metrics.go b/stores/metrics.go index 203ed3b71..8816d1729 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -605,9 +605,7 @@ func (s *SQLStore) findPeriods(table string, dst interface{}, start time.Time, n WHERE ? GROUP BY p.period_start - ORDER BY - p.period_start ASC - ) i ON %s.id = i.id + ) i ON %s.id = i.id ORDER BY Period ASC `, table, table, table, table), unixTimeMS(start), interval.Milliseconds(), diff --git a/stores/metrics_test.go b/stores/metrics_test.go index 2b2f572a7..f71d985bd 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -517,7 +517,7 @@ func TestWalletMetrics(t *testing.T) { } else if !sort.SliceIsSorted(metrics, func(i, j int) bool { return time.Time(metrics[i].Timestamp).Before(time.Time(metrics[j].Timestamp)) }) { - t.Fatal("expected metrics to be sorted by time") + t.Fatalf("expected metrics to be sorted by time, %+v", metrics) } // Prune metrics diff --git a/stores/migrations.go b/stores/migrations.go index 4d6fd2970..4e547b9e3 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -45,9 +45,15 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { }, }, { - ID: "00004_peer_store", + ID: "00004_prune_slabs_cascade", Migrate: func(tx *gorm.DB) error { - return performMigration(tx, dbIdentifier, "00004_peer_store", logger) + return performMigration(tx, dbIdentifier, "00004_prune_slabs_cascade", logger) + }, + }, + { + ID: "00005_peer_store", + Migrate: func(tx *gorm.DB) error { + return performMigration(tx, dbIdentifier, "00005_peer_store", logger) }, }, } diff --git a/stores/migrations/mysql/main/migration_00004_prune_slabs_cascade.sql b/stores/migrations/mysql/main/migration_00004_prune_slabs_cascade.sql new file mode 100644 index 000000000..c2efe3467 --- /dev/null +++ b/stores/migrations/mysql/main/migration_00004_prune_slabs_cascade.sql @@ -0,0 +1,16 @@ +-- drop triggers +DROP TRIGGER IF EXISTS before_delete_on_objects_delete_slices; +DROP TRIGGER IF EXISTS before_delete_on_multipart_uploads_delete_multipart_parts; +DROP TRIGGER IF EXISTS before_delete_on_multipart_parts_delete_slices; +DROP TRIGGER IF EXISTS after_delete_on_slices_delete_slabs; + +-- add ON DELETE CASCADE to slices +ALTER TABLE slices DROP FOREIGN KEY fk_objects_slabs; +ALTER TABLE slices ADD CONSTRAINT fk_objects_slabs FOREIGN KEY (db_object_id) REFERENCES objects (id) ON DELETE CASCADE; + +ALTER TABLE slices DROP FOREIGN KEY fk_multipart_parts_slabs; +ALTER TABLE slices ADD CONSTRAINT fk_multipart_parts_slabs FOREIGN KEY (db_multipart_part_id) REFERENCES multipart_parts (id) ON DELETE CASCADE; + +-- add ON DELETE CASCADE to multipart_parts +ALTER TABLE multipart_parts DROP FOREIGN KEY fk_multipart_uploads_parts; +ALTER TABLE multipart_parts ADD CONSTRAINT fk_multipart_uploads_parts FOREIGN KEY (db_multipart_upload_id) REFERENCES multipart_uploads (id) ON DELETE CASCADE; \ No newline at end of file diff --git a/stores/migrations/mysql/main/schema.sql b/stores/migrations/mysql/main/schema.sql index 5e18cd2c1..7a1f959a4 100644 --- a/stores/migrations/mysql/main/schema.sql +++ b/stores/migrations/mysql/main/schema.sql @@ -310,7 +310,7 @@ CREATE TABLE `multipart_parts` ( KEY `idx_multipart_parts_etag` (`etag`), KEY `idx_multipart_parts_part_number` (`part_number`), KEY `idx_multipart_parts_db_multipart_upload_id` (`db_multipart_upload_id`), - CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads` (`id`) + CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads` (`id`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- dbObject @@ -374,8 +374,8 @@ CREATE TABLE `slices` ( KEY `idx_slices_object_index` (`object_index`), KEY `idx_slices_db_multipart_part_id` (`db_multipart_part_id`), KEY `idx_slices_db_slab_id` (`db_slab_id`), - CONSTRAINT `fk_multipart_parts_slabs` FOREIGN KEY (`db_multipart_part_id`) REFERENCES `multipart_parts` (`id`), - CONSTRAINT `fk_objects_slabs` FOREIGN KEY (`db_object_id`) REFERENCES `objects` (`id`), + CONSTRAINT `fk_multipart_parts_slabs` FOREIGN KEY (`db_multipart_part_id`) REFERENCES `multipart_parts` (`id`) ON DELETE CASCADE, + CONSTRAINT `fk_objects_slabs` FOREIGN KEY (`db_object_id`) REFERENCES `objects` (`id`) ON DELETE CASCADE, CONSTRAINT `fk_slabs_slices` FOREIGN KEY (`db_slab_id`) REFERENCES `slabs` (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/stores/migrations/sqlite/main/migration_00004_prune_slabs_cascade.sql b/stores/migrations/sqlite/main/migration_00004_prune_slabs_cascade.sql new file mode 100644 index 000000000..03f006acd --- /dev/null +++ b/stores/migrations/sqlite/main/migration_00004_prune_slabs_cascade.sql @@ -0,0 +1,30 @@ +-- drop triggers +DROP TRIGGER IF EXISTS before_delete_on_objects_delete_slices; +DROP TRIGGER IF EXISTS before_delete_on_multipart_uploads_delete_multipart_parts; +DROP TRIGGER IF EXISTS before_delete_on_multipart_parts_delete_slices; +DROP TRIGGER IF EXISTS after_delete_on_slices_delete_slabs; + +PRAGMA foreign_keys=off; +-- update constraints on slices +DROP TABLE IF EXISTS slices_temp; +CREATE TABLE `slices_temp` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_object_id` integer,`object_index` integer,`db_multipart_part_id` integer,`db_slab_id` integer,`offset` integer,`length` integer,CONSTRAINT `fk_objects_slabs` FOREIGN KEY (`db_object_id`) REFERENCES `objects`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_multipart_parts_slabs` FOREIGN KEY (`db_multipart_part_id`) REFERENCES `multipart_parts`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_slabs_slices` FOREIGN KEY (`db_slab_id`) REFERENCES `slabs`(`id`)); +INSERT INTO slices_temp SELECT `id`, `created_at`, `db_object_id`, `object_index`, `db_multipart_part_id`, `db_slab_id`, `offset`, `length` FROM slices; +DROP TABLE slices; +ALTER TABLE slices_temp RENAME TO slices; + +CREATE INDEX `idx_slices_object_index` ON `slices`(`object_index`); +CREATE INDEX `idx_slices_db_object_id` ON `slices`(`db_object_id`); +CREATE INDEX `idx_slices_db_slab_id` ON `slices`(`db_slab_id`); +CREATE INDEX `idx_slices_db_multipart_part_id` ON `slices`(`db_multipart_part_id`); + +-- update constraints multipart_parts +DROP TABLE IF EXISTS multipart_parts_temp; +CREATE TABLE `multipart_parts_temp` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`etag` text,`part_number` integer,`size` integer,`db_multipart_upload_id` integer NOT NULL,CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads`(`id`) ON DELETE CASCADE); +INSERT INTO multipart_parts_temp SELECT * FROM multipart_parts; +DROP TABLE multipart_parts; +ALTER TABLE multipart_parts_temp RENAME TO multipart_parts; + +CREATE INDEX `idx_multipart_parts_db_multipart_upload_id` ON `multipart_parts`(`db_multipart_upload_id`); +CREATE INDEX `idx_multipart_parts_part_number` ON `multipart_parts`(`part_number`); +CREATE INDEX `idx_multipart_parts_etag` ON `multipart_parts`(`etag`); +PRAGMA foreign_keys=on; diff --git a/stores/migrations/sqlite/main/schema.sql b/stores/migrations/sqlite/main/schema.sql index 97cfc3d15..2e696954c 100644 --- a/stores/migrations/sqlite/main/schema.sql +++ b/stores/migrations/sqlite/main/schema.sql @@ -85,13 +85,13 @@ CREATE INDEX `idx_contract_sectors_db_contract_id` ON `contract_sectors`(`db_con CREATE INDEX `idx_contract_sectors_db_sector_id` ON `contract_sectors`(`db_sector_id`); -- dbMultipartPart -CREATE TABLE `multipart_parts` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`etag` text,`part_number` integer,`size` integer,`db_multipart_upload_id` integer NOT NULL,CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads`(`id`)); +CREATE TABLE `multipart_parts` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`etag` text,`part_number` integer,`size` integer,`db_multipart_upload_id` integer NOT NULL,CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads`(`id`) ON DELETE CASCADE); CREATE INDEX `idx_multipart_parts_db_multipart_upload_id` ON `multipart_parts`(`db_multipart_upload_id`); CREATE INDEX `idx_multipart_parts_part_number` ON `multipart_parts`(`part_number`); CREATE INDEX `idx_multipart_parts_etag` ON `multipart_parts`(`etag`); -- dbSlice -CREATE TABLE `slices` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_object_id` integer,`object_index` integer,`db_multipart_part_id` integer,`db_slab_id` integer,`offset` integer,`length` integer,CONSTRAINT `fk_objects_slabs` FOREIGN KEY (`db_object_id`) REFERENCES `objects`(`id`),CONSTRAINT `fk_multipart_parts_slabs` FOREIGN KEY (`db_multipart_part_id`) REFERENCES `multipart_parts`(`id`),CONSTRAINT `fk_slabs_slices` FOREIGN KEY (`db_slab_id`) REFERENCES `slabs`(`id`)); +CREATE TABLE `slices` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_object_id` integer,`object_index` integer,`db_multipart_part_id` integer,`db_slab_id` integer,`offset` integer,`length` integer,CONSTRAINT `fk_objects_slabs` FOREIGN KEY (`db_object_id`) REFERENCES `objects`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_multipart_parts_slabs` FOREIGN KEY (`db_multipart_part_id`) REFERENCES `multipart_parts`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_slabs_slices` FOREIGN KEY (`db_slab_id`) REFERENCES `slabs`(`id`)); CREATE INDEX `idx_slices_object_index` ON `slices`(`object_index`); CREATE INDEX `idx_slices_db_object_id` ON `slices`(`db_object_id`); CREATE INDEX `idx_slices_db_slab_id` ON `slices`(`db_slab_id`); diff --git a/stores/multipart.go b/stores/multipart.go index 18706ed0c..3a5bcd54a 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -295,6 +295,10 @@ func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string if err != nil { return fmt.Errorf("failed to delete multipart upload: %w", err) } + // Prune the slabs. + if err := pruneSlabs(tx); err != nil { + return fmt.Errorf("failed to prune slabs: %w", err) + } return nil }) } @@ -435,6 +439,11 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str if err := tx.Delete(&mu).Error; err != nil { return fmt.Errorf("failed to delete multipart upload: %w", err) } + + // Prune the slabs. + if err := pruneSlabs(tx); err != nil { + return fmt.Errorf("failed to prune slabs: %w", err) + } return nil }) if err != nil { diff --git a/stores/sql_test.go b/stores/sql_test.go index 3a51161ae..776e3e10e 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -48,6 +48,9 @@ type testSQLStore struct { } type testSQLStoreConfig struct { + dbURI string + dbUser string + dbPassword string dbName string dbMetricsName string dir string @@ -65,9 +68,26 @@ func newTestSQLStore(t *testing.T, cfg testSQLStoreConfig) *testSQLStore { if dir == "" { dir = t.TempDir() } - dbName := cfg.dbName + + dbURI, dbUser, dbPassword, dbName := DBConfigFromEnv() + if dbURI == "" { + dbURI = cfg.dbURI + } + if cfg.persistent && dbURI != "" { + t.Fatal("invalid store config, can't use both persistent and dbURI") + } + if dbUser == "" { + dbUser = cfg.dbUser + } + if dbPassword == "" { + dbPassword = cfg.dbPassword + } if dbName == "" { - dbName = hex.EncodeToString(frand.Bytes(32)) // random name for db + if cfg.dbName != "" { + dbName = cfg.dbName + } else { + dbName = hex.EncodeToString(frand.Bytes(32)) // random name for db + } } dbMetricsName := cfg.dbMetricsName if dbMetricsName == "" { @@ -75,7 +95,18 @@ func newTestSQLStore(t *testing.T, cfg testSQLStoreConfig) *testSQLStore { } var conn, connMetrics gorm.Dialector - if cfg.persistent { + if dbURI != "" { + if tmpDB, err := gorm.Open(NewMySQLConnection(dbUser, dbPassword, dbURI, "")); err != nil { + t.Fatal(err) + } else if err := tmpDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbName)).Error; err != nil { + t.Fatal(err) + } else if err := tmpDB.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbMetricsName)).Error; err != nil { + t.Fatal(err) + } + + conn = NewMySQLConnection(dbUser, dbPassword, dbURI, dbName) + connMetrics = NewMySQLConnection(dbUser, dbPassword, dbURI, dbMetricsName) + } else if cfg.persistent { conn = NewSQLiteConnection(filepath.Join(cfg.dir, "db.sqlite")) connMetrics = NewSQLiteConnection(filepath.Join(cfg.dir, "metrics.sqlite")) } else { @@ -125,6 +156,18 @@ func (s *testSQLStore) Close() error { return nil } +func (s *testSQLStore) DefaultBucketID() uint { + var b dbBucket + if err := s.db. + Model(&dbBucket{}). + Where("name = ?", api.DefaultBucketName). + Take(&b). + Error; err != nil { + s.t.Fatal(err) + } + return b.ID +} + func (s *testSQLStore) Reopen() *testSQLStore { s.t.Helper() cfg := defaultTestSQLStoreConfig @@ -217,11 +260,13 @@ func (s *SQLStore) contractsCount() (cnt int64, err error) { func (s *SQLStore) overrideSlabHealth(objectID string, health float64) (err error) { err = s.db.Exec(fmt.Sprintf(` UPDATE slabs SET health = %v WHERE id IN ( - SELECT sla.id - FROM objects o - INNER JOIN slices sli ON o.id = sli.db_object_id - INNER JOIN slabs sla ON sli.db_slab_id = sla.id - WHERE o.object_id = "%s" + SELECT * FROM ( + SELECT sla.id + FROM objects o + INNER JOIN slices sli ON o.id = sli.db_object_id + INNER JOIN slabs sla ON sli.db_slab_id = sla.id + WHERE o.object_id = "%s" + ) AS sub )`, health, objectID)).Error return } @@ -283,11 +328,24 @@ func TestConsensusReset(t *testing.T) { } } -type queryPlanExplain struct { - ID int `json:"id"` - Parent int `json:"parent"` - NotUsed bool `json:"notused"` - Detail string `json:"detail"` +type sqliteQueryPlan struct { + Detail string `json:"detail"` +} + +func (p sqliteQueryPlan) usesIndex() bool { + d := strings.ToLower(p.Detail) + return strings.Contains(d, "using index") || strings.Contains(d, "using covering index") +} + +//nolint:tagliatelle +type mysqlQueryPlan struct { + Extra string `json:"Extra"` + PossibleKeys string `json:"possible_keys"` +} + +func (p mysqlQueryPlan) usesIndex() bool { + d := strings.ToLower(p.Extra) + return strings.Contains(d, "using index") || strings.Contains(p.PossibleKeys, "idx_") } func TestQueryPlan(t *testing.T) { @@ -323,14 +381,20 @@ func TestQueryPlan(t *testing.T) { } for _, query := range queries { - var explain queryPlanExplain - err := ss.db.Raw(fmt.Sprintf("EXPLAIN QUERY PLAN %s;", query)).Scan(&explain).Error - if err != nil { - t.Fatal(err) - } - if !(strings.Contains(explain.Detail, "USING INDEX") || - strings.Contains(explain.Detail, "USING COVERING INDEX")) { - t.Fatalf("query '%s' should use an index, instead the plan was '%s'", query, explain.Detail) + if isSQLite(ss.db) { + var explain sqliteQueryPlan + if err := ss.db.Raw(fmt.Sprintf("EXPLAIN QUERY PLAN %s;", query)).Scan(&explain).Error; err != nil { + t.Fatal(err) + } else if !explain.usesIndex() { + t.Fatalf("query '%s' should use an index, instead the plan was %+v", query, explain) + } + } else { + var explain mysqlQueryPlan + if err := ss.db.Raw(fmt.Sprintf("EXPLAIN %s;", query)).Scan(&explain).Error; err != nil { + t.Fatal(err) + } else if !explain.usesIndex() { + t.Fatalf("query '%s' should use an index, instead the plan was %+v", query, explain) + } } } } diff --git a/worker/bench_test.go b/worker/bench_test.go new file mode 100644 index 000000000..552eca17c --- /dev/null +++ b/worker/bench_test.go @@ -0,0 +1,101 @@ +package worker + +import ( + "bytes" + "context" + "io" + "testing" + + rhpv2 "go.sia.tech/core/rhp/v2" + "go.sia.tech/renterd/api" + "lukechampine.com/frand" +) + +// zeroReader is a reader that leaves the buffer unchanged and returns no error. +// It's useful for benchmarks that need to produce data for uploading and should +// be used together with a io.LimitReader. +type zeroReader struct{} + +func (z *zeroReader) Read(p []byte) (n int, err error) { + return len(p), nil +} + +// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized +// object. +// 1036.74 MB/s | M2 Pro | c9dc1b6 +func BenchmarkDownloaderSingleObject(b *testing.B) { + w := newMockWorker() + + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false + w.addHosts(up.rs.TotalShards) + + data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy()))) + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } + o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{}) + if err != nil { + b.Fatal(err) + } + + b.SetBytes(o.Object.Size) + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = w.dl.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkUploaderSingleObject benchmarks uploading a single object. +// +// Speed | CPU | Commit +// 433.86 MB/s | M2 Pro | bae6e77 +func BenchmarkUploaderSingleObject(b *testing.B) { + w := newMockWorker() + + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false + w.addHosts(up.rs.TotalShards) + + data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards)) + b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) + b.ResetTimer() + + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } +} + +// BenchmarkUploaderSingleObject benchmarks uploading one object per slab. +// +// Speed | CPU | Commit +// 282.47 MB/s | M2 Pro | bae6e77 +func BenchmarkUploaderMultiObject(b *testing.B) { + w := newMockWorker() + + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false + w.addHosts(up.rs.TotalShards) + + b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards)) + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/worker/download.go b/worker/download.go index f23c8e640..d5c4b01b3 100644 --- a/worker/download.go +++ b/worker/download.go @@ -26,6 +26,7 @@ const ( var ( errDownloadNotEnoughHosts = errors.New("not enough hosts available to download the slab") + errDownloadCancelled = errors.New("download was cancelled") ) type ( @@ -194,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o hosts[c.HostKey] = struct{}{} } - // buffer the writer - bw := bufio.NewWriter(w) - defer bw.Flush() - // create the cipher writer - cw := o.Key.Decrypt(bw, offset) + cw := o.Key.Decrypt(w, offset) + + // buffer the writer we recover to making sure that we don't hammer the + // response writer with tiny writes + bw := bufio.NewWriter(cw) + defer bw.Flush() // create response chan and ensure it's closed properly var wg sync.WaitGroup @@ -290,7 +292,7 @@ outer: case <-mgr.shutdownCtx.Done(): return ErrShuttingDown case <-ctx.Done(): - return errors.New("download timed out") + return errDownloadCancelled case resp = <-responseChan: } @@ -313,7 +315,7 @@ outer: s := slabs[respIndex] if s.PartialSlab { // Partial slab. - _, err = cw.Write(s.Data) + _, err = bw.Write(s.Data) if err != nil { mgr.logger.Errorf("failed to send partial slab", respIndex, err) return err @@ -321,7 +323,7 @@ outer: } else { // Regular slab. slabs[respIndex].Decrypt(next.shards) - err := slabs[respIndex].Recover(cw, next.shards) + err := slabs[respIndex].Recover(bw, next.shards) if err != nil { mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err) return err @@ -760,8 +762,6 @@ loop: if isSectorNotFound(resp.err) { if err := s.mgr.os.DeleteHostSector(ctx, resp.req.host.PublicKey(), resp.req.root); err != nil { s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root, zap.Error(err)) - } else { - s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root) } } else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay { resp.req.overpay = true // ensures we don't retry the same request over and over again diff --git a/worker/host.go b/worker/host.go index 86e92ce27..e5642efdd 100644 --- a/worker/host.go +++ b/worker/host.go @@ -21,7 +21,7 @@ type ( PublicKey() types.PublicKey DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error - UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) + UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error) @@ -52,7 +52,6 @@ type ( acc *account bus Bus contractSpendingRecorder ContractSpendingRecorder - interactionRecorder HostInteractionRecorder logger *zap.SugaredLogger transportPool *transportPoolV3 priceTables *priceTables @@ -70,7 +69,6 @@ func (w *worker) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr acc: w.accounts.ForHost(hk), bus: w.bus, contractSpendingRecorder: w.contractSpendingRecorder, - interactionRecorder: w.hostInteractionRecorder, logger: w.logger.Named(hk.String()[:4]), fcid: fcid, siamuxAddr: siamuxAddr, @@ -123,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 }) } -func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) { +func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) { // fetch price table pt, err := h.priceTable(ctx, nil) if err != nil { - return types.Hash256{}, err + return err } // prepare payment @@ -136,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, // insufficient balance error expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, err + return err } if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) + return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) } payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey) if !ok { - return types.Hash256{}, errors.New("failed to create payment") + return errors.New("failed to create payment") } var cost types.Currency err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector) + cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector) return err }) if err != nil { - return types.Hash256{}, err + return err } // record spending h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost}) - return root, nil + return nil } func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) { @@ -198,11 +196,13 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) { err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { hpt, err = RPCPriceTable(ctx, t, paymentFn) - h.interactionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{ - HostKey: h.hk, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - PriceTable: hpt, + h.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{ + { + HostKey: h.hk, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }, }) return }) diff --git a/worker/host_test.go b/worker/host_test.go index 87d35fb36..78ce6b74e 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -16,11 +16,9 @@ func TestHost(t *testing.T) { sector, root := newMockSector() // upload the sector - uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{}) + err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{}) if err != nil { t.Fatal(err) - } else if uploaded != root { - t.Fatal("root mismatch") } // download entire sector diff --git a/worker/interactions.go b/worker/interactions.go index dfe8c4017..3e0d6ac62 100644 --- a/worker/interactions.go +++ b/worker/interactions.go @@ -2,7 +2,6 @@ package worker import ( "context" - "fmt" "sync" "time" @@ -10,10 +9,6 @@ import ( "go.uber.org/zap" ) -const ( - keyInteractionRecorder contextKey = "InteractionRecorder" -) - type ( HostInteractionRecorder interface { RecordHostScan(...hostdb.HostScan) @@ -36,100 +31,6 @@ type ( } ) -var ( - _ HostInteractionRecorder = (*hostInteractionRecorder)(nil) -) - -func (w *worker) initHostInteractionRecorder(flushInterval time.Duration) { - if w.hostInteractionRecorder != nil { - panic("HostInteractionRecorder already initialized") // developer error - } - w.hostInteractionRecorder = &hostInteractionRecorder{ - bus: w.bus, - logger: w.logger, - - flushCtx: w.shutdownCtx, - flushInterval: flushInterval, - - hostScans: make([]hostdb.HostScan, 0), - priceTableUpdates: make([]hostdb.PriceTableUpdate, 0), - } -} - -func (r *hostInteractionRecorder) RecordHostScan(scans ...hostdb.HostScan) { - r.mu.Lock() - defer r.mu.Unlock() - r.hostScans = append(r.hostScans, scans...) - r.tryFlushInteractionsBuffer() -} - -func (r *hostInteractionRecorder) RecordPriceTableUpdate(ptUpdates ...hostdb.PriceTableUpdate) { - r.mu.Lock() - defer r.mu.Unlock() - r.priceTableUpdates = append(r.priceTableUpdates, ptUpdates...) - r.tryFlushInteractionsBuffer() -} - -func (r *hostInteractionRecorder) Stop(ctx context.Context) { - // stop the flush timer - r.mu.Lock() - if r.flushTimer != nil { - r.flushTimer.Stop() - } - r.flushCtx = ctx - r.mu.Unlock() - - // flush all interactions - r.flush() - - // log if we weren't able to flush them - r.mu.Lock() - if len(r.hostScans) > 0 { - r.logger.Errorw(fmt.Sprintf("failed to record %d host scans on worker shutdown", len(r.hostScans))) - } - if len(r.priceTableUpdates) > 0 { - r.logger.Errorw(fmt.Sprintf("failed to record %d price table updates on worker shutdown", len(r.priceTableUpdates))) - } - r.mu.Unlock() -} - -func (r *hostInteractionRecorder) flush() { - r.mu.Lock() - defer r.mu.Unlock() - - // NOTE: don't bother flushing if the context is cancelled, we can safely - // ignore the buffered scans and price tables since we'll flush on shutdown - // and log in case we weren't able to flush all interactions to the bus - select { - case <-r.flushCtx.Done(): - r.flushTimer = nil - return - default: - } - - if len(r.hostScans) > 0 { - if err := r.bus.RecordHostScans(r.flushCtx, r.hostScans); err != nil { - r.logger.Errorw(fmt.Sprintf("failed to record scans: %v", err)) - } else if err == nil { - r.hostScans = nil - } - } - if len(r.priceTableUpdates) > 0 { - if err := r.bus.RecordPriceTables(r.flushCtx, r.priceTableUpdates); err != nil { - r.logger.Errorw(fmt.Sprintf("failed to record price table updates: %v", err)) - } else if err == nil { - r.priceTableUpdates = nil - } - } - r.flushTimer = nil -} - -func (r *hostInteractionRecorder) tryFlushInteractionsBuffer() { - if r.flushTimer == nil { - r.flushTimer = time.AfterFunc(r.flushInterval, r.flush) - } -} - func isSuccessfulInteraction(err error) bool { // No error always means success. if err == nil { diff --git a/worker/mocks_test.go b/worker/mocks_test.go index e6fd62d8e..a28e9256c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -274,7 +274,7 @@ func (os *mockObjectStore) Object(ctx context.Context, bucket, path string, opts return api.ObjectsResponse{Object: &api.Object{ ObjectMetadata: api.ObjectMetadata{Name: path, Size: o.TotalSize()}, - Object: o, + Object: &o, }}, nil } @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H return err } -func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) { - return h.contract().addSector(sector), nil +func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { + h.contract().addSector(sectorRoot, sector) + return nil } func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) { @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac } } -func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) { - root = rhpv2.SectorRoot(sector) +func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) { c.mu.Lock() - c.sectors[root] = sector + c.sectors[sectorRoot] = sector c.mu.Unlock() - return } func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 03f67c6f6..45a3610e2 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -337,19 +337,17 @@ type ( // accounts stores the balance and other metrics of accounts that the // worker maintains with a host. accounts struct { - as AccountStore - key types.PrivateKey - shutdownCtx context.Context + as AccountStore + key types.PrivateKey } // account contains information regarding a specific account of the // worker. account struct { - as AccountStore - id rhpv3.Account - key types.PrivateKey - host types.PublicKey - shutdownCtx context.Context + as AccountStore + id rhpv3.Account + key types.PrivateKey + host types.PublicKey } ) @@ -358,9 +356,8 @@ func (w *worker) initAccounts(as AccountStore) { panic("accounts already initialized") // developer error } w.accounts = &accounts{ - as: as, - key: w.deriveSubKey("accountkey"), - shutdownCtx: w.shutdownCtx, + as: as, + key: w.deriveSubKey("accountkey"), } } @@ -376,117 +373,95 @@ func (w *worker) initTransportPool() { func (a *accounts) ForHost(hk types.PublicKey) *account { accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey()) return &account{ - as: a.as, - id: accountID, - key: a.key, - host: hk, - shutdownCtx: a.shutdownCtx, + as: a.as, + id: accountID, + key: a.key, + host: hk, } } -// WithDeposit increases the balance of an account by the amount returned by -// amtFn if amtFn doesn't return an error. -func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) +func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk types.PublicKey, exclusive bool, fn func(a api.Account) error) error { + acc, lockID, err := as.LockAccount(ctx, id, hk, exclusive, accountLockingDuration) if err != nil { return err } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() + err = fn(acc) - amt, err := amtFn() - if err != nil { - return err - } - return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + // unlock account + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + _ = as.UnlockAccount(ctx, acc.ID, lockID) // ignore error + cancel() + + return err } -func (a *account) Balance(ctx context.Context) (types.Currency, error) { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return types.Currency{}, err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() +// Balance returns the account balance. +func (a *account) Balance(ctx context.Context) (balance types.Currency, err error) { + err = withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + balance = types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()) + return nil + }) + return +} - return types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()), nil +// WithDeposit increases the balance of an account by the amount returned by +// amtFn if amtFn doesn't return an error. +func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error { + amt, err := amtFn() + if err != nil { + return err + } + return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + }) +} + +// WithSync syncs an accounts balance with the bus. To do so, the account is +// locked while the balance is fetched through balanceFn. +func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, true, func(_ api.Account) error { + balance, err := balanceFn() + if err != nil { + return err + } + return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) + }) } // WithWithdrawal decreases the balance of an account by the amount returned by // amtFn. The amount is still withdrawn if amtFn returns an error since some // costs are non-refundable. func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Currency, error)) error { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() + return withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + // return early if the account needs to sync + if account.RequiresSync { + return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) + } - // return early if the account needs to sync - if account.RequiresSync { - return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) - } + // return early if our account is not funded + if account.Balance.Cmp(big.NewInt(0)) <= 0 { + return errBalanceInsufficient + } - // return early if our account is not funded - if account.Balance.Cmp(big.NewInt(0)) <= 0 { - return errBalanceInsufficient - } + // execute amtFn + amt, err := amtFn() - // execute amtFn - amt, err := amtFn() - if isBalanceInsufficient(err) { // in case of an insufficient balance, we schedule a sync - scheduleCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - defer cancel() - err2 := a.as.ScheduleSync(scheduleCtx, a.id, a.host) - if err2 != nil { - err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, err2) + if isBalanceInsufficient(err) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + err = errors.Join(err, a.as.ScheduleSync(ctx, a.id, a.host)) + cancel() } - } - - // if the amount is zero, we are done - if amt.IsZero() { - return err - } - // if an amount was returned, we withdraw it. - addCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - defer cancel() - errAdd := a.as.AddBalance(addCtx, a.id, a.host, new(big.Int).Neg(amt.Big())) - if errAdd != nil { - err = fmt.Errorf("%w; failed to add balance to account, error: %v", err, errAdd) - } - return err -} - -// WithSync syncs an accounts balance with the bus. To do so, the account is -// locked while the balance is fetched through balanceFn. -func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, true, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() + // if an amount was returned, we withdraw it + if !amt.IsZero() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + err = errors.Join(err, a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big()))) + cancel() + } - balance, err := balanceFn() - if err != nil { return err - } - return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) + }) } // deriveAccountKey derives an account plus key for a given host and worker. @@ -789,17 +764,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho return } -func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) { +func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) { defer wrapErr(&err, "AppendSector") // sanity check revision first if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached + return types.ZeroCurrency, errMaxRevisionReached } s, err := t.DialStream(ctx) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } defer s.Close() @@ -829,7 +804,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // compute expected collateral and refund expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } // apply leeways. @@ -840,13 +815,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // check if the cost, collateral and refund match our expectation. if executeResp.TotalCost.Cmp(expectedCost) > 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) + return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) } if executeResp.FailureRefund.Cmp(expectedRefund) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) } if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) } // set the cost and refund @@ -870,18 +845,17 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund) // check proof - sectorRoot = rhpv2.SectorRoot(sector) if rev.Filesize == 0 { // For the first upload to a contract we don't get a proof. So we just // assert that the new contract root matches the root of the sector. if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) + return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) } } else { // Otherwise we make sure the proof was transmitted and verify it. actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) { - return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed") + return types.ZeroCurrency, errors.New("proof verification failed") } } @@ -889,7 +863,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat newRevision := *rev newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } newRevision.Filesize += rhpv2.SectorSize newRevision.RevisionNumber++ diff --git a/worker/upload.go b/worker/upload.go index 72c65bf07..8bd87f881 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -137,9 +137,8 @@ type ( } sectorUploadResp struct { - req *sectorUploadReq - root types.Hash256 - err error + req *sectorUploadReq + err error } ) @@ -400,11 +399,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // create the object o := object.NewObject(up.ec) - // create the hash reader - hr := newHashReader(r) - // create the cipher reader - cr, err := o.Encrypt(hr, up.encryptionOffset) + cr, err := o.Encrypt(r, up.encryptionOffset) if err != nil { return false, "", err } @@ -532,8 +528,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a o.Slabs = append(o.Slabs, resp.slab) } - // calculate the eTag - eTag = hr.Hash() + // compute etag + eTag = o.ComputeETag() // add partial slabs if len(partialSlab) > 0 { @@ -765,20 +761,32 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ responseChan := make(chan sectorUploadResp) // prepare sectors + var wg sync.WaitGroup sectors := make([]*sectorUpload, len(shards)) - for sI, shard := range shards { - // create the ctx - sCtx, sCancel := context.WithCancel(ctx) - - // create the sector - sectors[sI] = §orUpload{ - data: (*[rhpv2.SectorSize]byte)(shard), - index: sI, - root: rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(shard)), - ctx: sCtx, - cancel: sCancel, - } + for sI := range shards { + wg.Add(1) + go func(idx int) { + // create the ctx + sCtx, sCancel := context.WithCancel(ctx) + + // create the sector + // NOTE: we are computing the sector root here and pass it all the + // way down to the RPC to avoid having to recompute it for the proof + // verification. This is necessary because we need it ahead of time + // for the call to AddUploadingSector in uploader.go + // Once we upload to temp storage we don't need AddUploadingSector + // anymore and can move it back to the RPC. + sectors[idx] = §orUpload{ + data: (*[rhpv2.SectorSize]byte)(shards[idx]), + index: idx, + root: rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(shards[idx])), + ctx: sCtx, + cancel: sCancel, + } + wg.Done() + }(sI) } + wg.Wait() // prepare candidates candidates := make([]*candidate, len(uploaders)) @@ -833,8 +841,6 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data } func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { - start := time.Now() - // ensure inflight uploads get cancelled ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -871,6 +877,10 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [ // create a request buffer var buffer []*sectorUploadReq + // start the timer after the upload has started + // newSlabUpload is quite slow due to computing the sector roots + start := time.Now() + // collect responses var used bool var done bool @@ -930,6 +940,9 @@ loop: // calculate the upload speed bytes := slab.numUploaded * rhpv2.SectorSize ms := time.Since(start).Milliseconds() + if ms == 0 { + ms = 1 + } uploadSpeed = int64(bytes) / ms // calculate overdrive pct @@ -1057,12 +1070,6 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { return false, false } - // sanity check we receive the expected root - if resp.root != req.sector.root { - s.errs[req.hk] = fmt.Errorf("root mismatch, %v != %v", resp.root, req.sector.root) - return false, false - } - // redundant sectors can't complete the upload if sector.uploaded.Root != (types.Hash256{}) { return false, false @@ -1072,7 +1079,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { sector.finish(object.Sector{ Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}}, LatestHost: req.hk, - Root: resp.root, + Root: req.sector.root, }) // update uploaded sectors @@ -1119,7 +1126,7 @@ func (req *sectorUploadReq) done() bool { } } -func (req *sectorUploadReq) fail(err error) { +func (req *sectorUploadReq) finish(err error) { select { case <-req.sector.ctx.Done(): case req.responseChan <- sectorUploadResp{ @@ -1128,13 +1135,3 @@ func (req *sectorUploadReq) fail(err error) { }: } } - -func (req *sectorUploadReq) succeed(root types.Hash256) { - select { - case <-req.sector.ctx.Done(): - case req.responseChan <- sectorUploadResp{ - req: req, - root: root, - }: - } -} diff --git a/worker/upload_test.go b/worker/upload_test.go index 8d32455bd..0b9f6b28b 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -64,7 +64,7 @@ func TestUpload(t *testing.T) { // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -90,7 +90,7 @@ func TestUpload(t *testing.T) { // download the data again and assert it matches buf.Reset() - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), filtered) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), filtered) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -107,7 +107,7 @@ func TestUpload(t *testing.T) { // download the data again and assert it fails buf.Reset() - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), filtered) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), filtered) if !errors.Is(err, errDownloadNotEnoughHosts) { t.Fatal("expected not enough hosts error", err) } @@ -170,7 +170,7 @@ func TestUploadPackedSlab(t *testing.T) { // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -206,7 +206,7 @@ func TestUploadPackedSlab(t *testing.T) { // download the data again and assert it matches buf.Reset() - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -325,7 +325,7 @@ func TestUploadShards(t *testing.T) { // download the data and assert it matches var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), contracts) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), contracts) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { @@ -485,7 +485,7 @@ func TestUploadRegression(t *testing.T) { // download data for good measure var buf bytes.Buffer - err = dl.DownloadObject(context.Background(), &buf, o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + err = dl.DownloadObject(context.Background(), &buf, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) if err != nil { t.Fatal(err) } else if !bytes.Equal(data, buf.Bytes()) { diff --git a/worker/upload_utils.go b/worker/upload_utils.go index 4b5241b4d..306e1774f 100644 --- a/worker/upload_utils.go +++ b/worker/upload_utils.go @@ -2,11 +2,9 @@ package worker import ( "bytes" - "encoding/hex" "io" "github.com/gabriel-vasile/mimetype" - "go.sia.tech/core/types" "go.sia.tech/renterd/object" ) @@ -28,28 +26,3 @@ func newMimeReader(r io.Reader) (mimeType string, recycled io.Reader, err error) recycled = io.MultiReader(buf, r) return mtype.String(), recycled, err } - -type hashReader struct { - r io.Reader - h *types.Hasher -} - -func newHashReader(r io.Reader) *hashReader { - return &hashReader{ - r: r, - h: types.NewHasher(), - } -} - -func (e *hashReader) Read(p []byte) (int, error) { - n, err := e.r.Read(p) - if _, wErr := e.h.E.Write(p[:n]); wErr != nil { - return 0, wErr - } - return n, err -} - -func (e *hashReader) Hash() string { - sum := e.h.Sum() - return hex.EncodeToString(sum[:]) -} diff --git a/worker/uploader.go b/worker/uploader.go index dcff27eaf..fa1d04651 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -114,7 +114,7 @@ outer: } // execute it - root, elapsed, err := u.execute(req) + elapsed, err := u.execute(req) // the uploader's contract got renewed, requeue the request if errors.Is(err, errMaxRevisionReached) { @@ -125,10 +125,12 @@ outer: } // send the response - if err != nil { - req.fail(err) - } else { - req.succeed(root) + select { + case <-req.sector.ctx.Done(): + case req.responseChan <- sectorUploadResp{ + req: req, + err: err, + }: } // track the error, ignore gracefully closed streams and canceled overdrives @@ -151,7 +153,7 @@ func (u *uploader) Stop(err error) { break } if !upload.done() { - upload.fail(err) + upload.finish(err) } } } @@ -161,7 +163,7 @@ func (u *uploader) enqueue(req *sectorUploadReq) { // check for stopped if u.stopped { u.mu.Unlock() - go req.fail(errUploaderStopped) // don't block the caller + go req.finish(errUploaderStopped) // don't block the caller return } @@ -192,7 +194,7 @@ func (u *uploader) estimate() float64 { return numSectors * estimateP90 } -func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, error) { +func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) { // grab fields u.mu.Lock() host := u.host @@ -202,7 +204,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // acquire contract lock lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration) if err != nil { - return types.Hash256{}, 0, err + return 0, err } // defer the release @@ -220,26 +222,26 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // fetch the revision rev, err := host.FetchRevision(ctx, defaultRevisionFetchTimeout) if err != nil { - return types.Hash256{}, 0, err + return 0, err } else if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, 0, errMaxRevisionReached + return 0, errMaxRevisionReached } // update the bus if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) } // upload the sector start := time.Now() - root, err := host.UploadSector(ctx, req.sector.sectorData(), rev) + err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev) if err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) } // calculate elapsed time elapsed := time.Since(start) - return root, elapsed, nil + return elapsed, nil } func (u *uploader) pop() *sectorUploadReq { diff --git a/worker/worker.go b/worker/worker.go index 0e41bd00c..52a59559d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -199,7 +199,6 @@ type worker struct { uploadsMu sync.Mutex uploadingPackedSlabs map[string]bool - hostInteractionRecorder HostInteractionRecorder contractSpendingRecorder ContractSpendingRecorder contractLockingDuration time.Duration @@ -341,11 +340,13 @@ func (w *worker) rhpPriceTableHandler(jc jape.Context) { var err error var hpt hostdb.HostPriceTable defer func() { - w.hostInteractionRecorder.RecordPriceTableUpdate(hostdb.PriceTableUpdate{ - HostKey: rptr.HostKey, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - PriceTable: hpt, + w.bus.RecordPriceTables(ctx, []hostdb.PriceTableUpdate{ + { + HostKey: rptr.HostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + PriceTable: hpt, + }, }) }() @@ -918,10 +919,12 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { // create a download function downloadFn := func(wr io.Writer, offset, length int64) (err error) { ctx = WithGougingChecker(ctx, w.bus, gp) - err = w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts) + err = w.downloadManager.DownloadObject(ctx, wr, *res.Object.Object, uint64(offset), uint64(length), contracts) if err != nil { w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) { + if !errors.Is(err, ErrShuttingDown) && + !errors.Is(err, errDownloadCancelled) && + !errors.Is(err, io.ErrClosedPipe) { w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) } } @@ -1290,6 +1293,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush return nil, errors.New("uploadMaxMemory cannot be 0") } + ctx, cancel := context.WithCancel(context.Background()) w := &worker{ alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)), allowPrivateIPs: allowPrivateIPs, @@ -1300,13 +1304,10 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush logger: l.Sugar().Named("worker").Named(id), startTime: time.Now(), uploadingPackedSlabs: make(map[string]bool), + shutdownCtx: ctx, + shutdownCtxCancel: cancel, } - ctx, cancel := context.WithCancel(context.Background()) - ctx = context.WithValue(ctx, keyInteractionRecorder, w) - w.shutdownCtx = ctx - w.shutdownCtxCancel = cancel - w.initAccounts(b) w.initPriceTables() w.initTransportPool() @@ -1315,7 +1316,6 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush w.initUploadManager(uploadMaxMemory, uploadMaxOverdrive, uploadOverdriveTimeout, l.Sugar().Named("uploadmanager")) w.initContractSpendingRecorder(busFlushInterval) - w.initHostInteractionRecorder(busFlushInterval) return w, nil } @@ -1362,7 +1362,6 @@ func (w *worker) Shutdown(ctx context.Context) error { w.uploadManager.Stop() // stop recorders - w.hostInteractionRecorder.Stop(ctx) w.contractSpendingRecorder.Stop(ctx) return nil } @@ -1439,14 +1438,23 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s default: } - // record host scan - w.hostInteractionRecorder.RecordHostScan(hostdb.HostScan{ - HostKey: hostKey, - Success: isSuccessfulInteraction(err), - Timestamp: time.Now(), - Settings: settings, - PriceTable: pt, + // record host scan - make sure this isn't interrupted by the same context + // used to time out the scan itself because otherwise we won't be able to + // record scans that timed out. + recordCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + scanErr := w.bus.RecordHostScans(recordCtx, []hostdb.HostScan{ + { + HostKey: hostKey, + Success: isSuccessfulInteraction(err), + Timestamp: time.Now(), + Settings: settings, + PriceTable: pt, + }, }) + if scanErr != nil { + w.logger.Errorf("failed to record host scan: %v", scanErr) + } return settings, pt, duration, err }