From 65700b25d27d00ae48beaa900804b96660b7b105 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 2 Dec 2024 14:44:07 -0500 Subject: [PATCH] Support flexible schemas for LLO --- core/services/llo/codecs.go | 1 + core/services/llo/evm/fees.go | 3 +- core/services/llo/evm/fees_test.go | 23 +- .../report_codec_evm_abi_encode_unpacked.go | 276 +++++++ ...port_codec_evm_abi_encode_unpacked_test.go | 680 ++++++++++++++++++ core/services/llo/keyring.go | 4 +- .../services/llo/mercurytransmitter/server.go | 18 +- core/services/ocr2/delegate.go | 2 +- .../services/ocr2/plugins/llo/helpers_test.go | 52 +- .../ocr2/plugins/llo/integration_test.go | 518 ++++++++++++- core/services/streams/delegate.go | 6 +- go.mod | 4 + go.sum | 4 - 13 files changed, 1562 insertions(+), 29 deletions(-) create mode 100644 core/services/llo/evm/report_codec_evm_abi_encode_unpacked.go create mode 100644 core/services/llo/evm/report_codec_evm_abi_encode_unpacked_test.go diff --git a/core/services/llo/codecs.go b/core/services/llo/codecs.go index f9c5b7b3380..050a1945873 100644 --- a/core/services/llo/codecs.go +++ b/core/services/llo/codecs.go @@ -14,6 +14,7 @@ func NewReportCodecs(lggr logger.Logger, donID uint32) map[llotypes.ReportFormat codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{} codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr, donID) + codecs[llotypes.ReportFormatEVMABIEncodeUnpacked] = evm.NewReportCodecEVMABIEncodeUnpacked(lggr, donID) return codecs } diff --git a/core/services/llo/evm/fees.go b/core/services/llo/evm/fees.go index b74d68b08d2..a6ff7a31178 100644 --- a/core/services/llo/evm/fees.go +++ b/core/services/llo/evm/fees.go @@ -16,8 +16,9 @@ const Precision int32 = 18 // CalculateFee outputs a fee in wei according to the formula: baseUSDFee / tokenPriceInUSD func CalculateFee(tokenPriceInUSD decimal.Decimal, baseUSDFee decimal.Decimal) *big.Int { - if tokenPriceInUSD.IsZero() || baseUSDFee.IsZero() { + if baseUSDFee.IsZero() || baseUSDFee.IsNegative() || tokenPriceInUSD.IsZero() || tokenPriceInUSD.IsNegative() { // zero fee if token price or base fee is zero + // if either fee should somehow be negative, also, return zero return big.NewInt(0) } diff --git a/core/services/llo/evm/fees_test.go b/core/services/llo/evm/fees_test.go index 33888de14ec..4f3fedbaedc 100644 --- a/core/services/llo/evm/fees_test.go +++ b/core/services/llo/evm/fees_test.go @@ -38,8 +38,27 @@ func Test_Fees(t *testing.T) { t.Run("with base fee == 0", func(t *testing.T) { tokenPriceInUSD := decimal.NewFromInt32(123) - BaseUSDFee = decimal.NewFromInt32(0) - fee := CalculateFee(tokenPriceInUSD, BaseUSDFee) + baseUSDFee := decimal.NewFromInt32(0) + fee := CalculateFee(tokenPriceInUSD, baseUSDFee) + assert.Equal(t, big.NewInt(0), fee) + }) + + t.Run("negative fee rounds up to zero", func(t *testing.T) { + tokenPriceInUSD := decimal.NewFromInt32(-123) + baseUSDFee := decimal.NewFromInt32(1) + fee := CalculateFee(tokenPriceInUSD, baseUSDFee) + assert.Equal(t, big.NewInt(0), fee) + + tokenPriceInUSD = decimal.NewFromInt32(123) + baseUSDFee = decimal.NewFromInt32(-1) + fee = CalculateFee(tokenPriceInUSD, baseUSDFee) + assert.Equal(t, big.NewInt(0), fee) + + // Multiple negative values also return a zero fee since negative + // prices are always nonsensical + tokenPriceInUSD = decimal.NewFromInt32(-123) + baseUSDFee = decimal.NewFromInt32(-1) + fee = CalculateFee(tokenPriceInUSD, baseUSDFee) assert.Equal(t, big.NewInt(0), fee) }) diff --git a/core/services/llo/evm/report_codec_evm_abi_encode_unpacked.go b/core/services/llo/evm/report_codec_evm_abi_encode_unpacked.go new file mode 100644 index 00000000000..d9ef6791659 --- /dev/null +++ b/core/services/llo/evm/report_codec_evm_abi_encode_unpacked.go @@ -0,0 +1,276 @@ +package evm + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/shopspring/decimal" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink-data-streams/llo" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" +) + +var ( + _ llo.ReportCodec = ReportCodecEVMABIEncodeUnpacked{} + + zero = big.NewInt(0) +) + +type ReportCodecEVMABIEncodeUnpacked struct { + logger.Logger + donID uint32 +} + +func NewReportCodecEVMABIEncodeUnpacked(lggr logger.Logger, donID uint32) ReportCodecEVMABIEncodeUnpacked { + return ReportCodecEVMABIEncodeUnpacked{logger.Sugared(lggr).Named("ReportCodecEVMABIEncodeUnpacked"), donID} +} + +// Opts format remains unchanged +type ReportFormatEVMABIEncodeOpts struct { + // BaseUSDFee is the cost on-chain of verifying a report + BaseUSDFee decimal.Decimal `json:"baseUSDFee"` + // Expiration window is the length of time in seconds the report is valid + // for, from the observation timestamp + ExpirationWindow uint32 `json:"expirationWindow"` + // FeedID is for compatibility with existing on-chain verifiers + FeedID common.Hash `json:"feedID"` + // ABI defines the encoding of the payload. Each element maps to exactly + // one stream (although sub-arrays may be specified for streams that + // produce a composite data type). + // + // EXAMPLE + // + // [{"streamID":123,"multiplier":"10000","type":"uint192"}, ...] + // + // See definition of ABIEncoder struct for more details. + // + // The total number of streams must be 2+n, where n is the number of + // top-level elements in this ABI array (stream 0 is always the native + // token price and stream 1 is the link token price). + ABI []ABIEncoder `json:"abi"` +} + +func (r *ReportFormatEVMABIEncodeOpts) Decode(opts []byte) error { + return json.Unmarshal(opts, r) +} + +func (r *ReportFormatEVMABIEncodeOpts) Encode() ([]byte, error) { + return json.Marshal(r) +} + +type EVMBaseReportFields struct { + FeedID common.Hash + ValidFromTimestamp uint32 + Timestamp uint32 + NativeFee *big.Int + LinkFee *big.Int + ExpiresAt uint32 +} + +// TODO: Add VerifyOpts public function and add to interface for chainlink-data-streams? +// Or just Verify(channelDefinitions) ? to handle e.g. unique feed IDs + +func (r ReportCodecEVMABIEncodeUnpacked) Encode(ctx context.Context, report llo.Report, cd llotypes.ChannelDefinition) ([]byte, error) { + if report.Specimen { + return nil, errors.New("ReportCodecEVMABIEncodeUnpacked does not support encoding specimen reports") + } + if len(report.Values) < 2 { + return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpacked requires at least 2 values (NativePrice, LinkPrice, ...); got report.Values: %v", report.Values) + } + nativePrice, err := extractPrice(report.Values[0]) + if err != nil { + return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpacked failed to extract native price: %w", err) + } + linkPrice, err := extractPrice(report.Values[1]) + if err != nil { + return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpacked failed to extract link price: %w", err) + } + + // NOTE: It seems suboptimal to have to parse the opts on every encode but + // not sure how to avoid it. Should be negligible performance hit as long + // as Opts is small. + opts := ReportFormatEVMABIEncodeOpts{} + if err := (&opts).Decode(cd.Opts); err != nil { + return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", cd.Opts, err) + } + + rf := EVMBaseReportFields{ + FeedID: opts.FeedID, + ValidFromTimestamp: report.ValidAfterSeconds + 1, + Timestamp: report.ObservationTimestampSeconds, + NativeFee: CalculateFee(nativePrice, opts.BaseUSDFee), + LinkFee: CalculateFee(linkPrice, opts.BaseUSDFee), + ExpiresAt: report.ObservationTimestampSeconds + opts.ExpirationWindow, + } + + // TODO: Enable with verbose logging? + // r.Logger.Debugw("Encoding report", "report", report, "opts", opts, "nativePrice", nativePrice, "linkPrice", linkPrice, "quote", quote, "multiplier", multiplier, "rf", rf) + + header, err := r.buildHeader(ctx, rf) + if err != nil { + return nil, fmt.Errorf("failed to build base report; %w", err) + } + + payload, err := r.buildPayload(ctx, opts.ABI, report.Values[2:]) + if err != nil { + return nil, fmt.Errorf("failed to build payload; %w", err) + } + + return append(header, payload...), nil +} + +// BaseSchema represents the fixed base schema that remains unchanged for all +// EVMABIEncodeUnpacked reports. +// +// An arbitrary payload will be appended to this. +var BaseSchema = getBaseSchema() + +func getBaseSchema() abi.Arguments { + mustNewType := func(t string) abi.Type { + result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{}) + if err != nil { + panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err)) + } + return result + } + return abi.Arguments([]abi.Argument{ + {Name: "feedId", Type: mustNewType("bytes32")}, + {Name: "validFromTimestamp", Type: mustNewType("uint32")}, + {Name: "observationsTimestamp", Type: mustNewType("uint32")}, + {Name: "nativeFee", Type: mustNewType("uint192")}, + {Name: "linkFee", Type: mustNewType("uint192")}, + {Name: "expiresAt", Type: mustNewType("uint32")}, + }) +} + +func (r ReportCodecEVMABIEncodeUnpacked) buildHeader(ctx context.Context, rf EVMBaseReportFields) ([]byte, error) { + var merr error + if rf.LinkFee == nil { + merr = errors.Join(merr, errors.New("linkFee may not be nil")) + } else if rf.LinkFee.Cmp(zero) < 0 { + merr = errors.Join(merr, fmt.Errorf("linkFee may not be negative (got: %s)", rf.LinkFee)) + } + if rf.NativeFee == nil { + merr = errors.Join(merr, errors.New("nativeFee may not be nil")) + } else if rf.NativeFee.Cmp(zero) < 0 { + merr = errors.Join(merr, fmt.Errorf("nativeFee may not be negative (got: %s)", rf.NativeFee)) + } + if merr != nil { + return nil, merr + } + b, err := BaseSchema.Pack(rf.FeedID, rf.ValidFromTimestamp, rf.Timestamp, rf.NativeFee, rf.LinkFee, rf.ExpiresAt) + if err != nil { + return nil, fmt.Errorf("failed to pack base report blob; %w", err) + } + return b, nil +} + +func (r ReportCodecEVMABIEncodeUnpacked) buildPayload(ctx context.Context, encoders []ABIEncoder, values []llo.StreamValue) (payload []byte, merr error) { + if len(encoders) != len(values) { + return nil, fmt.Errorf("ABI and values length mismatch; ABI: %d, Values: %d", len(encoders), len(values)) + } + + for i, encoder := range encoders { + b, err := encoder.Encode(values[i]) + if err != nil { + var vStr []byte + if values[i] == nil { + vStr = []byte("") + } else { + var marshalErr error + vStr, marshalErr = values[i].MarshalText() + if marshalErr != nil { + vStr = []byte(fmt.Sprintf("%v(failed to marshal: %s)", values[i], marshalErr)) + } + } + merr = errors.Join(merr, fmt.Errorf("failed to encode stream value %s at index %d with abi %q; %w", string(vStr), i, encoder.Type, err)) + continue + } + payload = append(payload, b...) + } + + return payload, merr +} + +// An ABIEncoder encodes exactly one stream value into a byte slice +type ABIEncoder struct { + // StreamID is the ID of the stream that this encoder is responsible for. + // MANDATORY + StreamID llotypes.StreamID `json:"streamID"` + // Type is the ABI type of the stream value. E.g. "uint192", "int256", "bool", "string" etc. + // MANDATORY + Type string `json:"type"` + // Multiplier, if provided, will be multiplied with the stream value before + // encoding. + // OPTIONAL + Multiplier *ubig.Big `json:"multiplier"` +} + +// getNormalizedMultiplier returns the multiplier as a decimal.Decimal, defaulting +// to 1 if the multiplier is nil. +// TODO: Verify its not negative +func (a ABIEncoder) getNormalizedMultiplier() (multiplier decimal.Decimal) { + if a.Multiplier == nil { + multiplier = decimal.NewFromInt(1) + } else { + multiplier = decimal.NewFromBigInt(a.Multiplier.ToInt(), 0) + } + return +} + +func (a ABIEncoder) applyMultiplier(d decimal.Decimal) *big.Int { + return d.Mul(a.getNormalizedMultiplier()).BigInt() +} + +func (a ABIEncoder) Encode(value llo.StreamValue) ([]byte, error) { + switch sv := value.(type) { + case *llo.Decimal: + if sv == nil { + return nil, fmt.Errorf("expected non-nil *Decimal; got: %v", sv) + } + return packBigInt(a.applyMultiplier(sv.Decimal()), a.Type) + default: + return nil, fmt.Errorf("unhandled type; supported types are: *llo.Decimal; got: %T", value) + } +} + +// TODO: How to handle overflow? +// TODO: Use ryan's generic evm encoding library + +// packBigInt encodes a *big.Int as a byte slice according to the given ABI type +func packBigInt(val *big.Int, t string) (b []byte, err error) { + abiType, err := abi.NewType(t, "", []abi.ArgumentMarshaling{}) + if err != nil { + return nil, fmt.Errorf("invalid ABI type %q; %w", abiType, err) + } + + // Pack the value using ABI type + arguments := abi.Arguments{ + { + Type: abiType, + }, + } + + switch t { + case "uint32": + // packing uint32 expects uint32 as argument + if val.BitLen() > 32 { + return nil, fmt.Errorf("value %v is too large for uint32", val) + } + b, err = arguments.Pack(uint32(val.Uint64())) + default: + b, err = arguments.Pack(val) + } + if err != nil { + return nil, fmt.Errorf("failed to pack value %v as %q: %w", val, t, err) + } + + return b, nil +} diff --git a/core/services/llo/evm/report_codec_evm_abi_encode_unpacked_test.go b/core/services/llo/evm/report_codec_evm_abi_encode_unpacked_test.go new file mode 100644 index 00000000000..368ea22a96e --- /dev/null +++ b/core/services/llo/evm/report_codec_evm_abi_encode_unpacked_test.go @@ -0,0 +1,680 @@ +package evm + +import ( + "fmt" + "math/big" + "math/rand/v2" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-data-streams/llo" +) + +// AllTrue returns false if any element in the array is false. +func AllTrue(arr []bool) bool { + for _, v := range arr { + if !v { + return false + } + } + return true +} + +func TestReportFormatEVMABIEncodeOpts_Decode_Encode_properties(t *testing.T) { + properties := gopter.NewProperties(nil) + + runTest := func(opts ReportFormatEVMABIEncodeOpts) bool { + encoded, err := opts.Encode() + require.NoError(t, err) + + decoded := ReportFormatEVMABIEncodeOpts{} + err = decoded.Decode(encoded) + require.NoError(t, err) + + return decoded.BaseUSDFee.Equal(opts.BaseUSDFee) && decoded.ExpirationWindow == opts.ExpirationWindow && decoded.FeedID == opts.FeedID && assert.Equal(t, opts.ABI, decoded.ABI) + } + properties.Property("Encodes values", prop.ForAll( + runTest, + gen.StrictStruct(reflect.TypeOf(&ReportFormatEVMABIEncodeOpts{}), map[string]gopter.Gen{ + "BaseUSDFee": genBaseUSDFee(), + "ExpirationWindow": genExpirationWindow(), + "FeedID": genFeedID(), + "ABI": genABI(), + }))) + + properties.TestingRun(t) +} + +func genABI() gopter.Gen { + return gen.SliceOf(genABIEncoder()) +} + +func genABIEncoder() gopter.Gen { + return gen.StrictStruct(reflect.TypeOf(&ABIEncoder{}), map[string]gopter.Gen{ + "StreamID": gen.UInt32().Map(func(i uint32) llotypes.StreamID { return llotypes.StreamID(i) }), + "Multiplier": genMultiplier(), + "Type": gen.AnyString(), + }) +} + +func TestReportCodecEVMABIEncodeUnpacked_Encode_properties(t *testing.T) { + ctx := tests.Context(t) + codec := ReportCodecEVMABIEncodeUnpacked{} + + properties := gopter.NewProperties(nil) + + linkQuoteStreamID := llotypes.StreamID(rand.Uint32()) + ethQuoteStreamID := llotypes.StreamID(rand.Uint32()) + dexBasedAssetDecimalStreamID := llotypes.StreamID(rand.Uint32()) + benchmarkPriceStreamID := llotypes.StreamID(rand.Uint32()) + baseMarketDepthStreamID := llotypes.StreamID(rand.Uint32()) + quoteMarketDepthStreamID := llotypes.StreamID(rand.Uint32()) + marketStatusStreamID := llotypes.StreamID(rand.Uint32()) + binanceFundingRateStreamID := llotypes.StreamID(rand.Uint32()) + binanceFundingTimeStreamID := llotypes.StreamID(rand.Uint32()) + binanceFundingIntervalHoursStreamID := llotypes.StreamID(rand.Uint32()) + deribitFundingRateStreamID := llotypes.StreamID(rand.Uint32()) + deribitFundingTimeStreamID := llotypes.StreamID(rand.Uint32()) + deribitFundingIntervalHoursStreamID := llotypes.StreamID(rand.Uint32()) + + t.Run("DEX-based asset schema example", func(t *testing.T) { + expectedDEXBasedAssetSchema := abi.Arguments([]abi.Argument{ + {Name: "feedId", Type: mustNewABIType("bytes32")}, + {Name: "validFromTimestamp", Type: mustNewABIType("uint32")}, + {Name: "observationsTimestamp", Type: mustNewABIType("uint32")}, + {Name: "nativeFee", Type: mustNewABIType("uint192")}, + {Name: "linkFee", Type: mustNewABIType("uint192")}, + {Name: "expiresAt", Type: mustNewABIType("uint32")}, + {Name: "price", Type: mustNewABIType("int192")}, + {Name: "baseMarketDepth", Type: mustNewABIType("int192")}, + {Name: "quoteMarketDepth", Type: mustNewABIType("int192")}, + }) + runTest := func(sampleFeedID common.Hash, sampleObservationsTimestamp, sampleValidAfterSeconds, sampleExpirationWindow uint32, priceMultiplier, marketDepthMultiplier *ubig.Big, sampleBaseUSDFee, sampleLinkBenchmarkPrice, sampleNativeBenchmarkPrice, sampleDexBasedAssetPrice, sampleBaseMarketDepth, sampleQuoteMarketDepth decimal.Decimal) bool { + report := llo.Report{ + ConfigDigest: types.ConfigDigest{0x01}, + SeqNr: 0x02, + ChannelID: llotypes.ChannelID(0x03), + ValidAfterSeconds: sampleValidAfterSeconds, + ObservationTimestampSeconds: sampleObservationsTimestamp, + Values: []llo.StreamValue{ + &llo.Quote{Bid: decimal.NewFromFloat(6.1), Benchmark: sampleLinkBenchmarkPrice, Ask: decimal.NewFromFloat(8.2332)}, // Link price + &llo.Quote{Bid: decimal.NewFromFloat(9.4), Benchmark: sampleNativeBenchmarkPrice, Ask: decimal.NewFromFloat(11.33)}, // Native price + llo.ToDecimal(sampleDexBasedAssetPrice), // DEX-based asset price + llo.ToDecimal(sampleBaseMarketDepth), // Base market depth + llo.ToDecimal(sampleQuoteMarketDepth), // Quote market depth + }, + Specimen: false, + } + + opts := ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: sampleBaseUSDFee, + ExpirationWindow: sampleExpirationWindow, + FeedID: sampleFeedID, + ABI: []ABIEncoder{ + // benchmark price + ABIEncoder{ + StreamID: dexBasedAssetDecimalStreamID, + Type: "int192", + Multiplier: priceMultiplier, // TODO: Default multiplier? + }, + // base market depth + ABIEncoder{ + StreamID: baseMarketDepthStreamID, + Type: "int192", + Multiplier: marketDepthMultiplier, + }, + // quote market depth + ABIEncoder{ + StreamID: quoteMarketDepthStreamID, + Type: "int192", + Multiplier: marketDepthMultiplier, + }, + }, + } + serializedOpts, err := opts.Encode() + require.NoError(t, err) + + cd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: linkQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: ethQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: dexBasedAssetDecimalStreamID, + Aggregator: llotypes.AggregatorQuote, + }, + { + StreamID: baseMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: quoteMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: serializedOpts, + } + + encoded, err := codec.Encode(ctx, report, cd) + require.NoError(t, err) + + values, err := expectedDEXBasedAssetSchema.Unpack(encoded) + require.NoError(t, err) + + require.Len(t, values, len(expectedDEXBasedAssetSchema)) + + expectedLinkFee := CalculateFee(sampleLinkBenchmarkPrice, sampleBaseUSDFee) + expectedNativeFee := CalculateFee(sampleNativeBenchmarkPrice, sampleBaseUSDFee) + + // doesn't crash if values are nil + for i := range report.Values { + report.Values[i] = nil + } + _, err = codec.Encode(ctx, report, cd) + require.Error(t, err) + + return AllTrue([]bool{ + assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), // feedId + assert.Equal(t, uint32(sampleValidAfterSeconds+1), values[1].(uint32)), // validFromTimestamp + assert.Equal(t, sampleObservationsTimestamp, values[2].(uint32)), // observationsTimestamp + assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee + assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee + assert.Equal(t, uint32(sampleObservationsTimestamp+sampleExpirationWindow), values[5].(uint32)), // expiresAt + assert.Equal(t, sampleDexBasedAssetPrice.Mul(decimal.NewFromBigInt(priceMultiplier.ToInt(), 0)).BigInt(), values[6].(*big.Int)), // price + assert.Equal(t, sampleBaseMarketDepth.Mul(decimal.NewFromBigInt(marketDepthMultiplier.ToInt(), 0)).BigInt(), values[7].(*big.Int)), // baseMarketDepth + assert.Equal(t, sampleQuoteMarketDepth.Mul(decimal.NewFromBigInt(marketDepthMultiplier.ToInt(), 0)).BigInt(), values[8].(*big.Int)), // quoteMarketDepth + }) + } + + properties.Property("Encodes values", prop.ForAll( + runTest, + genFeedID(), + genObservationsTimestamp(), + genValidAfterSeconds(), + genExpirationWindow(), + genPriceMultiplier(), + genMarketDepthMultiplier(), + genBaseUSDFee(), + genLinkBenchmarkPrice(), + genNativeBenchmarkPrice(), + genBenchmarkPrice(), + genBaseMarketDepth(), + genQuoteMarketDepth(), + )) + + properties.TestingRun(t) + }) + + t.Run("Market status schema", func(t *testing.T) { + expectedRWASchema := abi.Arguments([]abi.Argument{ + {Name: "feedId", Type: mustNewABIType("bytes32")}, + {Name: "validFromTimestamp", Type: mustNewABIType("uint32")}, + {Name: "observationsTimestamp", Type: mustNewABIType("uint32")}, + {Name: "nativeFee", Type: mustNewABIType("uint192")}, + {Name: "linkFee", Type: mustNewABIType("uint192")}, + {Name: "expiresAt", Type: mustNewABIType("uint32")}, + {Name: "marketStatus", Type: mustNewABIType("uint32")}, + }) + + runTest := func(sampleFeedID common.Hash, sampleObservationsTimestamp, sampleValidAfterSeconds, sampleExpirationWindow uint32, sampleBaseUSDFee, sampleLinkBenchmarkPrice, sampleNativeBenchmarkPrice, sampleMarketStatus decimal.Decimal) bool { + report := llo.Report{ + ConfigDigest: types.ConfigDigest{0x01}, + SeqNr: 0x02, + ChannelID: llotypes.ChannelID(0x03), + ValidAfterSeconds: sampleValidAfterSeconds, + ObservationTimestampSeconds: sampleObservationsTimestamp, + Values: []llo.StreamValue{ + &llo.Quote{Bid: decimal.NewFromFloat(6.1), Benchmark: sampleLinkBenchmarkPrice, Ask: decimal.NewFromFloat(8.2332)}, // Link price + &llo.Quote{Bid: decimal.NewFromFloat(9.4), Benchmark: sampleNativeBenchmarkPrice, Ask: decimal.NewFromFloat(11.33)}, // Native price + llo.ToDecimal(sampleMarketStatus), // DEX-based asset price + }, + Specimen: false, + } + + opts := ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: sampleBaseUSDFee, + ExpirationWindow: sampleExpirationWindow, + FeedID: sampleFeedID, + ABI: []ABIEncoder{ + // market status + ABIEncoder{ + StreamID: marketStatusStreamID, + Type: "uint32", + }, + }, + } + serializedOpts, err := opts.Encode() + require.NoError(t, err) + + cd := llotypes.ChannelDefinition{ + // ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + ReportFormat: llotypes.ReportFormat(4), // FIXME: When chainlink-common is fixed + Streams: []llotypes.Stream{ + { + StreamID: linkQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: ethQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: dexBasedAssetDecimalStreamID, + Aggregator: llotypes.AggregatorQuote, + }, + { + StreamID: baseMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: quoteMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: serializedOpts, + } + + encoded, err := codec.Encode(ctx, report, cd) + require.NoError(t, err) + + values, err := expectedRWASchema.Unpack(encoded) + require.NoError(t, err) + + require.Len(t, values, len(expectedRWASchema)) + + expectedLinkFee := CalculateFee(sampleLinkBenchmarkPrice, sampleBaseUSDFee) + expectedNativeFee := CalculateFee(sampleNativeBenchmarkPrice, sampleBaseUSDFee) + + // doesn't crash if values are nil + for i := range report.Values { + report.Values[i] = nil + } + _, err = codec.Encode(ctx, report, cd) + require.Error(t, err) + + return AllTrue([]bool{ + assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), // feedId + assert.Equal(t, uint32(sampleValidAfterSeconds+1), values[1].(uint32)), // validFromTimestamp + assert.Equal(t, sampleObservationsTimestamp, values[2].(uint32)), // observationsTimestamp + assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee + assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee + assert.Equal(t, uint32(sampleObservationsTimestamp+sampleExpirationWindow), values[5].(uint32)), // expiresAt + assert.Equal(t, uint32(sampleMarketStatus.BigInt().Int64()), values[6].(uint32)), // market status + }) + } + + properties.Property("Encodes values", prop.ForAll( + runTest, + genFeedID(), + genObservationsTimestamp(), + genValidAfterSeconds(), + genExpirationWindow(), + genBaseUSDFee(), + genLinkBenchmarkPrice(), + genNativeBenchmarkPrice(), + genMarketStatus(), + )) + + properties.TestingRun(t) + }) + + t.Run("benchmark price schema example", func(t *testing.T) { + expectedDEXBasedAssetSchema := abi.Arguments([]abi.Argument{ + {Name: "feedId", Type: mustNewABIType("bytes32")}, + {Name: "validFromTimestamp", Type: mustNewABIType("uint32")}, + {Name: "observationsTimestamp", Type: mustNewABIType("uint32")}, + {Name: "nativeFee", Type: mustNewABIType("uint192")}, + {Name: "linkFee", Type: mustNewABIType("uint192")}, + {Name: "expiresAt", Type: mustNewABIType("uint32")}, + {Name: "price", Type: mustNewABIType("int192")}, + }) + runTest := func(sampleFeedID common.Hash, sampleObservationsTimestamp, sampleValidAfterSeconds, sampleExpirationWindow uint32, priceMultiplier, marketDepthMultiplier *ubig.Big, sampleBaseUSDFee, sampleLinkBenchmarkPrice, sampleNativeBenchmarkPrice, sampleBenchmarkPrice decimal.Decimal) bool { + report := llo.Report{ + ConfigDigest: types.ConfigDigest{0x01}, + SeqNr: 0x02, + ChannelID: llotypes.ChannelID(0x03), + ValidAfterSeconds: sampleValidAfterSeconds, + ObservationTimestampSeconds: sampleObservationsTimestamp, + Values: []llo.StreamValue{ + &llo.Quote{Bid: decimal.NewFromFloat(6.1), Benchmark: sampleLinkBenchmarkPrice, Ask: decimal.NewFromFloat(8.2332)}, // Link price + &llo.Quote{Bid: decimal.NewFromFloat(9.4), Benchmark: sampleNativeBenchmarkPrice, Ask: decimal.NewFromFloat(11.33)}, // Native price + llo.ToDecimal(sampleBenchmarkPrice), // price + }, + Specimen: false, + } + + opts := ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: sampleBaseUSDFee, + ExpirationWindow: sampleExpirationWindow, + FeedID: sampleFeedID, + ABI: []ABIEncoder{ + // benchmark price + ABIEncoder{ + StreamID: dexBasedAssetDecimalStreamID, + Type: "int192", + Multiplier: priceMultiplier, // TODO: Default multiplier? + }, + }, + } + serializedOpts, err := opts.Encode() + require.NoError(t, err) + + cd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: linkQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: ethQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: benchmarkPriceStreamID, + Aggregator: llotypes.AggregatorQuote, + }, + }, + Opts: serializedOpts, + } + + encoded, err := codec.Encode(ctx, report, cd) + require.NoError(t, err) + + values, err := expectedDEXBasedAssetSchema.Unpack(encoded) + require.NoError(t, err) + + require.Len(t, values, len(expectedDEXBasedAssetSchema)) + + expectedLinkFee := CalculateFee(sampleLinkBenchmarkPrice, sampleBaseUSDFee) + expectedNativeFee := CalculateFee(sampleNativeBenchmarkPrice, sampleBaseUSDFee) + + // doesn't crash if values are nil + for i := range report.Values { + report.Values[i] = nil + } + _, err = codec.Encode(ctx, report, cd) + require.Error(t, err) + + return AllTrue([]bool{ + assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), // feedId + assert.Equal(t, uint32(sampleValidAfterSeconds+1), values[1].(uint32)), // validFromTimestamp + assert.Equal(t, sampleObservationsTimestamp, values[2].(uint32)), // observationsTimestamp + assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee + assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee + assert.Equal(t, uint32(sampleObservationsTimestamp+sampleExpirationWindow), values[5].(uint32)), // expiresAt + assert.Equal(t, sampleBenchmarkPrice.Mul(decimal.NewFromBigInt(priceMultiplier.ToInt(), 0)).BigInt(), values[6].(*big.Int)), // price + }) + } + + properties.Property("Encodes values", prop.ForAll( + runTest, + genFeedID(), + genObservationsTimestamp(), + genValidAfterSeconds(), + genExpirationWindow(), + genPriceMultiplier(), + genMarketDepthMultiplier(), + genBaseUSDFee(), + genLinkBenchmarkPrice(), + genNativeBenchmarkPrice(), + genBenchmarkPrice(), + )) + + properties.TestingRun(t) + }) + + t.Run("funding rate schema example", func(t *testing.T) { + expectedFundingRateSchema := abi.Arguments([]abi.Argument{ + {Name: "feedId", Type: mustNewABIType("bytes32")}, + {Name: "validFromTimestamp", Type: mustNewABIType("uint32")}, + {Name: "observationsTimestamp", Type: mustNewABIType("uint32")}, + {Name: "nativeFee", Type: mustNewABIType("uint192")}, + {Name: "linkFee", Type: mustNewABIType("uint192")}, + {Name: "expiresAt", Type: mustNewABIType("uint32")}, + {Name: "binanceFundingRate", Type: mustNewABIType("int192")}, + {Name: "binanceFundingTime", Type: mustNewABIType("uint32")}, + {Name: "binanceFundingIntervalHours", Type: mustNewABIType("uint32")}, + {Name: "deribitFundingRate", Type: mustNewABIType("int192")}, + {Name: "deribitFundingTime", Type: mustNewABIType("uint32")}, + {Name: "deribitFundingIntervalHours", Type: mustNewABIType("uint32")}, + }) + + runTest := func(sampleFeedID common.Hash, sampleObservationsTimestamp, sampleValidAfterSeconds, sampleExpirationWindow uint32, sampleBaseUSDFee, sampleLinkBenchmarkPrice, sampleNativeBenchmarkPrice, sampleBinanceFundingRate, sampleBinanceFundingTime, sampleBinanceFundingIntervalHours, sampleDeribitFundingRate, sampleDeribitFundingTime, sampleDeribitFundingIntervalHours decimal.Decimal) bool { + report := llo.Report{ + ConfigDigest: types.ConfigDigest{0x01}, + SeqNr: 0x02, + ChannelID: llotypes.ChannelID(0x03), + ValidAfterSeconds: sampleValidAfterSeconds, + ObservationTimestampSeconds: sampleObservationsTimestamp, + Values: []llo.StreamValue{ + &llo.Quote{Bid: decimal.NewFromFloat(6.1), Benchmark: sampleLinkBenchmarkPrice, Ask: decimal.NewFromFloat(8.2332)}, // Link price + &llo.Quote{Bid: decimal.NewFromFloat(9.4), Benchmark: sampleNativeBenchmarkPrice, Ask: decimal.NewFromFloat(11.33)}, // Native price + llo.ToDecimal(sampleBinanceFundingRate), // Binance funding rate + llo.ToDecimal(sampleBinanceFundingTime), // Binance funding time + llo.ToDecimal(sampleBinanceFundingIntervalHours), // Binance funding interval hours + llo.ToDecimal(sampleDeribitFundingRate), // Deribit funding rate + llo.ToDecimal(sampleDeribitFundingTime), // Deribit funding time + llo.ToDecimal(sampleDeribitFundingIntervalHours), // Deribit funding interval hours + }, + Specimen: false, + } + + opts := ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: sampleBaseUSDFee, + ExpirationWindow: sampleExpirationWindow, + FeedID: sampleFeedID, + ABI: []ABIEncoder{ + ABIEncoder{StreamID: binanceFundingRateStreamID, Type: "int192"}, + ABIEncoder{StreamID: binanceFundingTimeStreamID, Type: "uint32"}, + ABIEncoder{StreamID: binanceFundingIntervalHoursStreamID, Type: "uint32"}, + ABIEncoder{StreamID: deribitFundingRateStreamID, Type: "int192"}, + ABIEncoder{StreamID: deribitFundingTimeStreamID, Type: "uint32"}, + ABIEncoder{StreamID: deribitFundingIntervalHoursStreamID, Type: "uint32"}, + }, + } + serializedOpts, err := opts.Encode() + require.NoError(t, err) + + cd := llotypes.ChannelDefinition{ + // ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + ReportFormat: llotypes.ReportFormat(4), // FIXME: When chainlink-common is fixed + Streams: []llotypes.Stream{ + { + StreamID: linkQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: ethQuoteStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: dexBasedAssetDecimalStreamID, + Aggregator: llotypes.AggregatorQuote, + }, + { + StreamID: baseMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: quoteMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: serializedOpts, + } + + encoded, err := codec.Encode(ctx, report, cd) + require.NoError(t, err) + + values, err := expectedFundingRateSchema.Unpack(encoded) + require.NoError(t, err) + + require.Len(t, values, len(expectedFundingRateSchema)) + + expectedLinkFee := CalculateFee(sampleLinkBenchmarkPrice, sampleBaseUSDFee) + expectedNativeFee := CalculateFee(sampleNativeBenchmarkPrice, sampleBaseUSDFee) + + // doesn't crash if values are nil + for i := range report.Values { + report.Values[i] = nil + } + _, err = codec.Encode(ctx, report, cd) + require.Error(t, err) + + return AllTrue([]bool{ + assert.Equal(t, sampleFeedID, (common.Hash)(values[0].([32]byte))), // feedId + assert.Equal(t, uint32(sampleValidAfterSeconds+1), values[1].(uint32)), // validFromTimestamp + assert.Equal(t, sampleObservationsTimestamp, values[2].(uint32)), // observationsTimestamp + assert.Equal(t, expectedLinkFee.String(), values[3].(*big.Int).String()), // linkFee + assert.Equal(t, expectedNativeFee.String(), values[4].(*big.Int).String()), // nativeFee + assert.Equal(t, uint32(sampleObservationsTimestamp+sampleExpirationWindow), values[5].(uint32)), // expiresAt + assert.Equal(t, sampleBinanceFundingRate.BigInt().String(), values[6].(*big.Int).String()), // binanceFundingRate + assert.Equal(t, uint32(sampleBinanceFundingTime.BigInt().Int64()), values[7].(uint32)), // binanceFundingTime + assert.Equal(t, uint32(sampleBinanceFundingIntervalHours.BigInt().Int64()), values[8].(uint32)), // binanceFundingIntervalHours + assert.Equal(t, sampleDeribitFundingRate.BigInt().String(), values[9].(*big.Int).String()), // deribitFundingRate + assert.Equal(t, uint32(sampleDeribitFundingTime.BigInt().Int64()), values[10].(uint32)), // deribitFundingTime + assert.Equal(t, uint32(sampleDeribitFundingIntervalHours.BigInt().Int64()), values[11].(uint32)), // deribitFundingIntervalHours + }) + } + + properties.Property("Encodes values", prop.ForAll( + runTest, + genFeedID(), + genObservationsTimestamp(), + genValidAfterSeconds(), + genExpirationWindow(), + genBaseUSDFee(), + genLinkBenchmarkPrice(), + genNativeBenchmarkPrice(), + genFundingRate(), + genFundingTime(), + genFundingIntervalHours(), + genFundingRate(), + genFundingTime(), + genFundingIntervalHours(), + )) + + properties.TestingRun(t) + }) +} + +func TestReportCodecEVMABIEncodeUnpacked_Encode(t *testing.T) { + t.Run("ABI and values length mismatch error", func(t *testing.T) { + // TODO + }) +} + +func genFeedID() gopter.Gen { + return func(p *gopter.GenParameters) *gopter.GenResult { + var feedID common.Hash + p.Rng.Read(feedID[:]) + return gopter.NewGenResult(feedID, gopter.NoShrinker) + } +} + +func genObservationsTimestamp() gopter.Gen { + return gen.UInt32() +} + +func genValidAfterSeconds() gopter.Gen { + return gen.UInt32() +} + +func genExpirationWindow() gopter.Gen { + return gen.UInt32() +} + +func genPriceMultiplier() gopter.Gen { + return genMultiplier() +} + +func genMarketDepthMultiplier() gopter.Gen { + return genMultiplier() +} + +func genMultiplier() gopter.Gen { + return gen.UInt32().Map(func(i uint32) *ubig.Big { + return ubig.NewI(int64(i)) + }) +} + +func genDecimal() gopter.Gen { + return gen.Float32Range(-2e32, 2e32).Map(decimal.NewFromFloat32) +} + +func genBaseUSDFee() gopter.Gen { + return genDecimal() +} + +func genLinkBenchmarkPrice() gopter.Gen { + return genDecimal() +} + +func genNativeBenchmarkPrice() gopter.Gen { + return genDecimal() +} + +func genBenchmarkPrice() gopter.Gen { + return genDecimal() +} + +func genBaseMarketDepth() gopter.Gen { + return genDecimal() +} + +func genQuoteMarketDepth() gopter.Gen { + return genDecimal() +} + +func genMarketStatus() gopter.Gen { + return gen.UInt32().Map(func(i uint32) decimal.Decimal { + return decimal.NewFromInt(int64(i)) + }) +} + +func genFundingRate() gopter.Gen { + return genDecimal() +} + +func genFundingTime() gopter.Gen { + // Unix epochs + return gen.UInt32Range(1500000000, 2000000000).Map(func(i uint32) decimal.Decimal { + return decimal.NewFromInt(int64(i)) + }) +} + +func genFundingIntervalHours() gopter.Gen { + return gen.UInt32().Map(func(i uint32) decimal.Decimal { + return decimal.NewFromInt(int64(i)) + }) +} + +func mustNewABIType(t string) abi.Type { + result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{}) + if err != nil { + panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err)) + } + return result +} diff --git a/core/services/llo/keyring.go b/core/services/llo/keyring.go index d4bf615711c..443aa1968ba 100644 --- a/core/services/llo/keyring.go +++ b/core/services/llo/keyring.go @@ -80,7 +80,7 @@ func (okr *onchainKeyring) MaxSignatureLength() (n int) { func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3types.ReportWithInfo[llotypes.ReportInfo]) (signature []byte, err error) { switch r.Info.ReportFormat { - case llotypes.ReportFormatEVMPremiumLegacy: + case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked: rf := r.Info.ReportFormat if key, exists := okr.keys[rf]; exists { // NOTE: Must use legacy Sign method for compatibility with v0.3 report verification @@ -101,7 +101,7 @@ func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3t func (okr *onchainKeyring) Verify(key types.OnchainPublicKey, digest types.ConfigDigest, seqNr uint64, r ocr3types.ReportWithInfo[llotypes.ReportInfo], signature []byte) bool { switch r.Info.ReportFormat { - case llotypes.ReportFormatEVMPremiumLegacy: + case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked: rf := r.Info.ReportFormat if verifier, exists := okr.keys[rf]; exists { // NOTE: Must use legacy Verify method for compatibility with v0.3 report verification diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index 3ce2b0a4b4a..769b5e99cda 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -253,12 +253,18 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI defer cancelFn() return s.transmit(ctx, t) }(ctx) + + lggr := s.lggr.With("transmission", t, "response", res, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + if req != nil { + lggr = s.lggr.With("req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat) + } + if ctx.Err() != nil { // only canceled on transmitter close so we can exit return false } else if err != nil { s.transmitConnectionErrorCount.Inc() - s.lggr.Errorw("Transmit report failed", "err", err, "req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat, "transmission", t) + lggr.Errorw("Transmit report failed", "err", err) if ok := s.q.Push(t); !ok { s.lggr.Error("Failed to push report to transmit queue; queue is closed") return false @@ -276,7 +282,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI b.Reset() if res.Error == "" { s.transmitSuccessCount.Inc() - s.lggr.Debugw("Transmit report success", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) + lggr.Debug("Transmit report success") } else { // We don't need to retry here because the mercury server // has confirmed it received the report. We only need to retry @@ -285,17 +291,17 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI case DuplicateReport: s.transmitSuccessCount.Inc() s.transmitDuplicateCount.Inc() - s.lggr.Debugw("Transmit report success; duplicate report", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) + lggr.Debug("Transmit report success; duplicate report") default: promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc() - s.lggr.Errorw("Transmit report failed; mercury server returned error", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "response", res, "transmission", t, "err", res.Error, "code", res.Code) + lggr.Errorw("Transmit report failed; mercury server returned error", "err", res.Error, "code", res.Code) } } select { case s.deleteQueue <- t.Hash(): default: - s.lggr.Criticalw("Delete queue is full", "transmission", t, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + lggr.Criticalw("Delete queue is full") } return true }() @@ -309,7 +315,7 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitReq switch t.Report.Info.ReportFormat { case llotypes.ReportFormatJSON: payload, err = s.jsonPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) - case llotypes.ReportFormatEVMPremiumLegacy: + case llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatEVMABIEncodeUnpacked: payload, err = s.evmPremiumLegacyPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) default: return nil, nil, fmt.Errorf("Transmit failed; don't know how to Pack unsupported report format: %q", t.Report.Info.ReportFormat) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index f6b08cf46a3..888cbfa5778 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1009,7 +1009,7 @@ func (d *Delegate) newServicesLLO( // Also re-use EVM keys for signing the retirement report. This isn't // required, just seems easiest since it's the only key type available for // now. - for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement} { + for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} { if _, exists := kbm[rf]; !exists { // Use the first if unspecified kbs, err3 := d.ks.GetAllOfType("evm") diff --git a/core/services/ocr2/plugins/llo/helpers_test.go b/core/services/ocr2/plugins/llo/helpers_test.go index 9cd8742ffa8..74759a056ab 100644 --- a/core/services/ocr2/plugins/llo/helpers_test.go +++ b/core/services/ocr2/plugins/llo/helpers_test.go @@ -232,6 +232,29 @@ observationSource = """ )) } +func addStreamSpec( + t *testing.T, + node Node, + name string, + streamID *uint32, + observationSource string, +) (id int32) { + optionalStreamID := "" + if streamID != nil { + optionalStreamID = fmt.Sprintf("streamID = %d\n", *streamID) + } + specTOML := fmt.Sprintf(` +type = "stream" +schemaVersion = 1 +name = "%s" +%s +observationSource = """ +%s +""" +`, name, optionalStreamID, observationSource) + return node.AddStreamJob(t, specTOML) +} + func addQuoteStreamJob( t *testing.T, node Node, @@ -331,7 +354,7 @@ transmitterID = "%x" )) } -func createBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM) (bridgeName string) { +func createSingleDecimalBridge(t *testing.T, name string, i int, p decimal.Decimal, borm bridges.ORM) (bridgeName string) { ctx := testutils.Context(t) bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { b, err := io.ReadAll(req.Body) @@ -355,6 +378,22 @@ func createBridge(t *testing.T, name string, i int, p decimal.Decimal, borm brid return bridgeName } +func createBridge(t *testing.T, bridgeName string, resultJSON string, borm bridges.ORM) { + ctx := testutils.Context(t) + bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(http.StatusOK) + resp := fmt.Sprintf(`{"result": %s}`, resultJSON) + _, err := res.Write([]byte(resp)) + require.NoError(t, err) + })) + t.Cleanup(bridge.Close) + u, _ := url.Parse(bridge.URL) + require.NoError(t, borm.CreateBridgeType(ctx, &bridges.BridgeType{ + Name: bridges.BridgeName(bridgeName), + URL: models.WebURL(*u), + })) +} + func addOCRJobsEVMPremiumLegacy( t *testing.T, streams []Stream, @@ -386,7 +425,7 @@ func addOCRJobsEVMPremiumLegacy( name = "linkprice" } name = fmt.Sprintf("%s-%d-%d", name, strm.id, j) - bmBridge := createBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM()) + bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM()) jobID := addSingleDecimalStreamJob( t, node, @@ -395,9 +434,9 @@ func addOCRJobsEVMPremiumLegacy( ) jobIDs[i][strm.id] = jobID } else { - bmBridge := createBridge(t, fmt.Sprintf("benchmarkprice-%d-%d", strm.id, j), i, strm.baseBenchmarkPrice, node.App.BridgeORM()) - bidBridge := createBridge(t, fmt.Sprintf("bid-%d-%d", strm.id, j), i, strm.baseBid, node.App.BridgeORM()) - askBridge := createBridge(t, fmt.Sprintf("ask-%d-%d", strm.id, j), i, strm.baseAsk, node.App.BridgeORM()) + bmBridge := createSingleDecimalBridge(t, fmt.Sprintf("benchmarkprice-%d-%d", strm.id, j), i, strm.baseBenchmarkPrice, node.App.BridgeORM()) + bidBridge := createSingleDecimalBridge(t, fmt.Sprintf("bid-%d-%d", strm.id, j), i, strm.baseBid, node.App.BridgeORM()) + askBridge := createSingleDecimalBridge(t, fmt.Sprintf("ask-%d-%d", strm.id, j), i, strm.baseAsk, node.App.BridgeORM()) jobID := addQuoteStreamJob( t, node, @@ -424,3 +463,6 @@ func addOCRJobsEVMPremiumLegacy( } return jobIDs } + +func addDexBasedAssetStreamSpec() { +} diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 0491c29b39c..ba59b68be57 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" @@ -33,6 +34,8 @@ import ( llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface" @@ -336,7 +339,7 @@ func promoteStagingConfig(t *testing.T, donID uint32, steve *bind.TransactOpts, backend.Commit() } -func TestIntegration_LLO(t *testing.T) { +func TestIntegration_LLO_evm_premium_legacy(t *testing.T) { t.Parallel() testStartTimeStamp := time.Now() multiplier := decimal.New(1, 18) @@ -351,7 +354,7 @@ func TestIntegration_LLO(t *testing.T) { clientPubKeys[i] = key.PublicKey } - steve, backend, configurator, configuratorAddress, verifier, _, verifierProxy, _, configStore, configStoreAddress, legacyVerifier, legacyVerifierAddr, _, _ := setupBlockchain(t) + steve, backend, _, _, verifier, _, verifierProxy, _, configStore, configStoreAddress, legacyVerifier, legacyVerifierAddr, _, _ := setupBlockchain(t) fromBlock := 1 // Setup bootstrap @@ -369,7 +372,7 @@ func TestIntegration_LLO(t *testing.T) { serverURL := startMercuryServer(t, srv, clientPubKeys) donID := uint32(995544) - streams := []Stream{ethStream, linkStream, quoteStream1, quoteStream2} + streams := []Stream{ethStream, linkStream} streamMap := make(map[uint32]Stream) for _, strm := range streams { streamMap[strm.id] = strm @@ -548,6 +551,507 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi } }) }) +} + +func TestIntegration_LLO_evm_abi_encode_unpacked(t *testing.T) { + t.Parallel() + testStartTimeStamp := time.Now() + expirationWindow := uint32(3600) + + clientCSAKeys := make([]csakey.KeyV2, nNodes) + clientPubKeys := make([]ed25519.PublicKey, nNodes) + for i := 0; i < nNodes; i++ { + k := big.NewInt(int64(i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + + steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t) + fromBlock := 1 + + // Setup bootstrap + bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-1)) + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} + + t.Run("generates reports using go ReportFormatEVMABIEncodeUnpacked format", func(t *testing.T) { + reqs := make(chan request, 100000) + serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-2)) + serverPubKey := serverKey.PublicKey + srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), reqs) + + serverURL := startMercuryServer(t, srv, clientPubKeys) + + donID := uint32(888333) + streams := []Stream{ethStream, linkStream} + streamMap := make(map[uint32]Stream) + for _, strm := range streams { + streamMap[strm.id] = strm + } + + // Setup oracle nodes + oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, streams) + + chainID := testutils.SimulatedChainID + relayType := "evm" + relayConfig := fmt.Sprintf(` +chainID = "%s" +fromBlock = %d +lloDonID = %d +lloConfigMode = "bluegreen" +`, chainID, fromBlock, donID) + addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-4", relayType, relayConfig) + + dexBasedAssetPriceStreamID := uint32(1) + marketStatusStreamID := uint32(2) + baseMarketDepthStreamID := uint32(3) + quoteMarketDepthStreamID := uint32(4) + benchmarkPriceStreamID := uint32(5) + binanceFundingRateStreamID := uint32(6) + binanceFundingTimeStreamID := uint32(7) + binanceFundingIntervalHoursStreamID := uint32(8) + deribitFundingRateStreamID := uint32(9) + deribitFundingTimeStreamID := uint32(10) + deribitFundingIntervalHoursStreamID := uint32(11) + + mustEncodeOpts := func(opts *lloevm.ReportFormatEVMABIEncodeOpts) []byte { + encoded, err := json.Marshal(opts) + require.NoError(t, err) + return encoded + } + + standardMultiplier := ubig.NewI(1e18) + + dexBasedAssetFeedID := utils.NewHash() + rwaFeedID := utils.NewHash() + benchmarkPriceFeedID := utils.NewHash() + fundingRateFeedID := utils.NewHash() + // Channel definitions + channelDefinitions := llotypes.ChannelDefinitions{ + // Sample DEX-based asset schema + 1: { + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: dexBasedAssetPriceStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: baseMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: quoteMarketDepthStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: expirationWindow, + FeedID: dexBasedAssetFeedID, + ABI: []lloevm.ABIEncoder{ + lloevm.ABIEncoder{ + StreamID: dexBasedAssetPriceStreamID, + Type: "int192", + Multiplier: standardMultiplier, + }, + lloevm.ABIEncoder{ + StreamID: baseMarketDepthStreamID, + Type: "int192", + // TODO: Multiplier? + }, + lloevm.ABIEncoder{ + StreamID: quoteMarketDepthStreamID, + Type: "int192", + }, + }, + }), + }, + // Sample RWA schema + 2: { + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: marketStatusStreamID, + Aggregator: llotypes.AggregatorMode, + }, + }, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: expirationWindow, + FeedID: rwaFeedID, + ABI: []lloevm.ABIEncoder{ + { + StreamID: marketStatusStreamID, + Type: "uint32", + }, + }, + }), + }, + // Sample Benchmark price schema + 3: { + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: benchmarkPriceStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: expirationWindow, + FeedID: benchmarkPriceFeedID, + ABI: []lloevm.ABIEncoder{ + { + StreamID: benchmarkPriceStreamID, + Type: "int192", + Multiplier: standardMultiplier, + }, + }, + }), + }, + // Sample funding rate scheam + 4: { + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: []llotypes.Stream{ + { + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingRateStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingTimeStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingIntervalHoursStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingRateStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingTimeStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingIntervalHoursStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + }, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: expirationWindow, + FeedID: fundingRateFeedID, + ABI: []lloevm.ABIEncoder{ + { + StreamID: binanceFundingRateStreamID, + Type: "int192", + }, + { + StreamID: binanceFundingTimeStreamID, + Type: "int192", + }, + { + StreamID: binanceFundingIntervalHoursStreamID, + Type: "int192", + }, + { + StreamID: deribitFundingRateStreamID, + Type: "int192", + }, + { + StreamID: deribitFundingTimeStreamID, + Type: "int192", + }, + { + StreamID: deribitFundingIntervalHoursStreamID, + Type: "int192", + }, + }, + }), + }, + } + url, sha := newChannelDefinitionsServer(t, channelDefinitions) + + // Set channel definitions + _, err := configStore.SetChannelDefinitions(steve, donID, url, sha) + require.NoError(t, err) + backend.Commit() + + pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x" } +donID = %d +channelDefinitionsContractAddress = "0x%x" +channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, configStoreAddress, fromBlock) + + bridgeName := "superbridge" + + resultJSON := `{ + "benchmarkPrice": "2976.39", + "baseMarketDepth": "1000.1212", + "quoteMarketDepth": "998.5431", + "marketStatus": "1", + "binanceFundingRate": "1234.5678", + "binanceFundingTime": "1630000000", + "binanceFundingIntervalHours": "8", + "deribitFundingRate": "5432.2345", + "deribitFundingTime": "1630000000", + "deribitFundingIntervalHours": "8" +}` + + dexBasedAssetPipeline := fmt.Sprintf(` +dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; + +bp_parse [type=jsonparse path="result,benchmarkPrice"]; +base_market_depth_parse [type=jsonparse path="result,baseMarketDepth"]; +quote_market_depth_parse [type=jsonparse path="result,quoteMarketDepth"]; + +bp_decimal [type=multiply times=1 streamID=%d]; +base_market_depth_decimal [type=multiply times=1 streamID=%d]; +quote_market_depth_decimal [type=multiply times=1 streamID=%d]; + +dp -> bp_parse -> bp_decimal; +dp -> base_market_depth_parse -> base_market_depth_decimal; +dp -> quote_market_depth_parse -> quote_market_depth_decimal; +`, bridgeName, dexBasedAssetPriceStreamID, baseMarketDepthStreamID, quoteMarketDepthStreamID) + + rwaPipeline := fmt.Sprintf(` +dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; + +market_status_parse [type=jsonparse path="result,marketStatus"]; +market_status_decimal [type=multiply times=1 streamID=%d]; + +dp -> market_status_parse -> market_status_decimal; +`, bridgeName, marketStatusStreamID) + + benchmarkPricePipeline := fmt.Sprintf(` +dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; + +bp_parse [type=jsonparse path="result,benchmarkPrice"]; +bp_decimal [type=multiply times=1 streamID=%d]; + +dp -> bp_parse -> bp_decimal; +`, bridgeName, benchmarkPriceStreamID) + + fundingRatePipeline := fmt.Sprintf(` +dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; + +binance_funding_rate_parse [type=jsonparse path="result,binanceFundingRate"]; +binance_funding_rate_decimal [type=multiply times=1 streamID=%d]; + +binance_funding_time_parse [type=jsonparse path="result,binanceFundingTime"]; +binance_funding_time_decimal [type=multiply times=1 streamID=%d]; + +binance_funding_interval_hours_parse [type=jsonparse path="result,binanceFundingIntervalHours"]; +binance_funding_interval_hours_decimal [type=multiply times=1 streamID=%d]; + +deribit_funding_rate_parse [type=jsonparse path="result,deribitFundingRate"]; +deribit_funding_rate_decimal [type=multiply times=1 streamID=%d]; + +deribit_funding_time_parse [type=jsonparse path="result,deribitFundingTime"]; +deribit_funding_time_decimal [type=multiply times=1 streamID=%d]; + +deribit_funding_interval_hours_parse [type=jsonparse path="result,deribitFundingIntervalHours"]; +deribit_funding_interval_hours_decimal [type=multiply times=1 streamID=%d]; + +dp -> binance_funding_rate_parse -> binance_funding_rate_decimal; +dp -> binance_funding_time_parse -> binance_funding_time_decimal; +dp -> binance_funding_interval_hours_parse -> binance_funding_interval_hours_decimal; +dp -> deribit_funding_rate_parse -> deribit_funding_rate_decimal; +dp -> deribit_funding_time_parse -> deribit_funding_time_decimal; +dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_decimal; +`, bridgeName, binanceFundingRateStreamID, binanceFundingTimeStreamID, binanceFundingIntervalHoursStreamID, deribitFundingRateStreamID, deribitFundingTimeStreamID, deribitFundingIntervalHoursStreamID) + + for i, node := range nodes { + // superBridge returns a JSON with everything you want in it, + // stream specs can just pick the individual fields they need + createBridge(t, bridgeName, resultJSON, node.App.BridgeORM()) + addStreamSpec(t, node, "dexBasedAssetPipeline", nil, dexBasedAssetPipeline) + addStreamSpec(t, node, "rwaPipeline", nil, rwaPipeline) + addStreamSpec(t, node, "benchmarkPricePipeline", nil, benchmarkPricePipeline) + addStreamSpec(t, node, "fundingRatePipeline", nil, fundingRatePipeline) + addLLOJob( + t, + node, + configuratorAddress, + bootstrapPeerID, + bootstrapNodePort, + clientPubKeys[i], + "llo-evm-abi-encode-unpacked-test", + pluginConfig, + relayType, + relayConfig, + ) + } + + // Set config on configurator + digest := setProductionConfig( + t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, + ) + + // NOTE: Wait for one of each type of report + feedIDs := map[[32]byte]struct{}{ + dexBasedAssetFeedID: {}, + rwaFeedID: {}, + benchmarkPriceFeedID: {}, + fundingRateFeedID: {}, + } + + for req := range reqs { + v := make(map[string]interface{}) + err := mercury.PayloadTypes.UnpackIntoMap(v, req.req.Payload) + require.NoError(t, err) + report, exists := v["report"] + if !exists { + t.Fatalf("expected payload %#v to contain 'report'", v) + } + reportCtx, exists := v["reportContext"] + if !exists { + t.Fatalf("expected payload %#v to contain 'reportContext'", v) + } + + // Check the report context + assert.Equal(t, [32]byte(digest), reportCtx.([3][32]uint8)[0]) // config digest + assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash + + reportElems := make(map[string]interface{}) + err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte)) + require.NoError(t, err) + + feedID := reportElems["feedId"].([32]uint8) + delete(feedIDs, feedID) + + // Check headers + assert.GreaterOrEqual(t, reportElems["validFromTimestamp"].(uint32), uint32(testStartTimeStamp.Unix())) + assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp.Unix())) + // Zero fees since both eth/link stream specs are missing, don't + // care about billing for purposes of this test + assert.Equal(t, "0", reportElems["nativeFee"].(*big.Int).String()) + assert.Equal(t, "0", reportElems["linkFee"].(*big.Int).String()) + assert.Equal(t, reportElems["observationsTimestamp"].(uint32)+uint32(expirationWindow), reportElems["expiresAt"].(uint32)) + + // Check payload values + payload := report.([]byte)[192:] + switch fmt.Sprintf("%x", feedID) { + case fmt.Sprintf("%x", dexBasedAssetFeedID): + require.Len(t, payload, 96) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + {Name: "baseMarketDepth", Type: mustNewType("int192")}, + {Name: "quoteMarketDepth", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) + assert.Equal(t, "1000", v["baseMarketDepth"].(*big.Int).String()) + assert.Equal(t, "998", v["quoteMarketDepth"].(*big.Int).String()) + case fmt.Sprintf("%x", rwaFeedID): + require.Len(t, payload, 32) + args := abi.Arguments([]abi.Argument{ + {Name: "marketStatus", Type: mustNewType("uint32")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, uint32(1), v["marketStatus"].(uint32)) + case fmt.Sprintf("%x", benchmarkPriceFeedID): + require.Len(t, payload, 32) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) + case fmt.Sprintf("%x", fundingRateFeedID): + require.Len(t, payload, 192) + args := abi.Arguments([]abi.Argument{ + {Name: "binanceFundingRate", Type: mustNewType("int192")}, + {Name: "binanceFundingTime", Type: mustNewType("int192")}, + {Name: "binanceFundingIntervalHours", Type: mustNewType("int192")}, + {Name: "deribitFundingRate", Type: mustNewType("int192")}, + {Name: "deribitFundingTime", Type: mustNewType("int192")}, + {Name: "deribitFundingIntervalHours", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "1234", v["binanceFundingRate"].(*big.Int).String()) + assert.Equal(t, "1630000000", v["binanceFundingTime"].(*big.Int).String()) + assert.Equal(t, "8", v["binanceFundingIntervalHours"].(*big.Int).String()) + assert.Equal(t, "5432", v["deribitFundingRate"].(*big.Int).String()) + assert.Equal(t, "1630000000", v["deribitFundingTime"].(*big.Int).String()) + assert.Equal(t, "8", v["deribitFundingIntervalHours"].(*big.Int).String()) + default: + t.Fatalf("unexpected feedID: %x", feedID) + } + + if len(feedIDs) == 0 { + break + } + } + }) +} + +func TestIntegration_LLO_blue_green_lifecycle(t *testing.T) { + t.Parallel() + + clientCSAKeys := make([]csakey.KeyV2, nNodes) + clientPubKeys := make([]ed25519.PublicKey, nNodes) + for i := 0; i < nNodes; i++ { + k := big.NewInt(int64(i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + + steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t) + fromBlock := 1 + + // Setup bootstrap + bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(-1)) + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} t.Run("Blue/Green lifecycle (using JSON report format)", func(t *testing.T) { reqs := make(chan request, 100000) @@ -871,3 +1375,11 @@ func newChannelDefinitionsServer(t *testing.T, channelDefinitions llotypes.Chann t.Cleanup(channelDefinitionsServer.Close) return channelDefinitionsServer.URL, channelDefinitionsSHA } + +func mustNewType(t string) abi.Type { + result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{}) + if err != nil { + panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err)) + } + return result +} diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index 2f62a7bf1f4..ee5d050cacd 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -43,11 +43,7 @@ func (d *Delegate) BeforeJobDeleted(jb job.Job) {} func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []job.ServiceCtx, err error) { - if jb.StreamID == nil { - return nil, errors.New("streamID is required to be present for stream specs") - } - id := *jb.StreamID - lggr := d.lggr.Named(fmt.Sprintf("%d", id)).With("streamID", id) + lggr := d.lggr.Named(fmt.Sprintf("job-%d", jb.ID)).With("jobID", jb.ID) rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) services = append(services, rrs, &StreamService{ diff --git a/go.mod b/go.mod index ccedade99b3..afc940a0677 100644 --- a/go.mod +++ b/go.mod @@ -406,5 +406,9 @@ replace ( // replicating the replace directive on cosmos SDK github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 + github.com/smartcontractkit/chainlink-common => /Users/sam/code/smartcontractkit/chainlink-common + github.com/smartcontractkit/chainlink-data-streams => /Users/sam/code/smartcontractkit/chainlink-data-streams + github.com/sourcegraph/sourcegraph/lib => github.com/sourcegraph/sourcegraph-public-snapshot/lib v0.0.0-20240822153003-c864f15af264 + ) diff --git a/go.sum b/go.sum index 6e8c1b4b5f3..c6c6973de6a 100644 --- a/go.sum +++ b/go.sum @@ -1152,12 +1152,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20250109124515-ff9d86b874ba h1:gisAer1YxKKui6LhxDgfuZ3OyrHVjHm/oK/0idusFeI= github.com/smartcontractkit/chainlink-ccip v0.0.0-20250109124515-ff9d86b874ba/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.4.1-0.20250108194320-2ebd63bbb16e h1:8BStiP1F4W8AvjBRga0TYtjvAtkwN8oHYnHJztAlSF4= -github.com/smartcontractkit/chainlink-common v0.4.1-0.20250108194320-2ebd63bbb16e/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3/go.mod h1:AS6zY2BkcRwfiGzNabGbHhfrLSrXrcI/GmjnT4jQ5/s= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20241220173418-09e17ddbeb20 h1:p/gzWpEf8alodCXm2Gtx2kWI/O9ZLdWZOdNnv5ZGO6c=