From f582bce20aa1c71cfbe02957f86327c2e78fe766 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 18 Dec 2024 10:16:14 -0500 Subject: [PATCH] Benchmark on datasource.Observe --- core/services/llo/data_source_test.go | 86 ++++++++++++++++++- core/services/llo/observation_context_test.go | 71 +-------------- 2 files changed, 85 insertions(+), 72 deletions(-) diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 3f9a4b48fbe..bae60b1e281 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -3,6 +3,7 @@ package llo import ( "context" "errors" + "fmt" "math/big" "sync" "testing" @@ -10,13 +11,20 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) @@ -65,9 +73,11 @@ func makeStreamValues() llo.StreamValues { } } -type mockOpts struct{} +type mockOpts struct { + verboseLogging bool +} -func (m *mockOpts) VerboseLogging() bool { return true } +func (m *mockOpts) VerboseLogging() bool { return m.verboseLogging } func (m *mockOpts) SeqNr() uint64 { return 1042 } func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} @@ -209,3 +219,75 @@ func Test_DataSource(t *testing.T) { }) }) } + +func BenchmarkObserve(b *testing.B) { + lggr := logger.TestLogger(b) + ctx := testutils.Context(b) + // can enable/disable verbose logging to test performance here + opts := &mockOpts{verboseLogging: true} + + db := pgtest.NewSqlxDB(b) + bridgesORM := bridges.NewORM(db) + + n := uint32(b.N) + + createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + for i := uint32(0); i < n; i++ { + jobStreamID := streams.StreamID(i) + + jb := job.Job{ + ID: int32(i), + Name: null.StringFrom(fmt.Sprintf("job-%d", i)), + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + ID: int32(i * 100), + DotDagSource: fmt.Sprintf(` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=%d index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, i+n, i+2*n, i+3*n), + }, + } + err := r.Register(jb, nil) + require.NoError(b, err) + } + + ds := newDataSource(lggr, r, NullTelemeter) + vals := make(map[llotypes.StreamID]llo.StreamValue) + for i := uint32(0); i < 4*n; i++ { + vals[llotypes.StreamID(i)] = nil + } + + b.ResetTimer() + err := ds.Observe(ctx, vals, opts) + require.NoError(b, err) +} diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 882661b79af..7599b8cc740 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -274,76 +274,6 @@ result3 -> result3_parse -> multiply3; }) } -func TestObservationContext_Observe_integrationRealPipeline_concurrencyStressTest(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - db := pgtest.NewSqlxDB(t) - bridgesORM := bridges.NewORM(db) - - createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1) - createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1) - - c := clhttptest.NewTestLocalOnlyHTTPClient() - runner := pipeline.NewRunner( - nil, - bridgesORM, - &mockPipelineConfig{}, - &mockBridgeConfig{}, - nil, - nil, - nil, - lggr, - c, - c, - ) - - r := streams.NewRegistry(lggr, runner) - - jobStreamID := streams.StreamID(5) - - jb := job.Job{ - Type: job.Stream, - StreamID: &jobStreamID, - PipelineSpec: &pipeline.Spec{ - DotDagSource: ` -// Benchmark Price -result1 [type=memo value="900.0022"]; -multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal - -result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; -result2_parse [type=jsonparse path="result" streamID=2 index=1]; - -result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; -result3_parse [type=jsonparse path="result"]; -multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal - -result1 -> multiply2; -result2 -> result2_parse; -result3 -> result3_parse -> multiply3; -`, - }, - } - err := r.Register(jb, nil) - require.NoError(t, err) - - telem := &mockTelemeter{} - oc := newObservationContext(r, telem) - opts := llo.DSOpts(nil) - - // concurrency stress test - g, ctx := errgroup.WithContext(ctx) - for i := uint32(0); i < 1000; i++ { - strmID := 1 + i%3 - g.Go(func() error { - _, err := oc.Observe(ctx, strmID, opts) - return err - }) - } - if err := g.Wait(); err != nil { - t.Fatalf("Observation failed: %v", err) - } -} - func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) { ctx := tests.Context(t) lggr := logger.TestLogger(t) @@ -408,6 +338,7 @@ result3 -> result3_parse -> multiply3; opts := llo.DSOpts(nil) // concurrency stress test + t.ResetTimer() g, ctx := errgroup.WithContext(ctx) for i := uint32(0); i < n; i++ { for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} {