Skip to content

Commit

Permalink
Scanning commit roots in steps to limit number of logs fetched to mem…
Browse files Browse the repository at this point in the history
…ory.
  • Loading branch information
mateusz-sekara committed Nov 14, 2023
1 parent 9488826 commit 05e1bcd
Showing 1 changed file with 143 additions and 113 deletions.
256 changes: 143 additions & 113 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
MaxExecutionReportLength = 250_000

// MaxDataLenPerBatch limits the total length of msg data that can be in a batch.
MaxDataLenPerBatch = 60_000
MaxDataLenPerBatch = 60_000
MessagesIterationStep = 800
)

var (
Expand Down Expand Up @@ -211,143 +212,157 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ObservedMessage, error) {
unexpiredReports, err := getUnexpiredCommitReports(
allUnexpiredReports, err := getUnexpiredCommitReports(
ctx,
r.config.commitStoreReader,
r.onchainConfig.PermissionLessExecutionThresholdSeconds,
)
if err != nil {
return nil, err
}
lggr.Infow("Unexpired roots", "n", len(unexpiredReports))
if len(unexpiredReports) == 0 {
lggr.Infow("Unexpired roots", "n", len(allUnexpiredReports))
if len(allUnexpiredReports) == 0 {
return []ObservedMessage{}, nil
}

// This could result in slightly different values on each call as
// the function returns the allowed amount at the time of the last block.
// Since this will only increase over time, the highest observed value will
// always be the lower bound of what would be available on chain
// since we already account for inflight txs.
getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) {
return r.config.offRampReader.CurrentRateLimiterState(ctx)
})
sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx)
if err != nil {
return nil, err
}
getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken})
})
getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
dstTokens, err1 := r.cachedDestTokens.Get(ctx)
if err1 != nil {
return nil, err1
allUnexpiredNotSnoozedReports := func() []ccipdata.CommitStoreReport {
index := 0
for _, rep := range allUnexpiredReports {
if r.snoozedRoots.IsSnoozed(rep.MerkleRoot) {
lggr.Debug("Skipping snoozed root", "minSeqNr", rep.Interval.Min, "maxSeqNr", rep.Interval.Max)
} else {
allUnexpiredReports[index] = rep
index++
}
}
return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative))
})
getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) {
return r.gasPriceEstimator.GetGasPrice(ctx)
})

lggr.Infow("Processing unexpired reports", "n", len(unexpiredReports))
measureNumberOfReportsProcessed(timestamp, len(unexpiredReports))
reportIterationStart := time.Now()
defer func() {
measureReportsIterationDuration(timestamp, time.Since(reportIterationStart))
return allUnexpiredReports[:index]
}()

unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports)
if err != nil {
return nil, err
}

getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens)
})

for _, rep := range unexpiredReportsWithSendReqs {
if ctx.Err() != nil {
lggr.Warn("Processing of roots killed by context")
break
lggr.Infow("Unexpired roots", "all", len(allUnexpiredReports), "notSnoozed", len(allUnexpiredNotSnoozedReports))

for j := 0; j < len(allUnexpiredReports); {
unexpiredReports, step := selectReportsToFillBatch(allUnexpiredNotSnoozedReports[j:], MessagesIterationStep)
j += step

// This could result in slightly different values on each call as
// the function returns the allowed amount at the time of the last block.
// Since this will only increase over time, the highest observed value will
// always be the lower bound of what would be available on chain
// since we already account for inflight txs.
getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) {
return r.config.offRampReader.CurrentRateLimiterState(ctx)
})
sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx)
if err != nil {
return nil, err
}
getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken})
})
getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
dstTokens, err1 := r.cachedDestTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative))
})
getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) {
return r.gasPriceEstimator.GetGasPrice(ctx)
})

measureNumberOfReportsProcessed(timestamp, len(unexpiredReports))
reportIterationStart := time.Now()
defer func() {
measureReportsIterationDuration(timestamp, time.Since(reportIterationStart))
}()

unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports)
if err != nil {
return nil, err
}

merkleRoot := rep.commitReport.MerkleRoot
getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens)
})

rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]),
"minSeqNr", rep.commitReport.Interval.Min,
"maxSeqNr", rep.commitReport.Interval.Max,
)
for _, rep := range unexpiredReportsWithSendReqs {
if ctx.Err() != nil {
lggr.Warn("Processing of roots killed by context")
break
}

if r.snoozedRoots.IsSnoozed(merkleRoot) {
rootLggr.Debug("Skipping snoozed root")
continue
}
merkleRoot := rep.commitReport.MerkleRoot

if err := rep.validate(); err != nil {
rootLggr.Errorw("Skipping invalid report", "err", err)
continue
}
rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]),
"minSeqNr", rep.commitReport.Interval.Min,
"maxSeqNr", rep.commitReport.Interval.Max,
)

// If all messages are already executed and finalized, snooze the root for
// config.PermissionLessExecutionThresholdSeconds so it will never be considered again.
if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized {
rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:]))
r.snoozedRoots.MarkAsExecuted(merkleRoot)
incSkippedRequests(reasonAllExecuted)
continue
}
if err := rep.validate(); err != nil {
rootLggr.Errorw("Skipping invalid report", "err", err)
continue
}

blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot)
if err != nil {
return nil, err
}
if !blessed {
rootLggr.Infow("Report is accepted but not blessed")
incSkippedRequests(reasonNotBlessed)
continue
}
// If all messages are already executed and finalized, snooze the root for
// config.PermissionLessExecutionThresholdSeconds so it will never be considered again.
if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized {
rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:]))
r.snoozedRoots.MarkAsExecuted(merkleRoot)
incSkippedRequests(reasonAllExecuted)
continue
}

allowedTokenAmountValue, err := getAllowedTokenAmount()
if err != nil {
return nil, err
}
sourceTokensPricesValue, err := getSourceTokensPrices()
if err != nil {
return nil, fmt.Errorf("get source token prices: %w", err)
}
blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot)
if err != nil {
return nil, err
}
if !blessed {
rootLggr.Infow("Report is accepted but not blessed")
incSkippedRequests(reasonNotBlessed)
continue
}

destTokensPricesValue, err := getDestTokensPrices()
if err != nil {
return nil, fmt.Errorf("get dest token prices: %w", err)
}
allowedTokenAmountValue, err := getAllowedTokenAmount()
if err != nil {
return nil, err
}
sourceTokensPricesValue, err := getSourceTokensPrices()
if err != nil {
return nil, fmt.Errorf("get source token prices: %w", err)
}

destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, fmt.Errorf("get dest pool rate limits: %w", err)
}
destTokensPricesValue, err := getDestTokensPrices()
if err != nil {
return nil, fmt.Errorf("get dest token prices: %w", err)
}

buildBatchDuration := time.Now()
batch := r.buildBatch(
ctx,
rootLggr,
rep,
inflight,
allowedTokenAmountValue.Tokens,
sourceTokensPricesValue,
destTokensPricesValue,
getDestGasPrice,
sourceToDestTokens,
destPoolRateLimits)
measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration))
if len(batch) != 0 {
return batch, nil
destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, fmt.Errorf("get dest pool rate limits: %w", err)
}

buildBatchDuration := time.Now()
batch := r.buildBatch(
ctx,
rootLggr,
rep,
inflight,
allowedTokenAmountValue.Tokens,
sourceTokensPricesValue,
destTokensPricesValue,
getDestGasPrice,
sourceToDestTokens,
destPoolRateLimits)
measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration))
if len(batch) != 0 {
return batch, nil
}
r.snoozedRoots.Snooze(merkleRoot)
}
r.snoozedRoots.Snooze(merkleRoot)
}
return []ObservedMessage{}, nil
}
Expand Down Expand Up @@ -1162,3 +1177,18 @@ func getUnexpiredCommitReports(
}
return reports, nil
}

func selectReportsToFillBatch(unexpiredReports []ccipdata.CommitStoreReport, messagesLimit int) ([]ccipdata.CommitStoreReport, int) {
currentNumberOfMessages := 0
var index int

for index = 0; index < len(unexpiredReports); index++ {
currentNumberOfMessages += int(unexpiredReports[index].Interval.Max - unexpiredReports[index].Interval.Min + 1)
if currentNumberOfMessages >= messagesLimit {
break
}
}

index = min(index, len(unexpiredReports)-1)
return unexpiredReports[:index], index
}

0 comments on commit 05e1bcd

Please sign in to comment.