Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Comp (LLO) cleanup and minor optimizations #15368

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/services/llo/channel_definition_cache_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type channelDefinitionCacheFactory struct {
mu sync.Mutex
}

// TODO: Test this
// MERC-3653
func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) {
if cfg.ChannelDefinitions != "" {
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions)
Expand Down
58 changes: 58 additions & 0 deletions core/services/llo/channel_definition_cache_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package llo

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/logger"
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config"
)

func Test_ChannelDefinitionCacheFactory(t *testing.T) {
lggr := logger.TestLogger(t)
cdcFactory := NewChannelDefinitionCacheFactory(lggr, nil, nil, nil)

t.Run("NewCache", func(t *testing.T) {
t.Run("when ChannelDefinitions is present, returns static cache", func(t *testing.T) {
_, err := cdcFactory.NewCache(lloconfig.PluginConfig{ChannelDefinitions: "..."})
require.EqualError(t, err, "failed to unmarshal static channel definitions: invalid character '.' looking for beginning of value")

cdc, err := cdcFactory.NewCache(lloconfig.PluginConfig{ChannelDefinitions: "{}"})
require.NoError(t, err)
require.IsType(t, &staticCDC{}, cdc)
})
t.Run("when ChannelDefinitions is not present, returns dynamic cache", func(t *testing.T) {
cdc, err := cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 1,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// returns error if you try to do it again with the same addr/donID
_, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 1,
})
require.EqualError(t, err, "cache already exists for contract address 0xaAaAaAaaAaAaAaaAaAAAAAAAAaaaAaAaAaaAaaAa and don ID 1")

// is fine if you do it again with different addr
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
DonID: 1,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// is fine if you do it again with different don ID
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 2,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)
})
})
}
4 changes: 2 additions & 2 deletions core/services/llo/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
)

// NOTE: All supported codecs must be specified here
func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec {
func NewReportCodecs(lggr logger.Logger, donID uint32) map[llotypes.ReportFormat]llo.ReportCodec {
codecs := make(map[llotypes.ReportFormat]llo.ReportCodec)

codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr)
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr, donID)

return codecs
}
2 changes: 1 addition & 1 deletion core/services/llo/codecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func Test_NewReportCodecs(t *testing.T) {
c := NewReportCodecs(logger.TestLogger(t))
c := NewReportCodecs(logger.TestLogger(t), 1)

assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported")
assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported")
Expand Down
3 changes: 0 additions & 3 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// by the pipeline executor
finaltrrs := trrs.Terminals()

// TODO: Special handling for missing native/link streams?
// https://smartcontract-it.atlassian.net/browse/MERC-5949

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
Expand Down
7 changes: 4 additions & 3 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
} else {
codecLggr = corelogger.NullLogger
}
reportCodecs := NewReportCodecs(codecLggr)
reportCodecs := NewReportCodecs(codecLggr, cfg.DonID)

