Skip to content

Commit

Permalink
OCR3 execute report generation.
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Jul 1, 2024
1 parent 18f92cd commit ed5091c
Show file tree
Hide file tree
Showing 7 changed files with 882 additions and 39 deletions.
4 changes: 3 additions & 1 deletion execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ func (p PluginFactory) NewReportingPlugin(
config ocr3types.ReportingPluginConfig,
) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
return NewPlugin(
context.Background(),
config,
cciptypes.ExecutePluginConfig{},
nil,
nil,
nil,
nil,
), ocr3types.ReportingPluginInfo{}, nil
}

Expand Down
315 changes: 299 additions & 16 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,59 @@ package execute

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
)

// maxReportSizeBytes that should be returned as an execution report payload.
const maxReportSizeBytes = 250_000

// Plugin implements the main ocr3 plugin logic.
type Plugin struct {
reportingCfg ocr3types.ReportingPluginConfig
cfg cciptypes.ExecutePluginConfig
ccipReader cciptypes.CCIPReader
reportingCfg ocr3types.ReportingPluginConfig
cfg cciptypes.ExecutePluginConfig
ccipReader cciptypes.CCIPReader
reportCodec cciptypes.ExecutePluginCodec
msgHasher cciptypes.MessageHasher
tokenDataReader TokenDataReader

//commitRootsCache cache.CommitsRootsCache
lastReportTS *atomic.Int64

lggr logger.Logger
}

func NewPlugin(
_ context.Context,
reportingCfg ocr3types.ReportingPluginConfig,
cfg cciptypes.ExecutePluginConfig,
ccipReader cciptypes.CCIPReader,
reportCodec cciptypes.ExecutePluginCodec,
msgHasher cciptypes.MessageHasher,
lggr logger.Logger,
) *Plugin {
lastReportTS := &atomic.Int64{}
lastReportTS.Store(time.Now().Add(-cfg.MessageVisibilityInterval).UnixMilli())

// TODO: initialize tokenDataReader.

return &Plugin{
reportingCfg: reportingCfg,
cfg: cfg,
ccipReader: ccipReader,
reportCodec: reportCodec,
msgHasher: msgHasher,
lastReportTS: lastReportTS,
lggr: lggr,
}
}

Expand Down Expand Up @@ -199,6 +216,248 @@ func (p *Plugin) ObservationQuorum(outctx ocr3types.OutcomeContext, query types.
return ocr3types.QuorumFPlusOne, nil
}

// TokenDataReader is an interface for reading extra token data from an async process.
// TODO: Build a token data reading process.
type TokenDataReader interface {
ReadTokenData(ctx context.Context, srcChain cciptypes.ChainSelector, num cciptypes.SeqNum) ([][]byte, error)
}

// buildSingleChainReportMaxSize generates the largest report which fits into maxSizeBytes.
// See buildSingleChainReport for more details about how a report is built.
func buildSingleChainReportMaxSize(
ctx context.Context,
lggr logger.Logger,
hasher cciptypes.MessageHasher,
tokenDataReader TokenDataReader,
encoder cciptypes.ExecutePluginCodec,
report cciptypes.ExecutePluginCommitDataWithMessages,
maxSizeBytes int,
) (cciptypes.ExecutePluginReportSingleChain, int, cciptypes.ExecutePluginCommitDataWithMessages, error) {
finalReport, encodedSize, err :=
buildSingleChainReport(ctx, lggr, hasher, tokenDataReader, encoder, report, 0)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
0,
cciptypes.ExecutePluginCommitDataWithMessages{},
fmt.Errorf("unable to build a single chain report (max): %w", err)
}

// return fully executed report
if encodedSize <= maxSizeBytes {
report = markNewMessagesExecuted(finalReport, report)
return finalReport, encodedSize, report, nil
}

var searchErr error
idx := sort.Search(len(report.Messages), func(mid int) bool {
if searchErr != nil {
return false
}
finalReport2, encodedSize2, err :=
buildSingleChainReport(ctx, lggr, hasher, tokenDataReader, encoder, report, mid)
if searchErr != nil {
searchErr = fmt.Errorf("unable to build a single chain report (messages %d): %w", mid, err)
}

if (encodedSize2) <= maxSizeBytes {
// mid is a valid report size, try something bigger next iteration.
finalReport = finalReport2
encodedSize = encodedSize2
return false // not full
}
return true // full
})
if searchErr != nil {
return cciptypes.ExecutePluginReportSingleChain{}, 0, cciptypes.ExecutePluginCommitDataWithMessages{}, searchErr
}

