Skip to content

Commit

Permalink
Add concurrency stress tests
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 17, 2024
1 parent f9e6c10 commit ec1033b
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 9 deletions.
8 changes: 4 additions & 4 deletions core/services/llo/observation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ func (e MissingStreamError) Error() string {
}

func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) {
strm, exists := oc.r.Get(streamID)
p, exists := oc.r.Get(streamID)
if !exists {
return nil, nil, MissingStreamError{StreamID: streamID}
}

// In case of multiple streamIDs per pipeline then the
// first call executes and the others wait for result
oc.executionsMu.Lock()
ex, isExecuting := oc.executions[strm]
ex, isExecuting := oc.executions[p]
if isExecuting {
oc.executionsMu.Unlock()
// wait for it to finish
Expand All @@ -180,10 +180,10 @@ func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID
// execute here
ch := make(chan struct{})
ex = &execution{done: ch}
oc.executions[strm] = ex
oc.executions[p] = ex
oc.executionsMu.Unlock()

run, trrs, err := strm.Run(ctx)
run, trrs, err := p.Run(ctx)
ex.run = run
ex.trrs = trrs
ex.err = err
Expand Down
107 changes: 102 additions & 5 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -119,6 +120,31 @@ func TestObservationContext_Observe(t *testing.T) {
})
}

func TestObservationContext_Observe_concurrencyStressTest(t *testing.T) {
ctx := tests.Context(t)
r := &mockRegistry{}
telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
opts := llo.DSOpts(nil)

streamID := streams.StreamID(1)
val := decimal.NewFromFloat(123.456)

// observes the same pipeline 1000 times to try and detect races etc
r.pipelines = make(map[streams.StreamID]*mockPipeline)
r.pipelines[streamID] = makePipelineWithSingleResult[decimal.Decimal](0, val, nil)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < 1000; i++ {
g.Go(func() error {
_, err := oc.Observe(ctx, streamID, opts)
return err
})
}
if err := g.Wait(); err != nil {
t.Fatalf("Observation failed: %v", err)
}
}

type mockPipelineConfig struct{}

func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 }
Expand All @@ -139,12 +165,12 @@ func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration {
return 0
}

func createBridge(t *testing.T, name string, val string, borm bridges.ORM) {
func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCalls int) {
callcount := 0
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
callcount++
if callcount > 1 {
t.Fatal("expected only one call to the bridge")
if callcount > maxCalls {
panic(fmt.Sprintf("too many calls to bridge %s", name))
}
_, herr := io.ReadAll(req.Body)
if herr != nil {
Expand Down Expand Up @@ -172,8 +198,8 @@ func TestObservationContext_Observe_integrationRealPipeline(t *testing.T) {
db := pgtest.NewSqlxDB(t)
bridgesORM := bridges.NewORM(db)

createBridge(t, "foo-bridge", `123.456`, bridgesORM)
createBridge(t, "bar-bridge", `"124.456"`, bridgesORM)
createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1)
createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1)

c := clhttptest.NewTestLocalOnlyHTTPClient()
runner := pipeline.NewRunner(
Expand Down Expand Up @@ -242,3 +268,74 @@ result3 -> result3_parse -> multiply3;
}, val.(*llo.Quote))
})
}

func TestObservationContext_Observe_integrationRealPipeline_concurrencyStressTest(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)
db := pgtest.NewSqlxDB(t)
bridgesORM := bridges.NewORM(db)

createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1)
createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1)

c := clhttptest.NewTestLocalOnlyHTTPClient()
runner := pipeline.NewRunner(
nil,
bridgesORM,
&mockPipelineConfig{},
&mockBridgeConfig{},
nil,
nil,
nil,
lggr,
c,
c,
)

r := streams.NewRegistry(lggr, runner)

jobStreamID := streams.StreamID(5)

jb := job.Job{
Type: job.Stream,
StreamID: &jobStreamID,
PipelineSpec: &pipeline.Spec{
DotDagSource: `
// Benchmark Price
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=2 index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`,
},
}
err := r.Register(jb, nil)
require.NoError(t, err)

telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
opts := llo.DSOpts(nil)

// concurrency stress test
oc = newObservationContext(r, telem)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < 1000; i++ {
strmID := streams.StreamID(1 + i%3)
g.Go(func() error {
_, err := oc.Observe(ctx, strmID, opts)
return err
})
}
if err := g.Wait(); err != nil {
t.Fatalf("Observation failed: %v", err)
}
}

0 comments on commit ec1033b

Please sign in to comment.