From a00ba3729b5e268447fc1dc1638fa7107d031208 Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 18 Oct 2024 09:49:07 -0400 Subject: [PATCH] Blue/green deployments for LLO; code cleanup (#81) --- go.mod | 2 +- go.sum | 4 +- llo/channel_definitions_test.go | 2 +- llo/json_report_codec.go | 60 +++++ llo/json_report_codec_test.go | 7 + llo/llo_offchain_config.pb.go | 19 +- llo/llo_offchain_config.proto | 4 +- llo/must.go | 8 - llo/offchain_config.go | 28 +- llo/offchain_config_test.go | 26 +- llo/onchain_config_codec.go | 80 ++++++ llo/onchain_config_codec_test.go | 62 +++++ llo/plugin.go | 58 ++-- llo/plugin_codecs_test.go | 4 - llo/plugin_observation.go | 213 ++++++++------- llo/plugin_observation_test.go | 213 ++++++++++++++- llo/plugin_outcome.go | 165 ++++++------ llo/plugin_outcome_test.go | 251 +++++++++++++++++- llo/plugin_reports.go | 24 +- llo/plugin_reports_test.go | 40 ++- llo/plugin_test.go | 2 +- llo/predecessor_retirement_report_cache.go | 27 -- ...redecessor_retirement_report_cache_test.go | 7 - llo/retirement_report_codec.go | 22 ++ llo/retirement_report_codec_test.go | 7 + llo/should_retire_cache.go | 19 -- llo/should_retire_cache_test.go | 7 - 27 files changed, 984 insertions(+), 377 deletions(-) delete mode 100644 llo/must.go create mode 100644 llo/onchain_config_codec.go create mode 100644 llo/onchain_config_codec_test.go delete mode 100644 llo/predecessor_retirement_report_cache.go delete mode 100644 llo/predecessor_retirement_report_cache_test.go create mode 100644 llo/retirement_report_codec.go create mode 100644 llo/retirement_report_codec_test.go delete mode 100644 llo/should_retire_cache.go delete mode 100644 llo/should_retire_cache_test.go diff --git a/go.mod b/go.mod index 661b221..3af3cec 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.5 require ( github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99 github.com/shopspring/decimal v1.4.0 - github.com/smartcontractkit/chainlink-common v0.3.0 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241017144132-5d8c7abb6779 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 diff --git a/go.sum b/go.sum index 6bdf393..e35bdbd 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6Ng github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= -github.com/smartcontractkit/chainlink-common v0.3.0 h1:mUXHBzzw2qPKyw6gPAC8JhO+ryT8maY+rBi9NFtqEy0= -github.com/smartcontractkit/chainlink-common v0.3.0/go.mod h1:tsGgeEJc5SUSlfVGSX0wR0EkRU3pM58D6SKF97V68ko= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241017144132-5d8c7abb6779 h1:NyCHs/8ub2XctsDrNBYK7LIyvbOsL7klP5iyeWuu3AE= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241017144132-5d8c7abb6779/go.mod h1:tsGgeEJc5SUSlfVGSX0wR0EkRU3pM58D6SKF97V68ko= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA= github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 h1:NzZGjaqez21I3DU7objl3xExTH4fxYvzTqar8DC6360= diff --git a/llo/channel_definitions_test.go b/llo/channel_definitions_test.go index 2398fe1..a959522 100644 --- a/llo/channel_definitions_test.go +++ b/llo/channel_definitions_test.go @@ -15,7 +15,7 @@ func Test_VerifyChannelDefinitions(t *testing.T) { channelDefs[i] = llotypes.ChannelDefinition{} } err := VerifyChannelDefinitions(channelDefs) - assert.EqualError(t, err, "too many channels, got: 10001/10000") + assert.EqualError(t, err, "too many channels, got: 2001/2000") }) t.Run("fails for channel with no streams", func(t *testing.T) { diff --git a/llo/json_report_codec.go b/llo/json_report_codec.go index ff621a3..67ba219 100644 --- a/llo/json_report_codec.go +++ b/llo/json_report_codec.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/smartcontractkit/libocr/offchainreporting2/types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" ) @@ -110,6 +111,10 @@ func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) { return r, fmt.Errorf("failed to decode StreamValue: %w", err) } } + if d.SeqNr == 0 { + // catch obviously bad inputs, since a valid report can never have SeqNr == 0 + return r, fmt.Errorf("missing SeqNr") + } return Report{ ConfigDigest: cd, @@ -121,3 +126,58 @@ func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) { Specimen: d.Specimen, }, err } + +// TODO: Needs tests, MERC-3524 +func (cdc JSONReportCodec) Pack(digest types.ConfigDigest, seqNr uint64, report ocr2types.Report, sigs []types.AttributedOnchainSignature) ([]byte, error) { + type packed struct { + ConfigDigest types.ConfigDigest `json:"configDigest"` + SeqNr uint64 `json:"seqNr"` + Report json.RawMessage `json:"report"` + Sigs []types.AttributedOnchainSignature `json:"sigs"` + } + p := packed{ + ConfigDigest: digest, + SeqNr: seqNr, + Report: json.RawMessage(report), // TODO: check if its valid JSON + Sigs: sigs, + } + return json.Marshal(p) +} + +// TODO: Needs tests, MERC-3524 +func (cdc JSONReportCodec) Unpack(b []byte) (digest types.ConfigDigest, seqNr uint64, report ocr2types.Report, sigs []types.AttributedOnchainSignature, err error) { + type packed struct { + ConfigDigest string `json:"configDigest"` + SeqNr uint64 `json:"seqNr"` + Report json.RawMessage `json:"report"` + Sigs []types.AttributedOnchainSignature `json:"sigs"` + } + p := packed{} + err = json.Unmarshal(b, &p) + if err != nil { + return digest, seqNr, report, sigs, fmt.Errorf("failed to unpack report: expected JSON (got: %s); %w", b, err) + } + cdBytes, err := hex.DecodeString(p.ConfigDigest) + if err != nil { + return digest, seqNr, report, sigs, fmt.Errorf("invalid ConfigDigest; %w", err) + } + cd, err := types.BytesToConfigDigest(cdBytes) + if err != nil { + return digest, seqNr, report, sigs, fmt.Errorf("invalid ConfigDigest; %w", err) + } + return cd, p.SeqNr, ocr2types.Report(p.Report), p.Sigs, nil +} + +// TODO: Needs tests, MERC-3524 +func (cdc JSONReportCodec) UnpackDecode(b []byte) (digest types.ConfigDigest, seqNr uint64, report Report, sigs []types.AttributedOnchainSignature, err error) { + var encodedReport []byte + digest, seqNr, encodedReport, sigs, err = cdc.Unpack(b) + if err != nil { + return digest, seqNr, report, sigs, err + } + r, err := cdc.Decode(encodedReport) + if err != nil { + return digest, seqNr, report, sigs, err + } + return digest, seqNr, r, sigs, nil +} diff --git a/llo/json_report_codec_test.go b/llo/json_report_codec_test.go index a4e06c7..2149bf9 100644 --- a/llo/json_report_codec_test.go +++ b/llo/json_report_codec_test.go @@ -41,4 +41,11 @@ func Test_JSONCodec(t *testing.T) { assert.Equal(t, r, decoded) }) + t.Run("invalid input fails decode", func(t *testing.T) { + cdc := JSONReportCodec{} + _, err := cdc.Decode([]byte(`{}`)) + assert.EqualError(t, err, "invalid ConfigDigest; cannot convert bytes to ConfigDigest. bytes have wrong length 0") + _, err = cdc.Decode([]byte(`{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000"}`)) + assert.EqualError(t, err, "missing SeqNr") + }) } diff --git a/llo/llo_offchain_config.pb.go b/llo/llo_offchain_config.pb.go index 9c7aab1..155582d 100644 --- a/llo/llo_offchain_config.pb.go +++ b/llo/llo_offchain_config.pb.go @@ -24,8 +24,6 @@ type LLOOffchainConfigProto struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - - PredecessorConfigDigest []byte `protobuf:"bytes,1,opt,name=predecessorConfigDigest,proto3" json:"predecessorConfigDigest,omitempty"` } func (x *LLOOffchainConfigProto) Reset() { @@ -60,25 +58,14 @@ func (*LLOOffchainConfigProto) Descriptor() ([]byte, []int) { return file_llo_offchain_config_proto_rawDescGZIP(), []int{0} } -func (x *LLOOffchainConfigProto) GetPredecessorConfigDigest() []byte { - if x != nil { - return x.PredecessorConfigDigest - } - return nil -} - var File_llo_offchain_config_proto protoreflect.FileDescriptor var file_llo_offchain_config_proto_rawDesc = []byte{ 0x0a, 0x19, 0x6c, 0x6c, 0x6f, 0x5f, 0x6f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x76, 0x31, 0x22, - 0x52, 0x0a, 0x16, 0x4c, 0x4c, 0x4f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x38, 0x0a, 0x17, 0x70, 0x72, 0x65, - 0x64, 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, - 0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x17, 0x70, 0x72, 0x65, 0x64, - 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, - 0x65, 0x73, 0x74, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x6c, 0x6c, 0x6f, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x18, 0x0a, 0x16, 0x4c, 0x4c, 0x4f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x6c, + 0x6c, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/llo/llo_offchain_config.proto b/llo/llo_offchain_config.proto index 305b2ca..ce4b54b 100644 --- a/llo/llo_offchain_config.proto +++ b/llo/llo_offchain_config.proto @@ -3,6 +3,4 @@ syntax="proto3"; package v1; option go_package = ".;llo"; -message LLOOffchainConfigProto { - bytes predecessorConfigDigest = 1; -} +message LLOOffchainConfigProto {} diff --git a/llo/must.go b/llo/must.go deleted file mode 100644 index bf002ef..0000000 --- a/llo/must.go +++ /dev/null @@ -1,8 +0,0 @@ -package llo - -func must[T any](x T, err error) T { - if err != nil { - panic(err) - } - return x -} diff --git a/llo/offchain_config.go b/llo/offchain_config.go index bbd5554..1102d89 100644 --- a/llo/offchain_config.go +++ b/llo/offchain_config.go @@ -3,26 +3,11 @@ package llo import ( "fmt" - "github.com/smartcontractkit/libocr/offchainreporting2/types" "google.golang.org/protobuf/proto" ) type OffchainConfig struct { - // We use the offchainconfig of the plugin to tell the plugin the - // configdigest of its predecessor protocol instance. - // - // NOTE: Set here: - // https://github.com/smartcontractkit/mercury-v1-sketch/blob/f52c0f823788f86c1aeaa9ba1eee32a85b981535/onchain/src/ConfigurationStore.sol#L13 - // TODO: This needs to be implemented alongside staging/production - // switchover support: https://smartcontract-it.atlassian.net/browse/MERC-3386 - PredecessorConfigDigest *types.ConfigDigest - // TODO: Billing - // https://smartcontract-it.atlassian.net/browse/MERC-1189 - // QUESTION: Previously we stored ExpiryWindow and BaseUSDFeeCents in offchain - // config, but those might be channel specific so need to move to - // channel definition - // ExpirationWindow uint32 `json:"expirationWindow"` // Integer number of seconds - // BaseUSDFee decimal.Decimal `json:"baseUSDFee"` // Base USD fee + // NOTE: Currently OffchainConfig does not contain anything, and is not used } func DecodeOffchainConfig(b []byte) (o OffchainConfig, err error) { @@ -31,21 +16,10 @@ func DecodeOffchainConfig(b []byte) (o OffchainConfig, err error) { if err != nil { return o, fmt.Errorf("failed to decode offchain config: expected protobuf (got: 0x%x); %w", b, err) } - if len(pbuf.PredecessorConfigDigest) > 0 { - var predecessorConfigDigest types.ConfigDigest - predecessorConfigDigest, err = types.BytesToConfigDigest(pbuf.PredecessorConfigDigest) - if err != nil { - return o, err - } - o.PredecessorConfigDigest = &predecessorConfigDigest - } return } func (c OffchainConfig) Encode() ([]byte, error) { pbuf := LLOOffchainConfigProto{} - if c.PredecessorConfigDigest != nil { - pbuf.PredecessorConfigDigest = c.PredecessorConfigDigest[:] - } return proto.Marshal(&pbuf) } diff --git a/llo/offchain_config_test.go b/llo/offchain_config_test.go index aa1f555..96fe2de 100644 --- a/llo/offchain_config_test.go +++ b/llo/offchain_config_test.go @@ -3,37 +3,13 @@ package llo import ( "testing" - "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func Test_OffchainConfig(t *testing.T) { - t.Run("garbage bytes", func(t *testing.T) { - _, err := DecodeOffchainConfig([]byte{1}) - require.Error(t, err) - - assert.Contains(t, err.Error(), "failed to decode offchain config: expected protobuf (got: 0x01); proto:") - }) - - t.Run("zero length for PredecessorConfigDigest is ok", func(t *testing.T) { - decoded, err := DecodeOffchainConfig([]byte{}) - require.NoError(t, err) - assert.Equal(t, OffchainConfig{}, decoded) - }) - - t.Run("encoding nil PredecessorConfigDigest is ok", func(t *testing.T) { - cfg := OffchainConfig{nil} - - b, err := cfg.Encode() - require.NoError(t, err) - - assert.Len(t, b, 0) - }) - t.Run("encode and decode", func(t *testing.T) { - cd := types.ConfigDigest([32]byte{1, 2, 3}) - cfg := OffchainConfig{&cd} + cfg := OffchainConfig{} b, err := cfg.Encode() require.NoError(t, err) diff --git a/llo/onchain_config_codec.go b/llo/onchain_config_codec.go new file mode 100644 index 0000000..60e62a5 --- /dev/null +++ b/llo/onchain_config_codec.go @@ -0,0 +1,80 @@ +package llo + +import ( + "fmt" + "math/big" + + "github.com/smartcontractkit/libocr/bigbigendian" + "github.com/smartcontractkit/libocr/offchainreporting2/types" +) + +const onchainConfigVersion = 1 + +var onchainConfigVersionBig = big.NewInt(onchainConfigVersion) + +const onchainConfigEncodedLength = 2 * 32 // 2x 32bit evm word: version and predecessorConfigDigest + +type OnchainConfig struct { + Version uint8 + PredecessorConfigDigest *types.ConfigDigest +} + +type OnchainConfigCodec interface { + Decode(b []byte) (OnchainConfig, error) + Encode(OnchainConfig) ([]byte, error) +} + +var _ OnchainConfigCodec = EVMOnchainConfigCodec{} + +// EVMOnchainConfigCodec provides a llo-specific implementation of +// OnchainConfigCodec. +// +// An encoded onchain config is expected to be in the format +// +// where version is a uint8 and min and max are in the format +// returned by EncodeValueInt192. +type EVMOnchainConfigCodec struct{} + +// TODO: Needs fuzz testing - MERC-3524 +func (EVMOnchainConfigCodec) Decode(b []byte) (OnchainConfig, error) { + if len(b) != onchainConfigEncodedLength { + return OnchainConfig{}, fmt.Errorf("unexpected length of OnchainConfig, expected %v, got %v", onchainConfigEncodedLength, len(b)) + } + + v, err := bigbigendian.DeserializeSigned(32, b[:32]) + if err != nil { + return OnchainConfig{}, err + } + if v.Cmp(onchainConfigVersionBig) != 0 { + return OnchainConfig{}, fmt.Errorf("unexpected version of OnchainConfig, expected %v, got %v", onchainConfigVersion, v) + } + + o := OnchainConfig{ + Version: uint8(v.Uint64()), + } + + cd := types.ConfigDigest(b[32:64]) + if (cd != types.ConfigDigest{}) { + o.PredecessorConfigDigest = &cd + } + return o, nil +} + +// TODO: Needs fuzz testing - MERC-3524 +func (EVMOnchainConfigCodec) Encode(c OnchainConfig) ([]byte, error) { + if c.Version != onchainConfigVersion { + return nil, fmt.Errorf("unexpected version of OnchainConfig, expected %v, got %v", onchainConfigVersion, c.Version) + } + verBytes, err := bigbigendian.SerializeSigned(32, onchainConfigVersionBig) + if err != nil { + return nil, err + } + cdBytes := make([]byte, 32) + if c.PredecessorConfigDigest != nil { + copy(cdBytes, c.PredecessorConfigDigest[:]) + } + result := make([]byte, 0, onchainConfigEncodedLength) + result = append(result, verBytes...) + result = append(result, cdBytes...) + return result, nil +} diff --git a/llo/onchain_config_codec_test.go b/llo/onchain_config_codec_test.go new file mode 100644 index 0000000..babb6e7 --- /dev/null +++ b/llo/onchain_config_codec_test.go @@ -0,0 +1,62 @@ +package llo + +import ( + "testing" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_EVMOnchainConfigCodec(t *testing.T) { + c := EVMOnchainConfigCodec{} + + t.Run("invalid length", func(t *testing.T) { + _, err := c.Decode([]byte{1}) + require.Error(t, err) + + assert.Contains(t, err.Error(), "unexpected length of OnchainConfig, expected 64, got 1") + }) + + t.Run("encoding unsupported version fails", func(t *testing.T) { + cfg := OnchainConfig{Version: uint8(100)} + + _, err := c.Encode(cfg) + require.EqualError(t, err, "unexpected version of OnchainConfig, expected 1, got 100") + }) + t.Run("decoding unsupported version fails", func(t *testing.T) { + b := make([]byte, 64) + b[30] = 100 + + _, err := c.Decode(b) + require.EqualError(t, err, "unexpected version of OnchainConfig, expected 1, got 25600") + }) + + t.Run("encoding nil PredecessorConfigDigest is ok", func(t *testing.T) { + cfg := OnchainConfig{Version: uint8(1)} + + b, err := c.Encode(cfg) + require.NoError(t, err) + + assert.Len(t, b, 64) + assert.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, b) + + t.Run("decoding zero predecessor config digest results in nil", func(t *testing.T) { + decoded, err := c.Decode(b) + require.NoError(t, err) + assert.Equal(t, OnchainConfig{Version: 1}, decoded) + }) + }) + + t.Run("encode and decode", func(t *testing.T) { + cd := types.ConfigDigest([32]byte{1, 2, 3}) + cfg := OnchainConfig{Version: uint8(1), PredecessorConfigDigest: &cd} + + b, err := c.Encode(cfg) + require.NoError(t, err) + + cfgDecoded, err := c.Decode(b) + require.NoError(t, err) + assert.Equal(t, cfg, cfgDecoded) + }) +} diff --git a/llo/plugin.go b/llo/plugin.go index b38c590..913f0ad 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -14,8 +14,6 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" ) -// TODO: Split out this file and write unit tests: https://smartcontract-it.atlassian.net/browse/MERC-3524 - // Additional limits so we can more effectively bound the size of observations // NOTE: These are hardcoded because these exact values are relied upon as a // property of coming to consensus, it's too dangerous to make these @@ -23,6 +21,18 @@ import ( // OffchainConfig if they need to be changed dynamically and in a // backwards-compatible way. const ( + // OCR protocol limits + // NOTE: CAREFUL! If we ever accidentally exceed these e.g. + // through too many channels/streams, the protocol will halt. + // + // TODO: How many channels/streams can we support given these constraints? + // https://smartcontract-it.atlassian.net/browse/MERC-6468 + MaxReportCount = ocr3types.MaxMaxReportCount + MaxObservationLength = ocr3types.MaxMaxObservationLength + MaxOutcomeLength = ocr3types.MaxMaxOutcomeLength + MaxReportLength = ocr3types.MaxMaxReportLength + + // LLO-specific limits // Maximum amount of channels that can be added per round (if more than // this needs to be added, it will be added in batches until everything is // up-to-date) @@ -32,14 +42,10 @@ const ( // is up-to-date) MaxObservationUpdateChannelDefinitionsLength = 5 // Maximum number of streams that can be observed per round - // TODO: This needs to be implemented on the Observation side so we don't - // even generate an observation that fails this MaxObservationStreamValuesLength = 10_000 // MaxOutcomeChannelDefinitionsLength is the maximum number of channels that // can be supported - // TODO: This needs to be implemented on the Observation side so we don't - // even generate an observation that fails this - MaxOutcomeChannelDefinitionsLength = 10_000 + MaxOutcomeChannelDefinitionsLength = MaxReportCount ) type DSOpts interface { @@ -98,7 +104,7 @@ type ShouldRetireCache interface { // reads asynchronously from onchain Configur // Should the protocol instance retire according to the configuration // contract? // See: https://github.com/smartcontractkit/mercury-v1-sketch/blob/main/onchain/src/ConfigurationStore.sol#L18 - ShouldRetire() (bool, error) + ShouldRetire(digest ocr2types.ConfigDigest) (bool, error) } // The predecessor protocol instance stores its attested retirement report in @@ -111,7 +117,15 @@ type ShouldRetireCache interface { // reads asynchronously from onchain Configur // The sketch envisions it being implemented as a single object that is shared // between different protocol instances. type PredecessorRetirementReportCache interface { + // AttestedRetirementReport returns the attested retirement report for the + // given config digest from the local cache. + // + // This should return nil and not error in the case of a missing attested + // retirement report. AttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest) ([]byte, error) + // CheckAttestedRetirementReport verifies that an attested retirement + // report, which may have come from another node, is valid (signed) with + // signers corresponding to the given config digest CheckAttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) } @@ -174,9 +188,9 @@ type ChannelDefinitionCache interface { // A ReportingPlugin instance will only ever serve a single protocol instance. var _ ocr3types.ReportingPluginFactory[llotypes.ReportInfo] = &PluginFactory{} -func NewPluginFactory(cfg Config, prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory { +func NewPluginFactory(cfg Config, prrc PredecessorRetirementReportCache, src ShouldRetireCache, rcodec RetirementReportCodec, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, oncc OnchainConfigCodec, reportCodecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory { return &PluginFactory{ - cfg, prrc, src, cdc, ds, lggr, codecs, + cfg, prrc, src, rcodec, cdc, ds, lggr, oncc, reportCodecs, } } @@ -190,21 +204,23 @@ type PluginFactory struct { Config Config PredecessorRetirementReportCache PredecessorRetirementReportCache ShouldRetireCache ShouldRetireCache + RetirementReportCodec RetirementReportCodec ChannelDefinitionCache ChannelDefinitionCache DataSource DataSource Logger logger.Logger - Codecs map[llotypes.ReportFormat]ReportCodec + OnchainConfigCodec OnchainConfigCodec + ReportCodecs map[llotypes.ReportFormat]ReportCodec } func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[llotypes.ReportInfo], ocr3types.ReportingPluginInfo, error) { - offchainCfg, err := DecodeOffchainConfig(cfg.OffchainConfig) + onchainConfig, err := f.OnchainConfigCodec.Decode(cfg.OnchainConfig) if err != nil { - return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("NewReportingPlugin failed to decode offchain config; got: 0x%x (len: %d); %w", cfg.OffchainConfig, len(cfg.OffchainConfig), err) + return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("NewReportingPlugin failed to decode onchain config; got: 0x%x (len: %d); %w", cfg.OnchainConfig, len(cfg.OnchainConfig), err) } return &Plugin{ f.Config, - offchainCfg.PredecessorConfigDigest, + onchainConfig.PredecessorConfigDigest, cfg.ConfigDigest, f.PredecessorRetirementReportCache, f.ShouldRetireCache, @@ -215,15 +231,16 @@ func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.Re cfg.F, protoObservationCodec{}, protoOutcomeCodec{}, - f.Codecs, + f.RetirementReportCodec, + f.ReportCodecs, }, ocr3types.ReportingPluginInfo{ Name: "LLO", Limits: ocr3types.ReportingPluginLimits{ MaxQueryLength: 0, - MaxObservationLength: ocr3types.MaxMaxObservationLength, // TODO: use tighter bound MERC-3524 - MaxOutcomeLength: ocr3types.MaxMaxOutcomeLength, // TODO: use tighter bound MERC-3524 - MaxReportLength: ocr3types.MaxMaxReportLength, // TODO: use tighter bound MERC-3524 - MaxReportCount: ocr3types.MaxMaxReportCount, // TODO: use tighter bound MERC-3524 + MaxObservationLength: MaxObservationLength, + MaxOutcomeLength: MaxOutcomeLength, + MaxReportLength: MaxReportLength, + MaxReportCount: MaxReportCount, }, }, nil } @@ -250,7 +267,8 @@ type Plugin struct { F int ObservationCodec ObservationCodec OutcomeCodec OutcomeCodec - Codecs map[llotypes.ReportFormat]ReportCodec + RetirementReportCodec RetirementReportCodec + ReportCodecs map[llotypes.ReportFormat]ReportCodec } // Query creates a Query that is sent from the leader to all follower nodes diff --git a/llo/plugin_codecs_test.go b/llo/plugin_codecs_test.go index f0b214b..81fdb84 100644 --- a/llo/plugin_codecs_test.go +++ b/llo/plugin_codecs_test.go @@ -10,10 +10,6 @@ import ( llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" ) -// TODO: probably ought to have fuzz testing to detect crashes -// TODO: what about resource starvation attacks? maximum length? Does OCR -// protect us from this? - func Test_protoObservationCodec(t *testing.T) { t.Run("encode and decode empty struct", func(t *testing.T) { obs := Observation{} diff --git a/llo/plugin_observation.go b/llo/plugin_observation.go index 32843e0..d2ae6fc 100644 --- a/llo/plugin_observation.go +++ b/llo/plugin_observation.go @@ -24,141 +24,136 @@ func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContex // See case at the top of Outcome() return types.Observation{}, nil } - // Second round will have no channel definitions yet, but may vote to add - // them - - // QUESTION: is there a way to have this captured in EAs so we get something - // closer to the source? - nowNanoseconds := time.Now().UnixNano() + // SeqNr==2 will have no channel definitions yet, so will not make any + // observations, but it may vote to add new channel definitions previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) if err != nil { return nil, fmt.Errorf("error unmarshalling previous outcome: %w", err) } - var attestedRetirementReport []byte - // Only try to fetch this from the cache if this instance if configured - // with a predecessor and we're still in the staging stage. - if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { - var err2 error - attestedRetirementReport, err2 = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) - if err2 != nil { - return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err2) - } + obs := Observation{ + // QUESTION: is there a way to have this captured in EAs so we get something + // closer to the source? + UnixTimestampNanoseconds: time.Now().UnixNano(), } - shouldRetire, err := p.ShouldRetireCache.ShouldRetire() - if err != nil { - return nil, fmt.Errorf("error fetching shouldRetire from cache: %w", err) - } + if previousOutcome.LifeCycleStage == LifeCycleStageRetired { + p.Logger.Debugw("Node is retired, will generate empty observation", "stage", "Observation", "seqNr", outctx.SeqNr) + } else { + if err = VerifyChannelDefinitions(previousOutcome.ChannelDefinitions); err != nil { + // This is not expected, unless the majority of nodes are using a + // different verification method than this one. + // + // If it does happen, it's an invariant violation and we cannot + // generate an observation. + return nil, fmt.Errorf("previousOutcome.Definitions is invalid: %w", err) + } - // vote to remove channel ids if they're in the previous outcome - // ChannelDefinitions or ValidAfterSeconds - removeChannelIDs := map[llotypes.ChannelID]struct{}{} - // vote to add channel definitions that aren't present in the previous - // outcome ChannelDefinitions - // FIXME: Why care about ValidAfterSeconds here? - var updateChannelDefinitions llotypes.ChannelDefinitions - { - // NOTE: Be careful using maps, since key ordering is randomized! All - // addition/removal lists must be built deterministically so that nodes - // can agree on the same set of changes. - // - // ChannelIDs should always be sorted the same way (channel ID ascending). - expectedChannelDefs := p.ChannelDefinitionCache.Definitions() - if err := VerifyChannelDefinitions(expectedChannelDefs); err != nil { - return nil, fmt.Errorf("ChannelDefinitionCache.Definitions is invalid: %w", err) + // Only try to fetch this from the cache if this instance if configured + // with a predecessor and we're still in the staging stage. + if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { + var err2 error + obs.AttestedPredecessorRetirement, err2 = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) + if err2 != nil { + return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err2) + } } - removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MaxObservationRemoveChannelIDsLength) - for channelID := range removeChannelDefinitions { - removeChannelIDs[channelID] = struct{}{} + obs.ShouldRetire, err = p.ShouldRetireCache.ShouldRetire(p.ConfigDigest) + if err != nil { + return nil, fmt.Errorf("error fetching shouldRetire from cache: %w", err) + } + if obs.ShouldRetire && p.Config.VerboseLogging { + p.Logger.Debugw("Voting to retire", "seqNr", outctx.SeqNr, "stage", "Observation") } - // TODO: needs testing - validAfterSecondsChannelIDs := maps.Keys(previousOutcome.ValidAfterSeconds) - // Sort so we cut off deterministically - sortChannelIDs(validAfterSecondsChannelIDs) - for _, channelID := range validAfterSecondsChannelIDs { - if len(removeChannelIDs) >= MaxObservationRemoveChannelIDsLength { - break + // vote to remove channel ids if they're in the previous outcome + // ChannelDefinitions + obs.RemoveChannelIDs = map[llotypes.ChannelID]struct{}{} + // vote to add channel definitions that aren't present in the previous + // outcome ChannelDefinitions + { + // NOTE: Be careful using maps, since key ordering is randomized! All + // addition/removal lists must be built deterministically so that nodes + // can agree on the same set of changes. + // + // ChannelIDs should always be sorted the same way (channel ID ascending). + expectedChannelDefs := p.ChannelDefinitionCache.Definitions() + if err = VerifyChannelDefinitions(expectedChannelDefs); err != nil { + // If channel definitions is invalid, do not error out but instead + // don't vote on any new channels. + // + // This prevents protocol halts in the event of an invalid channel + // definitions file. + p.Logger.Errorw("ChannelDefinitionCache.Definitions is invalid", "err", err) + } else { + removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MaxObservationRemoveChannelIDsLength) + for channelID := range removeChannelDefinitions { + obs.RemoveChannelIDs[channelID] = struct{}{} + } + + // NOTE: This is slow because it deeply compares every value in the map. + // To improve performance, consider changing channel voting to happen + // every N rounds instead of every round. Or, alternatively perhaps the + // first e.g. 100 rounds could check every round to allow for fast feed + // spinup, then after that every 10 or 100 rounds. + obs.UpdateChannelDefinitions = make(llotypes.ChannelDefinitions) + expectedChannelIDs := maps.Keys(expectedChannelDefs) + // Sort so we cut off deterministically + sortChannelIDs(expectedChannelIDs) + for _, channelID := range expectedChannelIDs { + prev, exists := previousOutcome.ChannelDefinitions[channelID] + channelDefinition := expectedChannelDefs[channelID] + if exists && prev.Equals(channelDefinition) { + continue + } + // Add or replace channel + obs.UpdateChannelDefinitions[channelID] = channelDefinition + if len(obs.UpdateChannelDefinitions) >= MaxObservationUpdateChannelDefinitionsLength { + // Never add more than MaxObservationUpdateChannelDefinitionsLength + break + } + } } - if _, ok := expectedChannelDefs[channelID]; !ok { - removeChannelIDs[channelID] = struct{}{} - } - } - // NOTE: This is slow because it deeply compares every value in the map. - // To improve performance, consider changing channel voting to happen - // every N rounds instead of every round. Or, alternatively perhaps the - // first e.g. 100 rounds could check every round to allow for fast feed - // spinup, then after that every 10 or 100 rounds. - updateChannelDefinitions = make(llotypes.ChannelDefinitions) - expectedChannelIDs := maps.Keys(expectedChannelDefs) - // Sort so we cut off deterministically - sortChannelIDs(expectedChannelIDs) - for _, channelID := range expectedChannelIDs { - prev, exists := previousOutcome.ChannelDefinitions[channelID] - channelDefinition := expectedChannelDefs[channelID] - if exists && prev.Equals(channelDefinition) { - continue + if len(obs.UpdateChannelDefinitions) > 0 { + p.Logger.Debugw("Voting to update channel definitions", + "updateChannelDefinitions", obs.UpdateChannelDefinitions, + "seqNr", outctx.SeqNr, + "stage", "Observation") } - // Add or replace channel - updateChannelDefinitions[channelID] = channelDefinition - if len(updateChannelDefinitions) >= MaxObservationUpdateChannelDefinitionsLength { - // Never add more than MaxObservationUpdateChannelDefinitionsLength - break + if len(obs.RemoveChannelIDs) > 0 { + p.Logger.Debugw("Voting to remove channel definitions", + "removeChannelIDs", obs.RemoveChannelIDs, + "seqNr", outctx.SeqNr, + "stage", "Observation", + ) } } - if len(updateChannelDefinitions) > 0 { - p.Logger.Debugw("Voting to update channel definitions", - "updateChannelDefinitions", updateChannelDefinitions, - "seqNr", outctx.SeqNr, - "stage", "Observation") - } - if len(removeChannelIDs) > 0 { - p.Logger.Debugw("Voting to remove channel definitions", - "removeChannelIDs", removeChannelIDs, - "seqNr", outctx.SeqNr, - "stage", "Observation", - ) - } - } - - var streamValues StreamValues - if len(previousOutcome.ChannelDefinitions) == 0 { - p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "stage", "Observation", "seqNr", outctx.SeqNr) - } else { - streamValues = make(StreamValues) - for _, channelDefinition := range previousOutcome.ChannelDefinitions { - for _, strm := range channelDefinition.Streams { - streamValues[strm.StreamID] = nil + if len(previousOutcome.ChannelDefinitions) == 0 { + p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "stage", "Observation", "seqNr", outctx.SeqNr) + } else { + obs.StreamValues = make(StreamValues) + for _, channelDefinition := range previousOutcome.ChannelDefinitions { + for _, strm := range channelDefinition.Streams { + obs.StreamValues[strm.StreamID] = nil + } } - } - if err := p.DataSource.Observe(ctx, streamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { - return nil, fmt.Errorf("DataSource.Observe error: %w", err) + if err = p.DataSource.Observe(ctx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { + return nil, fmt.Errorf("DataSource.Observe error: %w", err) + } } } - var rawObservation []byte - { - var err error - rawObservation, err = p.ObservationCodec.Encode(Observation{ - attestedRetirementReport, - shouldRetire, - nowNanoseconds, - removeChannelIDs, - updateChannelDefinitions, - streamValues, - }) - if err != nil { - return nil, fmt.Errorf("Observation encode error: %w", err) - } + serialized, err := p.ObservationCodec.Encode(obs) + if err != nil { + return nil, fmt.Errorf("Observation encode error: %w", err) } - return rawObservation, nil + return serialized, nil } type Observation struct { diff --git a/llo/plugin_observation_test.go b/llo/plugin_observation_test.go index cc29184..8002eea 100644 --- a/llo/plugin_observation_test.go +++ b/llo/plugin_observation_test.go @@ -2,6 +2,7 @@ package llo import ( "context" + "errors" "testing" "time" @@ -11,12 +12,27 @@ import ( "golang.org/x/exp/maps" "github.com/smartcontractkit/libocr/offchainreporting2/types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" ) +type mockPredecessorRetirementReportCache struct { + retirementReports map[ocr2types.ConfigDigest][]byte + err error +} + +var _ PredecessorRetirementReportCache = &mockPredecessorRetirementReportCache{} + +func (p *mockPredecessorRetirementReportCache) AttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest) ([]byte, error) { + return p.retirementReports[predecessorConfigDigest], p.err +} +func (p *mockPredecessorRetirementReportCache) CheckAttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) { + panic("not implemented") +} + func Test_Observation(t *testing.T) { smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ 1: { @@ -271,12 +287,62 @@ func Test_Observation(t *testing.T) { assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) assert.Equal(t, ds.s, decoded.StreamValues) }) + + t.Run("in case previous outcome channel definitions is invalid, returns error", func(t *testing.T) { + dfns := make(llotypes.ChannelDefinitions) + for i := 0; i < 2*MaxOutcomeChannelDefinitionsLength; i++ { + dfns[llotypes.ChannelID(i)] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}}, + } + } + + previousOutcome := Outcome{ + ChannelDefinitions: dfns, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + _, err = p.Observation(context.Background(), outctx, query) + assert.EqualError(t, err, "previousOutcome.Definitions is invalid: too many channels, got: 4000/2000") + }) + + t.Run("in case ChannelDefinitionsCache returns invalid definitions, does not vote to change anything", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: smallDefinitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + dfns := make(llotypes.ChannelDefinitions) + for i := 0; i < 2*MaxOutcomeChannelDefinitionsLength; i++ { + dfns[llotypes.ChannelID(i)] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}}, + } + } + cdc.definitions = dfns + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.UpdateChannelDefinitions, 0) + assert.Len(t, decoded.RemoveChannelIDs, 0) + }) }) cdc.definitions = smallDefinitions - // TODO: huge (greater than max allowed) - t.Run("votes to remove channel IDs", func(t *testing.T) { t.Run("first round of removals", func(t *testing.T) { testStartTS := time.Now() @@ -349,4 +415,147 @@ func Test_Observation(t *testing.T) { assert.Equal(t, ds.s, decoded.StreamValues) }) }) + + t.Run("sets shouldRetire if ShouldRetireCache.ShouldRetire() is true", func(t *testing.T) { + previousOutcome := Outcome{} + src := &mockShouldRetireCache{shouldRetire: true} + p.ShouldRetireCache = src + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.True(t, decoded.ShouldRetire) + }) + + t.Run("when predecessor config digest is set", func(t *testing.T) { + testStartTS := time.Now() + cd := types.ConfigDigest{2, 3, 4, 5, 6} + p.PredecessorConfigDigest = &cd + t.Run("in staging lifecycle stage, adds attestedRetirementReport to observation", func(t *testing.T) { + prrc := &mockPredecessorRetirementReportCache{ + retirementReports: map[ocr2types.ConfigDigest][]byte{ + {2, 3, 4, 5, 6}: []byte("foo"), + }, + } + p.PredecessorRetirementReportCache = prrc + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageStaging, + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Equal(t, []byte("foo"), decoded.AttestedPredecessorRetirement) + }) + t.Run("if predecessor retirement report cache returns error, returns error", func(t *testing.T) { + prrc := &mockPredecessorRetirementReportCache{ + err: errors.New("retirement report not found error"), + } + p.PredecessorRetirementReportCache = prrc + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageStaging, + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + _, err = p.Observation(context.Background(), outctx, query) + assert.EqualError(t, err, "error fetching attested retirement report from cache: retirement report not found error") + }) + t.Run("in production lifecycle stage, does not add attestedRetirementReport to observation", func(t *testing.T) { + prrc := &mockPredecessorRetirementReportCache{ + retirementReports: map[ocr2types.ConfigDigest][]byte{ + {2, 3, 4, 5, 6}: []byte("foo"), + }, + err: nil, + } + p.PredecessorRetirementReportCache = prrc + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageProduction, + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Equal(t, []byte(nil), decoded.AttestedPredecessorRetirement) + }) + }) + t.Run("if previous outcome is retired, returns observation with only timestamp", func(t *testing.T) { + testStartTS := time.Now() + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageRetired, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Zero(t, decoded.AttestedPredecessorRetirement) + assert.False(t, decoded.ShouldRetire) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Len(t, decoded.UpdateChannelDefinitions, 0) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.Len(t, decoded.StreamValues, 0) + }) + + invalidDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + // no streams means invalid + Streams: []llotypes.Stream{}, + }, + } + t.Run("if channel definitions file is invalid, does not vote to add or remove any channels and only submits observations", func(t *testing.T) { + testStartTS := time.Now() + previousOutcome := Outcome{ + LifeCycleStage: LifeCycleStageStaging, + ChannelDefinitions: smallDefinitions, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + cdc.definitions = invalidDefinitions + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.UpdateChannelDefinitions, 0) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) } diff --git a/llo/plugin_outcome.go b/llo/plugin_outcome.go index 8646783..c81afbc 100644 --- a/llo/plugin_outcome.go +++ b/llo/plugin_outcome.go @@ -19,10 +19,12 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos return nil, fmt.Errorf("invariant violation: expected at least 2f+1 attributed observations, got %d (f: %d)", len(aos), p.F) } - // Initial outcome is kind of a "keystone" with minimum extra information + // Initial outcome is kind of a "cornerstone" with minimum extra information if outctx.SeqNr <= 1 { // Initial Outcome var lifeCycleStage llotypes.LifeCycleStage + // NOTE: Staging instances **require** a predecessor config digest. + // This is enforced by the contract. if p.PredecessorConfigDigest == nil { // Start straight in production if we have no predecessor lifeCycleStage = LifeCycleStageProduction @@ -50,74 +52,7 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos ///////////////////////////////// // Decode observations ///////////////////////////////// - - // a single valid retirement report is enough - var validPredecessorRetirementReport *RetirementReport - - shouldRetireVotes := 0 - - timestampsNanoseconds := []int64{} - - removeChannelVotesByID := map[llotypes.ChannelID]int{} - - // for each channelId count number of votes that mention it and count number of votes that include it. - updateChannelVotesByHash := map[ChannelHash]int{} - updateChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} - - streamObservations := make(map[llotypes.StreamID][]StreamValue) - - for _, ao := range aos { - // TODO: Put in a function - // MERC-3524 - observation, err2 := p.ObservationCodec.Decode(ao.Observation) - if err2 != nil { - p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err2) - continue - } - - if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { - pcd := *p.PredecessorConfigDigest - retirementReport, err3 := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) - if err3 != nil { - p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err3, "predecessorConfigDigest", pcd) - continue - } - validPredecessorRetirementReport = &retirementReport - } - - if observation.ShouldRetire { - shouldRetireVotes++ - } - - timestampsNanoseconds = append(timestampsNanoseconds, observation.UnixTimestampNanoseconds) - - for channelID := range observation.RemoveChannelIDs { - removeChannelVotesByID[channelID]++ - } - - for channelID, channelDefinition := range observation.UpdateChannelDefinitions { - defWithID := ChannelDefinitionWithID{channelDefinition, channelID} - channelHash := MakeChannelHash(defWithID) - updateChannelVotesByHash[channelHash]++ - updateChannelDefinitionsByHash[channelHash] = defWithID - } - - var missingObservations []llotypes.StreamID - for id, sv := range observation.StreamValues { - if sv != nil { // FIXME: nil checks don't work here. Test this and figure out what to do (also, are there other cases?) - streamObservations[id] = append(streamObservations[id], sv) - } else { - missingObservations = append(missingObservations, id) - } - } - if p.Config.VerboseLogging { - if len(missingObservations) > 0 { - sort.Slice(missingObservations, func(i, j int) bool { return missingObservations[i] < missingObservations[j] }) - p.Logger.Debugw("Peer was missing observations", "streamIDs", missingObservations, "oracleID", ao.Observer, "stage", "Outcome", "seqNr", outctx.SeqNr) - } - p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) - } - } + timestampsNanoseconds, validPredecessorRetirementReport, shouldRetireVotes, removeChannelVotesByID, updateChannelDefinitionsByHash, updateChannelVotesByHash, streamObservations := p.decodeObservations(aos, outctx) if len(timestampsNanoseconds) == 0 { return nil, errors.New("no valid observations") @@ -125,11 +60,17 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos var outcome Outcome + ///////////////////////////////// + // outcome.ObservationsTimestampNanoseconds + ///////////////////////////////// + outcome.ObservationsTimestampNanoseconds = medianTimestamp(timestampsNanoseconds) + ///////////////////////////////// // outcome.LifeCycleStage ///////////////////////////////// if previousOutcome.LifeCycleStage == LifeCycleStageStaging && validPredecessorRetirementReport != nil { // Promote this protocol instance to the production stage! 🚀 + p.Logger.Infow("Promoting protocol instance from staging to production 🎖️", "seqNr", outctx.SeqNr, "stage", "Outcome", "validAfterSeconds", validPredecessorRetirementReport.ValidAfterSeconds) // override ValidAfterSeconds with the value from the retirement report // so that we have no gaps in the validity time range. @@ -140,16 +81,10 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos } if outcome.LifeCycleStage == LifeCycleStageProduction && shouldRetireVotes > p.F { + p.Logger.Infow("Retiring production protocol instance ⚰️", "seqNr", outctx.SeqNr, "stage", "Outcome") outcome.LifeCycleStage = LifeCycleStageRetired } - ///////////////////////////////// - // outcome.ObservationsTimestampNanoseconds - // TODO: Refactor this into an aggregate function - // MERC-3524 - sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) - outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2] - ///////////////////////////////// // outcome.ChannelDefinitions ///////////////////////////////// @@ -237,8 +172,10 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos if p.Config.VerboseLogging { p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "stage", "Outcome", "seqNr", outctx.SeqNr) } + // previous outcome did not report; keep the same validAfterSeconds outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds } else { + // previous outcome reported; update validAfterSeconds to the previousObservationsTimestamp outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds } } @@ -311,6 +248,68 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos return p.OutcomeCodec.Encode(outcome) } +func (p *Plugin) decodeObservations(aos []types.AttributedObservation, outctx ocr3types.OutcomeContext) (timestampsNanoseconds []int64, validPredecessorRetirementReport *RetirementReport, shouldRetireVotes int, removeChannelVotesByID map[llotypes.ChannelID]int, updateChannelDefinitionsByHash map[ChannelHash]ChannelDefinitionWithID, updateChannelVotesByHash map[ChannelHash]int, streamObservations map[llotypes.StreamID][]StreamValue) { + removeChannelVotesByID = make(map[llotypes.ChannelID]int) + updateChannelDefinitionsByHash = make(map[ChannelHash]ChannelDefinitionWithID) + updateChannelVotesByHash = make(map[ChannelHash]int) + streamObservations = make(map[llotypes.StreamID][]StreamValue) + + for _, ao := range aos { + observation, err2 := p.ObservationCodec.Decode(ao.Observation) + if err2 != nil { + p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err2) + continue + } + + if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { + // a single valid retirement report is enough + pcd := *p.PredecessorConfigDigest + retirementReport, err3 := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) + if err3 != nil { + p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err3, "predecessorConfigDigest", pcd) + continue + } + validPredecessorRetirementReport = &retirementReport + } + + if observation.ShouldRetire { + shouldRetireVotes++ + } + + timestampsNanoseconds = append(timestampsNanoseconds, observation.UnixTimestampNanoseconds) + + for channelID := range observation.RemoveChannelIDs { + removeChannelVotesByID[channelID]++ + } + + // for each channelId count number of votes that mention it and count number of votes that include it. + for channelID, channelDefinition := range observation.UpdateChannelDefinitions { + defWithID := ChannelDefinitionWithID{channelDefinition, channelID} + channelHash := MakeChannelHash(defWithID) + updateChannelVotesByHash[channelHash]++ + updateChannelDefinitionsByHash[channelHash] = defWithID + } + + var missingObservations []llotypes.StreamID + for id, sv := range observation.StreamValues { + if sv != nil { // FIXME: nil checks don't work here. Test this and figure out what to do (also, are there other cases?) + streamObservations[id] = append(streamObservations[id], sv) + } else { + missingObservations = append(missingObservations, id) + } + } + if p.Config.VerboseLogging { + if len(missingObservations) > 0 { + sort.Slice(missingObservations, func(i, j int) bool { return missingObservations[i] < missingObservations[j] }) + p.Logger.Debugw("Peer was missing observations", "streamIDs", missingObservations, "oracleID", ao.Observer, "stage", "Outcome", "seqNr", outctx.SeqNr) + } + p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) + } + } + + return +} + type Outcome struct { // LifeCycleStage the protocol is in LifeCycleStage llotypes.LifeCycleStage @@ -339,12 +338,17 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { return uint32(result), nil } +func (out *Outcome) GenRetirementReport() RetirementReport { + return RetirementReport{ + ValidAfterSeconds: out.ValidAfterSeconds, + } +} + // Indicates whether a report can be generated for the given channel. // Returns nil if channel is reportable // NOTE: A channel is still reportable even if missing some or all stream // values. The report codec is expected to handle nils and act accordingly // (e.g. some values may be optional). -// TODO: Test this function func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableChannel { if out.LifeCycleStage == LifeCycleStageRetired { return &ErrUnreportableChannel{nil, "IsReportable=false; retired channel", channelID} @@ -364,7 +368,6 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableC // No validAfterSeconds entry yet, this must be a new channel. // validAfterSeconds will be populated in Outcome() so the channel // becomes reportable in later protocol rounds. - // TODO: Test this case, haven't seen it in prod logs even though it would be expected return &ErrUnreportableChannel{nil, "IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", channelID} } @@ -377,7 +380,6 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableC // List of reportable channels (according to IsReportable), sorted according // to a canonical ordering -// TODO: test this func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unreportable []*ErrUnreportableChannel) { for channelID := range out.ChannelDefinitions { if err := out.IsReportable(channelID); err != nil { @@ -437,3 +439,8 @@ func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { h.Sum(result[:0]) return result } + +func medianTimestamp(timestampsNanoseconds []int64) int64 { + sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) + return timestampsNanoseconds[len(timestampsNanoseconds)/2] +} diff --git a/llo/plugin_outcome_test.go b/llo/plugin_outcome_test.go index 35d0b15..36cc0ef 100644 --- a/llo/plugin_outcome_test.go +++ b/llo/plugin_outcome_test.go @@ -2,6 +2,7 @@ package llo import ( "fmt" + "math" "testing" "time" @@ -19,6 +20,7 @@ import ( ) func Test_Outcome(t *testing.T) { + ctx := tests.Context(t) p := &Plugin{ Config: Config{true}, OutcomeCodec: protoOutcomeCodec{}, @@ -27,7 +29,6 @@ func Test_Outcome(t *testing.T) { } t.Run("if number of observers < 2f+1, errors", func(t *testing.T) { - ctx := tests.Context(t) _, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{}) assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)") p.F = 1 @@ -36,7 +37,6 @@ func Test_Outcome(t *testing.T) { }) t.Run("if seqnr == 1, and has enough observers, emits initial outcome with 'production' LifeCycleStage", func(t *testing.T) { - ctx := tests.Context(t) outcome, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{ { Observation: []byte{}, @@ -67,7 +67,6 @@ func Test_Outcome(t *testing.T) { t.Run("channel definitions", func(t *testing.T) { t.Run("adds a new channel definition if there are enough votes", func(t *testing.T) { - ctx := tests.Context(t) newCd := llotypes.ChannelDefinition{ ReportFormat: llotypes.ReportFormat(2), Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, @@ -96,7 +95,6 @@ func Test_Outcome(t *testing.T) { }) t.Run("replaces an existing channel definition if there are enough votes", func(t *testing.T) { - ctx := tests.Context(t) newCd := llotypes.ChannelDefinition{ ReportFormat: llotypes.ReportFormat(2), Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorQuote}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, @@ -136,7 +134,6 @@ func Test_Outcome(t *testing.T) { }) t.Run("does not add channels beyond MaxOutcomeChannelDefinitionsLength", func(t *testing.T) { - ctx := tests.Context(t) newCd := llotypes.ChannelDefinition{ ReportFormat: llotypes.ReportFormat(2), Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, @@ -186,7 +183,6 @@ func Test_Outcome(t *testing.T) { cdc := &mockChannelDefinitionCache{definitions: smallDefinitions} t.Run("aggregates values when all stream values are present from all observers", func(t *testing.T) { - ctx := tests.Context(t) previousOutcome := Outcome{ LifeCycleStage: llotypes.LifeCycleStage("test"), ObservationsTimestampNanoseconds: testStartTS.UnixNano(), @@ -244,6 +240,200 @@ func Test_Outcome(t *testing.T) { }, }, decoded) }) + t.Run("unreportable channels from the previous outcome re-use the same previous ValidAfterSeconds", func(t *testing.T) { + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: int64(102030410 * time.Second), + ChannelDefinitions: nil, // nil channel definitions makes all channels unreportable + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: uint32(102030405), + 2: uint32(102030400), + }, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)), + }, + 2: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)), + }, + 3: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)}, + }, + }, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + obs := Observation{ + UnixTimestampNanoseconds: int64(102030415 * time.Second), + StreamValues: map[llotypes.StreamID]StreamValue{ + 1: ToDecimal(decimal.NewFromInt(int64(120))), + 2: ToDecimal(decimal.NewFromInt(int64(220))), + 3: &Quote{Bid: decimal.NewFromInt(int64(320)), Benchmark: decimal.NewFromInt(int64(330)), Ask: decimal.NewFromInt(int64(340))}, + }, + } + encoded, err2 := p.ObservationCodec.Encode(obs) + require.NoError(t, err2) + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, int64(102030415*time.Second), decoded.ObservationsTimestampNanoseconds) + require.Len(t, decoded.ValidAfterSeconds, 2) + assert.Equal(t, int64(102030405), int64(decoded.ValidAfterSeconds[1])) + assert.Equal(t, int64(102030400), int64(decoded.ValidAfterSeconds[2])) + }) + t.Run("ValidAfterSeconds is set based on the previous observation timestamp such that reports never overlap", func(t *testing.T) { + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: int64(102030410 * time.Second), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: uint32(102030405), + 2: uint32(102030400), + }, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)), + }, + 2: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)), + }, + 3: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)}, + }, + }, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + obs := Observation{ + UnixTimestampNanoseconds: int64(102030415 * time.Second), + StreamValues: map[llotypes.StreamID]StreamValue{ + 1: ToDecimal(decimal.NewFromInt(int64(120))), + 2: ToDecimal(decimal.NewFromInt(int64(220))), + 3: &Quote{Bid: decimal.NewFromInt(int64(320)), Benchmark: decimal.NewFromInt(int64(330)), Ask: decimal.NewFromInt(int64(340))}, + }, + } + encoded, err2 := p.ObservationCodec.Encode(obs) + require.NoError(t, err2) + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, int64(102030415*time.Second), decoded.ObservationsTimestampNanoseconds) + require.Len(t, decoded.ValidAfterSeconds, 2) + assert.Equal(t, int64(102030410), int64(decoded.ValidAfterSeconds[1])) + assert.Equal(t, int64(102030410), int64(decoded.ValidAfterSeconds[2])) + }) + t.Run("does generate outcome for reports that would overlap on a seconds-basis (allows duplicate reports)", func(t *testing.T) { + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: int64(102030410 * time.Second), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: uint32(102030409), + 2: uint32(102030409), + }, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)), + }, + 2: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)), + }, + 3: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)}, + }, + }, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + obs := Observation{ + UnixTimestampNanoseconds: int64((102030410 * time.Second) + 100*time.Millisecond), // 100ms after previous outcome + StreamValues: map[llotypes.StreamID]StreamValue{ + 1: ToDecimal(decimal.NewFromInt(int64(120))), + 2: ToDecimal(decimal.NewFromInt(int64(220))), + 3: &Quote{Bid: decimal.NewFromInt(int64(320)), Benchmark: decimal.NewFromInt(int64(330)), Ask: decimal.NewFromInt(int64(340))}, + }, + } + encoded, err2 := p.ObservationCodec.Encode(obs) + require.NoError(t, err2) + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, int64(102030410*time.Second+100*time.Millisecond), decoded.ObservationsTimestampNanoseconds) + require.Len(t, decoded.ValidAfterSeconds, 2) + assert.Equal(t, int64(102030410), int64(decoded.ValidAfterSeconds[1])) + assert.Equal(t, int64(102030410), int64(decoded.ValidAfterSeconds[2])) + }) + }) + t.Run("if previousOutcome is retired, returns outcome as normal", func(t *testing.T) { + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("retired"), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: uint32(102030409), + 2: uint32(102030409), + }, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + obs := Observation{ + UnixTimestampNanoseconds: int64(102030415 * time.Second), + } + encoded, err2 := p.ObservationCodec.Encode(obs) + require.NoError(t, err2) + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, int64(102030415000000000), decoded.ObservationsTimestampNanoseconds) + require.Len(t, decoded.ValidAfterSeconds, 2) + assert.Equal(t, int64(102030409), int64(decoded.ValidAfterSeconds[1])) + assert.Equal(t, int64(102030409), int64(decoded.ValidAfterSeconds[2])) }) } @@ -329,3 +519,52 @@ func Test_MakeChannelHash(t *testing.T) { assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) }) } + +func Test_Outcome_Methods(t *testing.T) { + t.Run("IsReportable", func(t *testing.T) { + outcome := Outcome{} + cid := llotypes.ChannelID(1) + + // Not reportable if retired + outcome.LifeCycleStage = LifeCycleStageRetired + assert.EqualError(t, outcome.IsReportable(cid), "ChannelID: 1; Reason: IsReportable=false; retired channel") + + // Timestamp overflow + outcome.LifeCycleStage = LifeCycleStageProduction + outcome.ObservationsTimestampNanoseconds = time.Unix(math.MaxInt64, 0).UnixNano() + outcome.ChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{} + assert.EqualError(t, outcome.IsReportable(cid), "ChannelID: 1; Reason: IsReportable=false; invalid observations timestamp; Err: timestamp doesn't fit into uint32: -1") + + // No channel definition with ID + outcome.LifeCycleStage = LifeCycleStageProduction + outcome.ObservationsTimestampNanoseconds = time.Unix(1726670490, 0).UnixNano() + outcome.ChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{} + assert.EqualError(t, outcome.IsReportable(cid), "ChannelID: 1; Reason: IsReportable=false; no channel definition with this ID") + + // No ValidAfterSeconds yet + outcome.ChannelDefinitions[cid] = llotypes.ChannelDefinition{} + assert.EqualError(t, outcome.IsReportable(cid), "ChannelID: 1; Reason: IsReportable=false; no validAfterSeconds entry yet, this must be a new channel") + + // ValidAfterSeconds is in the future + outcome.ValidAfterSeconds = map[llotypes.ChannelID]uint32{cid: uint32(1726670491)} + assert.EqualError(t, outcome.IsReportable(cid), "ChannelID: 1; Reason: IsReportable=false; not valid yet (observationsTimestampSeconds=1726670490 < validAfterSeconds=1726670491)") + }) + t.Run("ReportableChannels", func(t *testing.T) { + outcome := Outcome{ + ObservationsTimestampNanoseconds: time.Unix(1726670490, 0).UnixNano(), + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: {}, + 2: {}, + 3: {}, + }, + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: 1726670489, + 3: 1726670489, + }, + } + reportable, unreportable := outcome.ReportableChannels() + assert.Equal(t, []llotypes.ChannelID{1, 3}, reportable) + require.Len(t, unreportable, 1) + assert.Equal(t, "ChannelID: 2; Reason: IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", unreportable[0].Error()) + }) +} diff --git a/llo/plugin_reports.go b/llo/plugin_reports.go index bdc7e5f..98e09dc 100644 --- a/llo/plugin_reports.go +++ b/llo/plugin_reports.go @@ -2,7 +2,6 @@ package llo import ( "context" - "encoding/json" "fmt" "github.com/smartcontractkit/libocr/offchainreporting2/types" @@ -33,17 +32,20 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types // if we're retired, emit special retirement report to transfer // ValidAfterSeconds part of state to the new protocol instance for a // "gapless" handover - retirementReport := RetirementReport{ - outcome.ValidAfterSeconds, + retirementReport := outcome.GenRetirementReport() + p.Logger.Infow("Emitting retirement report", "lifeCycleStage", outcome.LifeCycleStage, "retirementReport", retirementReport, "stage", "Report", "seqNr", seqNr) + + encoded, err := p.RetirementReportCodec.Encode(retirementReport) + if err != nil { + return nil, fmt.Errorf("error encoding retirement report: %w", err) } rwis = append(rwis, ocr3types.ReportPlus[llotypes.ReportInfo]{ ReportWithInfo: ocr3types.ReportWithInfo[llotypes.ReportInfo]{ - // TODO: Needs retirement report codec - Report: must(json.Marshal(retirementReport)), + Report: encoded, Info: llotypes.ReportInfo{ - LifeCycleStage: outcome.LifeCycleStage, - ReportFormat: llotypes.ReportFormatJSON, + LifeCycleStage: LifeCycleStageRetired, + ReportFormat: llotypes.ReportFormatRetirement, }, }, }) @@ -51,7 +53,7 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types reportableChannels, unreportableChannels := outcome.ReportableChannels() if p.Config.VerboseLogging { - p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "stage", "Report", "seqNr", seqNr) + p.Logger.Debugw("Reportable channels", "lifeCycleStage", outcome.LifeCycleStage, "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "stage", "Report", "seqNr", seqNr) } for _, cid := range reportableChannels { @@ -76,7 +78,7 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types if ctx.Err() != nil { return nil, context.Cause(ctx) } - p.Logger.Warnw("Error encoding report", "reportFormat", cd.ReportFormat, "err", err, "channelID", cid, "stage", "Report", "seqNr", seqNr) + p.Logger.Warnw("Error encoding report", "lifeCycleStage", outcome.LifeCycleStage, "reportFormat", cd.ReportFormat, "err", err, "channelID", cid, "stage", "Report", "seqNr", seqNr) continue } rwis = append(rwis, ocr3types.ReportPlus[llotypes.ReportInfo]{ @@ -91,14 +93,14 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types } if p.Config.VerboseLogging && len(rwis) == 0 { - p.Logger.Debugw("No reports, will not transmit anything", "reportableChannels", reportableChannels, "stage", "Report", "seqNr", seqNr) + p.Logger.Debugw("No reports, will not transmit anything", "lifeCycleStage", outcome.LifeCycleStage, "reportableChannels", reportableChannels, "stage", "Report", "seqNr", seqNr) } return rwis, nil } func (p *Plugin) encodeReport(ctx context.Context, r Report, cd llotypes.ChannelDefinition) (types.Report, error) { - codec, exists := p.Codecs[cd.ReportFormat] + codec, exists := p.ReportCodecs[cd.ReportFormat] if !exists { return nil, fmt.Errorf("codec missing for ReportFormat=%q", cd.ReportFormat) } diff --git a/llo/plugin_reports_test.go b/llo/plugin_reports_test.go index 280a9d0..591ba73 100644 --- a/llo/plugin_reports_test.go +++ b/llo/plugin_reports_test.go @@ -21,9 +21,10 @@ func Test_Reports(t *testing.T) { Config: Config{true}, OutcomeCodec: protoOutcomeCodec{}, Logger: logger.Test(t), - Codecs: map[llotypes.ReportFormat]ReportCodec{ + ReportCodecs: map[llotypes.ReportFormat]ReportCodec{ llotypes.ReportFormatJSON: JSONReportCodec{}, }, + RetirementReportCodec: StandardRetirementReportCodec{}, } t.Run("ignores seqnr=0", func(t *testing.T) { @@ -59,7 +60,7 @@ func Test_Reports(t *testing.T) { rwis, err := p.Reports(ctx, 2, encoded) require.NoError(t, err) require.Len(t, rwis, 1) - assert.Equal(t, llo.ReportInfo{LifeCycleStage: LifeCycleStageRetired, ReportFormat: llotypes.ReportFormatJSON}, rwis[0].ReportWithInfo.Info) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: LifeCycleStageRetired, ReportFormat: llotypes.ReportFormatRetirement}, rwis[0].ReportWithInfo.Info) assert.Equal(t, "{\"ValidAfterSeconds\":null}", string(rwis[0].ReportWithInfo.Report)) }) }) @@ -234,4 +235,39 @@ func Test_Reports(t *testing.T) { assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":2,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 8.8, Benchmark: 7.7, Ask: 6.6}"}],"Specimen":false}`, string(rwis[1].ReportWithInfo.Report)) assert.Equal(t, llo.ReportInfo{LifeCycleStage: "production", ReportFormat: llotypes.ReportFormatJSON}, rwis[1].ReportWithInfo.Info) }) + t.Run("does not produce reports with overlapping timestamps (where IsReportable returns false)", func(t *testing.T) { + ctx := tests.Context(t) + outcome := Outcome{ + LifeCycleStage: LifeCycleStageProduction, + ObservationsTimestampNanoseconds: int64(200 * time.Second), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: 200, + 2: 100, + }, + ChannelDefinitions: smallDefinitions, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + 4: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(6.6), Benchmark: decimal.NewFromFloat(7.7), Bid: decimal.NewFromFloat(8.8)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(ctx, 2, encoded) + require.NoError(t, err) + + // Only second channel is reported because first channel is not valid yet + require.Len(t, rwis, 1) + assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":2,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 8.8, Benchmark: 7.7, Ask: 6.6}"}],"Specimen":false}`, string(rwis[0].ReportWithInfo.Report)) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: "production", ReportFormat: llotypes.ReportFormatJSON}, rwis[0].ReportWithInfo.Info) + }) } diff --git a/llo/plugin_test.go b/llo/plugin_test.go index efe552b..a87929b 100644 --- a/llo/plugin_test.go +++ b/llo/plugin_test.go @@ -18,7 +18,7 @@ type mockShouldRetireCache struct { err error } -func (m *mockShouldRetireCache) ShouldRetire() (bool, error) { +func (m *mockShouldRetireCache) ShouldRetire(types.ConfigDigest) (bool, error) { return m.shouldRetire, m.err } diff --git a/llo/predecessor_retirement_report_cache.go b/llo/predecessor_retirement_report_cache.go deleted file mode 100644 index ee305fe..0000000 --- a/llo/predecessor_retirement_report_cache.go +++ /dev/null @@ -1,27 +0,0 @@ -package llo - -import ( - "github.com/smartcontractkit/libocr/offchainreporting2/types" -) - -var _ PredecessorRetirementReportCache = &predecessorRetirementReportCache{} - -type predecessorRetirementReportCache struct{} - -// TODO: This ought to be DB-persisted -// https://smartcontract-it.atlassian.net/browse/MERC-3386 -func NewPredecessorRetirementReportCache() PredecessorRetirementReportCache { - return newPredecessorRetirementReportCache() -} - -func newPredecessorRetirementReportCache() *predecessorRetirementReportCache { - return &predecessorRetirementReportCache{} -} - -func (c *predecessorRetirementReportCache) AttestedRetirementReport(predecessorConfigDigest types.ConfigDigest) ([]byte, error) { - panic("TODO") -} - -func (c *predecessorRetirementReportCache) CheckAttestedRetirementReport(predecessorConfigDigest types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) { - panic("TODO") -} diff --git a/llo/predecessor_retirement_report_cache_test.go b/llo/predecessor_retirement_report_cache_test.go deleted file mode 100644 index c02f84b..0000000 --- a/llo/predecessor_retirement_report_cache_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package llo - -import "testing" - -func Test_PredecessorRetirementReportCache(t *testing.T) { - t.Skip("https://smartcontract-it.atlassian.net/browse/MERC-4414") -} diff --git a/llo/retirement_report_codec.go b/llo/retirement_report_codec.go new file mode 100644 index 0000000..0875ea1 --- /dev/null +++ b/llo/retirement_report_codec.go @@ -0,0 +1,22 @@ +package llo + +import "encoding/json" + +type RetirementReportCodec interface { + Encode(RetirementReport) ([]byte, error) + Decode([]byte) (RetirementReport, error) +} + +var _ RetirementReportCodec = StandardRetirementReportCodec{} + +type StandardRetirementReportCodec struct{} + +func (r StandardRetirementReportCodec) Encode(report RetirementReport) ([]byte, error) { + return json.Marshal(report) +} + +func (r StandardRetirementReportCodec) Decode(data []byte) (RetirementReport, error) { + var report RetirementReport + err := json.Unmarshal(data, &report) + return report, err +} diff --git a/llo/retirement_report_codec_test.go b/llo/retirement_report_codec_test.go new file mode 100644 index 0000000..d7cb187 --- /dev/null +++ b/llo/retirement_report_codec_test.go @@ -0,0 +1,7 @@ +package llo + +import "testing" + +func Test_RetirementReportCodec(t *testing.T) { + t.Skip("TODO - MERC-3524") +} diff --git a/llo/should_retire_cache.go b/llo/should_retire_cache.go deleted file mode 100644 index 76ccd67..0000000 --- a/llo/should_retire_cache.go +++ /dev/null @@ -1,19 +0,0 @@ -package llo - -var _ ShouldRetireCache = &shouldRetireCache{} - -type shouldRetireCache struct{} - -// TODO: https://smartcontract-it.atlassian.net/browse/MERC-3386 -func NewShouldRetireCache() ShouldRetireCache { - return newShouldRetireCache() -} - -func newShouldRetireCache() *shouldRetireCache { - return &shouldRetireCache{} -} - -func (c *shouldRetireCache) ShouldRetire() (bool, error) { - // TODO - return false, nil -} diff --git a/llo/should_retire_cache_test.go b/llo/should_retire_cache_test.go deleted file mode 100644 index eafb4d1..0000000 --- a/llo/should_retire_cache_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package llo - -import "testing" - -func Test_ShouldRetireCache(t *testing.T) { - t.Skip("https://smartcontract-it.atlassian.net/browse/MERC-4414") -}