Skip to content

Commit

Permalink
Benchmark on datasource.Observe
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 18, 2024
1 parent b47f956 commit 446fb9c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
86 changes: 84 additions & 2 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ package llo
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"testing"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)
Expand Down Expand Up @@ -65,9 +73,11 @@ func makeStreamValues() llo.StreamValues {
}
}

type mockOpts struct{}
type mockOpts struct {
verboseLogging bool
}

func (m *mockOpts) VerboseLogging() bool { return true }
func (m *mockOpts) VerboseLogging() bool { return m.verboseLogging }
func (m *mockOpts) SeqNr() uint64 { return 1042 }
func (m *mockOpts) OutCtx() ocr3types.OutcomeContext {
return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))}
Expand Down Expand Up @@ -209,3 +219,75 @@ func Test_DataSource(t *testing.T) {
})
})
}

func BenchmarkObserve(b *testing.B) {
lggr := logger.TestLogger(b)
ctx := testutils.Context(b)
// can enable/disable verbose logging to test performance here
opts := &mockOpts{verboseLogging: true}

db := pgtest.NewSqlxDB(b)
bridgesORM := bridges.NewORM(db)

n := uint32(b.N)

createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0)
createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0)

c := clhttptest.NewTestLocalOnlyHTTPClient()
runner := pipeline.NewRunner(
nil,
bridgesORM,
&mockPipelineConfig{},
&mockBridgeConfig{},
nil,
nil,
nil,
lggr,
c,
c,
)

r := streams.NewRegistry(lggr, runner)
for i := uint32(0); i < n; i++ {
jobStreamID := streams.StreamID(i)

jb := job.Job{
ID: int32(i),
Name: null.StringFrom(fmt.Sprintf("job-%d", i)),
Type: job.Stream,
StreamID: &jobStreamID,
PipelineSpec: &pipeline.Spec{
ID: int32(i * 100),
DotDagSource: fmt.Sprintf(`
// Benchmark Price
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=%d index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`, i+n, i+2*n, i+3*n),
},
}
err := r.Register(jb, nil)
require.NoError(b, err)
}

ds := newDataSource(lggr, r, NullTelemeter)
vals := make(map[llotypes.StreamID]llo.StreamValue)
for i := uint32(0); i < 4*n; i++ {
vals[llotypes.StreamID(i)] = nil
}

b.ResetTimer()
err := ds.Observe(ctx, vals, opts)
require.NoError(b, err)
}
1 change: 1 addition & 0 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ result3 -> result3_parse -> multiply3;
opts := llo.DSOpts(nil)

// concurrency stress test
t.ResetTimer()
g, ctx := errgroup.WithContext(ctx)
for i := uint32(0); i < n; i++ {
for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} {
Expand Down

0 comments on commit 446fb9c

Please sign in to comment.