From 7c212aa79ab5ee6552271bc83ade17c2fe0faa63 Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 11 Nov 2024 13:02:16 -0500 Subject: [PATCH] Deadline on calls to Observe (#85) --- llo/plugin.go | 4 ++++ llo/plugin_observation.go | 16 +++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/llo/plugin.go b/llo/plugin.go index 913f0ad..3e29423 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -3,6 +3,7 @@ package llo import ( "context" "fmt" + "time" "github.com/smartcontractkit/libocr/quorumhelper" @@ -233,6 +234,7 @@ func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.Re protoOutcomeCodec{}, f.RetirementReportCodec, f.ReportCodecs, + cfg.MaxDurationObservation, }, ocr3types.ReportingPluginInfo{ Name: "LLO", Limits: ocr3types.ReportingPluginLimits{ @@ -269,6 +271,8 @@ type Plugin struct { OutcomeCodec OutcomeCodec RetirementReportCodec RetirementReportCodec ReportCodecs map[llotypes.ReportFormat]ReportCodec + + MaxDurationObservation time.Duration } // Query creates a Query that is sent from the leader to all follower nodes diff --git a/llo/plugin_observation.go b/llo/plugin_observation.go index d2ae6fc..4f6a7ff 100644 --- a/llo/plugin_observation.go +++ b/llo/plugin_observation.go @@ -142,7 +142,21 @@ func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContex } } - if err = p.DataSource.Observe(ctx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { + // NOTE: Timeouts/context cancelations are likely to be rather + // common here, since Observe may have to query 100s of streams, + // any one of which could be slow. libocr will log a warning if + // Observation takes longer than MaxDurationObservation, so we + // limit the call to Observe to 25ms less than that, to allow some + // headroom for serialization and other operations. + maxDurationObserve := p.MaxDurationObservation - 25*time.Millisecond + if maxDurationObserve < 10*time.Millisecond { + // Don't ever allow LESS than 10ms for Observe even if it would + // log a warning + maxDurationObserve = 10 * time.Millisecond + } + observationCtx, cancel := context.WithTimeout(ctx, maxDurationObserve) + defer cancel() + if err = p.DataSource.Observe(observationCtx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { return nil, fmt.Errorf("DataSource.Observe error: %w", err) } }