Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'stream' to allowed job type list; fix issue adding large numbers of channels #13692

Merged
merged 6 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/cuddly-toys-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"chainlink": patch
---

Add "VerboseLogging" option to mercury

Off by default, can be enabled like so:

```toml
[Mercury]
VerboseLogging = true
```

#updated
5 changes: 5 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ env = 'test' # Example

[Mercury]

# VerboseLogging enables detailed logging of mercury/LLO operations. These logs
# can be expensive since they may serialize large structs, so they are disabled
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# can be expensive since they may serialize large structs, so they are disabled
# are expensive since they may serialize large structs, so they are disabled

# by default.
VerboseLogging = false # Default

# Mercury.Cache controls settings for the price retrieval cache querying a mercury server
[Mercury.Cache]
# LatestReportTTL controls how "stale" we will allow a price to be e.g. if
Expand Down
1 change: 1 addition & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Mercury interface {
Cache() MercuryCache
TLS() MercuryTLS
Transmitter() MercuryTransmitter
VerboseLogging() bool
}
10 changes: 7 additions & 3 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,15 +1324,19 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
VerboseLogging *bool `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
m.TLS.setFrom(&f.TLS)
m.Transmitter.setFrom(&f.Transmitter)
if v := f.VerboseLogging; v != nil {
m.VerboseLogging = v
}
}

func (m *Mercury) ValidateConfig() (err error) {
Expand Down
2 changes: 2 additions & 0 deletions core/internal/testutils/configtest/general_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func overrides(c *chainlink.Config, s *chainlink.Secrets) {
c.JobPipeline.ReaperInterval = commonconfig.MustNewDuration(0)
c.JobPipeline.VerboseLogging = ptr(true)

c.Mercury.VerboseLogging = ptr(true)

c.P2P.V2.Enabled = ptr(false)

c.WebServer.SessionTimeout = commonconfig.MustNewDuration(2 * time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ require (
github.com/shirou/gopsutil/v3 v3.24.3 // indirect
github.com/smartcontractkit/chain-selectors v1.0.10 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240708130426-294b81e4afe7 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240625074951-06ab5e670dba // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1211,8 +1211,8 @@ github.com/smartcontractkit/chainlink-common v0.1.7-0.20240703234618-dc1fbe45acc
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240703234618-dc1fbe45acc0/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527 h1:Vs6myS+bpPwb8chUY7XxveJyhvejknhOmhDTddgsK5I=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527/go.mod h1:KRK7KlAEpmORi+nJgT0vxQVWvlLEBQ6zgzXziZuKvUM=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 h1:dsTmitRaVizHxoYFoGz4+y/zVa8XnvKUiTaZdx+6t9M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1/go.mod h1:6DgCnHMGdBaIh0bLs1dK0MtdeMZfeNhc/nvBUN6KIUg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 h1:MD80ZRCTvxxJ8PBmhtrKoTnky8cVNYrCrIBLVRbrOM0=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917/go.mod h1:jwVxhctE6BgLOSSsVq9wbREpZ8Ev34H+UBxeUhESZRs=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240708130426-294b81e4afe7 h1:tBJo/Rn0Ur7XtzqTEgIhLYDDWdcXBf985AF0PegsDk8=
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (m *mercuryConfig) TLS() config.MercuryTLS {
func (m *mercuryConfig) Transmitter() config.MercuryTransmitter {
return &mercuryTransmitterConfig{c: m.c.Transmitter}
}

func (m *mercuryConfig) VerboseLogging() bool {
return *m.c.VerboseLogging
}
3 changes: 3 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ func TestConfig_Marshal(t *testing.T) {
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
},
VerboseLogging: ptr(true),
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1186,6 +1187,8 @@ URL = 'http://stark.node'
APIKey = 'key'
`},
{"Mercury", Config{Core: toml.Core{Mercury: full.Mercury}}, `[Mercury]
VerboseLogging = true

[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ Mode = 'tls'
TLSCertPath = ''

[Mercury]
VerboseLogging = false

[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ env = 'dev'
test = 'load'

[Mercury]
VerboseLogging = true

[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ Mode = 'tls'
TLSCertPath = ''

[Mercury]
VerboseLogging = false

[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
Expand Down
22 changes: 21 additions & 1 deletion core/services/llo/bm/dummy_transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

Expand Down Expand Up @@ -57,8 +59,26 @@ func (t *transmitter) Transmit(
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) error {
lggr := t.lggr
switch report.Info.ReportFormat {
case llotypes.ReportFormatJSON:
r, err := (llo.JSONReportCodec{}).Decode(report.Report)
if err != nil {
lggr.Debugw("Failed to decode JSON report", "err", err)
}
lggr = lggr.With(
"report.Report.ConfigDigest", r.ConfigDigest,
"report.Report.ChainSelector", r.ChainSelector,
"report.Report.SeqNr", r.SeqNr,
"report.Report.ChannelID", r.ChannelID,
"report.Report.ValidAfterSeconds", r.ValidAfterSeconds,
"report.Report.ValidUntilSeconds", r.ValidUntilSeconds,
"report.Report.Values", r.Values,
"report.Report.Specimen", r.Specimen,
)
}
transmitSuccessCount.Inc()
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
lggr.Infow("Transmit (dummy)", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
return nil
}

Expand Down
134 changes: 94 additions & 40 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"fmt"
"math/big"
"sort"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

Expand All @@ -31,16 +34,35 @@ var (
)
)

type ErrMissingStream struct {
id string
}

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
}

func (e ErrMissingStream) Error() string {
return fmt.Sprintf("missing stream definition for: %q", e.id)
type ErrObservationFailed struct {
inner error
reason string
streamID streams.StreamID
run *pipeline.Run
}

func (e *ErrObservationFailed) Error() string {
s := fmt.Sprintf("StreamID: %d; Reason: %s", e.streamID, e.reason)
if e.inner != nil {
s += fmt.Sprintf("; Err: %v", e.inner)
}
if e.run != nil {
// NOTE: Could log more info about the run here if necessary
s += fmt.Sprintf("; RunID: %d; RunErrors: %v", e.run.ID, e.run.AllErrors)
}
return s
}

func (e *ErrObservationFailed) String() string {
return e.Error()
}

func (e *ErrObservationFailed) Unwrap() error {
return e.inner
}

var _ llo.DataSource = &dataSource{}
Expand All @@ -54,54 +76,86 @@ func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource {
return &dataSource{lggr.Named("DataSource"), registry}
}

// Observe looks up all streams in the registry and returns a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) {
// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamIDs))
sv := make(llo.StreamValues)
var mu sync.Mutex

d.lggr.Debugw("Observing streams", "streamIDs", streamIDs)
wg.Add(len(streamValues))
var svmu sync.Mutex
var errors []ErrObservationFailed
var errmu sync.Mutex

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
for streamID := range streamValues {
streamIDs = append(streamIDs, streamID)
}
sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] })
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "seqNr", opts.SeqNr())
}

for streamID := range streamIDs {
for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var res llo.ObsResult[*big.Int]
var val *big.Int

stream, exists := d.registry.Get(streamID)
if exists {
run, trrs, err := stream.Run(ctx)
if err != nil {
var runID int64
if run != nil {
runID = run.ID
}
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID)
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
} else {
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err := streams.ExtractBigInt(trrs)
if err == nil {
res.Val = val
res.Valid = true
}
}
} else {
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID)
if !exists {
errmu.Lock()
errors = append(errors, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err = streams.ExtractBigInt(trrs)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
return
}

mu.Lock()
defer mu.Unlock()
sv[streamID] = res
if val != nil {
svmu.Lock()
defer svmu.Unlock()
streamValues[streamID] = val
}
}(streamID)
}

wg.Wait()

d.lggr.Debugw("Observed streams", "streamIDs", streamIDs, "values", sv)
// Failed observations are always logged at warn level
var failedStreamIDs []streams.StreamID
if len(errors) > 0 {
sort.Slice(errors, func(i, j int) bool { return errors[i].streamID < errors[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errors))
for i, e := range errors {
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errors", errors, "seqNr", opts.SeqNr())
}

if opts.VerboseLogging() {
successes := make([]streams.StreamID, 0, len(streamValues))
for strmID, res := range streamValues {
if res != nil {
successes = append(successes, strmID)
}
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "values", streamValues, "seqNr", opts.SeqNr())
}

return sv, nil
return nil
}
Loading
Loading