Skip to content

Commit

Permalink
exec: use checkMessage for isExecuted and preparing token data. (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Aug 5, 2024
1 parent 480934b commit f23843c
Show file tree
Hide file tree
Showing 4 changed files with 506 additions and 135 deletions.
1 change: 1 addition & 0 deletions execute/report/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ package report
import "errors"

var ErrEmptyReport = errors.New("no messages can fit in the report")
var ErrNotReady = errors.New("token data not ready")
162 changes: 100 additions & 62 deletions execute/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,46 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"slices"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/execute/types"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
"github.com/smartcontractkit/chainlink-ccip/plugintypes"
)

// buildSingleChainReportHelper converts the on-chain event data stored in
// cciptypes.ExecutePluginCommitDataWithMessages into the final on-chain report format.
// buildSingleChainReportHelper converts the on-chain event data stored in cciptypes.ExecutePluginCommitData into the
// final on-chain report format. Messages in the report are selected based on the readyMessages argument. If
// readyMessages is empty all messages will be included. This allows the caller to create smaller reports if needed.
//
// The hasher and encoding codec are provided as arguments to allow for chain-specific formats to be used.
// Before calling this function all messages should have been checked and processed by the checkMessage function.
//
// The messages argument indicates which messages should be included in the report. If messages is empty
// all messages will be included. This allows the caller to create smaller reports if needed. Executed messages
// are skipped automatically.
// The hasher and encoding codec are provided as arguments to allow for chain-specific formats to be used.
func buildSingleChainReportHelper(
ctx context.Context,
lggr logger.Logger,
hasher cciptypes.MessageHasher,
tokenDataReader types.TokenDataReader,
report plugintypes.ExecutePluginCommitData,
messages map[int]struct{},
readyMessages map[int]struct{},
) (cciptypes.ExecutePluginReportSingleChain, error) {
if len(messages) == 0 {
if messages == nil {
messages = make(map[int]struct{})
if len(readyMessages) == 0 {
if readyMessages == nil {
readyMessages = make(map[int]struct{})
}
for i := 0; i < len(report.Messages); i++ {
messages[i] = struct{}{}
readyMessages[i] = struct{}{}
}
}

numMsg := len(report.Messages)
if len(report.TokenData) != numMsg {
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("token data length mismatch: got %d, expected %d", len(report.TokenData), numMsg)
}

lggr.Infow(
"constructing merkle tree",
"sourceChain", report.SourceChain,
Expand Down Expand Up @@ -71,36 +74,9 @@ func buildSingleChainReportHelper(
var toExecute []int
var offchainTokenData [][][]byte
var msgInRoot []cciptypes.Message
executedIdx := 0
for i, msg := range report.Messages {
seqNum := report.SequenceNumberRange.Start() + cciptypes.SeqNum(i)
// Skip messages which are already executed
if executedIdx < len(report.ExecutedMessages) && report.ExecutedMessages[executedIdx] == seqNum {
executedIdx++
} else if _, ok := messages[i]; ok {
var tokenData [][]byte
var err error
if tokenDataReader != nil {
tokenData, err = tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.Header.SequenceNumber)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
lggr.Infow(
"unable to read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to read token data for message %d: %w", msg.Header.SequenceNumber, err)
}

lggr.Infow(
"read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"data", tokenData)
}
offchainTokenData = append(offchainTokenData, tokenData)
if _, ok := readyMessages[i]; ok {
offchainTokenData = append(offchainTokenData, report.TokenData[i])
toExecute = append(toExecute, i)
msgInRoot = append(msgInRoot, msg)
}
Expand Down Expand Up @@ -142,8 +118,10 @@ func buildSingleChainReportHelper(
type messageStatus string

const (
ReadyToExecute messageStatus = "ready_to_execute"
AlreadyExecuted messageStatus = "already_executed"
ReadyToExecute messageStatus = "ready_to_execute"
AlreadyExecuted messageStatus = "already_executed"
TokenDataNotReady messageStatus = "token_data_not_ready" //nolint:gosec // this is not a password
TokenDataFetchError messageStatus = "token_data_fetch_error"
/*
SenderAlreadySkipped messageStatus = "sender_already_skipped"
MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error"
Expand All @@ -153,8 +131,6 @@ const (
InvalidNonce messageStatus = "invalid_nonce"
AggregateTokenValueComputeError messageStatus = "aggregate_token_value_compute_error"
AggregateTokenLimitExceeded messageStatus = "aggregate_token_limit_exceeded"
TokenDataNotReady messageStatus = "token_data_not_ready"
TokenDataFetchError messageStatus = "token_data_fetch_error"
TokenNotInDestTokenPrices messageStatus = "token_not_in_dest_token_prices"
TokenNotInSrcTokenPrices messageStatus = "token_not_in_src_token_prices"
InsufficientRemainingFee messageStatus = "insufficient_remaining_fee"
Expand All @@ -163,14 +139,67 @@ const (
)

func (b *execReportBuilder) checkMessage(
_ context.Context, idx int, execReport plugintypes.ExecutePluginCommitData,
ctx context.Context, idx int, execReport plugintypes.ExecutePluginCommitData,
// TODO: get rid of the nolint when the error is used
) (messageStatus, error) { // nolint this will use the error eventually
if slices.Contains(execReport.ExecutedMessages, execReport.Messages[idx].Header.SequenceNumber) {
return AlreadyExecuted, nil
) (plugintypes.ExecutePluginCommitData, messageStatus, error) { // nolint this will use the error eventually
if idx >= len(execReport.Messages) {
b.lggr.Errorw("message index out of range", "index", idx, "numMessages", len(execReport.Messages))
return execReport, TokenDataFetchError, fmt.Errorf("message index out of range")
}

msg := execReport.Messages[idx]

// Check if the message has already been executed.
if slices.Contains(execReport.ExecutedMessages, msg.Header.SequenceNumber) {
b.lggr.Infow(
"message already executed",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber)
return execReport, AlreadyExecuted, nil
}

return ReadyToExecute, nil
// Check if token data is ready.
if b.tokenDataReader != nil {
tokenData, err := b.tokenDataReader.ReadTokenData(ctx, execReport.SourceChain, msg.Header.SequenceNumber)
if err != nil {
if errors.Is(err, ErrNotReady) {
b.lggr.Infow(
"unable to read token data - token data not ready",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return execReport, TokenDataNotReady, nil
}
b.lggr.Infow(
"unable to read token data - unknown error",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return execReport, TokenDataFetchError, nil
}

// pad token data if needed
for len(execReport.TokenData) <= idx {
execReport.TokenData = append(execReport.TokenData, nil)
}

execReport.TokenData[idx] = tokenData
b.lggr.Infow(
"read token data",
"messageID", msg.Header.MessageID,
"sourceChain", execReport.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"data", tokenData)
}

// TODO: Check for valid nonce
// TODO: Check for max gas limit
// TODO: Check for fee boost

return execReport, ReadyToExecute, nil
}

func (b *execReportBuilder) verifyReport(
Expand Down Expand Up @@ -216,9 +245,25 @@ func (b *execReportBuilder) buildSingleChainReport(
commitReport = markNewMessagesExecuted(execReport, commitReport)
return execReport, commitReport, nil
}

// Check which messages are ready to execute, and update report with any additional metadata needed for execution.
readyMessages := make(map[int]struct{})
for i := 0; i < len(report.Messages); i++ {
updatedReport, status, err := b.checkMessage(ctx, i, report)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
plugintypes.ExecutePluginCommitData{},
fmt.Errorf("unable to check message: %w", err)
}
report = updatedReport
if status == ReadyToExecute {
readyMessages[i] = struct{}{}
}
}

// Attempt to include all messages in the report.
finalReport, err :=
buildSingleChainReportHelper(b.ctx, b.lggr, b.hasher, b.tokenDataReader, report, nil)
buildSingleChainReportHelper(b.ctx, b.lggr, b.hasher, report, readyMessages)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
plugintypes.ExecutePluginCommitData{},
Expand All @@ -237,20 +282,13 @@ func (b *execReportBuilder) buildSingleChainReport(
finalReport = cciptypes.ExecutePluginReportSingleChain{}
msgs := make(map[int]struct{})
for i := range report.Messages {
status, err := b.checkMessage(ctx, i, report)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
plugintypes.ExecutePluginCommitData{},
fmt.Errorf("unable to check message: %w", err)
}
if status != ReadyToExecute {
if _, ok := readyMessages[i]; !ok {
continue
}

msgs[i] = struct{}{}

finalReport2, err :=
buildSingleChainReportHelper(b.ctx, b.lggr, b.hasher, b.tokenDataReader, report, msgs)
finalReport2, err := buildSingleChainReportHelper(b.ctx, b.lggr, b.hasher, report, msgs)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
plugintypes.ExecutePluginCommitData{},
Expand Down
Loading

0 comments on commit f23843c

Please sign in to comment.