Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multistream specs #15603

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rotten-books-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Support multiple streamIDs in stream specs #added
3 changes: 2 additions & 1 deletion core/internal/testutils/httptest/httptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
// NewTestHTTPClient returns a real HTTP client that may only make requests to
// localhost
func NewTestLocalOnlyHTTPClient() *http.Client {
tr := http.DefaultTransport.(*http.Transport).Clone()
// Don't use the default transport, we want zero limits and zero timeouts
tr := &http.Transport{}
tr.DialContext = testDialContext
tr.DisableCompression = true
return &http.Client{Transport: tr}
Expand Down
95 changes: 13 additions & 82 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package llo

import (
"context"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shopspring/decimal"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -19,7 +20,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
Expand All @@ -42,7 +42,7 @@ var (
)

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
Get(streamID streams.StreamID) (p streams.Pipeline, exists bool)
}

type ErrObservationFailed struct {
Expand Down Expand Up @@ -109,43 +109,25 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

// oc only lives for the duration of this Observe call
oc := NewObservationContext(d.registry, d.t)

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var val llo.StreamValue

stream, exists := d.registry.Get(streamID)
if !exists {
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, err)
return
}
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
strmIDStr := strconv.FormatUint(uint64(streamID), 10)
if errors.As(err, &MissingStreamError{}) {
promMissingStreamCount.WithLabelValues(strmIDStr).Inc()
}
promObservationErrorCount.WithLabelValues(strmIDStr).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

Expand Down Expand Up @@ -186,54 +168,3 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

return nil
}

// ExtractStreamValue extracts a StreamValue from a TaskRunResults
func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
finaltrrs := trrs.Terminals()

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
// compat.
// There are a number of different possible ways to solve this in future.
// See: https://smartcontract-it.atlassian.net/browse/MERC-5934
switch len(finaltrrs) {
case 1:
res := finaltrrs[0].Result
if res.Error != nil {
return nil, res.Error
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
}
return llo.ToDecimal(val), nil
case 3:
// Expect ordering of Benchmark, Bid, Ask
results := make([]decimal.Decimal, 3)
for i, trr := range finaltrrs {
res := trr.Result
if res.Error != nil {
return nil, fmt.Errorf("failed to parse stream output into Quote (task index: %d): %w", i, res.Error)
}
val, err := toDecimal(res.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse decimal: %w", err)
}
results[i] = val
}
return &llo.Quote{
Benchmark: results[0],
Bid: results[1],
Ask: results[2],
}, nil
default:
return nil, fmt.Errorf("invalid number of results, expected: 1 or 3, got: %d", len(finaltrrs))
}
}

func toDecimal(val interface{}) (decimal.Decimal, error) {
return utils.ToDecimal(val)
}
Loading
Loading