Skip to content

Commit

Permalink
CCIP-1320 Upper bound limit for the GetSendRequestsGteSeqNum (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Nov 21, 2023
1 parent a2e508a commit bb2e5e3
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 88 deletions.
5 changes: 4 additions & 1 deletion core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (
// and restart from the chain's minSeqNum. Want to set it high to allow for large throughput,
// but low enough to minimize wasted revert cost.
MaxInflightSeqNumGap = 500
// OnRampMessagesScanLimit is used to limit number of onramp messages scanned in each Observation.
// Single CommitRoot can contain up to merklemulti.MaxNumberTreeLeaves, so we scan twice that to be safe and still don't hurt DB performance.
OnRampMessagesScanLimit = merklemulti.MaxNumberTreeLeaves * 2
)

var (
Expand Down Expand Up @@ -236,7 +239,7 @@ func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Conte
return 0, 0, err
}

msgRequests, err := r.onRampReader.GetSendRequestsGteSeqNum(ctx, nextInflightMin)
msgRequests, err := r.onRampReader.GetSendRequestsBetweenSeqNums(ctx, nextInflightMin, nextInflightMin+OnRampMessagesScanLimit)
if err != nil {
return 0, 0, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {

onRampReader := ccipdatamocks.NewOnRampReader(t)
if len(tc.sendReqs) > 0 {
onRampReader.On("GetSendRequestsGteSeqNum", ctx, tc.commitStoreSeqNum).
onRampReader.On("GetSendRequestsBetweenSeqNums", ctx, tc.commitStoreSeqNum, tc.commitStoreSeqNum+OnRampMessagesScanLimit).
Return(tc.sendReqs, nil)
}

Expand Down Expand Up @@ -1319,7 +1319,7 @@ func TestCommitReportingPlugin_calculateMinMaxSequenceNumbers(t *testing.T) {
},
})
}
onRampReader.On("GetSendRequestsGteSeqNum", ctx, tc.expQueryMin).Return(sendReqs, nil)
onRampReader.On("GetSendRequestsBetweenSeqNums", ctx, tc.expQueryMin, tc.expQueryMin+OnRampMessagesScanLimit).Return(sendReqs, nil)
p.onRampReader = onRampReader

