Skip to content

Commit

Permalink
CCIP-1716 Adding retention to filters used by LogPoller (#530)
Browse files Browse the repository at this point in the history
## Motivation

The goal of this PR is to reduce the number of logs and blocks we keep
in the database by utilizing a built-in retention mechanism in
LogPoller.
Requires paging for smooth deployment
smartcontractkit/chainlink#12060

## Solution

This PR enables retention for all the LogPoller's filters registered by
CCIP. Additionally, to avoid pushing too much pressure during deletion
(especially the first run will have a lot of logs to remove) we've
updated `LogPrunePageSize` to 10k. Please see the original PR in the
chainlink repo to learn more about paging and its impact on the
database. `LogPrunePageSize` is altered in the `fallback.toml` to avoid
the necessity of setting this value for every chain that CCIP is
deployed on.
  • Loading branch information
mateusz-sekara authored Mar 20, 2024
1 parent 73b92d7 commit a88c392
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 4 deletions.
19 changes: 19 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ccipdata

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"

Expand All @@ -18,6 +19,24 @@ const (
V1_5_0 = "1.5.0-dev"
)

const (
// CommitExecLogsRetention defines the duration for which logs critical for Commit/Exec plugins processing are retained.
// Although Exec relies on permissionlessExecThreshold which is lower than 24hours for picking eligible CommitRoots,
// Commit still can reach to older logs because it filters them by sequence numbers. For instance, in case of RMN curse on chain,
// we might have logs waiting in OnRamp to be committed first. When outage takes days we still would
// be able to bring back processing without replaying any logs from chain. You can read that param as
// "how long CCIP can be down and still be able to process all the messages after getting back to life".
// Breaching this threshold would require replaying chain using LogPoller from the beginning of the outage.
CommitExecLogsRetention = 30 * 24 * time.Hour // 30 days
// CacheEvictionLogsRetention defines the duration for which logs used for caching on-chain data are kept.
// Restarting node clears the cache entirely and rebuilds it from scratch by fetching data from chain,
// so we don't need to keep these logs for very long. All events relying on cache.NewLogpollerEventsBased should use this retention.
CacheEvictionLogsRetention = 7 * 24 * time.Hour // 7 days
// PriceUpdatesLogsRetention defines the duration for which logs with price updates are kept.
// These logs are emitted whenever the token price or gas price is updated and Commit scans very small time windows (e.g. 2 hours)
PriceUpdatesLogsRetention = 1 * 24 * time.Hour // 1 day
)

type Event[T any] struct {
Data T
cciptypes.TxMeta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewUSDCReader(lggr logger.Logger, jobID string, transmitter common.Address,
Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, jobID, transmitter.Hex()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{transmitter},
Retention: CommitExecLogsRetention,
},
transmitterAddress: transmitter,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func NewCommitStore(lggr logger.Logger, addr common.Address, ec client.Client, l
Name: logpoller.FilterName(EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
}
return &CommitStore{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,6 @@ func (o *OffRamp) DecodeExecutionReport(report []byte) (cciptypes.ExecReport, er
return DecodeExecReport(o.ExecutionReportArgs, report)
}

func (o *OffRamp) TokenEvents() []common.Hash {
return offRamp_poolAddedPoolRemovedEvents
}

func (o *OffRamp) RegisterFilters(qopts ...pg.QOpt) error {
return logpollerutil.RegisterLpFilters(o.lp, o.filters, qopts...)
}
Expand All @@ -624,16 +620,19 @@ func NewOffRamp(lggr logger.Logger, addr common.Address, ec client.Client, lp lo
Name: logpoller.FilterName(EXEC_EXECUTION_STATE_CHANGES, addr.String()),
EventSigs: []common.Hash{ExecutionStateChangedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
{
Name: logpoller.FilterName(EXEC_TOKEN_POOL_ADDED, addr.String()),
EventSigs: []common.Hash{PoolAddedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
{
Name: logpoller.FilterName(EXEC_TOKEN_POOL_REMOVED, addr.String()),
EventSigs: []common.Hash{PoolRemovedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.CONFIG_CHANGED, onRampAddress),
EventSigs: []common.Hash{configSetEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CacheEvictionLogsRetention,
},
}
cachedStaticConfig := cache.OnceCtxFunction[evm_2_evm_onramp_1_0_0.EVM2EVMOnRampStaticConfig](func(ctx context.Context) (evm_2_evm_onramp_1_0_0.EVM2EVMOnRampStaticConfig, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func NewPriceRegistry(lggr logger.Logger, priceRegistryAddr common.Address, lp l
Name: logpoller.FilterName(ccipdata.COMMIT_PRICE_UPDATES, priceRegistryAddr.String()),
EventSigs: []common.Hash{UsdPerUnitGasUpdated, usdPerTokenUpdated},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.PriceUpdatesLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.FEE_TOKEN_ADDED, priceRegistryAddr.String()),
EventSigs: []common.Hash{feeTokenAdded},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.FEE_TOKEN_REMOVED, priceRegistryAddr.String()),
EventSigs: []common.Hash{feeTokenRemoved},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.CacheEvictionLogsRetention,
}}
if registerFilters {
err = logpollerutil.RegisterLpFilters(lp, filters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func NewCommitStore(lggr logger.Logger, addr common.Address, ec client.Client, l
Name: logpoller.FilterName(v1_0_0.EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{CCIPSendRequestEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.CONFIG_CHANGED, onRampAddress),
EventSigs: []common.Hash{ConfigSetEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CacheEvictionLogsRetention,
},
}
cachedStaticConfig := cache.OnceCtxFunction[evm_2_evm_onramp_1_2_0.EVM2EVMOnRampStaticConfig](func(ctx context.Context) (evm_2_evm_onramp_1_2_0.EVM2EVMOnRampStaticConfig, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{CCIPSendRequestEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.CONFIG_CHANGED, onRampAddress),
EventSigs: []common.Hash{ConfigSetEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CacheEvictionLogsRetention,
},
}
cachedStaticConfig := cache.OnceCtxFunction[evm_2_evm_onramp.EVM2EVMOnRampStaticConfig](func(ctx context.Context) (evm_2_evm_onramp.EVM2EVMOnRampStaticConfig, error) {
Expand Down

0 comments on commit a88c392

Please sign in to comment.