Skip to content

Commit

Permalink
Move report building into Filter state.
Browse files Browse the repository at this point in the history
Add nonces reader and observation.

Add 1.21 to build matrix.

Fix tests.

Pass nonces to report selection.

Add new tests for nonce handling.

Fix tests.

Fix lint warning.

go mod tidy

Revert "go mod tidy"

This reverts commit c3ca278.

Fix bogus import.

Update chainlink-common.

Disable nonce checking.

Re-enable nonce checking with temporary feature flag.

Add additional log message.
  • Loading branch information
winder committed Aug 20, 2024
1 parent 1b2d9ea commit 7cfb675
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 116 deletions.
12 changes: 10 additions & 2 deletions execute/exectypes/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type CommitObservations map[cciptypes.ChainSelector][]CommitData
type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message
type NonceObservations map[cciptypes.ChainSelector]map[string]uint64

// Observation is the observation of the ExecutePlugin.
// TODO: revisit observation types. The maps used here are more space efficient and easier to work
Expand All @@ -16,16 +17,20 @@ type Observation struct {
// CommitReports are determined during the first phase of execute.
// It contains the commit reports we would like to execute in the following round.
CommitReports CommitObservations `json:"commitReports"`

// Messages are determined during the second phase of execute.
// Ideally, it contains all the messages identified by the previous outcome's
// NextCommits. With the previous outcome, and these messsages, we can build the
// execute report.
Messages MessageObservations `json:"messages"`
// TODO: some of the nodes configuration may need to be included here.

// Nonces are determined during the third phase of execute.
// It contains the nonces of senders who are being considered for the final report.
Nonces NonceObservations `json:"nonces"`
}

