From 05e1bcd69307b61759f140d444f658837b7b7117 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 14 Nov 2023 15:11:03 +0100 Subject: [PATCH] Scanning commit roots in steps to limit number of logs fetched to memory. --- .../ccip/execution_reporting_plugin.go | 256 ++++++++++-------- 1 file changed, 143 insertions(+), 113 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index a90617b4ad0..4deb8bc461d 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -37,7 +37,8 @@ const ( MaxExecutionReportLength = 250_000 // MaxDataLenPerBatch limits the total length of msg data that can be in a batch. - MaxDataLenPerBatch = 60_000 + MaxDataLenPerBatch = 60_000 + MessagesIterationStep = 800 ) var ( @@ -211,7 +212,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty } func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ObservedMessage, error) { - unexpiredReports, err := getUnexpiredCommitReports( + allUnexpiredReports, err := getUnexpiredCommitReports( ctx, r.config.commitStoreReader, r.onchainConfig.PermissionLessExecutionThresholdSeconds, @@ -219,135 +220,149 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context if err != nil { return nil, err } - lggr.Infow("Unexpired roots", "n", len(unexpiredReports)) - if len(unexpiredReports) == 0 { + lggr.Infow("Unexpired roots", "n", len(allUnexpiredReports)) + if len(allUnexpiredReports) == 0 { return []ObservedMessage{}, nil } - // This could result in slightly different values on each call as - // the function returns the allowed amount at the time of the last block. - // Since this will only increase over time, the highest observed value will - // always be the lower bound of what would be available on chain - // since we already account for inflight txs. - getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) { - return r.config.offRampReader.CurrentRateLimiterState(ctx) - }) - sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx) - if err != nil { - return nil, err - } - getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx) - if err1 != nil { - return nil, err1 - } - return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken}) - }) - getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - dstTokens, err1 := r.cachedDestTokens.Get(ctx) - if err1 != nil { - return nil, err1 + allUnexpiredNotSnoozedReports := func() []ccipdata.CommitStoreReport { + index := 0 + for _, rep := range allUnexpiredReports { + if r.snoozedRoots.IsSnoozed(rep.MerkleRoot) { + lggr.Debug("Skipping snoozed root", "minSeqNr", rep.Interval.Min, "maxSeqNr", rep.Interval.Max) + } else { + allUnexpiredReports[index] = rep + index++ + } } - return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative)) - }) - getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) { - return r.gasPriceEstimator.GetGasPrice(ctx) - }) - - lggr.Infow("Processing unexpired reports", "n", len(unexpiredReports)) - measureNumberOfReportsProcessed(timestamp, len(unexpiredReports)) - reportIterationStart := time.Now() - defer func() { - measureReportsIterationDuration(timestamp, time.Since(reportIterationStart)) + return allUnexpiredReports[:index] }() - unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports) - if err != nil { - return nil, err - } - - getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens) - }) - - for _, rep := range unexpiredReportsWithSendReqs { - if ctx.Err() != nil { - lggr.Warn("Processing of roots killed by context") - break + lggr.Infow("Unexpired roots", "all", len(allUnexpiredReports), "notSnoozed", len(allUnexpiredNotSnoozedReports)) + + for j := 0; j < len(allUnexpiredReports); { + unexpiredReports, step := selectReportsToFillBatch(allUnexpiredNotSnoozedReports[j:], MessagesIterationStep) + j += step + + // This could result in slightly different values on each call as + // the function returns the allowed amount at the time of the last block. + // Since this will only increase over time, the highest observed value will + // always be the lower bound of what would be available on chain + // since we already account for inflight txs. + getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) { + return r.config.offRampReader.CurrentRateLimiterState(ctx) + }) + sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx) + if err != nil { + return nil, err + } + getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx) + if err1 != nil { + return nil, err1 + } + return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken}) + }) + getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + dstTokens, err1 := r.cachedDestTokens.Get(ctx) + if err1 != nil { + return nil, err1 + } + return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative)) + }) + getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) { + return r.gasPriceEstimator.GetGasPrice(ctx) + }) + + measureNumberOfReportsProcessed(timestamp, len(unexpiredReports)) + reportIterationStart := time.Now() + defer func() { + measureReportsIterationDuration(timestamp, time.Since(reportIterationStart)) + }() + + unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports) + if err != nil { + return nil, err } - merkleRoot := rep.commitReport.MerkleRoot + getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens) + }) - rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]), - "minSeqNr", rep.commitReport.Interval.Min, - "maxSeqNr", rep.commitReport.Interval.Max, - ) + for _, rep := range unexpiredReportsWithSendReqs { + if ctx.Err() != nil { + lggr.Warn("Processing of roots killed by context") + break + } - if r.snoozedRoots.IsSnoozed(merkleRoot) { - rootLggr.Debug("Skipping snoozed root") - continue - } + merkleRoot := rep.commitReport.MerkleRoot - if err := rep.validate(); err != nil { - rootLggr.Errorw("Skipping invalid report", "err", err) - continue - } + rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]), + "minSeqNr", rep.commitReport.Interval.Min, + "maxSeqNr", rep.commitReport.Interval.Max, + ) - // If all messages are already executed and finalized, snooze the root for - // config.PermissionLessExecutionThresholdSeconds so it will never be considered again. - if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized { - rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:])) - r.snoozedRoots.MarkAsExecuted(merkleRoot) - incSkippedRequests(reasonAllExecuted) - continue - } + if err := rep.validate(); err != nil { + rootLggr.Errorw("Skipping invalid report", "err", err) + continue + } - blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot) - if err != nil { - return nil, err - } - if !blessed { - rootLggr.Infow("Report is accepted but not blessed") - incSkippedRequests(reasonNotBlessed) - continue - } + // If all messages are already executed and finalized, snooze the root for + // config.PermissionLessExecutionThresholdSeconds so it will never be considered again. + if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized { + rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:])) + r.snoozedRoots.MarkAsExecuted(merkleRoot) + incSkippedRequests(reasonAllExecuted) + continue + } - allowedTokenAmountValue, err := getAllowedTokenAmount() - if err != nil { - return nil, err - } - sourceTokensPricesValue, err := getSourceTokensPrices() - if err != nil { - return nil, fmt.Errorf("get source token prices: %w", err) - } + blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot) + if err != nil { + return nil, err + } + if !blessed { + rootLggr.Infow("Report is accepted but not blessed") + incSkippedRequests(reasonNotBlessed) + continue + } - destTokensPricesValue, err := getDestTokensPrices() - if err != nil { - return nil, fmt.Errorf("get dest token prices: %w", err) - } + allowedTokenAmountValue, err := getAllowedTokenAmount() + if err != nil { + return nil, err + } + sourceTokensPricesValue, err := getSourceTokensPrices() + if err != nil { + return nil, fmt.Errorf("get source token prices: %w", err) + } - destPoolRateLimits, err := getDestPoolRateLimits() - if err != nil { - return nil, fmt.Errorf("get dest pool rate limits: %w", err) - } + destTokensPricesValue, err := getDestTokensPrices() + if err != nil { + return nil, fmt.Errorf("get dest token prices: %w", err) + } - buildBatchDuration := time.Now() - batch := r.buildBatch( - ctx, - rootLggr, - rep, - inflight, - allowedTokenAmountValue.Tokens, - sourceTokensPricesValue, - destTokensPricesValue, - getDestGasPrice, - sourceToDestTokens, - destPoolRateLimits) - measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration)) - if len(batch) != 0 { - return batch, nil + destPoolRateLimits, err := getDestPoolRateLimits() + if err != nil { + return nil, fmt.Errorf("get dest pool rate limits: %w", err) + } + + buildBatchDuration := time.Now() + batch := r.buildBatch( + ctx, + rootLggr, + rep, + inflight, + allowedTokenAmountValue.Tokens, + sourceTokensPricesValue, + destTokensPricesValue, + getDestGasPrice, + sourceToDestTokens, + destPoolRateLimits) + measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration)) + if len(batch) != 0 { + return batch, nil + } + r.snoozedRoots.Snooze(merkleRoot) } - r.snoozedRoots.Snooze(merkleRoot) } return []ObservedMessage{}, nil } @@ -1162,3 +1177,18 @@ func getUnexpiredCommitReports( } return reports, nil } + +func selectReportsToFillBatch(unexpiredReports []ccipdata.CommitStoreReport, messagesLimit int) ([]ccipdata.CommitStoreReport, int) { + currentNumberOfMessages := 0 + var index int + + for index = 0; index < len(unexpiredReports); index++ { + currentNumberOfMessages += int(unexpiredReports[index].Interval.Max - unexpiredReports[index].Interval.Min + 1) + if currentNumberOfMessages >= messagesLimit { + break + } + } + + index = min(index, len(unexpiredReports)-1) + return unexpiredReports[:index], index +}