Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use extended contract reader, set onramp correctly in cciptypes.Message #52

Merged
merged 13 commits into from
Aug 1, 2024
71 changes: 61 additions & 10 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,19 @@ func (p *Plugin) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ty
}

func getPendingExecutedReports(
ctx context.Context, ccipReader reader.CCIP, dest cciptypes.ChainSelector, ts time.Time,
ctx context.Context,
ccipReader reader.CCIP,
dest cciptypes.ChainSelector,
ts time.Time,
lggr logger.Logger,
) (plugintypes.ExecutePluginCommitObservations, time.Time, error) {
latestReportTS := time.Time{}
commitReports, err := ccipReader.CommitReportsGTETimestamp(ctx, dest, ts, 1000)
if err != nil {
return nil, time.Time{}, err
}
lggr.Debugw("commit reports", "commitReports", commitReports, "count", len(commitReports))

// TODO: this could be more efficient. commitReports is also traversed in 'groupByChainSelector'.
for _, report := range commitReports {
if report.Timestamp.After(latestReportTS) {
Expand All @@ -108,6 +114,8 @@ func getPendingExecutedReports(
}

groupedCommits := groupByChainSelector(commitReports)
lggr.Debugw("grouped commits before removing fully executed reports",
"groupedCommits", groupedCommits, "count", len(groupedCommits))

// Remove fully executed reports.
for selector, reports := range groupedCommits {
Expand Down Expand Up @@ -136,6 +144,9 @@ func getPendingExecutedReports(
}
}

lggr.Debugw("grouped commits after removing fully executed reports",
"groupedCommits", groupedCommits, "count", len(groupedCommits))

return groupedCommits, latestReportTS, nil
}

Expand All @@ -161,6 +172,8 @@ func (p *Plugin) Observation(
}
}

p.lggr.Infow("decoded previous outcome", "previousOutcome", previousOutcome)

// Phase 1: Gather commit reports from the destination chain and determine which messages are required to build a
// valid execution report.
var groupedCommits plugintypes.ExecutePluginCommitObservations
Expand All @@ -171,7 +184,7 @@ func (p *Plugin) Observation(
if supportsDest {
var latestReportTS time.Time
groupedCommits, latestReportTS, err =
getPendingExecutedReports(ctx, p.ccipReader, p.cfg.DestChain, time.UnixMilli(p.lastReportTS.Load()))
getPendingExecutedReports(ctx, p.ccipReader, p.cfg.DestChain, time.UnixMilli(p.lastReportTS.Load()), p.lggr)
if err != nil {
return types.Observation{}, err
}
Expand All @@ -187,7 +200,7 @@ func (p *Plugin) Observation(
// Phase 2: Gather messages from the source chains and build the execution report.
messages := make(plugintypes.ExecutePluginMessageObservations)
if len(previousOutcome.PendingCommitReports) == 0 {
fmt.Println("TODO: No reports to execute. This is expected after a cold start.")
p.lggr.Debug("TODO: No reports to execute. This is expected after a cold start.")
// No reports to execute.
// This is expected after a cold start.
} else {
Expand Down Expand Up @@ -311,12 +324,15 @@ func (p *Plugin) Outcome(
decodedObservations, err := decodeAttributedObservations(aos)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to decode observations: %w", err)

}
if len(decodedObservations) < p.reportingCfg.F {
return ocr3types.Outcome{}, fmt.Errorf("below F threshold")
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: decoded observations", p.reportingCfg.OracleID),
"decodedObservations", decodedObservations)

fChain, err := p.homeChain.GetFChain()
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to get FChain: %w", err)
Expand All @@ -327,11 +343,19 @@ func (p *Plugin) Outcome(
return ocr3types.Outcome{}, fmt.Errorf("unable to merge commit report observations: %w", err)
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged commit observations", p.reportingCfg.OracleID),
"mergedCommitObservations", mergedCommitObservations)

mergedMessageObservations, err := mergeMessageObservations(decodedObservations, fChain)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to merge message observations: %w", err)
}

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged message observations", p.reportingCfg.OracleID),
"mergedMessageObservations", mergedMessageObservations)

observation := plugintypes.NewExecutePluginObservation(
mergedCommitObservations,
mergedMessageObservations)
Expand All @@ -345,6 +369,10 @@ func (p *Plugin) Outcome(
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use sprintf instead of a key/value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the oracle ID in the log message is much quicker to parse than having it in the fields; so I want a formatted string as the log message but still want to JSON encode the log fields.

"commitReports", commitReports)

// add messages to their commitReports.
for i, report := range commitReports {
report.Messages = nil
Expand All @@ -368,10 +396,24 @@ func (p *Plugin) Outcome(
ChainReports: outcomeReports,
}

return plugintypes.NewExecutePluginOutcome(commitReports, execReport).Encode()
outcome := plugintypes.NewExecutePluginOutcome(commitReports, execReport)
if outcome.IsEmpty() {
return nil, nil
}

p.lggr.Infow(
fmt.Sprintf("[oracle %d] exec outcome: generated outcome", p.reportingCfg.OracleID),
"outcome", outcome)

return outcome.Encode()
}

func (p *Plugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) {
if outcome == nil {
p.lggr.Warn("no outcome, skipping report generation")
return nil, nil
}

decodedOutcome, err := plugintypes.DecodeExecutePluginOutcome(outcome)
if err != nil {
return nil, fmt.Errorf("unable to decode outcome: %w", err)
Expand All @@ -394,15 +436,24 @@ func (p *Plugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.R
func (p *Plugin) ShouldAcceptAttestedReport(
ctx context.Context, u uint64, r ocr3types.ReportWithInfo[[]byte],
) (bool, error) {
// Just a safety check, should never happen.
if r.Report == nil {
p.lggr.Warn("skipping nil report")
return false, nil
}

decodedReport, err := p.reportCodec.Decode(ctx, r.Report)
if err != nil {
return false, fmt.Errorf("decode commit plugin report: %w", err)
}

p.lggr.Infow("Checking if ShouldAcceptAttestedReport", "chainReports", decodedReport.ChainReports)
if len(decodedReport.ChainReports) == 0 {
p.lggr.Infow("skipping empty report")
p.lggr.Info("skipping empty report")
return false, nil
}

p.lggr.Info("ShouldAcceptAttestedReport returns true, report accepted")
return true, nil
}

Expand All @@ -414,7 +465,7 @@ func (p *Plugin) ShouldTransmitAcceptedReport(
return false, fmt.Errorf("unable to determine if the destination chain is supported: %w", err)
}
if !isWriter {
p.lggr.Debugw("not a destination writer, skipping report transmission")
p.lggr.Debug("not a destination writer, skipping report transmission")
return false, nil
}

Expand All @@ -425,8 +476,8 @@ func (p *Plugin) ShouldTransmitAcceptedReport(

// TODO: Final validation?

p.lggr.Debugw("transmitting report",
"reports", len(decodedReport.ChainReports),
p.lggr.Infow("transmitting report",
"reports", decodedReport.ChainReports,
)
return true, nil
}
Expand All @@ -437,7 +488,7 @@ func (p *Plugin) Close() error {
defer cf()

if err := p.readerSyncer.Close(); err != nil {
p.lggr.Errorw("error closing reader syncer", "err", err)
p.lggr.Warnw("error closing reader syncer", "err", err)
}

if err := p.ccipReader.Close(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion execute/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func setupSimpleTest(
Messages: mapped,
}

tree, err := report.ConstructMerkleTree(context.Background(), msgHasher, reportData)
tree, err := report.ConstructMerkleTree(context.Background(), msgHasher, reportData, logger.Test(t))
require.NoError(t, err, "failed to construct merkle tree")

// Initialize reader with some data
Expand Down
23 changes: 19 additions & 4 deletions execute/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ func Test_getPendingExecutedReports(t *testing.T) {
// CommitReportsGTETimestamp(ctx, dest, ts, 1000) -> ([]cciptypes.CommitPluginReportWithMeta, error)
// for each chain selector:
// ExecutedMessageRanges(ctx, selector, dest, seqRange) -> ([]cciptypes.SeqNumRange, error)
got, got1, err := getPendingExecutedReports(context.Background(), mockReader, 123, time.Now())
got, got1, err := getPendingExecutedReports(
context.Background(),
mockReader,
123,
time.Now(),
logger.Test(t))
if !tt.wantErr(t, err, "getPendingExecutedReports(...)") {
return
}
Expand Down Expand Up @@ -286,6 +291,7 @@ func TestPlugin_Observation_EligibilityCheckFailure(t *testing.T) {
p := &Plugin{
homeChain: setupHomeChainPoller(lggr, []reader.ChainConfigInfo{}),
oracleIDToP2pID: map[commontypes.OracleID]libocrtypes.PeerID{},
lggr: lggr,
}

_, err := p.Observation(context.Background(), ocr3types.OutcomeContext{}, nil)
Expand Down Expand Up @@ -326,6 +332,7 @@ func TestPlugin_Outcome_HomeChainError(t *testing.T) {

p := &Plugin{
homeChain: homeChain,
lggr: logger.Test(t),
}
_, err := p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{})
require.Error(t, err)
Expand Down Expand Up @@ -367,6 +374,7 @@ func TestPlugin_Outcome_MessagesMergeError(t *testing.T) {

p := &Plugin{
homeChain: homeChain,
lggr: logger.Test(t),
}

// map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message
Expand Down Expand Up @@ -416,8 +424,11 @@ func TestPlugin_ShouldAcceptAttestedReport_DoesNotDecode(t *testing.T) {
Return(cciptypes.ExecutePluginReport{}, fmt.Errorf("test error"))
p := &Plugin{
reportCodec: codec,
lggr: logger.Test(t),
}
_, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
_, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("will not decode"), // faked out, see mock above
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this parameter needed or informational? I thought the mocked function would be enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its needed since I updated the Reports function to return if there is no report, since that will cause JSON decoding to error out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it gives a more informative log in that scenario

})
require.Error(t, err)
assert.Contains(t, err.Error(), "decode commit plugin report: test error")
}
Expand All @@ -430,7 +441,9 @@ func TestPlugin_ShouldAcceptAttestedReport_NoReports(t *testing.T) {
lggr: logger.Test(t),
reportCodec: codec,
}
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("empty report"), // faked out, see mock above
})
require.NoError(t, err)
require.False(t, result)
}
Expand All @@ -447,7 +460,9 @@ func TestPlugin_ShouldAcceptAttestedReport_ShouldAccept(t *testing.T) {
lggr: logger.Test(t),
reportCodec: codec,
}
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{})
result, err := p.ShouldAcceptAttestedReport(context.Background(), 0, ocr3types.ReportWithInfo[[]byte]{
Report: []byte("report"), // faked out, see mock above
})
require.NoError(t, err)
require.True(t, result)
}
Expand Down
50 changes: 32 additions & 18 deletions execute/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func buildSingleChainReportHelper(
}
}

lggr.Debugw(
lggr.Infow(
"constructing merkle tree",
"sourceChain", report.SourceChain,
"expectedRoot", report.MerkleRoot.String(),
"treeLeaves", len(report.Messages))

tree, err := ConstructMerkleTree(ctx, hasher, report)
tree, err := ConstructMerkleTree(ctx, hasher, report, lggr)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to construct merkle tree from messages for report (%s): %w", report.MerkleRoot.String(), err)
Expand All @@ -61,6 +61,11 @@ func buildSingleChainReportHelper(
fmt.Errorf("merkle root mismatch: expected %s, got %s", report.MerkleRoot.String(), actualStr)
}

lggr.Debugw("merkle root verified",
"sourceChain", report.SourceChain,
"commitRoot", report.MerkleRoot.String(),
"computedRoot", cciptypes.Bytes32(hash))

// Iterate sequence range and executed messages to select messages to execute.
numMsgs := len(report.Messages)
var toExecute []int
Expand All @@ -73,36 +78,40 @@ func buildSingleChainReportHelper(
if executedIdx < len(report.ExecutedMessages) && report.ExecutedMessages[executedIdx] == seqNum {
executedIdx++
} else if _, ok := messages[i]; ok {
tokenData, err := tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.Header.SequenceNumber)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
var tokenData [][]byte
var err error
if tokenDataReader != nil {
tokenData, err = tokenDataReader.ReadTokenData(context.Background(), report.SourceChain, msg.Header.SequenceNumber)
if err != nil {
// TODO: skip message instead of failing the whole thing.
// that might mean moving the token data reading out of the loop.
lggr.Infow(
"unable to read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to read token data for message %d: %w", msg.Header.SequenceNumber, err)
}

lggr.Infow(
"unable to read token data",
"read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"error", err)
return cciptypes.ExecutePluginReportSingleChain{},
fmt.Errorf("unable to read token data for message %d: %w", msg.Header.SequenceNumber, err)
"data", tokenData)
}

lggr.Infow(
"read token data",
"sourceChain", report.SourceChain,
"seqNum", msg.Header.SequenceNumber,
"data", tokenData)
offchainTokenData = append(offchainTokenData, tokenData)
toExecute = append(toExecute, i)
msgInRoot = append(msgInRoot, msg)
}
}

lggr.Infow(
"selected messages from commit report for execution",
"selected messages from commit report for execution, generating merkle proofs",
"sourceChain", report.SourceChain,
"commitRoot", report.MerkleRoot.String(),
"numMessages", numMsgs,
"toExecute", len(toExecute))
"toExecute", toExecute)
proof, err := tree.Prove(toExecute)
if err != nil {
return cciptypes.ExecutePluginReportSingleChain{},
Expand All @@ -114,6 +123,9 @@ func buildSingleChainReportHelper(
proofsCast = append(proofsCast, p)
}

lggr.Debugw("generated proofs", "sourceChain", report.SourceChain,
"proofsCast", proofsCast, "proof", proof)
makramkd marked this conversation as resolved.
Show resolved Hide resolved

finalReport := cciptypes.ExecutePluginReportSingleChain{
SourceChainSelector: report.SourceChain,
Messages: msgInRoot,
Expand All @@ -122,6 +134,8 @@ func buildSingleChainReportHelper(
ProofFlagBits: cciptypes.BigInt{Int: slicelib.BoolsToBitFlags(proof.SourceFlags)},
}

lggr.Debugw("final report", "sourceChain", report.SourceChain, "report", finalReport)

return finalReport, nil
}

Expand Down
2 changes: 1 addition & 1 deletion execute/report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func makeTestCommitReport(
// calculate merkle root
root := rootOverride
if root.IsEmpty() {
tree, err := ConstructMerkleTree(context.Background(), hasher, commitReport)
tree, err := ConstructMerkleTree(context.Background(), hasher, commitReport, logger.Nop())
if err != nil {
panic(fmt.Sprintf("unable to construct merkle tree: %s", err))
}
Expand Down
Loading
Loading