From 71898e2e415c7d1fbba6ca5561acc166d6b8c678 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 9 Dec 2024 11:58:00 -0500 Subject: [PATCH 01/11] Multistream specs --- .changeset/rotten-books-cross.md | 5 + core/services/llo/data_source.go | 94 +------ core/services/llo/data_source_test.go | 75 ++++-- core/services/llo/observation_context.go | 193 ++++++++++++++ core/services/llo/observation_context_test.go | 240 ++++++++++++++++++ core/services/pipeline/common.go | 1 + core/services/pipeline/task.base.go | 9 + core/services/streams/delegate.go | 37 ++- core/services/streams/pipeline.go | 108 ++++++++ core/services/streams/stream.go | 94 ------- core/services/streams/stream_registry.go | 60 +++-- 11 files changed, 695 insertions(+), 221 deletions(-) create mode 100644 .changeset/rotten-books-cross.md create mode 100644 core/services/llo/observation_context.go create mode 100644 core/services/llo/observation_context_test.go create mode 100644 core/services/streams/pipeline.go delete mode 100644 core/services/streams/stream.go diff --git a/.changeset/rotten-books-cross.md b/.changeset/rotten-books-cross.md new file mode 100644 index 00000000000..95231ec47f2 --- /dev/null +++ b/.changeset/rotten-books-cross.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Support multiple streamIDs in stream specs #added diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index 2afe9e090a3..f673e1caf1d 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -2,15 +2,16 @@ package llo import ( "context" + "errors" "fmt" "slices" "sort" + "strconv" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/shopspring/decimal" "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -19,7 +20,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -42,7 +42,7 @@ var ( ) type Registry interface { - Get(streamID streams.StreamID) (strm streams.Stream, exists bool) + Get(streamID streams.StreamID) (p streams.Pipeline, exists bool) } type ErrObservationFailed struct { @@ -109,43 +109,24 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) var errs []ErrObservationFailed + // oc only lives for the duration of this Observe call + oc := NewObservationContext(d.registry, d.t) + for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { defer wg.Done() - - var val llo.StreamValue - - stream, exists := d.registry.Get(streamID) - if !exists { - mu.Lock() - errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)}) - mu.Unlock() - promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() - return - } - run, trrs, err := stream.Run(ctx) - if err != nil { - mu.Lock() - errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"}) - mu.Unlock() - promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() - // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet - // https://smartcontract-it.atlassian.net/browse/MERC-6290 - d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err) - return - } - // TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet - // https://smartcontract-it.atlassian.net/browse/MERC-6290 - val, err = ExtractStreamValue(trrs) + val, err := oc.Observe(ctx, streamID, opts) if err != nil { + if errors.As(err, &ErrMissingStream{}) { + promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() + } + promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc() mu.Lock() - errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"}) + errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) mu.Unlock() return } - d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) - mu.Lock() defer mu.Unlock() @@ -186,54 +167,3 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, return nil } - -// ExtractStreamValue extracts a StreamValue from a TaskRunResults -func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) { - // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed - // by the pipeline executor - finaltrrs := trrs.Terminals() - - // HACK: Right now we rely on the number of outputs to determine whether - // its a Decimal or a Quote. - // This isn't very robust or future-proof but is sufficient to support v0.3 - // compat. - // There are a number of different possible ways to solve this in future. - // See: https://smartcontract-it.atlassian.net/browse/MERC-5934 - switch len(finaltrrs) { - case 1: - res := finaltrrs[0].Result - if res.Error != nil { - return nil, res.Error - } - val, err := toDecimal(res.Value) - if err != nil { - return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err) - } - return llo.ToDecimal(val), nil - case 3: - // Expect ordering of Benchmark, Bid, Ask - results := make([]decimal.Decimal, 3) - for i, trr := range finaltrrs { - res := trr.Result - if res.Error != nil { - return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error) - } - val, err := toDecimal(res.Value) - if err != nil { - return nil, fmt.Errorf("failed to parse decimal: %w", err) - } - results[i] = val - } - return &llo.Quote{ - Benchmark: results[0], - Bid: results[1], - Ask: results[2], - }, nil - default: - return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs)) - } -} - -func toDecimal(val interface{}) (decimal.Decimal, error) { - return utils.ToDecimal(val) -} diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 932c4c0c73a..87370d4442a 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -21,27 +21,36 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) -type mockStream struct { +type mockPipeline struct { run *pipeline.Run trrs pipeline.TaskRunResults err error + + streamIDs []streams.StreamID + + runCount int } -func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { +func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + m.runCount++ return m.run, m.trrs, m.err } +func (m *mockPipeline) StreamIDs() []streams.StreamID { + return m.streamIDs +} + type mockRegistry struct { - streams map[streams.StreamID]*mockStream + pipelines map[streams.StreamID]*mockPipeline } -func (m *mockRegistry) Get(streamID streams.StreamID) (strm streams.Stream, exists bool) { - strm, exists = m.streams[streamID] +func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exists bool) { + p, exists = m.pipelines[streamID] return } -func makeStreamWithSingleResult[T any](runID int64, res T, err error) *mockStream { - return &mockStream{ +func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPipeline { + return &mockPipeline{ run: &pipeline.Run{ID: runID}, trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, err: err, @@ -91,7 +100,7 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline. func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) - reg := &mockRegistry{make(map[streams.StreamID]*mockStream)} + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} ds := newDataSource(lggr, reg, NullTelemeter) ctx := testutils.Context(t) opts := &mockOpts{} @@ -105,9 +114,9 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, makeStreamValues(), vals) }) t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) { - reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), nil) - reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, big.NewInt(15), nil) + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil) vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) @@ -120,9 +129,9 @@ func Test_DataSource(t *testing.T) { }, vals) }) t.Run("observes each stream and returns success/errors", func(t *testing.T) { - reg.streams[1] = makeStreamWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded")) - reg.streams[2] = makeStreamWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - reg.streams[3] = makeStreamWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2")) + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded")) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2")) vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) @@ -139,9 +148,9 @@ func Test_DataSource(t *testing.T) { tm := &mockTelemeter{} ds.t = tm - reg.streams[1] = makeStreamWithSingleResult[*big.Int](100, big.NewInt(2181), nil) - reg.streams[2] = makeStreamWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.streams[3] = makeStreamWithSingleResult[*big.Int](102, big.NewInt(15), nil) + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) @@ -166,5 +175,37 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String()) assert.Nil(t, pkt.err) }) + + t.Run("records telemetry for errors", func(t *testing.T) { + tm := &mockTelemeter{} + ds.t = tm + + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) + reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) + + vals := makeStreamValues() + err := ds.Observe(ctx, vals, opts) + assert.NoError(t, err) + + assert.Equal(t, llo.StreamValues{ + 2: llo.ToDecimal(decimal.NewFromInt(40602)), + 1: nil, + 3: nil, + }, vals) + + require.Len(t, tm.v3PremiumLegacyPackets, 3) + m := make(map[int]v3PremiumLegacyPacket) + for _, pkt := range tm.v3PremiumLegacyPackets { + m[int(pkt.run.ID)] = pkt + } + pkt := m[100] + assert.Equal(t, 100, int(pkt.run.ID)) + assert.Len(t, pkt.trrs, 1) + assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, opts, pkt.opts) + assert.Nil(t, pkt.val) + assert.NotNil(t, pkt.err) + }) }) } diff --git a/core/services/llo/observation_context.go b/core/services/llo/observation_context.go new file mode 100644 index 00000000000..4ae4485d9e7 --- /dev/null +++ b/core/services/llo/observation_context.go @@ -0,0 +1,193 @@ +package llo + +import ( + "context" + "fmt" + "sync" + + "github.com/shopspring/decimal" + + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +// ObservationContext ensures that each pipeline is only executed once. It is +// intended to be instantiated and used then discarded as part of one +// Observation cycle. Subsequent calls to Observe will return the same cached +// values. + +var _ ObservationContext = (*observationContext)(nil) + +type ObservationContext interface { + Observe(ctx context.Context, streamID streams.StreamID, opts llo.DSOpts) (val llo.StreamValue, err error) +} + +type execution struct { + done <-chan struct{} + + run *pipeline.Run + trrs pipeline.TaskRunResults + err error +} + +type observationContext struct { + r Registry + t Telemeter + + executionsMu sync.Mutex + // only execute each pipeline once + executions map[streams.Pipeline]*execution +} + +func NewObservationContext(r Registry, t Telemeter) ObservationContext { + return newObservationContext(r, t) +} + +func newObservationContext(r Registry, t Telemeter) *observationContext { + return &observationContext{r, t, sync.Mutex{}, make(map[streams.Pipeline]*execution)} +} + +func (oc *observationContext) Observe(ctx context.Context, streamID streams.StreamID, opts llo.DSOpts) (val llo.StreamValue, err error) { + run, trrs, err := oc.run(ctx, streamID) + if err != nil { + // FIXME: This is a hack specific for V3 telemetry, future schemas should + // use a generic stream value telemetry instead + // https://smartcontract-it.atlassian.net/browse/MERC-6290 + oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err) + return nil, err + } + // Extract stream value based on streamID attribute + for _, trr := range trrs { + if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID { + val, err := resultToStreamValue(trr.Result.Value) + if err != nil { + return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err) + } + return val, nil + } + } + // If no streamID attribute is found in the task results, then assume the + // final output is the stream ID and return that. This is safe to do since + // the registry will never return a spec that doesn't match either by tag + // or by spec streamID. + + val, err = extractFinalResultAsStreamValue(trrs) + // FIXME: This is a hack specific for V3 telemetry, future schemas should + // use a generic stream value telemetry instead + // https://smartcontract-it.atlassian.net/browse/MERC-6290 + oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err) + return +} + +func resultToStreamValue(val interface{}) (llo.StreamValue, error) { + switch v := val.(type) { + case decimal.Decimal: + return llo.ToDecimal(v), nil + case float64: + return llo.ToDecimal(decimal.NewFromFloat(v)), nil + case pipeline.ObjectParam: + switch v.Type { + case pipeline.DecimalType: + return llo.ToDecimal(decimal.Decimal(v.DecimalValue)), nil + default: + return nil, fmt.Errorf("don't know how to convert pipeline.ObjectParam with type %d to llo.StreamValue", v.Type) + } + default: + return nil, fmt.Errorf("don't know how to convert pipeline output result of type %T to llo.StreamValue (got: %v)", val, val) + } +} + +// extractFinalResultAsStreamValue extracts a final StreamValue from a TaskRunResults +func extractFinalResultAsStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) { + // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed + // by the pipeline executor + finaltrrs := trrs.Terminals() + + // HACK: Right now we rely on the number of outputs to determine whether + // its a Decimal or a Quote. + // This is a hack to support the legacy "Quote" case. + // Future stream specs should use streamID tags instead. + switch len(finaltrrs) { + case 1: + res := finaltrrs[0].Result + if res.Error != nil { + return nil, res.Error + } + val, err := toDecimal(res.Value) + if err != nil { + return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err) + } + return llo.ToDecimal(val), nil + case 3: + // Expect ordering of Benchmark, Bid, Ask + results := make([]decimal.Decimal, 3) + for i, trr := range finaltrrs { + res := trr.Result + if res.Error != nil { + return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error) + } + val, err := toDecimal(res.Value) + if err != nil { + return nil, fmt.Errorf("failed to parse decimal: %w", err) + } + results[i] = val + } + return &llo.Quote{ + Benchmark: results[0], + Bid: results[1], + Ask: results[2], + }, nil + default: + return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs)) + } +} + +func toDecimal(val interface{}) (decimal.Decimal, error) { + return utils.ToDecimal(val) +} + +type ErrMissingStream struct { + StreamID streams.StreamID +} + +func (e ErrMissingStream) Error() string { + return fmt.Sprintf("no pipeline for stream: %d", e.StreamID) +} + +func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) { + strm, exists := oc.r.Get(streamID) + if !exists { + return nil, nil, ErrMissingStream{StreamID: streamID} + } + + // In case of multiple streamIDs per pipeline then the + // first call executes and the others wait for result + oc.executionsMu.Lock() + ex, isExecuting := oc.executions[strm] + if isExecuting { + oc.executionsMu.Unlock() + // wait for it to finish + select { + case <-ex.done: + return ex.run, ex.trrs, ex.err + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } + + // execute here + ch := make(chan struct{}) + ex = &execution{done: ch} + oc.executions[strm] = ex + oc.executionsMu.Unlock() + + run, trrs, err := strm.Run(ctx) + ex.run = run + ex.trrs = trrs + ex.err = err + close(ch) + + return run, trrs, err +} diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go new file mode 100644 index 00000000000..6b00766c87f --- /dev/null +++ b/core/services/llo/observation_context_test.go @@ -0,0 +1,240 @@ +package llo + +import ( + "errors" + "fmt" + "io" + "math/rand/v2" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" + clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/null" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" + "github.com/smartcontractkit/chainlink/v2/core/store/models" +) + +func makeErroringPipeline() *mockPipeline { + return &mockPipeline{ + err: errors.New("pipeline error"), + } +} + +func makePipelineWithMultipleStreamResults(streamIDs []streams.StreamID, results []interface{}) *mockPipeline { + if len(streamIDs) != len(results) { + panic("streamIDs and results must have the same length") + } + trrs := make([]pipeline.TaskRunResult, len(streamIDs)) + for i, res := range results { + trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: null.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}} + } + return &mockPipeline{ + run: &pipeline.Run{}, + trrs: trrs, + err: nil, + streamIDs: streamIDs, + } +} + +func TestObservationContext_Observe(t *testing.T) { + ctx := tests.Context(t) + r := &mockRegistry{} + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + missingStreamID := streams.StreamID(0) + streamID1 := streams.StreamID(1) + streamID2 := streams.StreamID(2) + streamID3 := streams.StreamID(3) + streamID4 := streams.StreamID(4) + streamID5 := streams.StreamID(5) + streamID6 := streams.StreamID(6) + + multiPipelineDecimal := makePipelineWithMultipleStreamResults([]streams.StreamID{streamID4, streamID5, streamID6}, []interface{}{decimal.NewFromFloat(12.34), decimal.NewFromFloat(56.78), decimal.NewFromFloat(90.12)}) + + r.pipelines = map[streams.StreamID]*mockPipeline{ + streamID1: &mockPipeline{}, + streamID2: makePipelineWithSingleResult[decimal.Decimal](rand.Int64(), decimal.NewFromFloat(12.34), nil), + streamID3: makeErroringPipeline(), + streamID4: multiPipelineDecimal, + streamID5: multiPipelineDecimal, + streamID6: multiPipelineDecimal, + } + + t.Run("returns error in case of missing pipeline", func(t *testing.T) { + _, err := oc.Observe(ctx, missingStreamID, opts) + require.EqualError(t, err, "no pipeline for stream: 0") + }) + t.Run("returns error in case of zero results", func(t *testing.T) { + _, err := oc.Observe(ctx, streamID1, opts) + require.EqualError(t, err, "invalid number of results, expected: 1 or 3, got: 0") + }) + t.Run("returns composite value from legacy job with single top-level streamID", func(t *testing.T) { + val, err := oc.Observe(ctx, streamID2, opts) + require.NoError(t, err) + + assert.Equal(t, "12.34", val.(*llo.Decimal).String()) + }) + t.Run("returns error in case of erroring pipeline", func(t *testing.T) { + _, err := oc.Observe(ctx, streamID3, opts) + require.EqualError(t, err, "pipeline error") + }) + t.Run("returns values for multiple stream IDs within the same job based on streamID tag with a single pipeline execution", func(t *testing.T) { + val, err := oc.Observe(ctx, streamID4, opts) + require.NoError(t, err) + assert.Equal(t, "12.34", val.(*llo.Decimal).String()) + + val, err = oc.Observe(ctx, streamID5, opts) + require.NoError(t, err) + assert.Equal(t, "56.78", val.(*llo.Decimal).String()) + + val, err = oc.Observe(ctx, streamID6, opts) + require.NoError(t, err) + assert.Equal(t, "90.12", val.(*llo.Decimal).String()) + + assert.Equal(t, 1, multiPipelineDecimal.runCount) + + // returns cached values on subsequent calls + val, err = oc.Observe(ctx, streamID6, opts) + require.NoError(t, err) + assert.Equal(t, "90.12", val.(*llo.Decimal).String()) + + assert.Equal(t, 1, multiPipelineDecimal.runCount) + }) +} + +type mockPipelineConfig struct{} + +func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 } +func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration { + return *commonconfig.MustNewDuration(time.Duration(1 * time.Hour)) +} +func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour } +func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 } +func (m *mockPipelineConfig) ReaperThreshold() time.Duration { return 0 } +func (m *mockPipelineConfig) VerboseLogging() bool { return true } + +type mockBridgeConfig struct{} + +func (m *mockBridgeConfig) BridgeResponseURL() *url.URL { + return nil +} +func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration { + return 0 +} + +func createBridge(t *testing.T, name string, val string, borm bridges.ORM) { + callcount := 0 + bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + callcount++ + if callcount > 1 { + t.Fatal("expected only one call to the bridge") + } + _, herr := io.ReadAll(req.Body) + require.NoError(t, herr) + + res.WriteHeader(http.StatusOK) + resp := fmt.Sprintf(`{"result": %s}`, val) + _, herr = res.Write([]byte(resp)) + require.NoError(t, herr) + })) + t.Cleanup(bridge.Close) + u, _ := url.Parse(bridge.URL) + require.NoError(t, borm.CreateBridgeType(tests.Context(t), &bridges.BridgeType{ + Name: bridges.BridgeName(name), + URL: models.WebURL(*u), + })) +} + +func TestObservationContext_Observe_integrationRealPipeline(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + bridgesORM := bridges.NewORM(db) + + createBridge(t, "foo-bridge", `123.456`, bridgesORM) + createBridge(t, "bar-bridge", `"124.456"`, bridgesORM) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + + jobStreamID := streams.StreamID(5) + + t.Run("using only streamID attributes", func(t *testing.T) { + jb := job.Job{ + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + DotDagSource: ` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=2 index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, + }, + } + err := r.Register(jb, nil) + require.NoError(t, err) + + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + val, err := oc.Observe(ctx, streams.StreamID(1), opts) + require.NoError(t, err) + assert.Equal(t, "900.0022", val.(*llo.Decimal).String()) + val, err = oc.Observe(ctx, streams.StreamID(2), opts) + require.NoError(t, err) + assert.Equal(t, "123.456", val.(*llo.Decimal).String()) + val, err = oc.Observe(ctx, streams.StreamID(3), opts) + require.NoError(t, err) + assert.Equal(t, "124.456", val.(*llo.Decimal).String()) + + val, err = oc.Observe(ctx, jobStreamID, opts) + require.NoError(t, err) + assert.Equal(t, &llo.Quote{ + Bid: decimal.NewFromFloat32(123.456), + Benchmark: decimal.NewFromFloat32(900.0022), + Ask: decimal.NewFromFloat32(124.456), + }, val.(*llo.Quote)) + }) +} diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 50611ee32a4..56af199078a 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -61,6 +61,7 @@ type ( TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration TaskTags() string + TaskStreamID() *uint32 GetDescendantTasks() []Task } diff --git a/core/services/pipeline/task.base.go b/core/services/pipeline/task.base.go index 3e1db5fcdb5..fdedb69193e 100644 --- a/core/services/pipeline/task.base.go +++ b/core/services/pipeline/task.base.go @@ -24,6 +24,8 @@ type BaseTask struct { Tags string `mapstructure:"tags" json:"-"` + StreamID null.Uint32 `mapstructure:"streamID"` + uuid uuid.UUID } @@ -84,6 +86,13 @@ func (t BaseTask) TaskTags() string { return t.Tags } +func (t BaseTask) TaskStreamID() *uint32 { + if t.StreamID.Valid { + return &t.StreamID.Uint32 + } + return nil +} + // GetDescendantTasks retrieves all descendant tasks of a given task func (t BaseTask) GetDescendantTasks() []Task { if len(t.outputs) == 0 { diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index bf492d4bd15..2f62a7bf1f4 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -52,8 +52,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) services = append(services, rrs, &StreamService{ d.registry, - id, - jb.PipelineSpec, + jb, lggr, rrs, }) @@ -66,23 +65,22 @@ type ResultRunSaver interface { type StreamService struct { registry Registry - id StreamID - spec *pipeline.Spec + jb job.Job lggr logger.Logger rrs ResultRunSaver } func (s *StreamService) Start(_ context.Context) error { - if s.spec == nil { - return fmt.Errorf("pipeline spec unexpectedly missing for stream %q", s.id) + if s.jb.PipelineSpec == nil { + return errors.New("pipeline spec unexpectedly missing for stream") } - s.lggr.Debugf("Starting stream %d", s.id) - return s.registry.Register(s.id, *s.spec, s.rrs) + s.lggr.Debugw("Registering stream", "jobID", s.jb.ID) + return s.registry.Register(s.jb, s.rrs) } func (s *StreamService) Close() error { - s.lggr.Debugf("Stopping stream %d", s.id) - s.registry.Unregister(s.id) + s.lggr.Debugw("Unregistering stream", "jobID", s.jb.ID) + s.registry.Unregister(s.jb.ID) return nil } @@ -101,8 +99,23 @@ func ValidatedStreamSpec(tomlString string) (job.Job, error) { return jb, errors.Errorf("unsupported type: %q", jb.Type) } - if jb.StreamID == nil { - return jb, errors.New("jobs of type 'stream' require streamID to be specified") + // The spec stream ID is optional, but if provided represents the final output of the pipeline run. + // nodes in the DAG may also contain streamID tags. + // Every spec must have at least one streamID. + var streamIDs []StreamID + + if jb.StreamID != nil { + streamIDs = append(streamIDs, *jb.StreamID) + } + + for _, t := range jb.Pipeline.Tasks { + if streamID := t.TaskStreamID(); streamID != nil { + streamIDs = append(streamIDs, *streamID) + } + } + + if len(streamIDs) == 0 { + return jb, errors.New("no streamID found in spec (must be either specified as top-level key 'streamID' or at least one streamID tag must be provided in the pipeline)") } return jb, nil diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go new file mode 100644 index 00000000000..bead12d96b8 --- /dev/null +++ b/core/services/streams/pipeline.go @@ -0,0 +1,108 @@ +package streams + +import ( + "context" + "fmt" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +type Runner interface { + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + InitializePipeline(spec pipeline.Spec) (*pipeline.Pipeline, error) +} + +type RunResultSaver interface { + Save(run *pipeline.Run) +} + +type Pipeline interface { + Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) + StreamIDs() []StreamID +} + +type multiStreamPipeline struct { + sync.RWMutex + lggr logger.Logger + spec pipeline.Spec + runner Runner + rrs RunResultSaver + streamIDs []StreamID + vars pipeline.Vars +} + +func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (Pipeline, error) { + return newMultiStreamPipeline(lggr, jb, runner, rrs) +} + +func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (*multiStreamPipeline, error) { + if jb.PipelineSpec == nil { + // should never happen + return nil, fmt.Errorf("job has no pipeline spec") + } + spec := *jb.PipelineSpec + if spec.Pipeline == nil { + pipeline, err := spec.ParsePipeline() + if err != nil { + return nil, fmt.Errorf("unparseable pipeline: %w", err) + } + + spec.Pipeline = pipeline + // initialize it for the given runner + if _, err := runner.InitializePipeline(spec); err != nil { + return nil, fmt.Errorf("error while initializing pipeline: %w", err) + } + } + var streamIDs []StreamID + for _, t := range spec.Pipeline.Tasks { + if t.TaskStreamID() != nil { + streamIDs = append(streamIDs, *t.TaskStreamID()) + } + } + if jb.StreamID != nil { + streamIDs = append(streamIDs, *jb.StreamID) + } + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "pipelineSpec": map[string]interface{}{ + "id": jb.PipelineSpecID, + }, + "jb": map[string]interface{}{ + "databaseID": jb.ID, + "externalJobID": jb.ExternalJobID, + "name": jb.Name.ValueOrZero(), + }, + }) + + return &multiStreamPipeline{sync.RWMutex{}, lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), spec, runner, rrs, streamIDs, vars}, nil +} + +func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { + run, trrs, err = s.executeRun(ctx) + + if err != nil { + return nil, nil, fmt.Errorf("Run failed: %w", err) + } + if s.rrs != nil { + s.rrs.Save(run) + } + + return +} + +func (s *multiStreamPipeline) StreamIDs() []StreamID { + return s.streamIDs +} + +// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). +// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. +func (s *multiStreamPipeline) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, s.vars) + if err != nil { + return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) + } + + return run, trrs, err +} diff --git a/core/services/streams/stream.go b/core/services/streams/stream.go deleted file mode 100644 index b65c6dc12f6..00000000000 --- a/core/services/streams/stream.go +++ /dev/null @@ -1,94 +0,0 @@ -package streams - -import ( - "context" - "fmt" - "sync" - - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" -) - -type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) - InitializePipeline(spec pipeline.Spec) (*pipeline.Pipeline, error) -} - -type RunResultSaver interface { - Save(run *pipeline.Run) -} - -type Stream interface { - Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) -} - -type stream struct { - sync.RWMutex - id StreamID - lggr logger.Logger - spec *pipeline.Spec - runner Runner - rrs RunResultSaver -} - -func NewStream(lggr logger.Logger, id StreamID, spec pipeline.Spec, runner Runner, rrs RunResultSaver) Stream { - return newStream(lggr, id, spec, runner, rrs) -} - -func newStream(lggr logger.Logger, id StreamID, spec pipeline.Spec, runner Runner, rrs RunResultSaver) *stream { - return &stream{sync.RWMutex{}, id, lggr.Named("Stream").With("streamID", id), &spec, runner, rrs} -} - -func (s *stream) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { - run, trrs, err = s.executeRun(ctx) - - if err != nil { - return nil, nil, fmt.Errorf("Run failed: %w", err) - } - if s.rrs != nil { - s.rrs.Save(run) - } - - return -} - -// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). -// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { - // the hot path here is to avoid parsing and use the pre-parsed, cached, pipeline - s.RLock() - initialize := s.spec.Pipeline == nil - s.RUnlock() - if initialize { - pipeline, err := s.spec.ParsePipeline() - if err != nil { - return nil, nil, fmt.Errorf("Run failed due to unparseable pipeline: %w", err) - } - - s.Lock() - if s.spec.Pipeline == nil { - s.spec.Pipeline = pipeline - // initialize it for the given runner - if _, err := s.runner.InitializePipeline(*s.spec); err != nil { - return nil, nil, fmt.Errorf("Run failed due to error while initializing pipeline: %w", err) - } - } - s.Unlock() - } - - vars := pipeline.NewVarsFrom(map[string]interface{}{ - "pipelineSpec": map[string]interface{}{ - "id": s.spec.ID, - }, - "stream": map[string]interface{}{ - "id": s.id, - }, - }) - - run, trrs, err := s.runner.ExecuteRun(ctx, *s.spec, vars) - if err != nil { - return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) - } - - return run, trrs, err -} diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 9ab2df11d33..3fadd0bac12 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -7,27 +7,32 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/job" ) +// TODO: Rename, this is actually a PipelineRegistry (? is it ?) + // alias for easier refactoring type StreamID = llo.StreamID type Registry interface { Getter - Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error - Unregister(streamID StreamID) + Register(jb job.Job, rrs ResultRunSaver) error + Unregister(jobID int32) } type Getter interface { - Get(streamID StreamID) (strm Stream, exists bool) + Get(streamID StreamID) (p Pipeline, exists bool) } type streamRegistry struct { sync.RWMutex - lggr logger.Logger - runner Runner - streams map[StreamID]Stream + lggr logger.Logger + runner Runner + // keyed by stream ID + pipelines map[StreamID]Pipeline + // keyed by job ID + pipelinesByJobID map[int32]Pipeline } func NewRegistry(lggr logger.Logger, runner Runner) Registry { @@ -39,29 +44,52 @@ func newRegistry(lggr logger.Logger, runner Runner) *streamRegistry { sync.RWMutex{}, lggr.Named("Registry"), runner, - make(map[StreamID]Stream), + make(map[StreamID]Pipeline), + make(map[int32]Pipeline), } } -func (s *streamRegistry) Get(streamID StreamID) (strm Stream, exists bool) { +func (s *streamRegistry) Get(streamID StreamID) (p Pipeline, exists bool) { s.RLock() defer s.RUnlock() - strm, exists = s.streams[streamID] + p, exists = s.pipelines[streamID] return } -func (s *streamRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error { +func (s *streamRegistry) Register(jb job.Job, rrs ResultRunSaver) error { + if jb.Type != job.Stream { + return fmt.Errorf("cannot register job type %s; only Stream jobs are supported", jb.Type) + } s.Lock() defer s.Unlock() - if _, exists := s.streams[streamID]; exists { - return fmt.Errorf("stream already registered for id: %d", streamID) + if _, exists := s.pipelinesByJobID[jb.ID]; exists { + return fmt.Errorf("cannot register job with ID: %d; it is already registered", jb.ID) + } + p, err := NewMultiStreamPipeline(s.lggr, jb, s.runner, rrs) + if err != nil { + return fmt.Errorf("cannot register job with ID: %d; %w", jb.ID, err) + } + s.pipelinesByJobID[jb.ID] = p + // FIXME: Naming is so awkward, call it a Multistream or something instead? Or combistream? + streamIDs := p.StreamIDs() + for _, strmID := range streamIDs { + if _, exists := s.pipelines[strmID]; exists { + return fmt.Errorf("cannot register job with ID: %d; stream id %d is already registered", jb.ID, strmID) + } + s.pipelines[strmID] = p } - s.streams[streamID] = NewStream(s.lggr, streamID, spec, s.runner, rrs) return nil } -func (s *streamRegistry) Unregister(streamID StreamID) { +func (s *streamRegistry) Unregister(jobID int32) { s.Lock() defer s.Unlock() - delete(s.streams, streamID) + p, exists := s.pipelinesByJobID[jobID] + if !exists { + return + } + streamIDs := p.StreamIDs() + for _, id := range streamIDs { + delete(s.pipelines, id) + } } From 97cf5891b81e1cc32da5047d595e4ba03d7fbbad Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Fri, 13 Dec 2024 13:47:39 -0500 Subject: [PATCH 02/11] . --- .../relay/evm/mercury/mocks/pipeline.go | 1 + core/services/streams/delegate_test.go | 9 +-- core/services/streams/pipeline.go | 10 ++- core/services/streams/stream_registry.go | 2 - core/services/streams/stream_registry_test.go | 79 ++++++++++++------- 5 files changed, 62 insertions(+), 39 deletions(-) diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index a7183c9a037..1bc14b62c1a 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -41,3 +41,4 @@ func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false } func (m *MockTask) TaskRetries() uint32 { return 0 } func (m *MockTask) TaskMinBackoff() time.Duration { return 0 } func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 } +func (m *MockTask) TaskStreamID() *int32 { return nil } diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go index d177c977e1b..6489a339aa2 100644 --- a/core/services/streams/delegate_test.go +++ b/core/services/streams/delegate_test.go @@ -15,11 +15,11 @@ import ( type mockRegistry struct{} -func (m *mockRegistry) Get(streamID StreamID) (strm Stream, exists bool) { return } -func (m *mockRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error { +func (m *mockRegistry) Get(streamID StreamID) (p Pipeline, exists bool) { return } +func (m *mockRegistry) Register(jb job.Job, rrs ResultRunSaver) error { return nil } -func (m *mockRegistry) Unregister(streamID StreamID) {} +func (m *mockRegistry) Unregister(int32) {} type mockDelegateConfig struct{} @@ -49,8 +49,7 @@ func Test_Delegate(t *testing.T) { strmSrv := srvs[1].(*StreamService) assert.Equal(t, registry, strmSrv.registry) - assert.Equal(t, StreamID(42), strmSrv.id) - assert.Equal(t, jb.PipelineSpec, strmSrv.spec) + assert.Equal(t, jb, strmSrv.jb) assert.NotNil(t, strmSrv.lggr) assert.Equal(t, srvs[0], strmSrv.rrs) }) diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index bead12d96b8..7ea5eb531f3 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -3,7 +3,6 @@ package streams import ( "context" "fmt" - "sync" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" @@ -25,7 +24,6 @@ type Pipeline interface { } type multiStreamPipeline struct { - sync.RWMutex lggr logger.Logger spec pipeline.Spec runner Runner @@ -76,7 +74,13 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R }, }) - return &multiStreamPipeline{sync.RWMutex{}, lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), spec, runner, rrs, streamIDs, vars}, nil + return &multiStreamPipeline{ + lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), + spec, + runner, + rrs, + streamIDs, + vars}, nil } func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 3fadd0bac12..6c76fe101e9 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -10,8 +10,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" ) -// TODO: Rename, this is actually a PipelineRegistry (? is it ?) - // alias for easier refactoring type StreamID = llo.StreamID diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 738b68f5d4d..fa9486728f8 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -5,22 +5,29 @@ import ( "testing" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type mockStream struct { +var _ Pipeline = &mockPipeline{} + +type mockPipeline struct { run *pipeline.Run trrs pipeline.TaskRunResults err error } -func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { +func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { return m.run, m.trrs, m.err } +func (m *mockPipeline) StreamIDs() []StreamID { + return nil +} + func Test_Registry(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} @@ -28,21 +35,21 @@ func Test_Registry(t *testing.T) { t.Run("Get", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}} + sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}} + sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}} v, exists := sr.Get(1) assert.True(t, exists) - assert.Equal(t, sr.streams[1], v) + assert.Equal(t, sr.pipelines[1], v) v, exists = sr.Get(2) assert.True(t, exists) - assert.Equal(t, sr.streams[2], v) + assert.Equal(t, sr.pipelines[2], v) v, exists = sr.Get(3) assert.True(t, exists) - assert.Equal(t, sr.streams[3], v) + assert.Equal(t, sr.pipelines[3], v) v, exists = sr.Get(4) assert.Nil(t, v) @@ -51,56 +58,70 @@ func Test_Registry(t *testing.T) { t.Run("Register", func(t *testing.T) { sr := newRegistry(lggr, runner) - t.Run("registers new stream", func(t *testing.T) { - assert.Len(t, sr.streams, 0) - err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + t.Run("registers new pipeline with multiple stream IDs", func(t *testing.T) { + assert.Len(t, sr.pipelines, 0) + // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) + // TODO: what if the dag is unparseable? + // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: ` +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=2 index=1]; +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`}}, nil) require.NoError(t, err) - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) v, exists := sr.Get(1) require.True(t, exists) - strm := v.(*stream) - assert.Equal(t, StreamID(1), strm.id) - assert.Equal(t, int32(32), strm.spec.ID) + msp := v.(*multiStreamPipeline) + assert.Equal(t, "foo", msp.StreamIDs()) + assert.Equal(t, int32(32), msp.spec.ID) }) t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) { - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) require.Error(t, err) - assert.Len(t, sr.streams, 1) + assert.Len(t, sr.pipelines, 1) assert.EqualError(t, err, "stream already registered for id: 1") v, exists := sr.Get(1) require.True(t, exists) - strm := v.(*stream) - assert.Equal(t, StreamID(1), strm.id) - assert.Equal(t, int32(32), strm.spec.ID) + msp := v.(*multiStreamPipeline) + assert.Equal(t, StreamID(1), msp.id) + assert.Equal(t, int32(32), msp.spec.ID) }) }) t.Run("Unregister", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}} + sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}} + sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}} t.Run("unregisters a stream", func(t *testing.T) { - assert.Len(t, sr.streams, 3) + assert.Len(t, sr.pipelines, 3) sr.Unregister(1) - assert.Len(t, sr.streams, 2) - _, exists := sr.streams[1] + assert.Len(t, sr.pipelines, 2) + _, exists := sr.pipelines[1] assert.False(t, exists) }) t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) { - assert.Len(t, sr.streams, 2) + assert.Len(t, sr.pipelines, 2) sr.Unregister(1) - assert.Len(t, sr.streams, 2) - _, exists := sr.streams[1] + assert.Len(t, sr.pipelines, 2) + _, exists := sr.pipelines[1] assert.False(t, exists) }) }) From a1b15d7f626998c3f04579758c5497c6aad12162 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 16 Dec 2024 13:27:29 -0500 Subject: [PATCH 03/11] w --- .../relay/evm/mercury/mocks/pipeline.go | 2 +- core/services/streams/delegate_test.go | 2 +- core/services/streams/pipeline.go | 14 ++ core/services/streams/stream_registry.go | 15 +- core/services/streams/stream_registry_test.go | 161 ++++++++++++++---- core/services/streams/stream_test.go | 26 +-- 6 files changed, 167 insertions(+), 53 deletions(-) diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index 1bc14b62c1a..429eba66674 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -41,4 +41,4 @@ func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false } func (m *MockTask) TaskRetries() uint32 { return 0 } func (m *MockTask) TaskMinBackoff() time.Duration { return 0 } func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 } -func (m *MockTask) TaskStreamID() *int32 { return nil } +func (m *MockTask) TaskStreamID() *uint32 { return nil } diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go index 6489a339aa2..dfd3da8ca07 100644 --- a/core/services/streams/delegate_test.go +++ b/core/services/streams/delegate_test.go @@ -167,7 +167,7 @@ answer1 [type=median index=0]; """ `, assertion: func(t *testing.T, jb job.Job, err error) { - assert.EqualError(t, err, "jobs of type 'stream' require streamID to be specified") + assert.EqualError(t, err, "no streamID found in spec (must be either specified as top-level key 'streamID' or at least one streamID tag must be provided in the pipeline)") }, }, } diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index 7ea5eb531f3..2dc031a0338 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -63,6 +63,9 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R if jb.StreamID != nil { streamIDs = append(streamIDs, *jb.StreamID) } + if err := validateStreamIDs(streamIDs); err != nil { + return nil, fmt.Errorf("invalid stream IDs: %w", err) + } vars := pipeline.NewVarsFrom(map[string]interface{}{ "pipelineSpec": map[string]interface{}{ "id": jb.PipelineSpecID, @@ -83,6 +86,17 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R vars}, nil } +func validateStreamIDs(streamIDs []StreamID) error { + seen := make(map[StreamID]struct{}) + for _, id := range streamIDs { + if _, ok := seen[id]; ok { + return fmt.Errorf("duplicate stream ID: %v", id) + } + seen[id] = struct{}{} + } + return nil +} + func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { run, trrs, err = s.executeRun(ctx) diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 6c76fe101e9..cf5a59944b7 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -58,22 +58,23 @@ func (s *streamRegistry) Register(jb job.Job, rrs ResultRunSaver) error { if jb.Type != job.Stream { return fmt.Errorf("cannot register job type %s; only Stream jobs are supported", jb.Type) } + p, err := NewMultiStreamPipeline(s.lggr, jb, s.runner, rrs) + if err != nil { + return fmt.Errorf("cannot register job with ID: %d; %w", jb.ID, err) + } s.Lock() defer s.Unlock() if _, exists := s.pipelinesByJobID[jb.ID]; exists { return fmt.Errorf("cannot register job with ID: %d; it is already registered", jb.ID) } - p, err := NewMultiStreamPipeline(s.lggr, jb, s.runner, rrs) - if err != nil { - return fmt.Errorf("cannot register job with ID: %d; %w", jb.ID, err) + for _, strmID := range p.StreamIDs() { + if _, exists := s.pipelines[strmID]; exists { + return fmt.Errorf("cannot register job with ID: %d; stream id %d is already registered", jb.ID, strmID) + } } s.pipelinesByJobID[jb.ID] = p - // FIXME: Naming is so awkward, call it a Multistream or something instead? Or combistream? streamIDs := p.StreamIDs() for _, strmID := range streamIDs { - if _, exists := s.pipelines[strmID]; exists { - return fmt.Errorf("cannot register job with ID: %d; stream id %d is already registered", jb.ID, strmID) - } s.pipelines[strmID] = p } return nil diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index fa9486728f8..c0affc5b230 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -18,6 +18,8 @@ type mockPipeline struct { run *pipeline.Run trrs pipeline.TaskRunResults err error + + streamIDs []StreamID } func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { @@ -25,7 +27,7 @@ func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRun } func (m *mockPipeline) StreamIDs() []StreamID { - return nil + return m.streamIDs } func Test_Registry(t *testing.T) { @@ -58,12 +60,12 @@ func Test_Registry(t *testing.T) { t.Run("Register", func(t *testing.T) { sr := newRegistry(lggr, runner) - t.Run("registers new pipeline with multiple stream IDs", func(t *testing.T) { - assert.Len(t, sr.pipelines, 0) - // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) - // TODO: what if the dag is unparseable? - // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) - err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: ` + // registers new pipeline with multiple stream IDs + assert.Len(t, sr.pipelines, 0) + // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) + // TODO: what if the dag is unparseable? + // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + err := sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: ` result1 [type=memo value="900.0022"]; multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; @@ -75,45 +77,136 @@ result1 -> multiply2; result2 -> result2_parse; result3 -> result3_parse -> multiply3; `}}, nil) - require.NoError(t, err) - assert.Len(t, sr.pipelines, 1) - - v, exists := sr.Get(1) - require.True(t, exists) - msp := v.(*multiStreamPipeline) - assert.Equal(t, "foo", msp.StreamIDs()) - assert.Equal(t, int32(32), msp.spec.ID) - }) + require.NoError(t, err) + assert.Len(t, sr.pipelines, 3) // three streams, one pipeline + assert.Contains(t, sr.pipelines, StreamID(1)) + assert.Contains(t, sr.pipelines, StreamID(2)) + assert.Contains(t, sr.pipelines, StreamID(3)) + p := sr.pipelines[1] + assert.Equal(t, p, sr.pipelines[2]) + assert.Equal(t, p, sr.pipelines[3]) - t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) { - assert.Len(t, sr.pipelines, 1) - err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) - require.Error(t, err) - assert.Len(t, sr.pipelines, 1) - assert.EqualError(t, err, "stream already registered for id: 1") - - v, exists := sr.Get(1) - require.True(t, exists) - msp := v.(*multiStreamPipeline) - assert.Equal(t, StreamID(1), msp.id) - assert.Equal(t, int32(32), msp.spec.ID) - }) + v, exists := sr.Get(1) + require.True(t, exists) + msp := v.(*multiStreamPipeline) + assert.Equal(t, []StreamID{1, 2, 3}, msp.StreamIDs()) + assert.Equal(t, int32(32), msp.spec.ID) + + // errors when attempt to re-register a stream with an existing job ID + err = sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022"]; + `}}, nil) + require.Error(t, err) + assert.EqualError(t, err, "cannot register job with ID: 100; it is already registered") + + // errors when attempt to register a new job with duplicates stream IDs within ig + err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(100)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022" streamID=100]; + `}}, nil) + require.Error(t, err) + assert.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100") + + // errors with unparseable pipeline + err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil) + require.Error(t, err) + assert.EqualError(t, err, "cannot register job with ID: 101; unparseable pipeline: UnmarshalTaskFromMap: unknown task type: \"\"") + + // errors when attempt to re-register a stream with an existing streamID at top-level + err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(3)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=4 index=0]; // force conversion to decimal +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=5 index=1]; +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=6 index=2]; // force conversion to decimal +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`}}, nil) + require.Error(t, err) + assert.EqualError(t, err, "cannot register job with ID: 101; stream id 3 is already registered") + + // errors when attempt to re-register a stream with an existing streamID in DAG + err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=5 index=1]; +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=6 index=2]; // force conversion to decimal +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`}}, nil) + require.Error(t, err) + assert.EqualError(t, err, "cannot register job with ID: 101; stream id 1 is already registered") + + // registers new job with all new stream IDs + err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=5 index=0]; // force conversion to decimal +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=6 index=1]; +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=7 index=2]; // force conversion to decimal +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`}}, nil) + require.NoError(t, err) + + // did not overwrite existing stream + assert.Len(t, sr.pipelines, 7) + assert.Equal(t, p, sr.pipelines[1]) + assert.Equal(t, p, sr.pipelines[2]) + assert.Equal(t, p, sr.pipelines[3]) + p2 := sr.pipelines[4] + assert.NotEqual(t, p, p2) + assert.Equal(t, p2, sr.pipelines[5]) + assert.Equal(t, p2, sr.pipelines[6]) + assert.Equal(t, p2, sr.pipelines[7]) + + v, exists = sr.Get(1) + require.True(t, exists) + msp = v.(*multiStreamPipeline) + assert.ElementsMatch(t, []StreamID{1, 2, 3}, msp.StreamIDs()) + assert.Equal(t, int32(32), msp.spec.ID) + + v, exists = sr.Get(4) + require.True(t, exists) + msp = v.(*multiStreamPipeline) + assert.ElementsMatch(t, []StreamID{4, 5, 6, 7}, msp.StreamIDs()) + assert.Equal(t, int32(33), msp.spec.ID) }) t.Run("Unregister", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}} - sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}} - sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}} + err := sr.Register(job.Job{ID: 100, StreamID: ptr(StreamID(1)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022" streamID=2]; + `}}, nil) + require.NoError(t, err) + err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022" streamID=3]; + `}}, nil) + require.NoError(t, err) + err = sr.Register(job.Job{ID: 102, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` +result1 [type=memo value="900.0022" streamID=4]; + `}}, nil) + require.NoError(t, err) t.Run("unregisters a stream", func(t *testing.T) { - assert.Len(t, sr.pipelines, 3) + assert.Len(t, sr.pipelines, 4) - sr.Unregister(1) + sr.Unregister(100) assert.Len(t, sr.pipelines, 2) _, exists := sr.pipelines[1] assert.False(t, exists) + _, exists = sr.pipelines[2] + assert.False(t, exists) }) t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) { assert.Len(t, sr.pipelines, 2) diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go index 78174138121..0c248d91ae1 100644 --- a/core/services/streams/stream_test.go +++ b/core/services/streams/stream_test.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) @@ -57,24 +58,29 @@ func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 } func Test_Stream(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} - spec := pipeline.Spec{} - id := StreamID(123) ctx := testutils.Context(t) - t.Run("Run", func(t *testing.T) { - strm := newStream(lggr, id, spec, runner, nil) + t.Run("errors with empty pipeline", func(t *testing.T) { + jbInvalid := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: ``}} + _, err := newMultiStreamPipeline(lggr, jbInvalid, runner, nil) + require.EqualError(t, err, "unparseable pipeline: empty pipeline") + }) - t.Run("errors with empty pipeline", func(t *testing.T) { - _, _, err := strm.Run(ctx) - assert.EqualError(t, err, "Run failed: Run failed due to unparseable pipeline: empty pipeline") - }) + jb := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: ` +result1 [type=memo value="900.0022" streamID=124]; + `}} + + t.Run("Run", func(t *testing.T) { + strm, err := newMultiStreamPipeline(lggr, jb, runner, nil) + require.NoError(t, err) - spec.DotDagSource = ` + jb.PipelineSpec.DotDagSource = ` succeed [type=memo value=42] succeed; ` - strm = newStream(lggr, id, spec, runner, nil) + strm, err = newMultiStreamPipeline(lggr, jb, runner, nil) + require.NoError(t, err) t.Run("executes the pipeline (success)", func(t *testing.T) { runner.run = &pipeline.Run{ID: 42} From 58cfa7359ccec654b6cb77a85db9556de1a4ad1d Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 16 Dec 2024 16:30:39 -0500 Subject: [PATCH 04/11] Fix lint --- core/services/llo/data_source.go | 7 ++++--- core/services/llo/data_source_test.go | 4 ++-- core/services/llo/observation_context.go | 8 ++++---- core/services/llo/observation_context_test.go | 10 +++++++--- core/services/streams/pipeline.go | 4 +++- core/services/streams/stream_registry_test.go | 8 +++----- core/services/streams/stream_test.go | 11 ++--------- 7 files changed, 25 insertions(+), 27 deletions(-) diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index f673e1caf1d..855ac7d9940 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -117,10 +117,11 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, defer wg.Done() val, err := oc.Observe(ctx, streamID, opts) if err != nil { - if errors.As(err, &ErrMissingStream{}) { - promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc() + strmIDStr := strconv.FormatUint(uint64(streamID), 10) + if errors.As(err, &MissingStreamError{}) { + promMissingStreamCount.WithLabelValues(strmIDStr).Inc() } - promObservationErrorCount.WithLabelValues(strconv.FormatUint(uint64(streamID), 10)).Inc() + promObservationErrorCount.WithLabelValues(strmIDStr).Inc() mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) mu.Unlock() diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 87370d4442a..3f9a4b48fbe 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -186,7 +186,7 @@ func Test_DataSource(t *testing.T) { vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, llo.StreamValues{ 2: llo.ToDecimal(decimal.NewFromInt(40602)), @@ -205,7 +205,7 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, 1, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Nil(t, pkt.val) - assert.NotNil(t, pkt.err) + assert.Error(t, pkt.err) }) }) } diff --git a/core/services/llo/observation_context.go b/core/services/llo/observation_context.go index 4ae4485d9e7..ab022452629 100644 --- a/core/services/llo/observation_context.go +++ b/core/services/llo/observation_context.go @@ -61,7 +61,7 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre // Extract stream value based on streamID attribute for _, trr := range trrs { if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID { - val, err := resultToStreamValue(trr.Result.Value) + val, err = resultToStreamValue(trr.Result.Value) if err != nil { return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err) } @@ -148,18 +148,18 @@ func toDecimal(val interface{}) (decimal.Decimal, error) { return utils.ToDecimal(val) } -type ErrMissingStream struct { +type MissingStreamError struct { StreamID streams.StreamID } -func (e ErrMissingStream) Error() string { +func (e MissingStreamError) Error() string { return fmt.Sprintf("no pipeline for stream: %d", e.StreamID) } func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) { strm, exists := oc.r.Get(streamID) if !exists { - return nil, nil, ErrMissingStream{StreamID: streamID} + return nil, nil, MissingStreamError{StreamID: streamID} } // In case of multiple streamIDs per pipeline then the diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 6b00766c87f..67af24c2a7b 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -123,7 +123,7 @@ type mockPipelineConfig struct{} func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 } func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration { - return *commonconfig.MustNewDuration(time.Duration(1 * time.Hour)) + return *commonconfig.MustNewDuration(1 * time.Hour) } func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour } func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 } @@ -147,12 +147,16 @@ func createBridge(t *testing.T, name string, val string, borm bridges.ORM) { t.Fatal("expected only one call to the bridge") } _, herr := io.ReadAll(req.Body) - require.NoError(t, herr) + if herr != nil { + t.Fatal(herr) + } res.WriteHeader(http.StatusOK) resp := fmt.Sprintf(`{"result": %s}`, val) _, herr = res.Write([]byte(resp)) - require.NoError(t, herr) + if herr != nil { + t.Fatal(herr) + } })) t.Cleanup(bridge.Close) u, _ := url.Parse(bridge.URL) diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index 2dc031a0338..9fa6c25d36d 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -39,7 +41,7 @@ func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (*multiStreamPipeline, error) { if jb.PipelineSpec == nil { // should never happen - return nil, fmt.Errorf("job has no pipeline spec") + return nil, errors.New("job has no pipeline spec") } spec := *jb.PipelineSpec if spec.Pipeline == nil { diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index c0affc5b230..8a8abfc89b5 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -61,7 +61,7 @@ func Test_Registry(t *testing.T) { sr := newRegistry(lggr, runner) // registers new pipeline with multiple stream IDs - assert.Len(t, sr.pipelines, 0) + assert.Empty(t, sr.pipelines) // err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil) // TODO: what if the dag is unparseable? // err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) @@ -96,15 +96,13 @@ result3 -> result3_parse -> multiply3; err = sr.Register(job.Job{ID: 100, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` result1 [type=memo value="900.0022"]; `}}, nil) - require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 100; it is already registered") + require.EqualError(t, err, "cannot register job with ID: 100; it is already registered") // errors when attempt to register a new job with duplicates stream IDs within ig err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(100)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` result1 [type=memo value="900.0022" streamID=100]; `}}, nil) - require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100") + require.EqualError(t, err, "cannot register job with ID: 101; invalid stream IDs: duplicate stream ID: 100") // errors with unparseable pipeline err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil) diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go index 0c248d91ae1..905d421bddf 100644 --- a/core/services/streams/stream_test.go +++ b/core/services/streams/stream_test.go @@ -67,21 +67,14 @@ func Test_Stream(t *testing.T) { }) jb := job.Job{StreamID: ptr(StreamID(123)), PipelineSpec: &pipeline.Spec{DotDagSource: ` -result1 [type=memo value="900.0022" streamID=124]; +succeed [type=memo value=42 streamID=124]; +succeed; `}} t.Run("Run", func(t *testing.T) { strm, err := newMultiStreamPipeline(lggr, jb, runner, nil) require.NoError(t, err) - jb.PipelineSpec.DotDagSource = ` -succeed [type=memo value=42] -succeed; -` - - strm, err = newMultiStreamPipeline(lggr, jb, runner, nil) - require.NoError(t, err) - t.Run("executes the pipeline (success)", func(t *testing.T) { runner.run = &pipeline.Run{ID: 42} runner.trrs = []pipeline.TaskRunResult{pipeline.TaskRunResult{ID: UUID}} From 65d142280ad0906fed38c3ddd268a3e1e62fb3d8 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 17 Dec 2024 09:01:27 -0500 Subject: [PATCH 05/11] Lint --- core/services/streams/stream_registry_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 8a8abfc89b5..e20b29e9262 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -107,7 +107,7 @@ result1 [type=memo value="900.0022" streamID=100]; // errors with unparseable pipeline err = sr.Register(job.Job{ID: 101, Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: "source"}}, nil) require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 101; unparseable pipeline: UnmarshalTaskFromMap: unknown task type: \"\"") + require.EqualError(t, err, "cannot register job with ID: 101; unparseable pipeline: UnmarshalTaskFromMap: unknown task type: \"\"") // errors when attempt to re-register a stream with an existing streamID at top-level err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(3)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` @@ -123,7 +123,7 @@ result2 -> result2_parse; result3 -> result3_parse -> multiply3; `}}, nil) require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 101; stream id 3 is already registered") + require.EqualError(t, err, "cannot register job with ID: 101; stream id 3 is already registered") // errors when attempt to re-register a stream with an existing streamID in DAG err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` @@ -139,7 +139,7 @@ result2 -> result2_parse; result3 -> result3_parse -> multiply3; `}}, nil) require.Error(t, err) - assert.EqualError(t, err, "cannot register job with ID: 101; stream id 1 is already registered") + require.EqualError(t, err, "cannot register job with ID: 101; stream id 1 is already registered") // registers new job with all new stream IDs err = sr.Register(job.Job{ID: 101, StreamID: ptr(StreamID(4)), Type: job.Stream, PipelineSpec: &pipeline.Spec{ID: 33, DotDagSource: ` From 7e26fcb058912aab60727dd734e32ba6f13c3fc4 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 17 Dec 2024 10:15:04 -0500 Subject: [PATCH 06/11] Add concurrency stress tests --- core/services/llo/observation_context.go | 8 +- core/services/llo/observation_context_test.go | 107 +++++++++++++++++- 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/core/services/llo/observation_context.go b/core/services/llo/observation_context.go index ab022452629..5bf82fa5a79 100644 --- a/core/services/llo/observation_context.go +++ b/core/services/llo/observation_context.go @@ -157,7 +157,7 @@ func (e MissingStreamError) Error() string { } func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID) (*pipeline.Run, pipeline.TaskRunResults, error) { - strm, exists := oc.r.Get(streamID) + p, exists := oc.r.Get(streamID) if !exists { return nil, nil, MissingStreamError{StreamID: streamID} } @@ -165,7 +165,7 @@ func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID // In case of multiple streamIDs per pipeline then the // first call executes and the others wait for result oc.executionsMu.Lock() - ex, isExecuting := oc.executions[strm] + ex, isExecuting := oc.executions[p] if isExecuting { oc.executionsMu.Unlock() // wait for it to finish @@ -180,10 +180,10 @@ func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID // execute here ch := make(chan struct{}) ex = &execution{done: ch} - oc.executions[strm] = ex + oc.executions[p] = ex oc.executionsMu.Unlock() - run, trrs, err := strm.Run(ctx) + run, trrs, err := p.Run(ctx) ex.run = run ex.trrs = trrs ex.err = err diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 67af24c2a7b..1efe3ec7ee9 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -14,6 +14,7 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -119,6 +120,31 @@ func TestObservationContext_Observe(t *testing.T) { }) } +func TestObservationContext_Observe_concurrencyStressTest(t *testing.T) { + ctx := tests.Context(t) + r := &mockRegistry{} + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + streamID := streams.StreamID(1) + val := decimal.NewFromFloat(123.456) + + // observes the same pipeline 1000 times to try and detect races etc + r.pipelines = make(map[streams.StreamID]*mockPipeline) + r.pipelines[streamID] = makePipelineWithSingleResult[decimal.Decimal](0, val, nil) + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < 1000; i++ { + g.Go(func() error { + _, err := oc.Observe(ctx, streamID, opts) + return err + }) + } + if err := g.Wait(); err != nil { + t.Fatalf("Observation failed: %v", err) + } +} + type mockPipelineConfig struct{} func (m *mockPipelineConfig) DefaultHTTPLimit() int64 { return 10000 } @@ -139,12 +165,12 @@ func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration { return 0 } -func createBridge(t *testing.T, name string, val string, borm bridges.ORM) { +func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCalls int) { callcount := 0 bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { callcount++ - if callcount > 1 { - t.Fatal("expected only one call to the bridge") + if callcount > maxCalls { + panic(fmt.Sprintf("too many calls to bridge %s", name)) } _, herr := io.ReadAll(req.Body) if herr != nil { @@ -172,8 +198,8 @@ func TestObservationContext_Observe_integrationRealPipeline(t *testing.T) { db := pgtest.NewSqlxDB(t) bridgesORM := bridges.NewORM(db) - createBridge(t, "foo-bridge", `123.456`, bridgesORM) - createBridge(t, "bar-bridge", `"124.456"`, bridgesORM) + createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1) + createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1) c := clhttptest.NewTestLocalOnlyHTTPClient() runner := pipeline.NewRunner( @@ -242,3 +268,74 @@ result3 -> result3_parse -> multiply3; }, val.(*llo.Quote)) }) } + +func TestObservationContext_Observe_integrationRealPipeline_concurrencyStressTest(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + bridgesORM := bridges.NewORM(db) + + createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1) + createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + + jobStreamID := streams.StreamID(5) + + jb := job.Job{ + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + DotDagSource: ` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=2 index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, + }, + } + err := r.Register(jb, nil) + require.NoError(t, err) + + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + // concurrency stress test + oc = newObservationContext(r, telem) + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < 1000; i++ { + strmID := streams.StreamID(1 + i%3) + g.Go(func() error { + _, err := oc.Observe(ctx, strmID, opts) + return err + }) + } + if err := g.Wait(); err != nil { + t.Fatalf("Observation failed: %v", err) + } +} From 4141159852d231f1db336e11b9b7c1b10d1b4b60 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 17 Dec 2024 10:29:02 -0500 Subject: [PATCH 07/11] Fix lint --- core/services/llo/observation_context_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 1efe3ec7ee9..201931a5164 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -170,7 +170,7 @@ func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCa bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { callcount++ if callcount > maxCalls { - panic(fmt.Sprintf("too many calls to bridge %s", name)) + panic("too many calls to bridge" + name) } _, herr := io.ReadAll(req.Body) if herr != nil { @@ -326,9 +326,8 @@ result3 -> result3_parse -> multiply3; opts := llo.DSOpts(nil) // concurrency stress test - oc = newObservationContext(r, telem) g, ctx := errgroup.WithContext(ctx) - for i := 0; i < 1000; i++ { + for i := uint32(0); i < 1000; i++ { strmID := streams.StreamID(1 + i%3) g.Go(func() error { _, err := oc.Observe(ctx, strmID, opts) From 97fce69f41851cabbed5cfb9965e1d6917485779 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 17 Dec 2024 10:37:42 -0500 Subject: [PATCH 08/11] Lint --- core/services/llo/observation_context_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 201931a5164..b96383a5796 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -328,7 +328,7 @@ result3 -> result3_parse -> multiply3; // concurrency stress test g, ctx := errgroup.WithContext(ctx) for i := uint32(0); i < 1000; i++ { - strmID := streams.StreamID(1 + i%3) + strmID := 1 + i%3 g.Go(func() error { _, err := oc.Observe(ctx, strmID, opts) return err From b47f956f4d65beeaf00a676b6faed2421303741a Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 18 Dec 2024 10:05:14 -0500 Subject: [PATCH 09/11] Add benchmark --- core/services/llo/observation_context_test.go | 102 ++++++++++++++++-- core/services/pipeline/runner.go | 4 +- core/services/streams/pipeline.go | 3 + go.mod | 1 + 4 files changed, 100 insertions(+), 10 deletions(-) diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index b96383a5796..882661b79af 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -14,8 +14,11 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "gopkg.in/guregu/null.v4" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-data-streams/llo" @@ -23,7 +26,7 @@ import ( clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/null" + clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" @@ -42,7 +45,7 @@ func makePipelineWithMultipleStreamResults(streamIDs []streams.StreamID, results } trrs := make([]pipeline.TaskRunResult, len(streamIDs)) for i, res := range results { - trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: null.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}} + trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: clnull.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}} } return &mockPipeline{ run: &pipeline.Run{}, @@ -154,7 +157,9 @@ func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration { func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour } func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 } func (m *mockPipelineConfig) ReaperThreshold() time.Duration { return 0 } -func (m *mockPipelineConfig) VerboseLogging() bool { return true } + +// func (m *mockPipelineConfig) VerboseLogging() bool { return true } +func (m *mockPipelineConfig) VerboseLogging() bool { return false } type mockBridgeConfig struct{} @@ -165,23 +170,23 @@ func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration { return 0 } -func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCalls int) { - callcount := 0 +func createBridge(t testing.TB, name string, val string, borm bridges.ORM, maxCalls int64) { + callcount := atomic.NewInt64(0) bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - callcount++ - if callcount > maxCalls { + n := callcount.Inc() + if maxCalls > 0 && n > maxCalls { panic("too many calls to bridge" + name) } _, herr := io.ReadAll(req.Body) if herr != nil { - t.Fatal(herr) + panic(herr) } res.WriteHeader(http.StatusOK) resp := fmt.Sprintf(`{"result": %s}`, val) _, herr = res.Write([]byte(resp)) if herr != nil { - t.Fatal(herr) + panic(herr) } })) t.Cleanup(bridge.Close) @@ -338,3 +343,82 @@ result3 -> result3_parse -> multiply3; t.Fatalf("Observation failed: %v", err) } } + +func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + bridgesORM := bridges.NewORM(db) + + n := uint32(t.N) + + createBridge(t, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 0) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + + for i := uint32(0); i < n; i++ { + jobStreamID := streams.StreamID(i) + + jb := job.Job{ + ID: int32(i), + Name: null.StringFrom(fmt.Sprintf("job-%d", i)), + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + ID: int32(i * 100), + DotDagSource: fmt.Sprintf(` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=%d index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, i+n, i+2*n, i+3*n), + }, + } + err := r.Register(jb, nil) + require.NoError(t, err) + } + + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + // concurrency stress test + g, ctx := errgroup.WithContext(ctx) + for i := uint32(0); i < n; i++ { + for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} { + g.Go(func() error { + // ignore errors, only care about races + oc.Observe(ctx, strmID, opts) + return nil + }) + } + } + if err := g.Wait(); err != nil { + t.Fatalf("Observation failed: %v", err) + } +} diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 2194cb8be46..30c7842914d 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -377,7 +377,9 @@ func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) { func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars) TaskRunResults { l := r.lggr.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName) - l.Debug("Initiating tasks for pipeline run of spec") + if r.config.VerboseLogging() { + l.Debug("Initiating tasks for pipeline run of spec") + } scheduler := newScheduler(pipeline, run, vars, l) go scheduler.Run() diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index 9fa6c25d36d..de52fed26e5 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -44,6 +44,9 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R return nil, errors.New("job has no pipeline spec") } spec := *jb.PipelineSpec + spec.JobID = jb.ID + spec.JobName = jb.Name.ValueOrZero() + spec.JobType = string(jb.Type) if spec.Pipeline == nil { pipeline, err := spec.ParsePipeline() if err != nil { diff --git a/go.mod b/go.mod index 1767da64153..94621156009 100644 --- a/go.mod +++ b/go.mod @@ -108,6 +108,7 @@ require ( go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 + go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.31.0 From 9120cd96ae4b6c9f46706df732836ac91f9d92d8 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 18 Dec 2024 10:16:14 -0500 Subject: [PATCH 10/11] Benchmark on datasource.Observe --- core/internal/testutils/httptest/httptest.go | 3 +- core/services/llo/data_source_test.go | 86 ++++++++++++++++++- core/services/llo/observation_context_test.go | 71 +-------------- 3 files changed, 87 insertions(+), 73 deletions(-) diff --git a/core/internal/testutils/httptest/httptest.go b/core/internal/testutils/httptest/httptest.go index a1bd941fb02..69549ae3e45 100644 --- a/core/internal/testutils/httptest/httptest.go +++ b/core/internal/testutils/httptest/httptest.go @@ -12,7 +12,8 @@ import ( // NewTestHTTPClient returns a real HTTP client that may only make requests to // localhost func NewTestLocalOnlyHTTPClient() *http.Client { - tr := http.DefaultTransport.(*http.Transport).Clone() + // Don't use the default transport, we want zero limits and zero timeouts + tr := &http.Transport{} tr.DialContext = testDialContext tr.DisableCompression = true return &http.Client{Transport: tr} diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 3f9a4b48fbe..bae60b1e281 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -3,6 +3,7 @@ package llo import ( "context" "errors" + "fmt" "math/big" "sync" "testing" @@ -10,13 +11,20 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) @@ -65,9 +73,11 @@ func makeStreamValues() llo.StreamValues { } } -type mockOpts struct{} +type mockOpts struct { + verboseLogging bool +} -func (m *mockOpts) VerboseLogging() bool { return true } +func (m *mockOpts) VerboseLogging() bool { return m.verboseLogging } func (m *mockOpts) SeqNr() uint64 { return 1042 } func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} @@ -209,3 +219,75 @@ func Test_DataSource(t *testing.T) { }) }) } + +func BenchmarkObserve(b *testing.B) { + lggr := logger.TestLogger(b) + ctx := testutils.Context(b) + // can enable/disable verbose logging to test performance here + opts := &mockOpts{verboseLogging: true} + + db := pgtest.NewSqlxDB(b) + bridgesORM := bridges.NewORM(db) + + n := uint32(b.N) + + createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + for i := uint32(0); i < n; i++ { + jobStreamID := streams.StreamID(i) + + jb := job.Job{ + ID: int32(i), + Name: null.StringFrom(fmt.Sprintf("job-%d", i)), + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + ID: int32(i * 100), + DotDagSource: fmt.Sprintf(` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=%d index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, i+n, i+2*n, i+3*n), + }, + } + err := r.Register(jb, nil) + require.NoError(b, err) + } + + ds := newDataSource(lggr, r, NullTelemeter) + vals := make(map[llotypes.StreamID]llo.StreamValue) + for i := uint32(0); i < 4*n; i++ { + vals[llotypes.StreamID(i)] = nil + } + + b.ResetTimer() + err := ds.Observe(ctx, vals, opts) + require.NoError(b, err) +} diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 882661b79af..7599b8cc740 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -274,76 +274,6 @@ result3 -> result3_parse -> multiply3; }) } -func TestObservationContext_Observe_integrationRealPipeline_concurrencyStressTest(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - db := pgtest.NewSqlxDB(t) - bridgesORM := bridges.NewORM(db) - - createBridge(t, "foo-bridge", `123.456`, bridgesORM, 1) - createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 1) - - c := clhttptest.NewTestLocalOnlyHTTPClient() - runner := pipeline.NewRunner( - nil, - bridgesORM, - &mockPipelineConfig{}, - &mockBridgeConfig{}, - nil, - nil, - nil, - lggr, - c, - c, - ) - - r := streams.NewRegistry(lggr, runner) - - jobStreamID := streams.StreamID(5) - - jb := job.Job{ - Type: job.Stream, - StreamID: &jobStreamID, - PipelineSpec: &pipeline.Spec{ - DotDagSource: ` -// Benchmark Price -result1 [type=memo value="900.0022"]; -multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal - -result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; -result2_parse [type=jsonparse path="result" streamID=2 index=1]; - -result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; -result3_parse [type=jsonparse path="result"]; -multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal - -result1 -> multiply2; -result2 -> result2_parse; -result3 -> result3_parse -> multiply3; -`, - }, - } - err := r.Register(jb, nil) - require.NoError(t, err) - - telem := &mockTelemeter{} - oc := newObservationContext(r, telem) - opts := llo.DSOpts(nil) - - // concurrency stress test - g, ctx := errgroup.WithContext(ctx) - for i := uint32(0); i < 1000; i++ { - strmID := 1 + i%3 - g.Go(func() error { - _, err := oc.Observe(ctx, strmID, opts) - return err - }) - } - if err := g.Wait(); err != nil { - t.Fatalf("Observation failed: %v", err) - } -} - func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) { ctx := tests.Context(t) lggr := logger.TestLogger(t) @@ -408,6 +338,7 @@ result3 -> result3_parse -> multiply3; opts := llo.DSOpts(nil) // concurrency stress test + t.ResetTimer() g, ctx := errgroup.WithContext(ctx) for i := uint32(0); i < n; i++ { for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} { From 504102e2782d633021efcb2fd15e1f1f3c0ab0c2 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 18 Dec 2024 11:59:58 -0500 Subject: [PATCH 11/11] Fix lint --- core/services/llo/data_source_test.go | 18 ++++++---- core/services/llo/observation_context_test.go | 35 ++++++++++--------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index bae60b1e281..349ec70007d 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "math/big" "sync" "testing" @@ -229,7 +230,11 @@ func BenchmarkObserve(b *testing.B) { db := pgtest.NewSqlxDB(b) bridgesORM := bridges.NewORM(db) - n := uint32(b.N) + if b.N > math.MaxInt32 { + b.Fatalf("N is too large: %d", b.N) + } + + n := uint32(b.N) //nolint:gosec // G115 // overflow impossible createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0) createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0) @@ -250,15 +255,14 @@ func BenchmarkObserve(b *testing.B) { r := streams.NewRegistry(lggr, runner) for i := uint32(0); i < n; i++ { - jobStreamID := streams.StreamID(i) - + i := i jb := job.Job{ - ID: int32(i), + ID: int32(i), //nolint:gosec // G115 // overflow impossible Name: null.StringFrom(fmt.Sprintf("job-%d", i)), Type: job.Stream, - StreamID: &jobStreamID, + StreamID: &i, PipelineSpec: &pipeline.Spec{ - ID: int32(i * 100), + ID: int32(i * 100), //nolint:gosec // G115 // overflow impossible DotDagSource: fmt.Sprintf(` // Benchmark Price result1 [type=memo value="900.0022"]; @@ -284,7 +288,7 @@ result3 -> result3_parse -> multiply3; ds := newDataSource(lggr, r, NullTelemeter) vals := make(map[llotypes.StreamID]llo.StreamValue) for i := uint32(0); i < 4*n; i++ { - vals[llotypes.StreamID(i)] = nil + vals[i] = nil } b.ResetTimer() diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index 7599b8cc740..fe626815603 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand/v2" "net/http" "net/http/httptest" @@ -274,16 +275,19 @@ result3 -> result3_parse -> multiply3; }) } -func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - db := pgtest.NewSqlxDB(t) +func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(b *testing.B) { + ctx := tests.Context(b) + lggr := logger.TestLogger(b) + db := pgtest.NewSqlxDB(b) bridgesORM := bridges.NewORM(db) - n := uint32(t.N) + if b.N > math.MaxInt32 { + b.Fatalf("N is too large: %d", b.N) + } + n := uint32(b.N) //nolint:gosec // G115 // overflow impossible - createBridge(t, "foo-bridge", `123.456`, bridgesORM, 0) - createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 0) + createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0) c := clhttptest.NewTestLocalOnlyHTTPClient() runner := pipeline.NewRunner( @@ -302,15 +306,14 @@ func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStre r := streams.NewRegistry(lggr, runner) for i := uint32(0); i < n; i++ { - jobStreamID := streams.StreamID(i) - + i := i jb := job.Job{ - ID: int32(i), + ID: int32(i), //nolint:gosec // G115 // overflow impossible Name: null.StringFrom(fmt.Sprintf("job-%d", i)), Type: job.Stream, - StreamID: &jobStreamID, + StreamID: &i, PipelineSpec: &pipeline.Spec{ - ID: int32(i * 100), + ID: int32(i * 100), //nolint:gosec // G115 // overflow impossible DotDagSource: fmt.Sprintf(` // Benchmark Price result1 [type=memo value="900.0022"]; @@ -330,7 +333,7 @@ result3 -> result3_parse -> multiply3; }, } err := r.Register(jb, nil) - require.NoError(t, err) + require.NoError(b, err) } telem := &mockTelemeter{} @@ -338,18 +341,18 @@ result3 -> result3_parse -> multiply3; opts := llo.DSOpts(nil) // concurrency stress test - t.ResetTimer() + b.ResetTimer() g, ctx := errgroup.WithContext(ctx) for i := uint32(0); i < n; i++ { for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} { g.Go(func() error { // ignore errors, only care about races - oc.Observe(ctx, strmID, opts) + oc.Observe(ctx, strmID, opts) //nolint:errcheck // ignore error return nil }) } } if err := g.Wait(); err != nil { - t.Fatalf("Observation failed: %v", err) + b.Fatalf("Observation failed: %v", err) } }