Skip to content

Commit

Permalink
stores: add logging; worker: add debug logging for pruning; cmd: add …
Browse files Browse the repository at this point in the history
…name to sql logger; contractor; break revision errors up into multiple log lines
  • Loading branch information
ChrisSchinnerl committed Mar 21, 2024
1 parent c148336 commit d483b00
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
5 changes: 4 additions & 1 deletion api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type (
// ContractsResponse is the response type for the /rhp/contracts endpoint.
ContractsResponse struct {
Contracts []Contract `json:"contracts"`
Error string `json:"error,omitempty"`
Errors map[types.PublicKey]string

// deprecated
Error string `json:"error,omitempty"`
}

MemoryResponse struct {
Expand Down
6 changes: 4 additions & 2 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (
if err != nil {
return false, err
}
if resp.Error != "" {
c.logger.Error(resp.Error)
if resp.Errors != nil {
for pk, err := range resp.Errors {
c.logger.With("hostKey", pk).With("error", err).Warn("failed to fetch revision")
}
}
contracts := resp.Contracts
c.logger.Infof("fetched %d contracts from the worker, took %v", len(resp.Contracts), time.Since(start))
Expand Down
2 changes: 1 addition & 1 deletion cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func main() {
dbLogCfg = cfg.Database.Log
}
busCfg.DBLogger = zapgorm2.Logger{
ZapLogger: logger,
ZapLogger: logger.Named("SQL"),
LogLevel: level,
SlowThreshold: dbLogCfg.SlowThreshold,
SkipCallerLookup: false,
Expand Down
6 changes: 6 additions & 0 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,12 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS
DBSectorID: sectorID,
DBContractID: contracts[fcid].ID,
})
} else {
s.logger.Warn("missing contract for shard",
"contract", fcid,
"root", shard.Root,
"latest_host", shard.LatestHost,
)
}
}
}
Expand Down
34 changes: 29 additions & 5 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types
err = w.withContractLock(ctx, fcid, lockingPriorityPruning, func() error {
return w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error {
return w.withRevisionV2(defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) {
logger := w.logger.
With("hostKey", hostKey).
With("hostVersion", settings.Version).
With("fcid", fcid).
With("revisionNumber", rev.Revision.RevisionNumber).
With("lastKnownRevisionNumber", lastKnownRevisionNumber).
Named("pruneContract")

// perform gouging checks
gc, err := GougingCheckerFromContext(ctx, false)
if err != nil {
Expand Down Expand Up @@ -316,6 +324,7 @@ func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types
delete(keep, root) // prevent duplicates
continue
}
logger.With("index", i).With("root", root).Debug("collected root for pruning")
indices = append(indices, uint64(i))
}
if len(indices) == 0 {
Expand All @@ -339,7 +348,14 @@ func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types
}

func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, indices []uint64) (deleted uint64, err error) {
w.logger.Infow(fmt.Sprintf("deleting %d contract roots (%v)", len(indices), humanReadableSize(len(indices)*rhpv2.SectorSize)), "hk", rev.HostKey(), "fcid", rev.ID())
logger := w.logger.
With("hostKey", t.HostKey()).
With("hostVersion", settings.Version).
With("fcid", rev.Revision.ParentID).
With("revisionNumber", rev.Revision.RevisionNumber).
Named("deleteContractRoots")

logger.Infow(fmt.Sprintf("deleting %d contract roots (%v)", len(indices), humanReadableSize(len(indices)*rhpv2.SectorSize)), "hk", rev.HostKey(), "fcid", rev.ID())

// return early
if len(indices) == 0 {
Expand Down Expand Up @@ -380,9 +396,9 @@ func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevi
if err = func() error {
var cost types.Currency
start := time.Now()
w.logger.Infow(fmt.Sprintf("starting batch %d/%d of size %d", i+1, len(batches), len(batch)))
logger.Infow(fmt.Sprintf("starting batch %d/%d of size %d", i+1, len(batches), len(batch)))
defer func() {
w.logger.Infow(fmt.Sprintf("processing batch %d/%d of size %d took %v", i+1, len(batches), len(batch), time.Since(start)), "cost", cost)
logger.Infow(fmt.Sprintf("processing batch %d/%d of size %d took %v", i+1, len(batches), len(batch), time.Since(start)), "cost", cost)
}()

numSectors := rev.NumSectors()
Expand Down Expand Up @@ -462,7 +478,7 @@ func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevi
return err
} else if err := t.ReadResponse(&merkleResp, minMessageSize+responseSize); err != nil {
err := fmt.Errorf("couldn't read Merkle proof response, err: %v", err)
w.logger.Infow(fmt.Sprintf("processing batch %d/%d failed, err %v", i+1, len(batches), err))
logger.Infow(fmt.Sprintf("processing batch %d/%d failed, err %v", i+1, len(batches), err))
return err
}

Expand All @@ -472,7 +488,7 @@ func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevi
oldRoot, newRoot := types.Hash256(rev.Revision.FileMerkleRoot), merkleResp.NewMerkleRoot
if rev.Revision.Filesize > 0 && !rhpv2.VerifyDiffProof(actions, numSectors, proofHashes, leafHashes, oldRoot, newRoot, nil) {
err := fmt.Errorf("couldn't verify delete proof, host %v, version %v; %w", rev.HostKey(), settings.Version, ErrInvalidMerkleProof)
w.logger.Infow(fmt.Sprintf("processing batch %d/%d failed, err %v", i+1, len(batches), err))
logger.Infow(fmt.Sprintf("processing batch %d/%d failed, err %v", i+1, len(batches), err))
t.WriteResponseErr(err)
return err
}
Expand Down Expand Up @@ -506,6 +522,14 @@ func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevi

// record spending
w.contractSpendingRecorder.Record(rev.Revision, api.ContractSpending{Deletions: cost})

for _, action := range actions {
if action.Type == rhpv2.RPCWriteActionSwap {
logger.With("index", action.B).Debug("successfully swapped sector")
} else if action.Type == rhpv2.RPCWriteActionTrim {
logger.With("n", action.A).Debug("successfully trimmed sectors")
}
}
return nil
}(); err != nil {
return
Expand Down
4 changes: 4 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,10 @@ func (w *worker) rhpContractsHandlerGET(jc jape.Context) {
resp := api.ContractsResponse{Contracts: contracts}
if errs != nil {
resp.Error = errs.Error()
resp.Errors = make(map[types.PublicKey]string)
for pk, err := range errs {
resp.Errors[pk] = err.Error()
}
}
jc.Encode(resp)
}
Expand Down

0 comments on commit d483b00

Please sign in to comment.