diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 3f9a4b48fbe..bae60b1e281 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -3,6 +3,7 @@ package llo import ( "context" "errors" + "fmt" "math/big" "sync" "testing" @@ -10,13 +11,20 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) @@ -65,9 +73,11 @@ func makeStreamValues() llo.StreamValues { } } -type mockOpts struct{} +type mockOpts struct { + verboseLogging bool +} -func (m *mockOpts) VerboseLogging() bool { return true } +func (m *mockOpts) VerboseLogging() bool { return m.verboseLogging } func (m *mockOpts) SeqNr() uint64 { return 1042 } func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} @@ -209,3 +219,75 @@ func Test_DataSource(t *testing.T) { }) }) } + +func BenchmarkObserve(b *testing.B) { + lggr := logger.TestLogger(b) + ctx := testutils.Context(b) + // can enable/disable verbose logging to test performance here + opts := &mockOpts{verboseLogging: true} + + db := pgtest.NewSqlxDB(b) + bridgesORM := bridges.NewORM(db) + + n := uint32(b.N) + + createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + for i := uint32(0); i < n; i++ { + jobStreamID := streams.StreamID(i) + + jb := job.Job{ + ID: int32(i), + Name: null.StringFrom(fmt.Sprintf("job-%d", i)), + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + ID: int32(i * 100), + DotDagSource: fmt.Sprintf(` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=%d index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, i+n, i+2*n, i+3*n), + }, + } + err := r.Register(jb, nil) + require.NoError(b, err) + } + + ds := newDataSource(lggr, r, NullTelemeter) + vals := make(map[llotypes.StreamID]llo.StreamValue) + for i := uint32(0); i < 4*n; i++ { + vals[llotypes.StreamID(i)] = nil + } + + b.ResetTimer() + err := ds.Observe(ctx, vals, opts) + require.NoError(b, err) +} diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 882661b79af..0b8561f357b 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -408,6 +408,7 @@ result3 -> result3_parse -> multiply3; opts := llo.DSOpts(nil) // concurrency stress test + t.ResetTimer() g, ctx := errgroup.WithContext(ctx) for i := uint32(0); i < n; i++ { for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} {