func NewObservation(
commitReports CommitObservations, messages MessageObservations) Observation {
commitReports CommitObservations, messages MessageObservations, nonces NonceObservations) Observation {
return Observation{
CommitReports: commitReports,
Messages: messages,
Expand All @@ -37,6 +42,9 @@ func (obs Observation) Encode() ([]byte, error) {
}

func DecodeObservation(b []byte) (Observation, error) {
if len(b) == 0 {
return Observation{}, nil
}
obs := Observation{}
err := json.Unmarshal(b, &obs)
return obs, err
Expand Down
3 changes: 1 addition & 2 deletions execute/exectypes/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func (p PluginState) Next() PluginState {
return GetMessages

case GetMessages:
// TODO: go to Filter after GetMessages
return GetCommitReports
return Filter

case Unknown:
fallthrough
Expand Down
7 changes: 6 additions & 1 deletion execute/exectypes/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ func TestPluginState_Next(t *testing.T) {
want: GetMessages,
},
{
name: "Phase 2 to 1",
name: "Phase 2 to 3",
p: GetMessages,
want: Filter,
},
{
name: "Phase 3 to 1",
p: Filter,
want: GetCommitReports,
},
{
Expand Down
119 changes: 80 additions & 39 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

mapset "github.com/deckarep/golang-set/v2"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
Expand Down Expand Up @@ -168,6 +169,7 @@ func (p *Plugin) Observation(
}

state := previousOutcome.State.Next()
p.lggr.Debugw("Execute plugin performing observation.", "state", state)
switch state {
case exectypes.GetCommitReports:
fetchFrom := time.Now().Add(-p.cfg.OffchainConfig.MessageVisibilityInterval.Duration()).UTC()
Expand All @@ -185,7 +187,7 @@ func (p *Plugin) Observation(
}

// TODO: truncate grouped to a maximum observation size?
return exectypes.NewObservation(groupedCommits, nil).Encode()
return exectypes.NewObservation(groupedCommits, nil, nil).Encode()
}

// No observation for non-dest readers.
Expand All @@ -203,7 +205,7 @@ func (p *Plugin) Observation(
commitReportCache[report.SourceChain] = append(commitReportCache[report.SourceChain], report)
}

for selector, reports := range commitReportCache {
for srcChain, reports := range commitReportCache {
if len(reports) == 0 {
continue
}
Expand All @@ -215,15 +217,16 @@ func (p *Plugin) Observation(

// Read messages for each range.
for _, seqRange := range ranges {
msgs, err := p.ccipReader.MsgsBetweenSeqNums(ctx, selector, seqRange)
// TODO: check if srcChain is supported.
msgs, err := p.ccipReader.MsgsBetweenSeqNums(ctx, srcChain, seqRange)
if err != nil {
return nil, err
}
for _, msg := range msgs {
if _, ok := messages[selector]; !ok {
messages[selector] = make(map[cciptypes.SeqNum]cciptypes.Message)
if _, ok := messages[srcChain]; !ok {
messages[srcChain] = make(map[cciptypes.SeqNum]cciptypes.Message)
}
messages[selector][msg.Header.SequenceNumber] = msg
messages[srcChain][msg.Header.SequenceNumber] = msg
}
}
}
Expand All @@ -240,11 +243,35 @@ func (p *Plugin) Observation(
}

// TODO: Fire off messages for an attestation check service.
return exectypes.NewObservation(groupedCommits, messages).Encode()
return exectypes.NewObservation(groupedCommits, messages, nil).Encode()

case exectypes.Filter:
// TODO: pass the previous two through, add in the nonces.
return types.Observation{}, fmt.Errorf("unknown state")
// TODO: add in nonces, other data comes from previous outcome.
nonceRequestArgs := make(map[cciptypes.ChainSelector]map[string]struct{})

// Collect unique senders.
for _, commitReport := range previousOutcome.Report.ChainReports {
if _, ok := nonceRequestArgs[commitReport.SourceChainSelector]; !ok {
nonceRequestArgs[commitReport.SourceChainSelector] = make(map[string]struct{})
}
for _, msg := range commitReport.Messages {
nonceRequestArgs[commitReport.SourceChainSelector][msg.Sender.String()] = struct{}{}
}
}

// Read args from chain.
nonceObservations := make(exectypes.NonceObservations)
for srcChain, addrSet := range nonceRequestArgs {
// TODO: check if srcSelector is supported.
addrs := maps.Keys(addrSet)
nonces, err := p.ccipReader.Nonces(ctx, srcChain, p.cfg.DestChain, addrs)
if err != nil {
return types.Observation{}, fmt.Errorf("unable to get nonces: %w", err)
}
nonceObservations[srcChain] = nonces
}

return exectypes.NewObservation(nil, nil, nonceObservations).Encode()
default:
return types.Observation{}, fmt.Errorf("unknown state")
}
Expand Down Expand Up @@ -292,6 +319,7 @@ func selectReport(
encoder cciptypes.ExecutePluginCodec,
tokenDataReader exectypes.TokenDataReader,
estimateProvider gas.EstimateProvider,
nonces map[cciptypes.ChainSelector]map[string]uint64,
commitReports []exectypes.CommitData,
maxReportSizeBytes int,
maxGas uint64,
Expand All @@ -306,6 +334,7 @@ func selectReport(
tokenDataReader,
encoder,
estimateProvider,
nonces,
uint64(maxReportSizeBytes),
maxGas)
var stillPendingReports []exectypes.CommitData
Expand Down Expand Up @@ -390,31 +419,30 @@ func (p *Plugin) Outcome(

observation := exectypes.NewObservation(
mergedCommitObservations,
mergedMessageObservations)

//////////////////////////
// common preprocessing //
//////////////////////////

// flatten commit reports and sort by timestamp.
var commitReports []exectypes.CommitData
for _, report := range observation.CommitReports {
commitReports = append(commitReports, report...)
}
sort.Slice(commitReports, func(i, j int) bool {
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
"commitReports", commitReports)
mergedMessageObservations,
nil)

state := previousOutcome.State.Next()
switch state {
case exectypes.GetCommitReports:
// flatten commit reports and sort by timestamp.
var commitReports []exectypes.CommitData
for _, report := range observation.CommitReports {
commitReports = append(commitReports, report...)
}
sort.Slice(commitReports, func(i, j int) bool {
return commitReports[i].Timestamp.Before(commitReports[j].Timestamp)
})

p.lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: commit reports", p.reportingCfg.OracleID),
"commitReports", commitReports)

outcome := exectypes.NewOutcome(state, commitReports, cciptypes.ExecutePluginReport{})
return outcome.Encode()
case exectypes.GetMessages:
commitReports := previousOutcome.PendingCommitReports

// add messages to their commitReports.
for i, report := range commitReports {
report.Messages = nil
Expand All @@ -426,17 +454,31 @@ func (p *Plugin) Outcome(
commitReports[i].Messages = report.Messages
}

outcome := exectypes.NewOutcome(state, commitReports, cciptypes.ExecutePluginReport{})
if outcome.IsEmpty() {
return nil, nil
}

p.lggr.Infow(
fmt.Sprintf("[oracle %d] exec outcome: generated outcome", p.reportingCfg.OracleID),
"outcome", outcome)

return outcome.Encode()
case exectypes.Filter:
commitReports := previousOutcome.PendingCommitReports

// TODO: this function should be pure, a context should not be needed.
outcomeReports, commitReports, err :=
selectReport(
context.Background(),
p.lggr, p.msgHasher,
p.reportCodec,
p.tokenDataReader,
p.estimateProvider,
commitReports,
maxReportSizeBytes,
p.cfg.OffchainConfig.BatchGasLimit)
outcomeReports, commitReports, err := selectReport(
context.Background(),
p.lggr,
p.msgHasher,
p.reportCodec,
p.tokenDataReader,
p.estimateProvider,
observation.Nonces,
commitReports,
maxReportSizeBytes,
p.cfg.OffchainConfig.BatchGasLimit)
if err != nil {
return ocr3types.Outcome{}, fmt.Errorf("unable to extract proofs: %w", err)
}
Expand All @@ -455,8 +497,7 @@ func (p *Plugin) Outcome(
"outcome", outcome)

return outcome.Encode()
case exectypes.Filter:
panic("not implemented")

default:
panic("unknown state")
}
Expand Down
19 changes: 14 additions & 5 deletions execute/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func TestPlugin(t *testing.T) {

runner := testhelpers.NewOCR3Runner(nodes, nodeIDs, nil)

// In the first round there is a pending commit report only.
// Round 1.
// One pending commit report only.
// Two of the messages are executed which should be indicated in the Outcome.
res, err := runner.RunRound(ctx)
require.NoError(t, err)
Expand All @@ -60,18 +61,26 @@ func TestPlugin(t *testing.T) {
require.Len(t, outcome.PendingCommitReports, 1)
require.ElementsMatch(t, outcome.PendingCommitReports[0].ExecutedMessages, []cciptypes.SeqNum{100, 101})

// In the second round there is an exec report and the pending commit report is removed.
// The exec report should indicate the following messages are executed: 102, 103, 104, 105.
// Round 2.
// Messages now attached to the pending commit.
res, err = runner.RunRound(ctx)
require.NoError(t, err)
outcome, err = exectypes.DecodeOutcome(res.Outcome)
require.NoError(t, err)
require.Len(t, outcome.Report.ChainReports, 0)
require.Len(t, outcome.PendingCommitReports, 1)

// Round 3.
// An execute report with the following messages executed: 102, 103, 104, 105.
res, err = runner.RunRound(ctx)
require.NoError(t, err)
outcome, err = exectypes.DecodeOutcome(res.Outcome)
require.NoError(t, err)
require.Len(t, outcome.Report.ChainReports, 1)
require.Len(t, outcome.PendingCommitReports, 0)
sequenceNumbers := slicelib.Map(outcome.Report.ChainReports[0].Messages, func(m cciptypes.Message) cciptypes.SeqNum {
return m.Header.SequenceNumber
})
require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{102, 103, 104, 105})

}

type nodeSetup struct {
Expand Down
8 changes: 4 additions & 4 deletions execute/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestPlugin_ValidateObservation_IneligibleObserver(t *testing.T) {
},
},
},
})
}, nil)
encoded, err := observation.Encode()
require.NoError(t, err)
err = p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestPlugin_ValidateObservation_ValidateObservedSeqNum_Error(t *testing.T) {
{MerkleRoot: root},
},
}
observation := exectypes.NewObservation(commitReports, nil)
observation := exectypes.NewObservation(commitReports, nil, nil)
encoded, err := observation.Encode()
require.NoError(t, err)
err = p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestPlugin_Outcome_CommitReportsMergeError(t *testing.T) {
commitReports := map[cciptypes.ChainSelector][]exectypes.CommitData{
1: {},
}
observation, err := exectypes.NewObservation(commitReports, nil).Encode()
observation, err := exectypes.NewObservation(commitReports, nil, nil).Encode()
require.NoError(t, err)
_, err = p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{
{
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestPlugin_Outcome_MessagesMergeError(t *testing.T) {
},
},
}
observation, err := exectypes.NewObservation(nil, messages).Encode()
observation, err := exectypes.NewObservation(nil, messages, nil).Encode()
require.NoError(t, err)
_, err = p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{
{
Expand Down
9 changes: 9 additions & 0 deletions execute/report/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewBuilder(
tokenDataReader exectypes.TokenDataReader,
encoder cciptypes.ExecutePluginCodec,
estimateProvider gas.EstimateProvider,
nonces map[cciptypes.ChainSelector]map[string]uint64,
maxReportSizeBytes uint64,
maxGas uint64,
) ExecReportBuilder {
Expand All @@ -37,6 +38,8 @@ func NewBuilder(
encoder: encoder,
hasher: hasher,
estimateProvider: estimateProvider,
sendersNonce: nonces,
expectedNonce: make(map[cciptypes.ChainSelector]map[string]uint64),

maxReportSizeBytes: maxReportSizeBytes,
maxGas: maxGas,
Expand Down Expand Up @@ -65,16 +68,22 @@ type execReportBuilder struct {
encoder cciptypes.ExecutePluginCodec
hasher cciptypes.MessageHasher
estimateProvider gas.EstimateProvider
sendersNonce map[cciptypes.ChainSelector]map[string]uint64

// Config
maxReportSizeBytes uint64
maxGas uint64

// State
accumulated validationMetadata
// expectedNonce is used to track nonces for multiple messages from the same sender.
expectedNonce map[cciptypes.ChainSelector]map[string]uint64

// Result
execReports []cciptypes.ExecutePluginReportSingleChain

// TODO: remove temporary feature flagging
nonceCheckingEnabled bool // defaults to disabled for backwards compatibility.
}

func (b *execReportBuilder) Add(
Expand Down
Loading

0 comments on commit 7cfb675

Please sign in to comment.