From 470253bde52c730bea6a224774a16858d64e03c1 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 11 Nov 2024 11:52:36 -0500 Subject: [PATCH] Deadline on calls to Observe --- llo/plugin.go | 4 ++++ llo/plugin_observation.go | 14 +++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/llo/plugin.go b/llo/plugin.go index 913f0ad..3e29423 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -3,6 +3,7 @@ package llo import ( "context" "fmt" + "time" "github.com/smartcontractkit/libocr/quorumhelper" @@ -233,6 +234,7 @@ func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.Re protoOutcomeCodec{}, f.RetirementReportCodec, f.ReportCodecs, + cfg.MaxDurationObservation, }, ocr3types.ReportingPluginInfo{ Name: "LLO", Limits: ocr3types.ReportingPluginLimits{ @@ -269,6 +271,8 @@ type Plugin struct { OutcomeCodec OutcomeCodec RetirementReportCodec RetirementReportCodec ReportCodecs map[llotypes.ReportFormat]ReportCodec + + MaxDurationObservation time.Duration } // Query creates a Query that is sent from the leader to all follower nodes diff --git a/llo/plugin_observation.go b/llo/plugin_observation.go index d2ae6fc..e25a126 100644 --- a/llo/plugin_observation.go +++ b/llo/plugin_observation.go @@ -142,7 +142,19 @@ func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContex } } - if err = p.DataSource.Observe(ctx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { + // NOTE: Timeouts/context cancelations are likely to be rather + // common here, since Observe may have to query 100s of streams, + // any one of which could be slow. libocr will log a warning if + // Observation takes longer than MaxDurationObservation, so we + // limit the call to Observe to 25ms less than that, to allow some + // headroom for serialization and other operations. + maxDurationObserve := p.MaxDurationObservation - 25*time.Millisecond + if maxDurationObserve < 0 { + maxDurationObserve = 0 + } + observationCtx, cancel := context.WithTimeout(ctx, maxDurationObserve) + defer cancel() + if err = p.DataSource.Observe(observationCtx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil { return nil, fmt.Errorf("DataSource.Observe error: %w", err) } }