diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index 7c05ffbe52a..3380b4f1bc5 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -35,9 +35,9 @@ type delegate struct { cfg DelegateConfig reportCodecs map[llotypes.ReportFormat]datastreamsllo.ReportCodec - src datastreamsllo.ShouldRetireCache - ds datastreamsllo.DataSource - t services.Service + src datastreamsllo.ShouldRetireCache + ds datastreamsllo.DataSource + telem services.Service oracles []Closer } @@ -109,7 +109,13 @@ func (d *delegate) Start(ctx context.Context) error { if !(len(d.cfg.ContractConfigTrackers) == 1 || len(d.cfg.ContractConfigTrackers) == 2) { return fmt.Errorf("expected either 1 or 2 ContractConfigTrackers, got: %d", len(d.cfg.ContractConfigTrackers)) } + + d.cfg.Logger.Debugw("Starting LLO job", "instances", len(d.cfg.ContractConfigTrackers), "jobName", d.cfg.JobName.ValueOrZero(), "captureEATelemetry", d.cfg.CaptureEATelemetry) + var merr error + + merr = errors.Join(merr, d.telem.Start(ctx)) + psrrc := NewPluginScopedRetirementReportCache(d.cfg.RetirementReportCache, d.cfg.OnchainKeyring, d.cfg.RetirementReportCodec) for i, configTracker := range d.cfg.ContractConfigTrackers { lggr := logger.Named(d.cfg.Logger, fmt.Sprintf("%d", i)) @@ -156,10 +162,11 @@ func (d *delegate) Start(ctx context.Context) error { } func (d *delegate) Close() error { - return d.StopOnce("LLODelegate", func() (err error) { + return d.StopOnce("LLODelegate", func() (merr error) { for _, oracle := range d.oracles { - err = errors.Join(err, oracle.Close()) + merr = errors.Join(merr, oracle.Close()) } - return err + merr = errors.Join(merr, d.telem.Close()) + return merr }) } diff --git a/core/services/llo/telemetry.go b/core/services/llo/telemetry.go index 62b586f5cc8..d5c113c61ef 100644 --- a/core/services/llo/telemetry.go +++ b/core/services/llo/telemetry.go @@ -61,6 +61,12 @@ type telemeter struct { } func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { + if t.Service.Ready() != nil { + // This should never happen, telemeter should always be started BEFORE + // the oracle and closed AFTER it + t.eng.SugaredLogger.Errorw("Telemeter not ready, dropping observation", "run", run, "streamID", streamID, "opts", opts, "val", val, "err", err) + return + } var adapterError *eautils.AdapterError var dpInvariantViolationDetected bool if errors.As(err, &adapterError) && adapterError.Name == adapterLWBAErrorName {