Skip to content

Commit

Permalink
Add donID to mercury EA telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
mjk90 committed Nov 12, 2024
1 parent 66e3488 commit 10e546d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 19 deletions.
3 changes: 2 additions & 1 deletion core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type DelegateConfig struct {
RetirementReportCodec datastreamsllo.RetirementReportCodec
ShouldRetireCache datastreamsllo.ShouldRetireCache
EAMonitoringEndpoint ocrcommontypes.MonitoringEndpoint
DonID uint32

// OCR3
TraceLogging bool
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {

var t TelemeterService
if cfg.CaptureEATelemetry {
t = NewTelemeterService(lggr, cfg.EAMonitoringEndpoint)
t = NewTelemeterService(lggr, cfg.EAMonitoringEndpoint, cfg.DonID)
} else {
t = NullTelemeter
}
Expand Down
9 changes: 6 additions & 3 deletions core/services/llo/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@ type TelemeterService interface {
services.Service
}

func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) TelemeterService {
func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint, donID uint32) TelemeterService {
if monitoringEndpoint == nil {
return NullTelemeter
}
return newTelemeter(lggr, monitoringEndpoint)
return newTelemeter(lggr, monitoringEndpoint, donID)
}

func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) *telemeter {
func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint, donID uint32) *telemeter {
chTelemetryObservation := make(chan TelemetryObservation, 100)
t := &telemeter{
chTelemetryObservation: chTelemetryObservation,
monitoringEndpoint: monitoringEndpoint,
donID: donID,
}
t.Service, t.eng = services.Config{
Name: "LLOTelemeterService",
Expand All @@ -58,6 +59,7 @@ type telemeter struct {

monitoringEndpoint commontypes.MonitoringEndpoint
chTelemetryObservation chan TelemetryObservation
donID uint32
}

func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) {
Expand Down Expand Up @@ -140,6 +142,7 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) {
Epoch: int64(epoch),
AssetSymbol: eaTelem.AssetSymbol,
Version: uint32(1000 + mercuryutils.REPORT_V3), // add 1000 to distinguish between legacy feeds, this can be changed if necessary
DonId: t.donID,
}

bytes, err := proto.Marshal(tea)
Expand Down
8 changes: 5 additions & 3 deletions core/services/llo/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func Test_Telemeter(t *testing.T) {

run := &pipeline.Run{ID: 42}
streamID := uint32(135)
donId := uint32(1)
opts := &mockOpts{}

t.Run("with error", func(t *testing.T) {
tm := newTelemeter(lggr, m)
tm := newTelemeter(lggr, m, donId)
servicetest.Run(t, tm)

t.Run("if error is some random failure returns immediately", func(t *testing.T) {
Expand All @@ -142,7 +143,7 @@ func Test_Telemeter(t *testing.T) {
})
})
t.Run("with decimal value, sets all values correctly", func(t *testing.T) {
tm := newTelemeter(lggr, m)
tm := newTelemeter(lggr, m, donId)
val := llo.ToDecimal(decimal.NewFromFloat32(102.12))
servicetest.Run(t, tm)
tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)
Expand Down Expand Up @@ -184,14 +185,15 @@ func Test_Telemeter(t *testing.T) {
assert.Equal(t, int64(18), decoded.Round)
assert.Equal(t, int64(4), decoded.Epoch)
assert.Equal(t, "eth/usd", decoded.AssetSymbol)
assert.Equal(t, uint32(1), decoded.DonId)
if i == 2 {
return
}
i++
}
})
t.Run("with quote value", func(t *testing.T) {
tm := newTelemeter(lggr, m)
tm := newTelemeter(lggr, m, donId)
val := &llo.Quote{Bid: decimal.NewFromFloat32(102.12), Benchmark: decimal.NewFromFloat32(103.32), Ask: decimal.NewFromFloat32(104.25)}
servicetest.Run(t, tm)
tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ func (d *Delegate) newServicesLLO(
ShouldRetireCache: provider.ShouldRetireCache(),
RetirementReportCodec: datastreamsllo.StandardRetirementReportCodec{},
EAMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, telemetryContractID, synchronization.EnhancedEAMercury),
DonID: pluginCfg.DonID,

TraceLogging: d.cfg.OCR2().TraceLogging(),
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand Down
33 changes: 21 additions & 12 deletions core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ message EnhancedEAMercury {
int64 round=19;
int64 epoch=20;
string asset_symbol=21;
uint32 don_id=36;
}

0 comments on commit 10e546d

Please sign in to comment.