Skip to content

Commit

Permalink
Automation Telemetry: Send BlockNumber and Node Version (#9927)
Browse files Browse the repository at this point in the history
* rebase on toml feature flag

* fixes + send node version

rebase on feature flag PR

* fixes

* comments

* minor changes

* lint

* constant added

* logger

* toggle by feature flag

* minor fixes

* marshal error exits start()

* marshal error fix

* add automation custom telem to job

* rename toml tag

* test

* removed print statement

* merge

* configDigest go routine getter

* configDigest var

* refactor1

* send NodeVersion msg every new ConfigDigest

* lint

* refactor

* move registry creation outside delegate.go

* plugin config bool flag

* custom telem for 2.1

* block subscriber + thread controller

* ContractConfigTracker

* reset forge-std

* use toml config flag

* set toml flag default to true

* plugin config bool flag

* make generate

* refactor

* hourly node version msg

* fix tests

* lint fix

* goimports fixed

* goimport fix 2

---------

Co-authored-by: FelixFan1992 <[email protected]>
  • Loading branch information
JonWong203 and FelixFan1992 authored Nov 17, 2023
1 parent 5f09e55 commit b2c1a17
Show file tree
Hide file tree
Showing 23 changed files with 431 additions and 21 deletions.
2 changes: 1 addition & 1 deletion core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ KeyBundleID = '7a5f66bbe6594259325bf2b4f5b1a9c900000000000000000000000000000000'
# CaptureEATelemetry toggles collecting extra information from External Adaptares
CaptureEATelemetry = false # Default
# CaptureAutomationCustomTelemetry toggles collecting automation specific telemetry
CaptureAutomationCustomTelemetry = false # Default
CaptureAutomationCustomTelemetry = true # Default
# DefaultTransactionQueueDepth controls the queue size for `DropOldestStrategy` in OCR2. Set to 0 to use `SendEvery` strategy instead.
DefaultTransactionQueueDepth = 1 # Default
# SimulateTransactions enables transaction simulation for OCR2.
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/config_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestOCR2Config(t *testing.T) {
require.Equal(t, false, ocr2Cfg.TraceLogging())
require.Equal(t, uint32(1), ocr2Cfg.DefaultTransactionQueueDepth())
require.Equal(t, false, ocr2Cfg.CaptureEATelemetry())
require.Equal(t, false, ocr2Cfg.CaptureAutomationCustomTelemetry())
require.Equal(t, true, ocr2Cfg.CaptureAutomationCustomTelemetry())

keyBundleID, err := ocr2Cfg.KeyBundleID()
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func TestConfig_Marshal(t *testing.T) {
DatabaseTimeout: models.MustNewDuration(8 * time.Second),
KeyBundleID: ptr(models.MustSha256HashFromHex("7a5f66bbe6594259325bf2b4f5b1a9c9")),
CaptureEATelemetry: ptr(false),
CaptureAutomationCustomTelemetry: ptr(false),
CaptureAutomationCustomTelemetry: ptr(true),
DefaultTransactionQueueDepth: ptr[uint32](1),
SimulateTransactions: ptr(false),
TraceLogging: ptr(false),
Expand Down Expand Up @@ -848,7 +848,7 @@ ContractTransmitterTransmitTimeout = '1m0s'
DatabaseTimeout = '8s'
KeyBundleID = '7a5f66bbe6594259325bf2b4f5b1a9c900000000000000000000000000000000'
CaptureEATelemetry = false
CaptureAutomationCustomTelemetry = false
CaptureAutomationCustomTelemetry = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ ContractTransmitterTransmitTimeout = '10s'
DatabaseTimeout = '10s'
KeyBundleID = '0000000000000000000000000000000000000000000000000000000000000000'
CaptureEATelemetry = false
CaptureAutomationCustomTelemetry = false
CaptureAutomationCustomTelemetry = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ ContractTransmitterTransmitTimeout = '1m0s'
DatabaseTimeout = '8s'
KeyBundleID = '7a5f66bbe6594259325bf2b4f5b1a9c900000000000000000000000000000000'
CaptureEATelemetry = false
CaptureAutomationCustomTelemetry = false
CaptureAutomationCustomTelemetry = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ ContractTransmitterTransmitTimeout = '10s'
DatabaseTimeout = '20s'
KeyBundleID = '0000000000000000000000000000000000000000000000000000000000000000'
CaptureEATelemetry = false
CaptureAutomationCustomTelemetry = false
CaptureAutomationCustomTelemetry = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
Expand Down
23 changes: 21 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/autotelemetry21"
ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core"
ocr2vrfconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/config"
ocr2coordinator "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/coordinator"
Expand Down Expand Up @@ -174,6 +175,7 @@ type ocr2Config interface {
DatabaseTimeout() time.Duration
KeyBundleID() (string, error)
TraceLogging() bool
CaptureAutomationCustomTelemetry() bool
}

type insecureConfig interface {
Expand Down Expand Up @@ -1161,7 +1163,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
d.cfg.JobPipeline().MaxSuccessfulRuns(),
)

return []job.ServiceCtx{
automationServices := []job.ServiceCtx{
runResultSaver,
keeperProvider,
services.Registry(),
Expand All @@ -1171,7 +1173,24 @@ func (d *Delegate) newServicesOCR2Keepers21(
services.UpkeepStateStore(),
services.TransmitEventProvider(),
pluginService,
}, nil
}

if cfg.CaptureAutomationCustomTelemetry != nil && *cfg.CaptureAutomationCustomTelemetry ||
cfg.CaptureAutomationCustomTelemetry == nil && d.cfg.OCR2().CaptureAutomationCustomTelemetry() {
endpoint := d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.AutomationCustom)
customTelemService, custErr := autotelemetry21.NewAutomationCustomTelemetryService(
endpoint,
lggr,
services.BlockSubscriber(),
keeperProvider.ContractConfigTracker(),
)
if custErr != nil {
return nil, errors.Wrap(custErr, "Error when creating AutomationCustomTelemetryService")
}
automationServices = append(automationServices, customTelemService)
}

return automationServices, nil
}

func (d *Delegate) newServicesOCR2Keepers20(
Expand Down
2 changes: 2 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type PluginConfig struct {
ServiceQueueLength int `json:"serviceQueueLength"`
// ContractVersion is the contract version
ContractVersion string `json:"contractVersion"`
// CaptureAutomationCustomTelemetry is a bool flag to toggle Custom Telemetry Service
CaptureAutomationCustomTelemetry *bool `json:"captureAutomationCustomTelemetry,omitempty"`
}

func ValidatePluginConfig(cfg PluginConfig) error {
Expand Down
156 changes: 156 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/custom_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package ocr2keeper

import (
"context"
"encoding/hex"
"time"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"google.golang.org/protobuf/proto"

ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

type AutomationCustomTelemetryService struct {
utils.StartStopOnce
monitoringEndpoint commontypes.MonitoringEndpoint
blockSubscriber *evm21.BlockSubscriber
blockSubChanID int
threadCtrl utils.ThreadControl
lggr logger.Logger
configDigest [32]byte
contractConfigTracker types.ContractConfigTracker
}

// NewAutomationCustomTelemetryService creates a telemetry service for new blocks and node version
func NewAutomationCustomTelemetryService(me commontypes.MonitoringEndpoint,
lggr logger.Logger, blocksub *evm21.BlockSubscriber, configTracker types.ContractConfigTracker) (*AutomationCustomTelemetryService, error) {
return &AutomationCustomTelemetryService{
monitoringEndpoint: me,
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("AutomationCustomTelem"),
contractConfigTracker: configTracker,
blockSubscriber: blocksub,
}, nil
}

// Start starts Custom Telemetry Service, sends 1 NodeVersion message to endpoint at start and sends new BlockNumber messages
func (e *AutomationCustomTelemetryService) Start(ctx context.Context) error {
return e.StartOnce("AutomationCustomTelemetryService", func() error {
e.lggr.Infof("Starting: Custom Telemetry Service")
_, configDetails, err := e.contractConfigTracker.LatestConfigDetails(ctx)
if err != nil {
e.lggr.Errorf("Error occurred while getting newestConfigDetails for initialization %s", err)
} else {
e.configDigest = configDetails
e.sendNodeVersionMsg()
}
e.threadCtrl.Go(func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_, newConfigDigest, err := e.contractConfigTracker.LatestConfigDetails(ctx)
if err != nil {
e.lggr.Errorf("Error occurred while getting newestConfigDetails in configDigest loop %s", err)
}
if newConfigDigest != e.configDigest {
e.configDigest = newConfigDigest
e.sendNodeVersionMsg()
}
case <-ctx.Done():
return
}
}
})

chanID, blockSubscriberChan, blockSubErr := e.blockSubscriber.Subscribe()
if blockSubErr != nil {
e.lggr.Errorf("Block Subscriber Error: Subscribe(): %s", blockSubErr)

} else {
e.blockSubChanID = chanID
e.threadCtrl.Go(func(ctx context.Context) {
e.lggr.Infof("Started: Sending BlockNumber Messages")
for {
select {
case blockHistory := <-blockSubscriberChan:
latestBlockKey, err := blockHistory.Latest()
if err != nil {
e.lggr.Errorf("BlockSubscriber BlockHistory.Latest() failed: %s", err)
continue
}
e.sendBlockNumberMsg(latestBlockKey)
case <-ctx.Done():
return
}
}
})
}
return nil
})
}

// Close stops go routines and closes channels
func (e *AutomationCustomTelemetryService) Close() error {
// use utils
return e.StopOnce("AutomationCustomTelemetryService", func() error {
e.lggr.Infof("Stopping: custom telemetry service")
e.threadCtrl.Close()
err := e.blockSubscriber.Unsubscribe(e.blockSubChanID)
if err != nil {
return err
}
e.lggr.Infof("Stopped: Custom telemetry service")
return nil
})
}

func (e *AutomationCustomTelemetryService) sendNodeVersionMsg() {
vMsg := &telem.NodeVersion{
Timestamp: uint64(time.Now().UTC().UnixMilli()),
NodeVersion: static.Version,
ConfigDigest: e.configDigest[:],
}
wrappedVMsg := &telem.AutomationTelemWrapper{
Msg: &telem.AutomationTelemWrapper_NodeVersion{
NodeVersion: vMsg,
},
}
bytes, err := proto.Marshal(wrappedVMsg)
if err != nil {
e.lggr.Errorf("Error occurred while marshalling the Node Version Message %s: %v", wrappedVMsg.String(), err)
} else {
e.monitoringEndpoint.SendLog(bytes)
e.lggr.Infof("NodeVersion Message Sent to Endpoint: %d", vMsg.Timestamp)
}
}

func (e *AutomationCustomTelemetryService) sendBlockNumberMsg(blockKey ocr2keepers.BlockKey) {
blockNumMsg := &telem.BlockNumber{
Timestamp: uint64(time.Now().UTC().UnixMilli()),
BlockNumber: uint64(blockKey.Number),
BlockHash: hex.EncodeToString(blockKey.Hash[:]),
ConfigDigest: e.configDigest[:],
}
wrappedBlockNumMsg := &telem.AutomationTelemWrapper{
Msg: &telem.AutomationTelemWrapper_BlockNumber{
BlockNumber: blockNumMsg,
},
}
b, err := proto.Marshal(wrappedBlockNumMsg)
if err != nil {
e.lggr.Errorf("Error occurred while marshalling the Block Num Message %s: %v", wrappedBlockNumMsg.String(), err)
} else {
e.monitoringEndpoint.SendLog(b)
e.lggr.Infof("BlockNumber Message Sent to Endpoint: %d", blockNumMsg.Timestamp)
}
}
17 changes: 17 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/custom_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ocr2keeper

import (
"testing"
)

func TestNewAutomationCustomTelemetryService(t *testing.T) {
// me := &MockMonitoringEndpoint{}
// lggr := &MockLogger{}
// blocksub := &MockBlockSubscriber{}
// configTracker := &MockContractConfigTracker{}

// service, err := NewAutomationCustomTelemetryService(me, lggr, blocksub, configTracker)
// if err != nil {
// t.Errorf("Expected no error, but got: %v", err)
// }
}
Loading

0 comments on commit b2c1a17

Please sign in to comment.