diff --git a/db/db.go b/db/db.go index e29cf5f..9b5cc5d 100644 --- a/db/db.go +++ b/db/db.go @@ -85,7 +85,6 @@ func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, if err != nil { return 0, fmt.Errorf("deleting LIB undo operations: %w", err) } - undoOperations, fetchDuration, err := db.GenerateUndoOperations(ctx, kvOps.Operations) prevValueFetchDuration = fetchDuration if err != nil { diff --git a/sinker/sinker.go b/sinker/sinker.go index dd3fd08..61a2267 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -102,10 +102,12 @@ func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreams return fmt.Errorf("unmarshal database changes: %w", err) } + startHandleOperations := time.Now() prevValueFetchDuration, err := s.operationDB.HandleOperations(ctx, data.Clock.Number, data.FinalBlockHeight, cursor.Step, kvOps) if err != nil { return fmt.Errorf("handling operation: %w", err) } + s.stats.RecordHandleOperationsDuration(time.Since(startHandleOperations)) s.stats.RecordDurationFetchPrevValueFetch(prevValueFetchDuration) BlockCount.Inc() diff --git a/sinker/stats.go b/sinker/stats.go index b86b32c..7097ad5 100644 --- a/sinker/stats.go +++ b/sinker/stats.go @@ -22,6 +22,7 @@ type Stats struct { blockScopedDataProcessDuration *dmetrics.AvgDurationCounter durationBetweenBlock *dmetrics.AvgDurationCounter fetchPrevValuesDuration *dmetrics.AvgDurationCounter + handleOperationsDuration *dmetrics.AvgDurationCounter finalBlockHeight uint64 } @@ -36,6 +37,7 @@ func NewStats(logger *zap.Logger) *Stats { fetchPrevValuesDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "fetch prev values duration"), blockScopedDataProcessDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "process duration"), durationBetweenBlock: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "duration between block"), + handleOperationsDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "handle operations duration"), lastBlock: unsetBlockRef{}, logger: logger, } @@ -57,6 +59,9 @@ func (s *Stats) RecordProcessDuration(duration time.Duration) { func (s *Stats) RecordDuractionBetweenBlock(duration time.Duration) { s.durationBetweenBlock.AddDuration(duration) } +func (s *Stats) RecordHandleOperationsDuration(duration time.Duration) { + s.handleOperationsDuration.AddDuration(duration) +} func (s *Stats) RecordDurationFetchPrevValueFetch(duration time.Duration) { s.fetchPrevValuesDuration.AddDuration(duration) @@ -95,6 +100,7 @@ func (s *Stats) LogNow() { zap.String("process_duration", s.blockScopedDataProcessDuration.String()), zap.String("duration_between_block", s.durationBetweenBlock.String()), zap.String("fetch_prev_values_duration", s.fetchPrevValuesDuration.String()), + zap.String("handle_operations_duration", s.handleOperationsDuration.String()), zap.Stringer("block_rate", s.blockRate), zap.Uint64("flushed_entries", s.flushedEntries.ValueUint()), zap.Stringer("last_block", s.lastBlock),