var t TelemeterService
if cfg.CaptureEATelemetry {
Expand Down Expand Up @@ -134,8 +134,9 @@ func (d *delegate) Start(ctx context.Context) error {
lggr = logger.With(lggr, "instanceType", "Green")
}
ocrLogger := logger.NewOCRWrapper(NewSuppressedLogger(lggr, d.cfg.ReportingPluginConfig.VerboseLogging), d.cfg.TraceLogging, func(msg string) {
// TODO: do we actually need to DB-persist errors?
// MERC-3524
// NOTE: Some OCR loggers include a DB-persist here
// We do not DB persist errors in LLO, since they could be quite voluminous and ought to be present in logs anyway.
// This is a performance optimization
})

oracle, err := ocr2plus.NewOracle(ocr2plus.OCR3OracleArgs[llotypes.ReportInfo]{
Expand Down
54 changes: 0 additions & 54 deletions core/services/llo/evm/report_codec.go

This file was deleted.

34 changes: 26 additions & 8 deletions core/services/llo/evm/report_codec_premium_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/shopspring/decimal"
Expand All @@ -30,10 +31,11 @@ var (

type ReportCodecPremiumLegacy struct {
logger.Logger
donID uint32
}

func NewReportCodecPremiumLegacy(lggr logger.Logger) ReportCodecPremiumLegacy {
return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")}
func NewReportCodecPremiumLegacy(lggr logger.Logger, donID uint32) ReportCodecPremiumLegacy {
return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy"), donID}
}

type ReportFormatEVMPremiumLegacyOpts struct {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (r ReportCodecPremiumLegacy) Pack(digest types.ConfigDigest, seqNr uint64,
ss = append(ss, s)
vs[i] = v
}
reportCtx := LegacyReportContext(digest, seqNr)
reportCtx := LegacyReportContext(digest, seqNr, r.donID)
rawReportCtx := evmutil.RawReportContext(reportCtx)

payload, err := mercury.PayloadTypes.Pack(rawReportCtx, []byte(report), rs, ss, vs)
Expand Down Expand Up @@ -181,9 +183,25 @@ func extractPrice(price llo.StreamValue) (decimal.Decimal, error) {
}
}

// TODO: Consider embedding the DON ID here?
// MERC-3524
var LLOExtraHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")
const PluginVersion uint32 = 1 // the legacy mercury plugin is 0

// Uniquely identifies this as LLO plugin, rather than the legacy plugin (which
// uses all zeroes).
//
// This is a quite a hack but serves the purpose of uniquely identifying
samsondav marked this conversation as resolved.
Show resolved Hide resolved
// dons/plugin versions to the mercury server without having to modify any
// existing tooling or breaking backwards compatibility. It should be safe
// since the DonID is encoded into the config digest anyway so report context
// is already dependent on it, and all LLO jobs in the same don are expected to
// have the same don ID set.
//
// Packs donID+pluginVersion as (uint32, uint32), for example donID=2,
// PluginVersion=1 Yields:
// 0x0000000000000000000000000000000000000000000000000000000200000001
func LLOExtraHash(donID uint32) common.Hash {
combined := uint64(donID)<<32 | uint64(PluginVersion)
return common.BigToHash(new(big.Int).SetUint64(combined))
}

func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) {
// Simulate 256 rounds/epoch
Expand All @@ -192,14 +210,14 @@ func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) {
return
}

func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext {
func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64, donID uint32) ocr2types.ReportContext {
epoch, round := SeqNrToEpochAndRound(seqNr)
return ocr2types.ReportContext{
ReportTimestamp: ocr2types.ReportTimestamp{
ConfigDigest: cd,
Epoch: uint32(epoch),
Round: uint8(round),
},
ExtraHash: LLOExtraHash, // ExtraHash is always zero for mercury, we use LLOExtraHash here to differentiate from the legacy plugin
ExtraHash: LLOExtraHash(donID), // ExtraHash is always zero for mercury, we use LLOExtraHash here to differentiate from the legacy plugin
}
}
8 changes: 7 additions & 1 deletion core/services/llo/evm/report_codec_premium_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newValidPremiumLegacyReport() llo.Report {
}

func Test_ReportCodecPremiumLegacy(t *testing.T) {
rc := ReportCodecPremiumLegacy{logger.TestLogger(t)}
rc := ReportCodecPremiumLegacy{logger.TestLogger(t), 2}

feedID := [32]uint8{0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
cd := llotypes.ChannelDefinition{Opts: llotypes.ChannelOpts(fmt.Sprintf(`{"baseUSDFee":"10.50","expirationWindow":60,"feedId":"0x%x","multiplier":10}`, feedID))}
Expand Down Expand Up @@ -225,3 +225,9 @@ func Test_ExtractReportValues(t *testing.T) {
assert.Equal(t, &llo.Quote{Bid: decimal.NewFromInt(37), Benchmark: decimal.NewFromInt(38), Ask: decimal.NewFromInt(39)}, quote)
})
}

func Test_LLOExtraHash(t *testing.T) {
donID := uint32(8)
extraHash := LLOExtraHash(donID)
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000800000001", extraHash.String())
}
13 changes: 7 additions & 6 deletions core/services/llo/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ type Key interface {
}

type onchainKeyring struct {
lggr logger.Logger
keys map[llotypes.ReportFormat]Key
lggr logger.Logger
keys map[llotypes.ReportFormat]Key
donID uint32
}

func NewOnchainKeyring(lggr logger.Logger, keys map[llotypes.ReportFormat]Key) LLOOnchainKeyring {
func NewOnchainKeyring(lggr logger.Logger, keys map[llotypes.ReportFormat]Key, donID uint32) LLOOnchainKeyring {
return &onchainKeyring{
lggr.Named("OnchainKeyring"), keys,
lggr.Named("OnchainKeyring"), keys, donID,
}
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3t
rf := r.Info.ReportFormat
if key, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Sign method for compatibility with v0.3 report verification
rc := evm.LegacyReportContext(digest, seqNr)
rc := evm.LegacyReportContext(digest, seqNr, okr.donID)
return key.Sign(rc, r.Report)
}
default:
Expand All @@ -101,7 +102,7 @@ func (okr *onchainKeyring) Verify(key types.OnchainPublicKey, digest types.Confi
rf := r.Info.ReportFormat
if verifier, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Verify method for compatibility with v0.3 report verification
rc := evm.LegacyReportContext(digest, seqNr)
rc := evm.LegacyReportContext(digest, seqNr, okr.donID)
return verifier.Verify(key, rc, r.Report, signature)
}
default:
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/keyring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Test_Keyring(t *testing.T) {
llotypes.ReportFormatJSON: &mockKey{format: llotypes.ReportFormatJSON, maxSignatureLen: 2, sig: []byte("sig-2")},
}

kr := NewOnchainKeyring(lggr, ks)
kr := NewOnchainKeyring(lggr, ks, 2)

cases := []struct {
format llotypes.ReportFormat
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
NewTransmitQueue(lggr, serverURL, int(cfg.TransmitQueueMaxSize()), pm),
make(chan [32]byte, int(cfg.TransmitQueueMaxSize())),
serverURL,
evm.NewReportCodecPremiumLegacy(codecLggr),
evm.NewReportCodecPremiumLegacy(codecLggr, pm.DonID()),
llo.JSONReportCodec{},
promTransmitSuccessCount.WithLabelValues(donIDStr, serverURL),
promTransmitDuplicateCount.WithLabelValues(donIDStr, serverURL),
Expand Down
Loading
Loading