From 81395632aa2ccc65fe3502abbc9d84cd15f4d474 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Wed, 20 Sep 2023 08:35:32 -0500 Subject: [PATCH] BCF-2657: core/services: track spawned job services via the health checker (#10695) * core/services: track spawned job services via the health checker * docs: changelog note --- core/services/chainlink/application.go | 5 ++- core/services/directrequest/delegate.go | 6 +++ core/services/functions/listener.go | 6 +++ core/services/gateway/connectionmanager.go | 17 +++++++- core/services/gateway/connector/connector.go | 16 +++++++- core/services/gateway/network/wsconnection.go | 18 +++++++-- .../gateway/network/wsconnection_test.go | 11 +++-- core/services/job/spawner.go | 40 +++++++++++++------ core/services/job/spawner_test.go | 26 +++++++++--- core/services/ocr/contract_tracker.go | 6 +++ core/services/ocrcommon/run_saver.go | 8 +++- core/services/vrf/v2/listener_v2.go | 6 +++ docs/CHANGELOG.md | 1 + 13 files changed, 135 insertions(+), 31 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ba8e242ea8e..7b4a4eac774 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -404,11 +404,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger.Debug("Off-chain reporting v2 disabled") } + healthChecker := services.NewChecker() + var lbs []utils.DependentAwaiter for _, c := range legacyEVMChains.Slice() { lbs = append(lbs, c.LogBroadcaster()) } - jobSpawner := job.NewSpawner(jobORM, cfg.Database(), delegates, db, globalLogger, lbs) + jobSpawner := job.NewSpawner(jobORM, cfg.Database(), healthChecker, delegates, db, globalLogger, lbs) srvcs = append(srvcs, jobSpawner, pipelineRunner) // We start the log poller after the job spawner @@ -445,7 +447,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } } - healthChecker := services.NewChecker() for _, s := range srvcs { if err := healthChecker.Register(s); err != nil { return nil, err diff --git a/core/services/directrequest/delegate.go b/core/services/directrequest/delegate.go index 56020f77566..f0ba5276ce7 100644 --- a/core/services/directrequest/delegate.go +++ b/core/services/directrequest/delegate.go @@ -138,6 +138,12 @@ type listener struct { utils.StartStopOnce } +func (l *listener) HealthReport() map[string]error { + return map[string]error{l.Name(): l.Healthy()} +} + +func (l *listener) Name() string { return l.logger.Name() } + // Start complies with job.Service func (l *listener) Start(context.Context) error { return l.StartOnce("DirectRequestListener", func() error { diff --git a/core/services/functions/listener.go b/core/services/functions/listener.go index afdc1b7963f..d9272aca3aa 100644 --- a/core/services/functions/listener.go +++ b/core/services/functions/listener.go @@ -142,6 +142,12 @@ type FunctionsListener struct { logPollerWrapper evmrelayTypes.LogPollerWrapper } +func (l *FunctionsListener) HealthReport() map[string]error { + return map[string]error{l.Name(): l.Healthy()} +} + +func (l *FunctionsListener) Name() string { return l.logger.Name() } + func formatRequestId(requestId [32]byte) string { return fmt.Sprintf("0x%x", requestId) } diff --git a/core/services/gateway/connectionmanager.go b/core/services/gateway/connectionmanager.go index 4d89ec67267..f225b66fe18 100644 --- a/core/services/gateway/connectionmanager.go +++ b/core/services/gateway/connectionmanager.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" + "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -52,6 +53,18 @@ type connectionManager struct { lggr logger.Logger } +func (m *connectionManager) HealthReport() map[string]error { + hr := map[string]error{m.Name(): m.Healthy()} + for _, d := range m.dons { + for _, n := range d.nodes { + maps.Copy(hr, n.conn.HealthReport()) + } + } + return hr +} + +func (m *connectionManager) Name() string { return m.lggr.Name() } + type donConnectionManager struct { donConfig *config.DONConfig nodes map[string]*nodeState @@ -93,7 +106,7 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock utils.Clock, lgg if ok { return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId) } - nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper()} + nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper(lggr)} if nodes[nodeAddress].conn == nil { return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress) } @@ -128,7 +141,7 @@ func (m *connectionManager) Start(ctx context.Context) error { for _, donConnMgr := range m.dons { donConnMgr.closeWait.Add(len(donConnMgr.nodes)) for nodeAddress, nodeState := range donConnMgr.nodes { - if err := nodeState.conn.Start(); err != nil { + if err := nodeState.conn.Start(ctx); err != nil { return err } go donConnMgr.readLoop(nodeAddress, nodeState) diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index ca0a45e98c8..be544a6d94b 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "github.com/gorilla/websocket" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -59,6 +61,16 @@ type gatewayConnector struct { lggr logger.Logger } +func (c *gatewayConnector) HealthReport() map[string]error { + m := map[string]error{c.Name(): c.Healthy()} + for _, g := range c.gateways { + maps.Copy(m, g.conn.HealthReport()) + } + return m +} + +func (c *gatewayConnector) Name() string { return c.lggr.Name() } + type gatewayState struct { conn network.WSConnectionWrapper config ConnectorGatewayConfig @@ -102,7 +114,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler Gateway return nil, err } gateway := &gatewayState{ - conn: network.NewWSConnectionWrapper(), + conn: network.NewWSConnectionWrapper(lggr), config: gw, url: parsedURL, wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr), @@ -189,7 +201,7 @@ func (c *gatewayConnector) Start(ctx context.Context) error { } for _, gatewayState := range c.gateways { gatewayState := gatewayState - if err := gatewayState.conn.Start(); err != nil { + if err := gatewayState.conn.Start(ctx); err != nil { return err } c.closeWait.Add(2) diff --git a/core/services/gateway/network/wsconnection.go b/core/services/gateway/network/wsconnection.go index 27835c363bc..b2aaf445d20 100644 --- a/core/services/gateway/network/wsconnection.go +++ b/core/services/gateway/network/wsconnection.go @@ -5,8 +5,11 @@ import ( "errors" "sync/atomic" + "github.com/smartcontractkit/chainlink/v2/core/services" + "github.com/gorilla/websocket" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -26,7 +29,8 @@ import ( // The concept of "pumps" is borrowed from https://github.com/smartcontractkit/wsrpc // All methods are thread-safe. type WSConnectionWrapper interface { - job.Service + job.ServiceCtx + services.Checkable // Update underlying connection object. Return a channel that gets an error on connection close. // Cannot be called after Close(). @@ -39,6 +43,7 @@ type WSConnectionWrapper interface { type wsConnectionWrapper struct { utils.StartStopOnce + lggr logger.Logger conn atomic.Pointer[websocket.Conn] @@ -47,6 +52,12 @@ type wsConnectionWrapper struct { shutdownCh chan struct{} } +func (c *wsConnectionWrapper) HealthReport() map[string]error { + return map[string]error{c.Name(): c.Healthy()} +} + +func (c *wsConnectionWrapper) Name() string { return c.lggr.Name() } + type ReadItem struct { MsgType int Data []byte @@ -65,8 +76,9 @@ var ( ErrWrapperShutdown = errors.New("wrapper shutting down") ) -func NewWSConnectionWrapper() WSConnectionWrapper { +func NewWSConnectionWrapper(lggr logger.Logger) WSConnectionWrapper { cw := &wsConnectionWrapper{ + lggr: lggr.Named("WSConnectionWrapper"), writeCh: make(chan writeItem), readCh: make(chan ReadItem), shutdownCh: make(chan struct{}), @@ -74,7 +86,7 @@ func NewWSConnectionWrapper() WSConnectionWrapper { return cw } -func (c *wsConnectionWrapper) Start() error { +func (c *wsConnectionWrapper) Start(_ context.Context) error { return c.StartOnce("WSConnectionWrapper", func() error { // write pump runs until Shutdown() is called go c.writePump() diff --git a/core/services/gateway/network/wsconnection_test.go b/core/services/gateway/network/wsconnection_test.go index 23f7a856cd4..5fd8aa50e33 100644 --- a/core/services/gateway/network/wsconnection_test.go +++ b/core/services/gateway/network/wsconnection_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" ) @@ -29,16 +30,18 @@ func (ssl *serverSideLogic) wsHandler(w http.ResponseWriter, r *http.Request) { } func TestWSConnectionWrapper_ClientReconnect(t *testing.T) { + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) // server - ssl := &serverSideLogic{connWrapper: network.NewWSConnectionWrapper()} - require.NoError(t, ssl.connWrapper.Start()) + ssl := &serverSideLogic{connWrapper: network.NewWSConnectionWrapper(lggr)} + require.NoError(t, ssl.connWrapper.Start(ctx)) s := httptest.NewServer(http.HandlerFunc(ssl.wsHandler)) serverURL := "ws" + strings.TrimPrefix(s.URL, "http") defer s.Close() // client - clientConnWrapper := network.NewWSConnectionWrapper() - require.NoError(t, clientConnWrapper.Start()) + clientConnWrapper := network.NewWSConnectionWrapper(lggr) + require.NoError(t, clientConnWrapper.Start(ctx)) // connect, write a message, disconnect conn, _, err := websocket.DefaultDialer.Dial(serverURL, nil) diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index d5c716ab5fd..d766faa092a 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -42,6 +42,7 @@ type ( spawner struct { orm ORM config Config + checker services.Checker jobTypeDelegates map[Type]Delegate activeJobs map[int32]activeJob activeJobsMu sync.RWMutex @@ -82,11 +83,12 @@ type ( var _ Spawner = (*spawner)(nil) -func NewSpawner(orm ORM, config Config, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner { +func NewSpawner(orm ORM, config Config, checker services.Checker, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner { namedLogger := lggr.Named("JobSpawner") s := &spawner{ orm: orm, config: config, + checker: checker, jobTypeDelegates: jobTypeDelegates, q: pg.NewQ(db, namedLogger, config), lggr: namedLogger, @@ -155,7 +157,8 @@ func (js *spawner) stopAllServices() { // stopService removes the job from memory and stop the services. // It will always delete the job from memory even if closing the services fail. func (js *spawner) stopService(jobID int32) { - js.lggr.Debugw("Stopping services for job", "jobID", jobID) + lggr := js.lggr.With("jobID", jobID) + lggr.Debug("Stopping services for job") js.activeJobsMu.Lock() defer js.activeJobsMu.Unlock() @@ -163,26 +166,32 @@ func (js *spawner) stopService(jobID int32) { for i := len(aj.services) - 1; i >= 0; i-- { service := aj.services[i] - err := service.Close() - if err != nil { - js.lggr.Criticalw("Error stopping job service", "jobID", jobID, "err", err, "subservice", i, "serviceType", reflect.TypeOf(service)) + sLggr := lggr.With("subservice", i, "serviceType", reflect.TypeOf(service)) + if c, ok := service.(services.Checkable); ok { + if err := js.checker.Unregister(c.Name()); err != nil { + sLggr.Warnw("Failed to unregister service from health checker", "err", err) + } + } + if err := service.Close(); err != nil { + sLggr.Criticalw("Error stopping job service", "err", err) js.SvcErrBuffer.Append(pkgerrors.Wrap(err, "error stopping job service")) } else { - js.lggr.Debugw("Stopped job service", "jobID", jobID, "subservice", i, "serviceType", fmt.Sprintf("%T", service)) + sLggr.Debug("Stopped job service") } } - js.lggr.Debugw("Stopped all services for job", "jobID", jobID) + lggr.Debug("Stopped all services for job") delete(js.activeJobs, jobID) } func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) error { + lggr := js.lggr.With("jobID", jb.ID) js.activeJobsMu.Lock() defer js.activeJobsMu.Unlock() delegate, exists := js.jobTypeDelegates[jb.Type] if !exists { - js.lggr.Errorw("Job type has not been registered with job.Spawner", "type", jb.Type, "jobID", jb.ID) + lggr.Errorw("Job type has not been registered with job.Spawner", "type", jb.Type) return pkgerrors.Errorf("unregistered type %q for job: %d", jb.Type, jb.ID) } // We always add the active job in the activeJob map, even in the case @@ -201,7 +210,7 @@ func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) e srvs, err := delegate.ServicesForSpec(jb, qopts...) if err != nil { - js.lggr.Errorw("Error creating services for job", "jobID", jb.ID, "err", err) + lggr.Errorw("Error creating services for job", "err", err) cctx, cancel := js.chStop.NewCtx() defer cancel() js.orm.TryRecordError(jb.ID, err.Error(), pg.WithParentCtx(cctx)) @@ -209,18 +218,25 @@ func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) e return pkgerrors.Wrapf(err, "failed to create services for job: %d", jb.ID) } - js.lggr.Debugw("JobSpawner: Starting services for job", "jobID", jb.ID, "count", len(srvs)) + lggr.Debugw("JobSpawner: Starting services for job", "count", len(srvs)) var ms services.MultiStart for _, srv := range srvs { err = ms.Start(ctx, srv) if err != nil { - js.lggr.Critical("Error starting service for job", "jobID", jb.ID, "err", err) + lggr.Criticalw("Error starting service for job", "err", err) return err } + if c, ok := srv.(services.Checkable); ok { + err = js.checker.Register(c) + if err != nil { + lggr.Errorw("Error registering service with health checker", "err", err) + return err + } + } aj.services = append(aj.services, srv) } - js.lggr.Debugw("JobSpawner: Finished starting services for job", "jobID", jb.ID, "count", len(srvs)) + lggr.Debugw("JobSpawner: Finished starting services for job", "count", len(srvs)) js.activeJobs[jb.ID] = aj return nil } diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index efa252e4c4d..631f7ebfee9 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink/v2/core/services" + "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -100,7 +102,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { orm := NewTestORM(t, db, legacyChains, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) a := utils.NewDependentAwaiter() a.AddDependents(1) - spawner := job.NewSpawner(orm, config.Database(), map[job.Type]job.Delegate{}, db, lggr, []utils.DependentAwaiter{a}) + spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{}, db, lggr, []utils.DependentAwaiter{a}) // Starting the spawner should signal to the dependents result := make(chan bool) go func() { @@ -139,7 +141,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { dB := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateB := &delegate{jobB.Type, []job.ServiceCtx{serviceB1, serviceB2}, 0, make(chan struct{}), dB} - spawner := job.NewSpawner(orm, config.Database(), map[job.Type]job.Delegate{ + spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ jobA.Type: delegateA, jobB.Type: delegateB, }, db, lggr, nil) @@ -189,7 +191,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { mailMon := srvctest.Start(t, utils.NewMailboxMonitor(t.Name())) d := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateA := &delegate{jobA.Type, []job.ServiceCtx{serviceA1, serviceA2}, 0, nil, d} - spawner := job.NewSpawner(orm, config.Database(), map[job.Type]job.Delegate{ + spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ jobA.Type: delegateA, }, db, lggr, nil) @@ -223,7 +225,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { mailMon := srvctest.Start(t, utils.NewMailboxMonitor(t.Name())) d := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateA := &delegate{jobA.Type, []job.ServiceCtx{serviceA1, serviceA2}, 0, nil, d} - spawner := job.NewSpawner(orm, config.Database(), map[job.Type]job.Delegate{ + spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ jobA.Type: delegateA, }, db, lggr, nil) @@ -300,7 +302,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, nil) delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d} - spawner := job.NewSpawner(orm, config.Database(), map[job.Type]job.Delegate{ + spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ jobOCR2VRF.Type: delegateOCR2, }, db, lggr, nil) @@ -322,3 +324,17 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { spawner.Close() }) } + +type noopChecker struct{} + +func (n noopChecker) Register(service services.Checkable) error { return nil } + +func (n noopChecker) Unregister(name string) error { return nil } + +func (n noopChecker) IsReady() (ready bool, errors map[string]error) { return true, nil } + +func (n noopChecker) IsHealthy() (healthy bool, errors map[string]error) { return true, nil } + +func (n noopChecker) Start() error { return nil } + +func (n noopChecker) Close() error { return nil } diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 99057937db6..0c7e288bd43 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -94,6 +94,12 @@ type ( } ) +func (t *OCRContractTracker) HealthReport() map[string]error { + return map[string]error{t.Name(): t.Healthy()} +} + +func (t *OCRContractTracker) Name() string { return t.logger.Name() } + // NewOCRContractTracker makes a new OCRContractTracker func NewOCRContractTracker( contract *offchain_aggregator_wrapper.OffchainAggregator, diff --git a/core/services/ocrcommon/run_saver.go b/core/services/ocrcommon/run_saver.go index bc7d67c2f9d..7a7ea0c9d0a 100644 --- a/core/services/ocrcommon/run_saver.go +++ b/core/services/ocrcommon/run_saver.go @@ -18,6 +18,12 @@ type RunResultSaver struct { logger logger.Logger } +func (r *RunResultSaver) HealthReport() map[string]error { + return map[string]error{r.Name(): r.Healthy()} +} + +func (r *RunResultSaver) Name() string { return r.logger.Name() } + func NewResultRunSaver(runResults <-chan pipeline.Run, pipelineRunner pipeline.Runner, done chan struct{}, logger logger.Logger, maxSuccessfulRuns uint64, ) *RunResultSaver { @@ -26,7 +32,7 @@ func NewResultRunSaver(runResults <-chan pipeline.Run, pipelineRunner pipeline.R runResults: runResults, pipelineRunner: pipelineRunner, done: done, - logger: logger, + logger: logger.Named("RunResultSaver"), } } diff --git a/core/services/vrf/v2/listener_v2.go b/core/services/vrf/v2/listener_v2.go index 13214d7f2c7..047a1e6e29a 100644 --- a/core/services/vrf/v2/listener_v2.go +++ b/core/services/vrf/v2/listener_v2.go @@ -241,6 +241,12 @@ type listenerV2 struct { deduper *vrfcommon.LogDeduper } +func (lsn *listenerV2) HealthReport() map[string]error { + return map[string]error{lsn.Name(): lsn.Healthy()} +} + +func (lsn *listenerV2) Name() string { return lsn.l.Name() } + // Start starts listenerV2. func (lsn *listenerV2) Start(ctx context.Context) error { return lsn.StartOnce("VRFListenerV2", func() error { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 65701c9de60..b95cc614fb4 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `chainlink txs evm create` returns a transaction hash for the attempted transaction in the CLI. Previously only the sender, recipient and `unstarted` state were returned. - Fixed a bug where `evmChainId` is requested instead of `id` or `evm-chain-id` in CLI error verbatim - Fixed a bug that would cause the node to shut down while performing backup +- Fixed health checker to include more services in the prometheus `health` metric and HTTP `/health` endpoint ## 2.5.0 - UNRELEASED =======