From 10e546df6c40d6d7aaa7ad8f32cf3b975cbb4e6d Mon Sep 17 00:00:00 2001 From: Matt Kelly Date: Tue, 12 Nov 2024 16:35:13 +0800 Subject: [PATCH] Add donID to mercury EA telemetry --- core/services/llo/delegate.go | 3 +- core/services/llo/telemetry.go | 9 +++-- core/services/llo/telemetry_test.go | 8 +++-- core/services/ocr2/delegate.go | 1 + .../telem/telem_enhanced_ea_mercury.pb.go | 33 ++++++++++++------- .../telem/telem_enhanced_ea_mercury.proto | 1 + 6 files changed, 36 insertions(+), 19 deletions(-) diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index 3380b4f1bc5..b6c49e1373f 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -57,6 +57,7 @@ type DelegateConfig struct { RetirementReportCodec datastreamsllo.RetirementReportCodec ShouldRetireCache datastreamsllo.ShouldRetireCache EAMonitoringEndpoint ocrcommontypes.MonitoringEndpoint + DonID uint32 // OCR3 TraceLogging bool @@ -94,7 +95,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { var t TelemeterService if cfg.CaptureEATelemetry { - t = NewTelemeterService(lggr, cfg.EAMonitoringEndpoint) + t = NewTelemeterService(lggr, cfg.EAMonitoringEndpoint, cfg.DonID) } else { t = NullTelemeter } diff --git a/core/services/llo/telemetry.go b/core/services/llo/telemetry.go index d5c113c61ef..888ee9d5d36 100644 --- a/core/services/llo/telemetry.go +++ b/core/services/llo/telemetry.go @@ -31,18 +31,19 @@ type TelemeterService interface { services.Service } -func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) TelemeterService { +func NewTelemeterService(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint, donID uint32) TelemeterService { if monitoringEndpoint == nil { return NullTelemeter } - return newTelemeter(lggr, monitoringEndpoint) + return newTelemeter(lggr, monitoringEndpoint, donID) } -func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint) *telemeter { +func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringEndpoint, donID uint32) *telemeter { chTelemetryObservation := make(chan TelemetryObservation, 100) t := &telemeter{ chTelemetryObservation: chTelemetryObservation, monitoringEndpoint: monitoringEndpoint, + donID: donID, } t.Service, t.eng = services.Config{ Name: "LLOTelemeterService", @@ -58,6 +59,7 @@ type telemeter struct { monitoringEndpoint commontypes.MonitoringEndpoint chTelemetryObservation chan TelemetryObservation + donID uint32 } func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { @@ -140,6 +142,7 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { Epoch: int64(epoch), AssetSymbol: eaTelem.AssetSymbol, Version: uint32(1000 + mercuryutils.REPORT_V3), // add 1000 to distinguish between legacy feeds, this can be changed if necessary + DonId: t.donID, } bytes, err := proto.Marshal(tea) diff --git a/core/services/llo/telemetry_test.go b/core/services/llo/telemetry_test.go index ec77e959d24..6987997cb3f 100644 --- a/core/services/llo/telemetry_test.go +++ b/core/services/llo/telemetry_test.go @@ -112,10 +112,11 @@ func Test_Telemeter(t *testing.T) { run := &pipeline.Run{ID: 42} streamID := uint32(135) + donId := uint32(1) opts := &mockOpts{} t.Run("with error", func(t *testing.T) { - tm := newTelemeter(lggr, m) + tm := newTelemeter(lggr, m, donId) servicetest.Run(t, tm) t.Run("if error is some random failure returns immediately", func(t *testing.T) { @@ -142,7 +143,7 @@ func Test_Telemeter(t *testing.T) { }) }) t.Run("with decimal value, sets all values correctly", func(t *testing.T) { - tm := newTelemeter(lggr, m) + tm := newTelemeter(lggr, m, donId) val := llo.ToDecimal(decimal.NewFromFloat32(102.12)) servicetest.Run(t, tm) tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) @@ -184,6 +185,7 @@ func Test_Telemeter(t *testing.T) { assert.Equal(t, int64(18), decoded.Round) assert.Equal(t, int64(4), decoded.Epoch) assert.Equal(t, "eth/usd", decoded.AssetSymbol) + assert.Equal(t, uint32(1), decoded.DonId) if i == 2 { return } @@ -191,7 +193,7 @@ func Test_Telemeter(t *testing.T) { } }) t.Run("with quote value", func(t *testing.T) { - tm := newTelemeter(lggr, m) + tm := newTelemeter(lggr, m, donId) val := &llo.Quote{Bid: decimal.NewFromFloat32(102.12), Benchmark: decimal.NewFromFloat32(103.32), Ask: decimal.NewFromFloat32(104.25)} servicetest.Run(t, tm) tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 371eccdbe89..acee4168a5a 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1050,6 +1050,7 @@ func (d *Delegate) newServicesLLO( ShouldRetireCache: provider.ShouldRetireCache(), RetirementReportCodec: datastreamsllo.StandardRetirementReportCodec{}, EAMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, telemetryContractID, synchronization.EnhancedEAMercury), + DonID: pluginCfg.DonID, TraceLogging: d.cfg.OCR2().TraceLogging(), BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go index 09eed12ee8a..34f4b3e349b 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.28.0 +// protoc v5.28.3 // source: core/services/synchronization/telem/telem_enhanced_ea_mercury.proto package telem @@ -115,6 +115,7 @@ type EnhancedEAMercury struct { Round int64 `protobuf:"varint,19,opt,name=round,proto3" json:"round,omitempty"` Epoch int64 `protobuf:"varint,20,opt,name=epoch,proto3" json:"epoch,omitempty"` AssetSymbol string `protobuf:"bytes,21,opt,name=asset_symbol,json=assetSymbol,proto3" json:"asset_symbol,omitempty"` + DonId uint32 `protobuf:"varint,36,opt,name=don_id,json=donId,proto3" json:"don_id,omitempty"` } func (x *EnhancedEAMercury) Reset() { @@ -394,6 +395,13 @@ func (x *EnhancedEAMercury) GetAssetSymbol() string { return "" } +func (x *EnhancedEAMercury) GetDonId() uint32 { + if x != nil { + return x.DonId + } + return 0 +} + var File_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto protoreflect.FileDescriptor var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_rawDesc = []byte{ @@ -401,7 +409,7 @@ var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_raw 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x65, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x65, 0x61, 0x5f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xaa, 0x0d, 0x0a, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xc1, 0x0d, 0x0a, 0x11, 0x45, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, 0x45, 0x41, 0x4d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x20, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, @@ -508,16 +516,17 @@ var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_raw 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x14, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x73, 0x73, 0x65, 0x74, 0x5f, 0x73, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x18, 0x15, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x73, - 0x73, 0x65, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x2a, 0x31, 0x0a, 0x0c, 0x4d, 0x61, 0x72, - 0x6b, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, - 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4c, 0x4f, 0x53, 0x45, 0x44, - 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x4f, 0x50, 0x45, 0x4e, 0x10, 0x02, 0x42, 0x4e, 0x5a, 0x4c, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, - 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, - 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, - 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6f, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x24, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x64, 0x6f, 0x6e, 0x49, 0x64, + 0x2a, 0x31, 0x0a, 0x0c, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0a, 0x0a, + 0x06, 0x43, 0x4c, 0x4f, 0x53, 0x45, 0x44, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x4f, 0x50, 0x45, + 0x4e, 0x10, 0x02, 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, + 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, + 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, + 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, + 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto index d57b7ca836a..cfb8dbac0c9 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto @@ -59,4 +59,5 @@ message EnhancedEAMercury { int64 round=19; int64 epoch=20; string asset_symbol=21; + uint32 don_id=36; }