Skip to content

Commit

Permalink
Pass nonces to report selection.
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Aug 19, 2024
1 parent c521e9d commit 6f5b5d6
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 46 deletions.
57 changes: 30 additions & 27 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func selectReport(
encoder cciptypes.ExecutePluginCodec,
tokenDataReader exectypes.TokenDataReader,
estimateProvider gas.EstimateProvider,
nonces map[cciptypes.ChainSelector]map[string]uint64,
commitReports []exectypes.CommitData,
maxReportSizeBytes int,
maxGas uint64,
Expand All @@ -332,6 +333,7 @@ func selectReport(
tokenDataReader,
encoder,
estimateProvider,
nonces,
uint64(maxReportSizeBytes),
maxGas)
var stillPendingReports []exectypes.CommitData
Expand Down Expand Up @@ -419,29 +421,27 @@ func (p *Plugin) Outcome(
mergedMessageObservations,
nil)

//////////////////////////
// common preprocessing //
//////////////////////////

// flatten commit reports and sort by timestamp.
var commitReports []exectypes.CommitData
for _, report := range observation.CommitReports {
commitReports = append(commitReports, report...)
}
sort.Slice(commitReports, func(i, j int) bool {
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
"commitReports", commitReports)

state := previousOutcome.State.Next()
switch state {
case exectypes.GetCommitReports:
// flatten commit reports and sort by timestamp.
var commitReports []exectypes.CommitData
for _, report := range observation.CommitReports {
commitReports = append(commitReports, report...)
}
sort.Slice(commitReports, func(i, j int) bool {
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
"commitReports", commitReports)

outcome := exectypes.NewOutcome(state, commitReports, cciptypes.ExecutePluginReport{})
return outcome.Encode()
case exectypes.GetMessages:
commitReports := previousOutcome.PendingCommitReports

// add messages to their commitReports.
for i, report := range commitReports {
report.Messages = nil
Expand All @@ -464,17 +464,20 @@ func (p *Plugin) Outcome(

return outcome.Encode()
case exectypes.Filter:
commitReports := previousOutcome.PendingCommitReports

// TODO: this function should be pure, a context should not be needed.
outcomeReports, commitReports, err :=
selectReport(
context.Background(),
p.lggr, p.msgHasher,
p.reportCodec,
p.tokenDataReader,
p.estimateProvider,
previousOutcome.PendingCommitReports,
maxReportSizeBytes,
p.cfg.OffchainConfig.BatchGasLimit)
outcomeReports, commitReports, err := selectReport(
context.Background(),
p.lggr,
p.msgHasher,
p.reportCodec,
p.tokenDataReader,
p.estimateProvider,
observation.Nonces,
previousOutcome.PendingCommitReports,
maxReportSizeBytes,
p.cfg.OffchainConfig.BatchGasLimit)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to extract proofs: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions execute/report/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewBuilder(
tokenDataReader exectypes.TokenDataReader,
encoder cciptypes.ExecutePluginCodec,
estimateProvider gas.EstimateProvider,
nonces map[cciptypes.ChainSelector]map[string]uint64,
maxReportSizeBytes uint64,
maxGas uint64,
) ExecReportBuilder {
Expand All @@ -37,6 +38,8 @@ func NewBuilder(
encoder: encoder,
hasher: hasher,
estimateProvider: estimateProvider,
sendersNonce: nonces,
expectedNonce: make(map[cciptypes.ChainSelector]map[string]uint64),

maxReportSizeBytes: maxReportSizeBytes,
maxGas: maxGas,
Expand Down Expand Up @@ -65,13 +68,16 @@ type execReportBuilder struct {
encoder cciptypes.ExecutePluginCodec
hasher cciptypes.MessageHasher
estimateProvider gas.EstimateProvider
sendersNonce map[cciptypes.ChainSelector]map[string]uint64

// Config
maxReportSizeBytes uint64
maxGas uint64

// State
accumulated validationMetadata
// expectedNonce is used to track nonces for multiple messages from the same sender.
expectedNonce map[cciptypes.ChainSelector]map[string]uint64

// Result
execReports []cciptypes.ExecutePluginReportSingleChain
Expand Down
68 changes: 60 additions & 8 deletions execute/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func buildSingleChainReportHelper(

numMsg := len(report.Messages)
if len(report.TokenData) != numMsg {
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("token data length mismatch: got %d, expected %d", len(report.TokenData), numMsg)
lggr.Infow("no messages ready for execution",
"sourceChain", report.SourceChain)
return cciptypes.ExecutePluginReportSingleChain{}, nil
}

lggr.Infow(
Expand Down Expand Up @@ -124,12 +125,13 @@ const (
TokenDataNotReady messageStatus = "token_data_not_ready" //nolint:gosec // this is not a password
TokenDataFetchError messageStatus = "token_data_fetch_error"
InsufficientRemainingBatchGas messageStatus = "insufficient_remaining_batch_gas"
MissingNoncesForChain messageStatus = "missing_nonces_for_chain"
MissingNonce messageStatus = "missing_nonce"
InvalidNonce messageStatus = "invalid_nonce"
/*
SenderAlreadySkipped messageStatus = "sender_already_skipped"
MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error"
InsufficientRemainingBatchDataLength messageStatus = "insufficient_remaining_batch_data_length"
MissingNonce messageStatus = "missing_nonce"
InvalidNonce messageStatus = "invalid_nonce"
AggregateTokenValueComputeError messageStatus = "aggregate_token_value_compute_error"
AggregateTokenLimitExceeded messageStatus = "aggregate_token_limit_exceeded"
TokenNotInDestTokenPrices messageStatus = "token_not_in_dest_token_prices"
Expand Down Expand Up @@ -157,7 +159,7 @@ func (b *execReportBuilder) checkMessage(

msg := execReport.Messages[idx]

// Check if the message has already been executed.
// 1. Check if the message has already been executed.
if slices.Contains(execReport.ExecutedMessages, msg.Header.SequenceNumber) {
b.lggr.Infow(
"message already executed",
Expand All @@ -167,7 +169,7 @@ func (b *execReportBuilder) checkMessage(
return execReport, AlreadyExecuted, nil
}

// Check if token data is ready.
// 2. Check if token data is ready.
if b.tokenDataReader == nil {
return execReport, Unknown, fmt.Errorf("token data reader must be initialized")
}
Expand All @@ -182,7 +184,7 @@ func (b *execReportBuilder) checkMessage(
"error", err)
return execReport, TokenDataNotReady, nil
}
b.lggr.Infow(
b.lggr.Errorw(
"unable to read token data - unknown error",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
Expand All @@ -200,7 +202,53 @@ func (b *execReportBuilder) checkMessage(
"seqNum", msg.Header.SequenceNumber,
"data", tokenData)

// TODO: Check for valid nonce
// 3. Check if the message has a valid nonce.
if _, ok := b.sendersNonce[execReport.SourceChain]; !ok {
b.lggr.Errorw("Skipping message - nonces not available for chain",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
)
return execReport, MissingNoncesForChain, nil
}

chainNonces := b.sendersNonce[execReport.SourceChain]
sender := msg.Sender.String()
if _, ok := chainNonces[sender]; !ok {
b.lggr.Errorw("Skipping message - missing nonce",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
)
return execReport, MissingNonce, nil
}

if b.expectedNonce == nil {
// initialize expected nonce if needed.
b.expectedNonce = make(map[cciptypes.ChainSelector]map[string]uint64)
}
if _, ok := b.expectedNonce[execReport.SourceChain]; !ok {
// initialize expected nonce if needed.
b.expectedNonce[execReport.SourceChain] = make(map[string]uint64)
}
if _, ok := b.expectedNonce[execReport.SourceChain][sender]; !ok {
b.expectedNonce[execReport.SourceChain][sender] = chainNonces[sender] + 1
}

// Check expected nonce is valid for sequenced messages.
// Sequenced messages have non-zero nonces.
if msg.Header.Nonce > 0 && msg.Header.Nonce != b.expectedNonce[execReport.SourceChain][sender] {
b.lggr.Warnw("Skipping message - invalid nonce",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"have", msg.Header.Nonce,
"want", b.expectedNonce[execReport.SourceChain][sender],
)
return execReport, InvalidNonce, nil
}
b.expectedNonce[execReport.SourceChain][sender] = b.expectedNonce[execReport.SourceChain][sender] + 1

// TODO: Check for fee boost

return execReport, ReadyToExecute, nil
Expand Down Expand Up @@ -284,6 +332,10 @@ func (b *execReportBuilder) buildSingleChainReport(
}
}

if len(readyMessages) == 0 {
return cciptypes.ExecutePluginReportSingleChain{}, report, ErrEmptyReport
}

// Attempt to include all messages in the report.
finalReport, err :=
buildSingleChainReportHelper(b.ctx, b.lggr, b.hasher, report, readyMessages)
Expand Down
Loading

0 comments on commit 6f5b5d6

Please sign in to comment.