Skip to content

Commit

Permalink
Support median aggregation over Quote type (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav authored Nov 12, 2024
1 parent 7c212aa commit 74f19f4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
2 changes: 2 additions & 0 deletions llo/aggregators.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func MedianAggregator(values []StreamValue, f int) (StreamValue, error) {
switch v := value.(type) {
case *Decimal:
observations = append(observations, v.Decimal())
case *Quote:
observations = append(observations, v.Benchmark)
default:
// Unexpected type, skip
continue
Expand Down
18 changes: 17 additions & 1 deletion llo/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,29 @@ func Test_MedianAggregator(t *testing.T) {
assert.Equal(t, "3.3", sv.(*Decimal).String())
})

t.Run("for stream values of type *Quote, uses the Benchmark value", func(t *testing.T) {
mixedValues := []StreamValue{
&Quote{Benchmark: decimal.NewFromFloat(1.1)},
&Quote{Benchmark: decimal.NewFromFloat(4.4)},
&Quote{Benchmark: decimal.NewFromFloat(2.2)},
&Quote{Benchmark: decimal.NewFromFloat(3.3)},
ToDecimal(decimal.NewFromFloat(6.6)),
ToDecimal(decimal.NewFromFloat(5.5)),
}

sv, err := MedianAggregator(mixedValues, f)
require.NoError(t, err)
assert.IsType(t, &Decimal{}, sv)
assert.Equal(t, "4.4", sv.(*Decimal).String())
})

t.Run("fails with fewer than f+1 values", func(t *testing.T) {
_, err := MedianAggregator(values[:2], 3)
assert.EqualError(t, err, "not enough observations to calculate median, expected at least f+1, got 2")
})

t.Run("fails with unsupported StreamValue type", func(t *testing.T) {
_, err := MedianAggregator([]StreamValue{&Quote{}, &Quote{}, &Quote{}}, 1)
_, err := MedianAggregator([]StreamValue{nil, nil, nil}, 1)
assert.EqualError(t, err, "not enough observations to calculate median, expected at least f+1, got 0")
})
}
Expand Down
13 changes: 2 additions & 11 deletions llo/plugin_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,8 @@ func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContex

// NOTE: Timeouts/context cancelations are likely to be rather
// common here, since Observe may have to query 100s of streams,
// any one of which could be slow. libocr will log a warning if
// Observation takes longer than MaxDurationObservation, so we
// limit the call to Observe to 25ms less than that, to allow some
// headroom for serialization and other operations.
maxDurationObserve := p.MaxDurationObservation - 25*time.Millisecond
if maxDurationObserve < 10*time.Millisecond {
// Don't ever allow LESS than 10ms for Observe even if it would
// log a warning
maxDurationObserve = 10 * time.Millisecond
}
observationCtx, cancel := context.WithTimeout(ctx, maxDurationObserve)
// any one of which could be slow.
observationCtx, cancel := context.WithTimeout(ctx, p.MaxDurationObservation)
defer cancel()
if err = p.DataSource.Observe(observationCtx, obs.StreamValues, dsOpts{p.Config.VerboseLogging, outctx, p.ConfigDigest}); err != nil {
return nil, fmt.Errorf("DataSource.Observe error: %w", err)
Expand Down

0 comments on commit 74f19f4

Please sign in to comment.