Skip to content

Commit

Permalink
BCF-2657: core/services: track spawned job services via the health ch…
Browse files Browse the repository at this point in the history
…ecker (#10695)

* core/services: track spawned job services via the health checker

* docs: changelog note
  • Loading branch information
jmank88 authored Sep 20, 2023
1 parent b378676 commit 8139563
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 31 deletions.
5 changes: 3 additions & 2 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger.Debug("Off-chain reporting v2 disabled")
}

healthChecker := services.NewChecker()

var lbs []utils.DependentAwaiter
for _, c := range legacyEVMChains.Slice() {
lbs = append(lbs, c.LogBroadcaster())
}
jobSpawner := job.NewSpawner(jobORM, cfg.Database(), delegates, db, globalLogger, lbs)
jobSpawner := job.NewSpawner(jobORM, cfg.Database(), healthChecker, delegates, db, globalLogger, lbs)
srvcs = append(srvcs, jobSpawner, pipelineRunner)

// We start the log poller after the job spawner
Expand Down Expand Up @@ -445,7 +447,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}
}

healthChecker := services.NewChecker()
for _, s := range srvcs {
if err := healthChecker.Register(s); err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions core/services/directrequest/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ type listener struct {
utils.StartStopOnce
}

func (l *listener) HealthReport() map[string]error {
return map[string]error{l.Name(): l.Healthy()}
}

func (l *listener) Name() string { return l.logger.Name() }

// Start complies with job.Service
func (l *listener) Start(context.Context) error {
return l.StartOnce("DirectRequestListener", func() error {
Expand Down
6 changes: 6 additions & 0 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ type FunctionsListener struct {
logPollerWrapper evmrelayTypes.LogPollerWrapper
}

func (l *FunctionsListener) HealthReport() map[string]error {
return map[string]error{l.Name(): l.Healthy()}
}

func (l *FunctionsListener) Name() string { return l.logger.Name() }

func formatRequestId(requestId [32]byte) string {
return fmt.Sprintf("0x%x", requestId)
}
Expand Down
17 changes: 15 additions & 2 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand Down Expand Up @@ -52,6 +53,18 @@ type connectionManager struct {
lggr logger.Logger
}

func (m *connectionManager) HealthReport() map[string]error {
hr := map[string]error{m.Name(): m.Healthy()}
for _, d := range m.dons {
for _, n := range d.nodes {
maps.Copy(hr, n.conn.HealthReport())
}
}
return hr
}

func (m *connectionManager) Name() string { return m.lggr.Name() }

type donConnectionManager struct {
donConfig *config.DONConfig
nodes map[string]*nodeState
Expand Down Expand Up @@ -93,7 +106,7 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock utils.Clock, lgg
if ok {
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
}
nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper()}
nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper(lggr)}
if nodes[nodeAddress].conn == nil {
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
}
Expand Down Expand Up @@ -128,7 +141,7 @@ func (m *connectionManager) Start(ctx context.Context) error {
for _, donConnMgr := range m.dons {
donConnMgr.closeWait.Add(len(donConnMgr.nodes))
for nodeAddress, nodeState := range donConnMgr.nodes {
if err := nodeState.conn.Start(); err != nil {
if err := nodeState.conn.Start(ctx); err != nil {
return err
}
go donConnMgr.readLoop(nodeAddress, nodeState)
Expand Down
16 changes: 14 additions & 2 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"github.com/gorilla/websocket"

"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -59,6 +61,16 @@ type gatewayConnector struct {
lggr logger.Logger
}

func (c *gatewayConnector) HealthReport() map[string]error {
m := map[string]error{c.Name(): c.Healthy()}
for _, g := range c.gateways {
maps.Copy(m, g.conn.HealthReport())
}
return m
}

func (c *gatewayConnector) Name() string { return c.lggr.Name() }

type gatewayState struct {
conn network.WSConnectionWrapper
config ConnectorGatewayConfig
Expand Down Expand Up @@ -102,7 +114,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, handler Gateway
return nil, err
}
gateway := &gatewayState{
conn: network.NewWSConnectionWrapper(),
conn: network.NewWSConnectionWrapper(lggr),
config: gw,
url: parsedURL,
wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr),
Expand Down Expand Up @@ -189,7 +201,7 @@ func (c *gatewayConnector) Start(ctx context.Context) error {
}
for _, gatewayState := range c.gateways {
gatewayState := gatewayState
if err := gatewayState.conn.Start(); err != nil {
if err := gatewayState.conn.Start(ctx); err != nil {
return err
}
c.closeWait.Add(2)
Expand Down
18 changes: 15 additions & 3 deletions core/services/gateway/network/wsconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"sync/atomic"

"github.com/smartcontractkit/chainlink/v2/core/services"

"github.com/gorilla/websocket"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand All @@ -26,7 +29,8 @@ import (
// The concept of "pumps" is borrowed from https://github.com/smartcontractkit/wsrpc
// All methods are thread-safe.
type WSConnectionWrapper interface {
job.Service
job.ServiceCtx
services.Checkable

// Update underlying connection object. Return a channel that gets an error on connection close.
// Cannot be called after Close().
Expand All @@ -39,6 +43,7 @@ type WSConnectionWrapper interface {

type wsConnectionWrapper struct {
utils.StartStopOnce
lggr logger.Logger

conn atomic.Pointer[websocket.Conn]

Expand All @@ -47,6 +52,12 @@ type wsConnectionWrapper struct {
shutdownCh chan struct{}
}

func (c *wsConnectionWrapper) HealthReport() map[string]error {
return map[string]error{c.Name(): c.Healthy()}
}

func (c *wsConnectionWrapper) Name() string { return c.lggr.Name() }

type ReadItem struct {
MsgType int
Data []byte
Expand All @@ -65,16 +76,17 @@ var (
ErrWrapperShutdown = errors.New("wrapper shutting down")
)

func NewWSConnectionWrapper() WSConnectionWrapper {
func NewWSConnectionWrapper(lggr logger.Logger) WSConnectionWrapper {
cw := &wsConnectionWrapper{
lggr: lggr.Named("WSConnectionWrapper"),
writeCh: make(chan writeItem),
readCh: make(chan ReadItem),
shutdownCh: make(chan struct{}),
}
return cw
}

func (c *wsConnectionWrapper) Start() error {
func (c *wsConnectionWrapper) Start(_ context.Context) error {
return c.StartOnce("WSConnectionWrapper", func() error {
// write pump runs until Shutdown() is called
go c.writePump()
Expand Down
11 changes: 7 additions & 4 deletions core/services/gateway/network/wsconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
)

Expand All @@ -29,16 +30,18 @@ func (ssl *serverSideLogic) wsHandler(w http.ResponseWriter, r *http.Request) {
}

func TestWSConnectionWrapper_ClientReconnect(t *testing.T) {
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
// server
ssl := &serverSideLogic{connWrapper: network.NewWSConnectionWrapper()}
require.NoError(t, ssl.connWrapper.Start())
ssl := &serverSideLogic{connWrapper: network.NewWSConnectionWrapper(lggr)}
require.NoError(t, ssl.connWrapper.Start(ctx))
s := httptest.NewServer(http.HandlerFunc(ssl.wsHandler))
serverURL := "ws" + strings.TrimPrefix(s.URL, "http")
defer s.Close()

// client
clientConnWrapper := network.NewWSConnectionWrapper()
require.NoError(t, clientConnWrapper.Start())
clientConnWrapper := network.NewWSConnectionWrapper(lggr)
require.NoError(t, clientConnWrapper.Start(ctx))

// connect, write a message, disconnect
conn, _, err := websocket.DefaultDialer.Dial(serverURL, nil)
Expand Down
40 changes: 28 additions & 12 deletions core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type (
spawner struct {
orm ORM
config Config
checker services.Checker
jobTypeDelegates map[Type]Delegate
activeJobs map[int32]activeJob
activeJobsMu sync.RWMutex
Expand Down Expand Up @@ -82,11 +83,12 @@ type (

var _ Spawner = (*spawner)(nil)

func NewSpawner(orm ORM, config Config, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner {
func NewSpawner(orm ORM, config Config, checker services.Checker, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner {
namedLogger := lggr.Named("JobSpawner")
s := &spawner{
orm: orm,
config: config,
checker: checker,
jobTypeDelegates: jobTypeDelegates,
q: pg.NewQ(db, namedLogger, config),
lggr: namedLogger,
Expand Down Expand Up @@ -155,34 +157,41 @@ func (js *spawner) stopAllServices() {
// stopService removes the job from memory and stop the services.
// It will always delete the job from memory even if closing the services fail.
func (js *spawner) stopService(jobID int32) {
js.lggr.Debugw("Stopping services for job", "jobID", jobID)
lggr := js.lggr.With("jobID", jobID)
lggr.Debug("Stopping services for job")
js.activeJobsMu.Lock()
defer js.activeJobsMu.Unlock()

aj := js.activeJobs[jobID]

for i := len(aj.services) - 1; i >= 0; i-- {
service := aj.services[i]
err := service.Close()
if err != nil {
js.lggr.Criticalw("Error stopping job service", "jobID", jobID, "err", err, "subservice", i, "serviceType", reflect.TypeOf(service))
sLggr := lggr.With("subservice", i, "serviceType", reflect.TypeOf(service))
if c, ok := service.(services.Checkable); ok {
if err := js.checker.Unregister(c.Name()); err != nil {
sLggr.Warnw("Failed to unregister service from health checker", "err", err)
}
}
if err := service.Close(); err != nil {
sLggr.Criticalw("Error stopping job service", "err", err)
js.SvcErrBuffer.Append(pkgerrors.Wrap(err, "error stopping job service"))
} else {
js.lggr.Debugw("Stopped job service", "jobID", jobID, "subservice", i, "serviceType", fmt.Sprintf("%T", service))
sLggr.Debug("Stopped job service")
}
}
js.lggr.Debugw("Stopped all services for job", "jobID", jobID)
lggr.Debug("Stopped all services for job")

delete(js.activeJobs, jobID)
}

func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) error {
lggr := js.lggr.With("jobID", jb.ID)
js.activeJobsMu.Lock()
defer js.activeJobsMu.Unlock()

delegate, exists := js.jobTypeDelegates[jb.Type]
if !exists {
js.lggr.Errorw("Job type has not been registered with job.Spawner", "type", jb.Type, "jobID", jb.ID)
lggr.Errorw("Job type has not been registered with job.Spawner", "type", jb.Type)
return pkgerrors.Errorf("unregistered type %q for job: %d", jb.Type, jb.ID)
}
// We always add the active job in the activeJob map, even in the case
Expand All @@ -201,26 +210,33 @@ func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) e

srvs, err := delegate.ServicesForSpec(jb, qopts...)
if err != nil {
js.lggr.Errorw("Error creating services for job", "jobID", jb.ID, "err", err)
lggr.Errorw("Error creating services for job", "err", err)
cctx, cancel := js.chStop.NewCtx()
defer cancel()
js.orm.TryRecordError(jb.ID, err.Error(), pg.WithParentCtx(cctx))
js.activeJobs[jb.ID] = aj
return pkgerrors.Wrapf(err, "failed to create services for job: %d", jb.ID)
}

js.lggr.Debugw("JobSpawner: Starting services for job", "jobID", jb.ID, "count", len(srvs))
lggr.Debugw("JobSpawner: Starting services for job", "count", len(srvs))

var ms services.MultiStart
for _, srv := range srvs {
err = ms.Start(ctx, srv)
if err != nil {
js.lggr.Critical("Error starting service for job", "jobID", jb.ID, "err", err)
lggr.Criticalw("Error starting service for job", "err", err)
return err
}
if c, ok := srv.(services.Checkable); ok {
err = js.checker.Register(c)
if err != nil {
lggr.Errorw("Error registering service with health checker", "err", err)
return err
}
}
aj.services = append(aj.services, srv)
}
js.lggr.Debugw("JobSpawner: Finished starting services for job", "jobID", jb.ID, "count", len(srvs))
lggr.Debugw("JobSpawner: Finished starting services for job", "count", len(srvs))
js.activeJobs[jb.ID] = aj
return nil
}
Expand Down
Loading

0 comments on commit 8139563

Please sign in to comment.