Skip to content

Commit

Permalink
Use extended contract reader, set onramp correctly in cciptypes.Messa…
Browse files Browse the repository at this point in the history
…ge (#52)

* logs in exec

* add more logging

* fix panic

* deterministic outcome

* logging improvements

* logging changes, use extended CR

* clean up

* one more log

* more cleanup

* more log cleanup

* proofsCast -> proofs

Co-authored-by: Will Winder <[email protected]>

* sort prior to encoding and construction

---------

Co-authored-by: dimkouv <[email protected]>
Co-authored-by: Will Winder <[email protected]>
  • Loading branch information
3 people authored Aug 1, 2024
1 parent 0b4dcbf commit 456a6df
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 48 deletions.
71 changes: 61 additions & 10 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,19 @@ func (p *Plugin) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ty
}

func getPendingExecutedReports(
ctx context.Context, ccipReader reader.CCIP, dest cciptypes.ChainSelector, ts time.Time,
ctx context.Context,
ccipReader reader.CCIP,
dest cciptypes.ChainSelector,
ts time.Time,
lggr logger.Logger,
) (plugintypes.ExecutePluginCommitObservations, time.Time, error) {
latestReportTS := time.Time{}
commitReports, err := ccipReader.CommitReportsGTETimestamp(ctx, dest, ts, 1000)
if err != nil {
return nil, time.Time{}, err
}
lggr.Debugw("commit reports", "commitReports", commitReports, "count", len(commitReports))

// TODO: this could be more efficient. commitReports is also traversed in 'groupByChainSelector'.
for _, report := range commitReports {
if report.Timestamp.After(latestReportTS) {
Expand All @@ -108,6 +114,8 @@ func getPendingExecutedReports(
}

groupedCommits := groupByChainSelector(commitReports)
lggr.Debugw("grouped commits before removing fully executed reports",
"groupedCommits", groupedCommits, "count", len(groupedCommits))

// Remove fully executed reports.
for selector, reports := range groupedCommits {
Expand Down Expand Up @@ -136,6 +144,9 @@ func getPendingExecutedReports(
}
}

lggr.Debugw("grouped commits after removing fully executed reports",
"groupedCommits", groupedCommits, "count", len(groupedCommits))

return groupedCommits, latestReportTS, nil
}

Expand All @@ -161,6 +172,8 @@ func (p *Plugin) Observation(
}
}

p.lggr.Infow("decoded previous outcome", "previousOutcome", previousOutcome)

// Phase 1: Gather commit reports from the destination chain and determine which messages are required to build a
// valid execution report.
var groupedCommits plugintypes.ExecutePluginCommitObservations
Expand All @@ -171,7 +184,7 @@ func (p *Plugin) Observation(
if supportsDest {
var latestReportTS time.Time
groupedCommits, latestReportTS, err =
getPendingExecutedReports(ctx, p.ccipReader, p.cfg.DestChain, time.UnixMilli(p.lastReportTS.Load()))
getPendingExecutedReports(ctx, p.ccipReader, p.cfg.DestChain, time.UnixMilli(p.lastReportTS.Load()), p.lggr)
if err != nil {
return types.Observation{}, err
}
Expand All @@ -187,7 +200,7 @@ func (p *Plugin) Observation(
// Phase 2: Gather messages from the source chains and build the execution report.
messages := make(plugintypes.ExecutePluginMessageObservations)
if len(previousOutcome.PendingCommitReports) == 0 {
fmt.Println("TODO: No reports to execute. This is expected after a cold start.")
p.lggr.Debug("TODO: No reports to execute. This is expected after a cold start.")
// No reports to execute.
// This is expected after a cold start.
} else {
Expand Down Expand Up @@ -311,12 +324,15 @@ func (p *Plugin) Outcome(
decodedObservations, err := decodeAttributedObservations(aos)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to decode observations: %w", err)

}
if len(decodedObservations) < p.reportingCfg.F {
return ocr3types.Outcome{}, fmt.Errorf("below F threshold")
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: decoded observations", p.reportingCfg.OracleID),
"decodedObservations", decodedObservations)

fChain, err := p.homeChain.GetFChain()
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to get FChain: %w", err)
Expand All @@ -327,11 +343,19 @@ func (p *Plugin) Outcome(
return ocr3types.Outcome{}, fmt.Errorf("unable to merge commit report observations: %w", err)
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged commit observations", p.reportingCfg.OracleID),
"mergedCommitObservations", mergedCommitObservations)

mergedMessageObservations, err := mergeMessageObservations(decodedObservations, fChain)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to merge message observations: %w", err)
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged message observations", p.reportingCfg.OracleID),
"mergedMessageObservations", mergedMessageObservations)

observation := plugintypes.NewExecutePluginObservation(
mergedCommitObservations,
mergedMessageObservations)
Expand All @@ -345,6 +369,10 @@ func (p *Plugin) Outcome(
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
"commitReports", commitReports)

// add messages to their commitReports.
for i, report := range commitReports {
report.Messages = nil
Expand All @@ -368,10 +396,24 @@ func (p *Plugin) Outcome(
ChainReports: outcomeReports,
}

return plugintypes.NewExecutePluginOutcome(commitReports, execReport).Encode()
outcome := plugintypes.NewExecutePluginOutcome(commitReports, execReport)
if outcome.IsEmpty() {
return nil, nil
}

p.lggr.Infow(
fmt.Sprintf("[oracle %d] exec outcome: generated outcome", p.reportingCfg.OracleID),
"outcome", outcome)

return outcome.Encode()
}

func (p *Plugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
if outcome == nil {
p.lggr.Warn("no outcome, skipping report generation")
return nil, nil
}

decodedOutcome, err := plugintypes.DecodeExecutePluginOutcome(outcome)
if err != nil {
return nil, fmt.Errorf("unable to decode outcome: %w", err)
Expand All @@ -394,15 +436,24 @@ func (p *Plugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.R
func (p *Plugin) ShouldAcceptAttestedReport(
ctx context.Context, u uint64, r ocr3types.ReportWithInfo[[]byte],
) (bool, error) {
// Just a safety check, should never happen.
if r.Report == nil {
p.lggr.Warn("skipping nil report")
return false, nil
}

decodedReport, err := p.reportCodec.Decode(ctx, r.Report)
if err != nil {
return false, fmt.Errorf("decode commit plugin report: %w", err)
}

p.lggr.Infow("Checking if ShouldAcceptAttestedReport", "chainReports", decodedReport.ChainReports)
if len(decodedReport.ChainReports) == 0 {
p.lggr.Infow("skipping empty report")
p.lggr.Info("skipping empty report")
return false, nil
}

p.lggr.Info("ShouldAcceptAttestedReport returns true, report accepted")
return true, nil
}

Expand All @@ -414,7 +465,7 @@ func (p *Plugin) ShouldTransmitAcceptedReport(
return false, fmt.Errorf("unable to determine if the destination chain is supported: %w", err)
}
if !isWriter {
p.lggr.Debugw("not a destination writer, skipping report transmission")
p.lggr.Debug("not a destination writer, skipping report transmission")
return false, nil
}

Expand All @@ -425,8 +476,8 @@ func (p *Plugin) ShouldTransmitAcceptedReport(

// TODO: Final validation?

p.lggr.Debugw("transmitting report",
"reports", len(decodedReport.ChainReports),
p.lggr.Infow("transmitting report",
"reports", decodedReport.ChainReports,
)
return true, nil
}
Expand All @@ -437,7 +488,7 @@ func (p *Plugin) Close() error {
defer cf()

if err := p.readerSyncer.Close(); err != nil {
p.lggr.Errorw("error closing reader syncer", "err", err)
p.lggr.Warnw("error closing reader syncer", "err", err)
}

if err := p.ccipReader.Close(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion execute/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func setupSimpleTest(
Messages: mapped,
}

tree, err := report.ConstructMerkleTree(context.Background(), msgHasher, reportData)
tree, err := report.ConstructMerkleTree(context.Background(), msgHasher, reportData, logger.Test(t))
require.NoError(t, err, "failed to construct merkle tree")

// Initialize reader with some data
Expand Down
23 changes: 19 additions & 4 deletions execute/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ func Test_getPendingExecutedReports(t *testing.T) {
// CommitReportsGTETimestamp(ctx, dest, ts, 1000) -> ([]cciptypes.CommitPluginReportWithMeta, error)
// for each chain selector:
// ExecutedMessageRanges(ctx, selector, dest, seqRange) -> ([]cciptypes.SeqNumRange, error)
got, got1, err := getPendingExecutedReports(context.Background(), mockReader, 123, time.Now())
got, got1, err := getPendingExecutedReports(
context.Background(),
mockReader,
123,
time.Now(),
logger.Test(t))
if !tt.wantErr(t, err, "getPendingExecutedReports(...)") {
return
}
Expand Down Expand Up @@ -286,6 +291,7 @@ func TestPlugin_Observation_EligibilityCheckFailure(t *testing.T) {
p := &Plugin{
homeChain: setupHomeChainPoller(lggr, []reader.ChainConfigInfo{}),
oracleIDToP2pID: map[commontypes.OracleID]libocrtypes.PeerID{},
lggr: lggr,
}

_, err := p.Observation(context.Background(), ocr3types.OutcomeContext{}, nil)
Expand Down Expand Up @@ -326,6 +332,7 @@ func TestPlugin_Outcome_HomeChainError(t *testing.T) {

p := &Plugin{
homeChain: homeChain,
lggr: logger.Test(t),
}
_, err := p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{})
require.Error(t, err)
Expand Down Expand Up @@ -367,6 +374,7 @@ func TestPlugin_Outcome_MessagesMergeError(t *testing.T) {

p := &Plugin{
homeChain: homeChain,
lggr: logger.Test(t),
}

// map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message
Expand Down Expand Up @@ -416,8 +424,11 @@ func TestPlugin_ShouldAcceptAttestedReport_DoesNotDecode(t *testing.T) {
Return(cciptypes.ExecutePluginReport{}, fmt.Errorf("test error"))
p := &Plugin{
reportCodec: codec,
lggr: logger.Test(t),
}
_, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
_, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("will not decode"), // faked out, see mock above
})
require.Error(t, err)
assert.Contains(t, err.Error(), "decode commit plugin report: test error")
}
Expand All @@ -430,7 +441,9 @@ func TestPlugin_ShouldAcceptAttestedReport_NoReports(t *testing.T) {
lggr: logger.Test(t),
reportCodec: codec,
}
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("empty report"), // faked out, see mock above
})
require.NoError(t, err)
require.False(t, result)
}
Expand All @@ -447,7 +460,9 @@ func TestPlugin_ShouldAcceptAttestedReport_ShouldAccept(t *testing.T) {
lggr: logger.Test(t),
reportCodec: codec,
}
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("report"), // faked out, see mock above
})
require.NoError(t, err)
require.True(t, result)
}
Expand Down
50 changes: 32 additions & 18 deletions execute/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func buildSingleChainReportHelper(
}
}

lggr.Debugw(
lggr.Infow(
"constructing merkle tree",
"sourceChain", report.SourceChain,
"expectedRoot", report.MerkleRoot.String(),
"treeLeaves", len(report.Messages))

tree, err := ConstructMerkleTree(ctx, hasher, report)
tree, err := ConstructMerkleTree(ctx, hasher, report, lggr)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to construct merkle tree from messages for report (%s): %w", report.MerkleRoot.String(), err)
Expand All @@ -61,6 +61,11 @@ func buildSingleChainReportHelper(
fmt.Errorf("merkle root mismatch: expected %s, got %s", report.MerkleRoot.String(), actualStr)
}

lggr.Debugw("merkle root verified",
"sourceChain", report.SourceChain,
"commitRoot", report.MerkleRoot.String(),
"computedRoot", cciptypes.Bytes32(hash))

// Iterate sequence range and executed messages to select messages to execute.
numMsgs := len(report.Messages)
var toExecute []int
Expand All @@ -73,36 +78,40 @@ func buildSingleChainReportHelper(
if executedIdx < len(report.ExecutedMessages) && report.ExecutedMessages[executedIdx] == seqNum {
executedIdx++
} else if _, ok := messages[i]; ok {
tokenData, err := tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.Header.SequenceNumber)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
var tokenData [][]byte
var err error
if tokenDataReader != nil {
tokenData, err = tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.Header.SequenceNumber)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
lggr.Infow(
"unable to read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to read token data for message %d: %w", msg.Header.SequenceNumber, err)
}

lggr.Infow(
"unable to read token data",
"read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to read token data for message %d: %w", msg.Header.SequenceNumber, err)
"data", tokenData)
}

lggr.Infow(
"read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"data", tokenData)
offchainTokenData = append(offchainTokenData, tokenData)
toExecute = append(toExecute, i)
msgInRoot = append(msgInRoot, msg)
}
}

lggr.Infow(
"selected messages from commit report for execution",
"selected messages from commit report for execution, generating merkle proofs",
"sourceChain", report.SourceChain,
"commitRoot", report.MerkleRoot.String(),
"numMessages", numMsgs,
"toExecute", len(toExecute))
"toExecute", toExecute)
proof, err := tree.Prove(toExecute)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
Expand All @@ -114,6 +123,9 @@ func buildSingleChainReportHelper(
proofsCast = append(proofsCast, p)
}

lggr.Debugw("generated proofs", "sourceChain", report.SourceChain,
"proofs", proofsCast, "proof", proof)

finalReport := cciptypes.ExecutePluginReportSingleChain{
SourceChainSelector: report.SourceChain,
Messages: msgInRoot,
Expand All @@ -122,6 +134,8 @@ func buildSingleChainReportHelper(
ProofFlagBits: cciptypes.BigInt{Int: slicelib.BoolsToBitFlags(proof.SourceFlags)},
}

lggr.Debugw("final report", "sourceChain", report.SourceChain, "report", finalReport)

return finalReport, nil
}

Expand Down
2 changes: 1 addition & 1 deletion execute/report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func makeTestCommitReport(
// calculate merkle root
root := rootOverride
if root.IsEmpty() {
tree, err := ConstructMerkleTree(context.Background(), hasher, commitReport)
tree, err := ConstructMerkleTree(context.Background(), hasher, commitReport, logger.Nop())
if err != nil {
panic(fmt.Sprintf("unable to construct merkle tree: %s", err))
}
Expand Down
Loading

0 comments on commit 456a6df

Please sign in to comment.