// No messages fit into the report.
if idx <= 0 {
return cciptypes.ExecutePluginReportSingleChain{},
0,
cciptypes.ExecutePluginCommitDataWithMessages{},
errNothingExecuted

Check failure on line 279 in execute/plugin.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.21)

undefined: errNothingExecuted
}

report = markNewMessagesExecuted(finalReport, report)
return finalReport, encodedSize, report, nil
}

// buildSingleChainReport converts the on-chain event data stored in cciptypes.ExecutePluginCommitDataWithMessages into
// the final on-chain report format.
//
// The hasher and encoding codec are provided as arguments to allow for chain-specific formats to be used.
//
// The maxMessages argument is used to limit the number of messages that are included in the report. If maxMessages is
// set to 0, all messages will be included. This allows the caller to create smaller reports if needed.
func buildSingleChainReport(
ctx context.Context,
lggr logger.Logger,
hasher cciptypes.MessageHasher,
tokenDataReader TokenDataReader,
encoder cciptypes.ExecutePluginCodec,
report cciptypes.ExecutePluginCommitDataWithMessages,
maxMessages int,
) (cciptypes.ExecutePluginReportSingleChain, int, error) {
// TODO: maxMessages selects messages in FIFO order which may not yield the optimal message size. One message with a
// maximum data size could push the report over a size limit even if several smaller messages could have fit.
if maxMessages == 0 {
maxMessages = len(report.Messages)
}

tree, err := constructMerkleTree(ctx, hasher, report)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{}, 0,
fmt.Errorf("unable to construct merkle tree from messages: %w", err)
}
lggr.Debugw(
"constructing merkle tree",
"sourceChain", report.SourceChain,
"treeLeaves", len(report.Messages))
numMsgs := len(report.Messages)

// Iterate sequence range and executed messages to select messages to execute.
var toExecute []int
var offchainTokenData [][][]byte
var msgInRoot []cciptypes.CCIPMsg
executedIdx := 0
for i := 0; i < numMsgs && len(toExecute) <= maxMessages; i++ {
seqNum := report.SequenceNumberRange.Start() + cciptypes.SeqNum(i)
// Skip messages which are already executed
if executedIdx < len(report.ExecutedMessages) && report.ExecutedMessages[executedIdx] == seqNum {
executedIdx++
} else {
msg := report.Messages[i]
tokenData, err := tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.SeqNum)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
lggr.Infow(
"unable to read token data",
"source-chain", report.SourceChain,
"seq-num", msg.SeqNum,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{}, 0, fmt.Errorf(
"unable to read token data for message %d: %w", msg.SeqNum, err)
}

lggr.Debugw(
"read token data",
"source-chain", report.SourceChain,
"seq-num", msg.SeqNum,
"data", tokenData)
offchainTokenData = append(offchainTokenData, tokenData)
toExecute = append(toExecute, i)
msgInRoot = append(msgInRoot, msg)
}
}

lggr.Infow(
"selected messages from commit report for execution",
"sourceChain", report.SourceChain,
"commitRoot", report.MerkleRoot.String(),
"numMessages", numMsgs,
"toExecute", len(toExecute))
proof, err := tree.Prove(toExecute)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{}, 0,
fmt.Errorf("unable to prove messages for report %s: %w", report.MerkleRoot.String(), err)
}

var proofsCast []cciptypes.Bytes32
for _, p := range proof.Hashes {
proofsCast = append(proofsCast, p)
}

finalReport := cciptypes.ExecutePluginReportSingleChain{
SourceChainSelector: report.SourceChain,
Messages: msgInRoot,
OffchainTokenData: offchainTokenData,
Proofs: proofsCast,
ProofFlagBits: cciptypes.BigInt{Int: slicelib.BoolsToBitFlags(proof.SourceFlags)},

Check failure on line 377 in execute/plugin.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.21)

undefined: slicelib.BoolsToBitFlags
}

// Note: ExecutePluginReport is a strict array of data, so wrapping the final report
// does not add any additional overhead to the size being computed here.

// Compute the size of the encoded report.
encoded, err := encoder.Encode(
ctx,
cciptypes.ExecutePluginReport{
ChainReports: []cciptypes.ExecutePluginReportSingleChain{finalReport},
},
)
if err != nil {
lggr.Errorw("unable to encode report", "err", err, "report", finalReport)
return cciptypes.ExecutePluginReportSingleChain{}, 0, fmt.Errorf("unable to encode report: %w", err)
}

return finalReport, len(encoded), nil
}