minSeqNum, maxSeqNum, err := p.calculateMinMaxSequenceNumbers(ctx, lggr)
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (r *ExecutionReportingPlugin) buildBatch(
} else {
// Nothing inflight take from chain.
// Chain holds existing nonce.
nonce, err := r.config.offRampReader.GetSenderNonce(nil, msg.Sender)
nonce, err := r.config.offRampReader.GetSenderNonce(ctx, msg.Sender)
if err != nil {
lggr.Errorw("unable to get sender nonce", "err", err, "seqNr", msg.SequenceNumber)
continue
Expand Down Expand Up @@ -998,7 +998,7 @@ func (r *ExecutionReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Conte
lggr = lggr.With("messageIDs", contractutil.GetMessageIDsAsHexString(execReport.Messages))

// If the first message is executed already, this execution report is stale, and we do not accept it.
stale, err := r.isStaleReport(execReport.Messages)
stale, err := r.isStaleReport(ctx, execReport.Messages)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1026,7 +1026,7 @@ func (r *ExecutionReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Cont
// If report is not stale we transmit.
// When the executeTransmitter enqueues the tx for tx manager,
// we mark it as execution_sent, removing it from the set of inflight messages.
stale, err := r.isStaleReport(execReport.Messages)
stale, err := r.isStaleReport(ctx, execReport.Messages)
if err != nil {
return false, err
}
Expand All @@ -1039,15 +1039,15 @@ func (r *ExecutionReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Cont
return true, err
}

func (r *ExecutionReportingPlugin) isStaleReport(messages []internal.EVM2EVMMessage) (bool, error) {
func (r *ExecutionReportingPlugin) isStaleReport(ctx context.Context, messages []internal.EVM2EVMMessage) (bool, error) {
if len(messages) == 0 {
return true, fmt.Errorf("messages are empty")
}

// If the first message is executed already, this execution report is stale.
// Note the default execution state, including for arbitrary seq number not yet committed
// is ExecutionStateUntouched.
msgState, err := r.config.offRampReader.GetExecutionState(nil, messages[0].SequenceNumber)
msgState, err := r.config.offRampReader.GetExecutionState(ctx, messages[0].SequenceNumber)
if err != nil {
return true, err
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type OnRampDynamicConfig struct {
//go:generate mockery --quiet --name OnRampReader --filename onramp_reader_mock.go --case=underscore
type OnRampReader interface {
Closer
// GetSendRequestsGteSeqNum returns all the finalized message send requests with sequence number greater than or equal to the provided.
GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64) ([]Event[internal.EVM2EVMMessage], error)
// GetSendRequestsBetweenSeqNums returns all the finalized message send requests in the provided sequence numbers range (inclusive).
GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64) ([]Event[internal.EVM2EVMMessage], error)
// Get router configured in the onRamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,7 @@ func testOnRampReader(t *testing.T, th onRampReaderTH, expectedRouterAddress com
require.NoError(t, err)
require.Equal(t, expectedRouterAddress, res)

msg, err := th.reader.GetSendRequestsGteSeqNum(ctx, 0)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, []ccipdata.Event[internal.EVM2EVMMessage]{}, msg)

msg, err = th.reader.GetSendRequestsBetweenSeqNums(ctx, 0, 10)
msg, err := th.reader.GetSendRequestsBetweenSeqNums(ctx, 0, 10)
require.NoError(t, err)
require.NotNil(t, msg)
require.Equal(t, []ccipdata.Event[internal.EVM2EVMMessage]{}, msg)
Expand Down
27 changes: 6 additions & 21 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,17 @@ func (o *OnRampV1_0_0) logToMessage(log types.Log) (*internal.EVM2EVMMessage, er
}, nil
}

func (o *OnRampV1_0_0) GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64) ([]Event[internal.EVM2EVMMessage], error) {
logs, err := o.lp.LogsDataWordGreaterThan(
func (o *OnRampV1_0_0) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64) ([]Event[internal.EVM2EVMMessage], error) {
logs, err := o.lp.LogsDataWordRange(
o.sendRequestedEventSig,
o.address,
o.sendRequestedSeqNumberWord,
abihelpers.EvmWord(seqNum),
logpoller.EvmWord(seqNumMin),
logpoller.EvmWord(seqNumMax),
logpoller.Finalized,
pg.WithParentCtx(ctx),
)
pg.WithParentCtx(ctx))
if err != nil {
return nil, fmt.Errorf("logs data word greater than: %w", err)
return nil, err
}
return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage)
}
Expand All @@ -225,21 +225,6 @@ func (o *OnRampV1_0_0) RouterAddress() (common.Address, error) {
return config.Router, nil
}

func (o *OnRampV1_0_0) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64) ([]Event[internal.EVM2EVMMessage], error) {
logs, err := o.lp.LogsDataWordRange(
o.sendRequestedEventSig,
o.address,
o.sendRequestedSeqNumberWord,
logpoller.EvmWord(seqNumMin),
logpoller.EvmWord(seqNumMax),
logpoller.Finalized,
pg.WithParentCtx(ctx))
if err != nil {
return nil, err
}
return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage)
}

func (o *OnRampV1_0_0) Close(qopts ...pg.QOpt) error {
return o.lp.UnregisterFilter(o.filterName, qopts...)
}
15 changes: 0 additions & 15 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_2_0.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,21 +269,6 @@ func (o *OnRampV1_2_0) logToMessage(log types.Log) (*internal.EVM2EVMMessage, er
}, nil
}

