Skip to content

Commit

Permalink
Fix sending EA telemetry for LLO (actually start the service) (#15124)
Browse files Browse the repository at this point in the history
* Add LLO EA telemetry debug log

* Actually start the telemeter service

* Log errors if telemeter not started

* Comment out un-needed logging

* Remove commented logging

---------

Co-authored-by: Sam Davies <[email protected]>
  • Loading branch information
mjk90 and samsondav authored Nov 6, 2024
1 parent 99241ec commit 2ba5726
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
19 changes: 13 additions & 6 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type delegate struct {
cfg DelegateConfig
reportCodecs map[llotypes.ReportFormat]datastreamsllo.ReportCodec

src datastreamsllo.ShouldRetireCache
ds datastreamsllo.DataSource
t services.Service
src datastreamsllo.ShouldRetireCache
ds datastreamsllo.DataSource
telem services.Service

oracles []Closer
}
Expand Down Expand Up @@ -109,7 +109,13 @@ func (d *delegate) Start(ctx context.Context) error {
if !(len(d.cfg.ContractConfigTrackers) == 1 || len(d.cfg.ContractConfigTrackers) == 2) {
return fmt.Errorf("expected either 1 or 2 ContractConfigTrackers, got: %d", len(d.cfg.ContractConfigTrackers))
}

d.cfg.Logger.Debugw("Starting LLO job", "instances", len(d.cfg.ContractConfigTrackers), "jobName", d.cfg.JobName.ValueOrZero(), "captureEATelemetry", d.cfg.CaptureEATelemetry)

var merr error

merr = errors.Join(merr, d.telem.Start(ctx))

psrrc := NewPluginScopedRetirementReportCache(d.cfg.RetirementReportCache, d.cfg.OnchainKeyring, d.cfg.RetirementReportCodec)
for i, configTracker := range d.cfg.ContractConfigTrackers {
lggr := logger.Named(d.cfg.Logger, fmt.Sprintf("%d", i))
Expand Down Expand Up @@ -156,10 +162,11 @@ func (d *delegate) Start(ctx context.Context) error {
}

func (d *delegate) Close() error {
return d.StopOnce("LLODelegate", func() (err error) {
return d.StopOnce("LLODelegate", func() (merr error) {
for _, oracle := range d.oracles {
err = errors.Join(err, oracle.Close())
merr = errors.Join(merr, oracle.Close())
}
return err
merr = errors.Join(merr, d.telem.Close())
return merr
})
}
6 changes: 6 additions & 0 deletions core/services/llo/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ type telemeter struct {
}

func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) {
if t.Service.Ready() != nil {
// This should never happen, telemeter should always be started BEFORE
// the oracle and closed AFTER it
t.eng.SugaredLogger.Errorw("Telemeter not ready, dropping observation", "run", run, "streamID", streamID, "opts", opts, "val", val, "err", err)
return
}
var adapterError *eautils.AdapterError
var dpInvariantViolationDetected bool
if errors.As(err, &adapterError) && adapterError.Name == adapterLWBAErrorName {
Expand Down

0 comments on commit 2ba5726

Please sign in to comment.