// selectReport takes a list of reports in execution order and selects the first reports that fit within the
// maxReportSizeBytes. Individual messages in a commit report may be skipped for various reasons, for example if an
// out-of-order execution is detected or the message requires additional off-chain metadata which is not yet available.
// If there is not enough space in the final report, it may be partially executed by searching for a subset of messages
// which can fit in the final report.
func selectReport(
ctx context.Context,
lggr logger.Logger,
hasher cciptypes.MessageHasher,
encoder cciptypes.ExecutePluginCodec,
tokenDataReader TokenDataReader,
reports []cciptypes.ExecutePluginCommitDataWithMessages,
maxReportSizeBytes int,
) ([]cciptypes.ExecutePluginReportSingleChain, []cciptypes.ExecutePluginCommitDataWithMessages, error) {
// TODO: It may be desirable for this entire function to be an interface so that
// different selection algorithms can be used.

// count number of fully executed reports so that they can be removed after iterating the reports.
fullyExecuted := 0
accumulatedSize := 0
var finalReports []cciptypes.ExecutePluginReportSingleChain
for reportIdx, report := range reports {
execReport, encodedSize, updatedReport, err :=
buildSingleChainReportMaxSize(ctx, lggr, hasher, tokenDataReader, encoder,
report, maxReportSizeBytes-accumulatedSize)
// No messages fit into the report, stop adding more reports.
if errors.Is(err, errNothingExecuted) {

Check failure on line 424 in execute/plugin.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.21)

undefined: errNothingExecuted
break
}
if err != nil {
return nil, nil, fmt.Errorf("unable to build single chain report: %w", err)
}
reports[reportIdx] = updatedReport
accumulatedSize += encodedSize
finalReports = append(finalReports, execReport)

// partially executed report detected, stop adding more reports.
// TODO: do not break if messages were intentionally skipped.
if len(updatedReport.Messages) != len(updatedReport.ExecutedMessages) {
break
}
fullyExecuted++
}

// Remove reports that are about to be executed.
if fullyExecuted == len(reports) {
reports = nil
} else {
reports = reports[fullyExecuted:]
}

lggr.Infow(
"selected commit reports for execution report",
"numReports", len(finalReports),
"size", accumulatedSize,
"incompleteReports", len(reports),
"maxSize", maxReportSizeBytes)

return finalReports, reports, nil
}

// Outcome collects the reports from the two phases and constructs the final outcome. Part of the outcome is a fully
// formed report that will be encoded for final transmission in the reporting phase.
func (p *Plugin) Outcome(
outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation,
) (ocr3types.Outcome, error) {
Expand Down Expand Up @@ -226,16 +485,16 @@ func (p *Plugin) Outcome(
mergedMessageObservations)

// flatten commit reports and sort by timestamp.
var reports []cciptypes.ExecutePluginCommitDataWithMessages
var commitReports []cciptypes.ExecutePluginCommitDataWithMessages
for _, report := range observation.CommitReports {
reports = append(reports, report...)
commitReports = append(commitReports, report...)
}
sort.Slice(reports, func(i, j int) bool {
return reports[i].Timestamp.Before(reports[j].Timestamp)
sort.Slice(commitReports, func(i, j int) bool {
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

// add messages to their reports.
for _, report := range reports {
// add messages to their commitReports.
for _, report := range commitReports {
report.Messages = nil
for i := report.SequenceNumberRange.Start(); i <= report.SequenceNumberRange.End(); i++ {
if msg, ok := observation.Messages[report.SourceChain][i]; ok {
Expand All @@ -244,15 +503,39 @@ func (p *Plugin) Outcome(
}
}

// TODO: select reports and messages for the final exec report.
// TODO: may only need the proofs for the final exec report rather than the report and the messages.
// TODO: this function should be pure, a context should not be needed.
outcomeReports, commitReports, err :=
selectReport(context.Background(), p.lggr, p.msgHasher, p.reportCodec, p.tokenDataReader,
commitReports, maxReportSizeBytes)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to extract proofs: %w", err)
}

execReport := cciptypes.ExecutePluginReport{
ChainReports: outcomeReports,
}

return cciptypes.NewExecutePluginOutcome(reports).Encode()
return cciptypes.NewExecutePluginOutcome(commitReports, execReport).Encode()
}

func (p *Plugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
decodedOutcome, err := cciptypes.DecodeExecutePluginOutcome(outcome)
if err != nil {
return nil, err
}

panic("implement me")
// TODO: this function should be pure, a context should not be needed.
encoded, err := p.reportCodec.Encode(context.Background(), decodedOutcome.Report)
if err != nil {
return nil, err
}

report := []ocr3types.ReportWithInfo[[]byte]{{
Report: encoded,
Info: nil,
}}

return report, nil
}

func (p *Plugin) ShouldAcceptAttestedReport(
Expand Down
Loading

0 comments on commit ed5091c

Please sign in to comment.