func (o *OnRampV1_2_0) GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64) ([]Event[internal.EVM2EVMMessage], error) {
logs, err := o.lp.LogsDataWordGreaterThan(
o.sendRequestedEventSig,
o.address,
o.sendRequestedSeqNumberWord,
abihelpers.EvmWord(seqNum),
logpoller.Finalized,
pg.WithParentCtx(ctx),
)
if err != nil {
return nil, fmt.Errorf("logs data word greater than: %w", err)
}
return parseLogs[internal.EVM2EVMMessage](logs, o.lggr, o.logToMessage)
}

func (o *OnRampV1_2_0) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64) ([]Event[internal.EVM2EVMMessage], error) {
logs, err := o.lp.LogsDataWordRange(
o.sendRequestedEventSig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,27 @@ func TestHasherV1_2_0(t *testing.T) {
require.Equal(t, "4362a13a42e52ff5ce4324e7184dc7aa41704c3146bc842d35d95b94b32a78b6", hex.EncodeToString(hash[:]))
}

func TestLogPollerClient_GetSendRequestsGteSeqNum(t *testing.T) {
func TestLogPollerClient_GetSendRequestsBetweenSeqNums(t *testing.T) {
onRampAddr := utils.RandomAddress()
seqNum := uint64(100)
limit := uint64(10)
lggr := logger.TestLogger(t)

lp := mocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
onRampV2, err := NewOnRampV1_2_0(lggr, 1, 1, onRampAddr, lp, nil)
require.NoError(t, err)
lp.On("LogsDataWordGreaterThan",
lp.On("LogsDataWordRange",
onRampV2.sendRequestedEventSig,
onRampAddr,
onRampV2.sendRequestedSeqNumberWord,
abihelpers.EvmWord(seqNum),
abihelpers.EvmWord(seqNum+limit),
logpoller.Finalized,
mock.Anything,
).Return([]logpoller.Log{}, nil)

events, err := onRampV2.GetSendRequestsGteSeqNum(context.Background(), seqNum)
events, err := onRampV2.GetSendRequestsBetweenSeqNums(context.Background(), seqNum, seqNum+limit)
assert.NoError(t, err)
assert.Empty(t, events)
lp.AssertExpectations(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (o *ObservedOffRampReader) DecodeExecutionReport(report []byte) (ccipdata.E
})
}

func (o *ObservedOffRampReader) withObservedInteractionAndResults(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]ccipdata.Event[ccipdata.ExecutionStateChanged], error) {
func (o *ObservedOffRampReader) GetExecutionStateChangesBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64, confs int) ([]ccipdata.Event[ccipdata.ExecutionStateChanged], error) {
return withObservedInteraction(o.metric, "GetExecutionStateChangesBetweenSeqNums", func() ([]ccipdata.Event[ccipdata.ExecutionStateChanged], error) {
return o.OffRampReader.GetExecutionStateChangesBetweenSeqNums(ctx, seqNumMin, seqNumMax, confs)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ func NewObservedOnRampReader(origin ccipdata.OnRampReader, chainID int64, plugin
}
}

func (o ObservedOnRampReader) GetSendRequestsGteSeqNum(ctx context.Context, seqNum uint64) ([]ccipdata.Event[internal.EVM2EVMMessage], error) {
return withObservedInteractionAndResults(o.metric, "GetSendRequestsGteSeqNum", func() ([]ccipdata.Event[internal.EVM2EVMMessage], error) {
return o.OnRampReader.GetSendRequestsGteSeqNum(ctx, seqNum)
})
}

func (o ObservedOnRampReader) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64) ([]ccipdata.Event[internal.EVM2EVMMessage], error) {
return withObservedInteractionAndResults(o.metric, "GetSendRequestsBetweenSeqNums", func() ([]ccipdata.Event[internal.EVM2EVMMessage], error) {
return o.OnRampReader.GetSendRequestsBetweenSeqNums(ctx, seqNumMin, seqNumMax)
Expand Down

0 comments on commit bb2e5e3

Please sign in to comment.