From 529d2cf8af1de371ba900b5f371496942ed52dc5 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Thu, 4 Jan 2024 17:05:25 +0100 Subject: [PATCH 1/2] [FUN-877] Persist subscriptions fetched from contracts (#11573) * feat: db schema to store data fetched from contracts * feat: build orm layer for subscriptions * feat: implement subscription cache layer * feat: load cached subscriptions concurrently * fix: lint issues * fix: protect NewORM from invalid parameters, NoopORM for gateway script * fix: address race condition on update subscription * chore: make db, cfg and lggr part of the handlerfactory * chore: removing allowlist migration from this pr * fix: update cache on new subscriptions, modify balance to be a text at the db layer * feat: store the router_contract_address * fix: set router contract address and subscription id as composite primary key * chore: make router contract address as part of orm properties * feat: update to db only in case of difference with current state * feat: filter by router_contract_address when GetSubscriptions * chore: have balance fields as bigint, add tests covering deleted subscriptions * fix: GetSubscriptions ASC * chore: improve redability --- core/scripts/gateway/run_gateway.go | 2 +- core/services/chainlink/application.go | 2 + core/services/gateway/delegate.go | 15 +- core/services/gateway/gateway_test.go | 12 +- core/services/gateway/handler_factory.go | 11 +- .../handlers/functions/handler.functions.go | 12 +- .../functions/handler.functions_test.go | 4 +- .../gateway/handlers/functions/mocks/orm.go | 91 +++++++ .../gateway/handlers/functions/orm.go | 132 ++++++++++ .../gateway/handlers/functions/orm_test.go | 246 ++++++++++++++++++ .../handlers/functions/subscriptions.go | 53 +++- .../handlers/functions/subscriptions_test.go | 67 ++++- .../handlers/functions/user_subscriptions.go | 44 +++- .../functions/user_subscriptions_test.go | 88 ++++++- .../gateway_integration_test.go | 2 +- .../services/ocr2/plugins/functions/plugin.go | 6 +- .../0215_functions_subscriptions.sql | 19 ++ 17 files changed, 762 insertions(+), 44 deletions(-) create mode 100644 core/services/gateway/handlers/functions/mocks/orm.go create mode 100644 core/services/gateway/handlers/functions/orm.go create mode 100644 core/services/gateway/handlers/functions/orm_test.go create mode 100644 core/store/migrate/migrations/0215_functions_subscriptions.sql diff --git a/core/scripts/gateway/run_gateway.go b/core/scripts/gateway/run_gateway.go index e30c43bb8af..5dbcd02bf56 100644 --- a/core/scripts/gateway/run_gateway.go +++ b/core/scripts/gateway/run_gateway.go @@ -48,7 +48,7 @@ func main() { lggr, _ := logger.NewLogger() - handlerFactory := gateway.NewHandlerFactory(nil, lggr) + handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr) gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr) if err != nil { fmt.Println("error creating Gateway object:", err) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ee109db2119..e94f51abdf5 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -344,6 +344,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { job.Gateway: gateway.NewDelegate( legacyEVMChains, keyStore.Eth(), + db, + cfg.Database(), globalLogger), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() diff --git a/core/services/gateway/delegate.go b/core/services/gateway/delegate.go index d4180184aca..8a97f68d1ea 100644 --- a/core/services/gateway/delegate.go +++ b/core/services/gateway/delegate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/google/uuid" + "github.com/jmoiron/sqlx" "github.com/pelletier/go-toml" "github.com/pkg/errors" @@ -18,13 +19,21 @@ import ( type Delegate struct { legacyChains legacyevm.LegacyChainContainer ks keystore.Eth + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ job.Delegate = (*Delegate)(nil) -func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, lggr logger.Logger) *Delegate { - return &Delegate{legacyChains: legacyChains, ks: ks, lggr: lggr} +func NewDelegate(legacyChains legacyevm.LegacyChainContainer, ks keystore.Eth, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) *Delegate { + return &Delegate{ + legacyChains: legacyChains, + ks: ks, + db: db, + cfg: cfg, + lggr: lggr, + } } func (d *Delegate) JobType() job.Type { @@ -47,7 +56,7 @@ func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err if err2 != nil { return nil, errors.Wrap(err2, "unmarshal gateway config") } - handlerFactory := NewHandlerFactory(d.legacyChains, d.lggr) + handlerFactory := NewHandlerFactory(d.legacyChains, d.db, d.cfg, d.lggr) gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr) if err != nil { return nil, err diff --git a/core/services/gateway/gateway_test.go b/core/services/gateway/gateway_test.go index 74d689fffe1..1c31a643c80 100644 --- a/core/services/gateway/gateway_test.go +++ b/core/services/gateway/gateway_test.go @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) } @@ -75,7 +75,7 @@ HandlerName = "dummy" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -89,7 +89,7 @@ HandlerName = "no_such_handler" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -103,7 +103,7 @@ SomeOtherField = "abcd" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -121,7 +121,7 @@ Address = "0xnot_an_address" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -129,7 +129,7 @@ func TestGateway_CleanStartAndClose(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) - gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) } diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 368bc64c872..8ccae8c7c4b 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -4,11 +4,14 @@ import ( "encoding/json" "fmt" + "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) const ( @@ -18,19 +21,21 @@ const ( type handlerFactory struct { legacyChains legacyevm.LegacyChainContainer + db *sqlx.DB + cfg pg.QConfig lggr logger.Logger } var _ HandlerFactory = (*handlerFactory)(nil) -func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) HandlerFactory { - return &handlerFactory{legacyChains, lggr} +func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) HandlerFactory { + return &handlerFactory{legacyChains, db, cfg, lggr} } func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) { switch handlerType { case FunctionsHandlerType: - return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.lggr) + return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.db, hf.cfg, hf.lggr) case DummyHandlerType: return handlers.NewDummyHandler(donConfig, don, hf.lggr) default: diff --git a/core/services/gateway/handlers/functions/handler.functions.go b/core/services/gateway/handlers/functions/handler.functions.go index 5277f9789d6..2872c72f761 100644 --- a/core/services/gateway/handlers/functions/handler.functions.go +++ b/core/services/gateway/handlers/functions/handler.functions.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" @@ -22,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" hc "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) var ( @@ -96,7 +98,7 @@ type PendingRequest struct { var _ handlers.Handler = (*functionsHandler)(nil) -func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger) (handlers.Handler, error) { +func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, legacyChains legacyevm.LegacyChainContainer, db *sqlx.DB, qcfg pg.QConfig, lggr logger.Logger) (handlers.Handler, error) { var cfg FunctionsHandlerConfig err := json.Unmarshal(handlerConfig, &cfg) if err != nil { @@ -133,7 +135,13 @@ func NewFunctionsHandlerFromConfig(handlerConfig json.RawMessage, donConfig *con if err2 != nil { return nil, err2 } - subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, lggr) + + orm, err2 := NewORM(db, lggr, qcfg, cfg.OnchainSubscriptions.ContractAddress) + if err2 != nil { + return nil, err2 + } + + subscriptions, err2 = NewOnchainSubscriptions(chain.Client(), *cfg.OnchainSubscriptions, orm, lggr) if err2 != nil { return nil, err2 } diff --git a/core/services/gateway/handlers/functions/handler.functions_test.go b/core/services/gateway/handlers/functions/handler.functions_test.go index 1d6dd109625..7e055815c88 100644 --- a/core/services/gateway/handlers/functions/handler.functions_test.go +++ b/core/services/gateway/handlers/functions/handler.functions_test.go @@ -83,7 +83,7 @@ func sendNodeReponses(t *testing.T, handler handlers.Handler, userRequestMsg api func TestFunctionsHandler_Minimal(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) // empty message should always error out @@ -95,7 +95,7 @@ func TestFunctionsHandler_Minimal(t *testing.T) { func TestFunctionsHandler_CleanStartAndClose(t *testing.T) { t.Parallel() - handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, logger.TestLogger(t)) + handler, err := functions.NewFunctionsHandlerFromConfig(json.RawMessage("{}"), &config.DONConfig{}, nil, nil, nil, nil, logger.TestLogger(t)) require.NoError(t, err) servicetest.Run(t, handler) diff --git a/core/services/gateway/handlers/functions/mocks/orm.go b/core/services/gateway/handlers/functions/mocks/orm.go new file mode 100644 index 00000000000..110675c8b55 --- /dev/null +++ b/core/services/gateway/handlers/functions/mocks/orm.go @@ -0,0 +1,91 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + functions "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + mock "github.com/stretchr/testify/mock" + + pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +// ORM is an autogenerated mock type for the ORM type +type ORM struct { + mock.Mock +} + +// GetSubscriptions provides a mock function with given fields: offset, limit, qopts +func (_m *ORM) GetSubscriptions(offset uint, limit uint, qopts ...pg.QOpt) ([]functions.CachedSubscription, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, offset, limit) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetSubscriptions") + } + + var r0 []functions.CachedSubscription + var r1 error + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) ([]functions.CachedSubscription, error)); ok { + return rf(offset, limit, qopts...) + } + if rf, ok := ret.Get(0).(func(uint, uint, ...pg.QOpt) []functions.CachedSubscription); ok { + r0 = rf(offset, limit, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]functions.CachedSubscription) + } + } + + if rf, ok := ret.Get(1).(func(uint, uint, ...pg.QOpt) error); ok { + r1 = rf(offset, limit, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpsertSubscription provides a mock function with given fields: subscription, qopts +func (_m *ORM) UpsertSubscription(subscription functions.CachedSubscription, qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, subscription) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for UpsertSubscription") + } + + var r0 error + if rf, ok := ret.Get(0).(func(functions.CachedSubscription, ...pg.QOpt) error); ok { + r0 = rf(subscription, qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewORM(t interface { + mock.TestingT + Cleanup(func()) +}) *ORM { + mock := &ORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/gateway/handlers/functions/orm.go b/core/services/gateway/handlers/functions/orm.go new file mode 100644 index 00000000000..b7ec8d865d1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm.go @@ -0,0 +1,132 @@ +package functions + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/jmoiron/sqlx" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore + +type ORM interface { + GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) + UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error +} + +type orm struct { + q pg.Q + routerContractAddress common.Address +} + +var _ ORM = (*orm)(nil) +var ( + ErrInvalidParameters = errors.New("invalid parameters provided to create a subscription cache ORM") +) + +const ( + tableName = "functions_subscriptions" +) + +type cachedSubscriptionRow struct { + SubscriptionID uint64 + Owner common.Address + Balance int64 + BlockedBalance int64 + ProposedOwner common.Address + Consumers pq.ByteaArray + Flags []uint8 + RouterContractAddress common.Address +} + +func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, routerContractAddress common.Address) (ORM, error) { + if db == nil || cfg == nil || lggr == nil || routerContractAddress == (common.Address{}) { + return nil, ErrInvalidParameters + } + + return &orm{ + q: pg.NewQ(db, lggr, cfg), + routerContractAddress: routerContractAddress, + }, nil +} + +func (o *orm) GetSubscriptions(offset, limit uint, qopts ...pg.QOpt) ([]CachedSubscription, error) { + var cacheSubscriptions []CachedSubscription + var cacheSubscriptionRows []cachedSubscriptionRow + stmt := fmt.Sprintf(` + SELECT subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address + FROM %s + WHERE router_contract_address = $1 + ORDER BY subscription_id ASC + OFFSET $2 + LIMIT $3; + `, tableName) + err := o.q.WithOpts(qopts...).Select(&cacheSubscriptionRows, stmt, o.routerContractAddress, offset, limit) + if err != nil { + return cacheSubscriptions, err + } + + for _, cs := range cacheSubscriptionRows { + cacheSubscriptions = append(cacheSubscriptions, cs.encode()) + } + + return cacheSubscriptions, nil +} + +// UpsertSubscription will update if a subscription exists or create if it does not. +// In case a subscription gets deleted we will update it with an owner address equal to 0x0. +func (o *orm) UpsertSubscription(subscription CachedSubscription, qopts ...pg.QOpt) error { + stmt := fmt.Sprintf(` + INSERT INTO %s (subscription_id, owner, balance, blocked_balance, proposed_owner, consumers, flags, router_contract_address) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (subscription_id, router_contract_address) DO UPDATE + SET owner=$2, balance=$3, blocked_balance=$4, proposed_owner=$5, consumers=$6, flags=$7, router_contract_address=$8;`, tableName) + + if subscription.Balance == nil { + subscription.Balance = big.NewInt(0) + } + + if subscription.BlockedBalance == nil { + subscription.BlockedBalance = big.NewInt(0) + } + + _, err := o.q.WithOpts(qopts...).Exec( + stmt, + subscription.SubscriptionID, + subscription.Owner, + subscription.Balance.Int64(), + subscription.BlockedBalance.Int64(), + subscription.ProposedOwner, + subscription.Consumers, + subscription.Flags[:], + o.routerContractAddress, + ) + + return err +} + +func (cs *cachedSubscriptionRow) encode() CachedSubscription { + consumers := make([]common.Address, 0) + for _, csc := range cs.Consumers { + consumers = append(consumers, common.BytesToAddress(csc)) + } + + return CachedSubscription{ + SubscriptionID: cs.SubscriptionID, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(cs.Balance), + Owner: cs.Owner, + BlockedBalance: big.NewInt(cs.BlockedBalance), + ProposedOwner: cs.ProposedOwner, + Consumers: consumers, + Flags: [32]byte(cs.Flags), + }, + } +} diff --git a/core/services/gateway/handlers/functions/orm_test.go b/core/services/gateway/handlers/functions/orm_test.go new file mode 100644 index 00000000000..37ec24ea0b1 --- /dev/null +++ b/core/services/gateway/handlers/functions/orm_test.go @@ -0,0 +1,246 @@ +package functions_test + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" +) + +var ( + defaultFlags = [32]byte{0x1, 0x2, 0x3} +) + +func setupORM(t *testing.T) (functions.ORM, error) { + t.Helper() + + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + return functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) +} + +func createSubscriptions(t *testing.T, orm functions.ORM, amount int) []functions.CachedSubscription { + cachedSubscriptions := make([]functions.CachedSubscription, 0) + for i := amount; i > 0; i-- { + cs := functions.CachedSubscription{ + SubscriptionID: uint64(i), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + cachedSubscriptions = append(cachedSubscriptions, cs) + err := orm.UpsertSubscription(cs) + require.NoError(t, err) + } + return cachedSubscriptions +} + +func TestORM_GetSubscriptions(t *testing.T) { + t.Parallel() + t.Run("fetch first page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[1], results[0]) + }) + + t.Run("fetch second page", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + cachedSubscriptions := createSubscriptions(t, orm, 2) + results, err := orm.GetSubscriptions(1, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, cachedSubscriptions[0], results[0]) + }) +} + +func TestORM_UpsertSubscription(t *testing.T) { + t.Parallel() + + t.Run("create a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + expected := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expected) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 1) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, expected, results[0]) + }) + + t.Run("update a subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + expectedUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + expectedNotUpdated := functions.CachedSubscription{ + SubscriptionID: uint64(2), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(expectedNotUpdated) + require.NoError(t, err) + + // update the balance value + expectedUpdated.Balance = big.NewInt(20) + err = orm.UpsertSubscription(expectedUpdated) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 2, len(results), "incorrect results length") + require.Equal(t, expectedNotUpdated, results[1]) + require.Equal(t, expectedUpdated, results[0]) + }) + + t.Run("update a deleted subscription", func(t *testing.T) { + orm, err := setupORM(t) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(10), + Owner: testutils.NewAddress(), + BlockedBalance: big.NewInt(20), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + // empty subscription + subscription.IFunctionsSubscriptionsSubscription = functions_router.IFunctionsSubscriptionsSubscription{ + Balance: big.NewInt(0), + Owner: common.Address{}, + BlockedBalance: big.NewInt(0), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: [32]byte{}, + } + + err = orm.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm.GetSubscriptions(0, 5) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + require.Equal(t, subscription, results[0]) + }) + + t.Run("create a subscription with same id but different router address", func(t *testing.T) { + var ( + db = pgtest.NewSqlxDB(t) + lggr = logger.TestLogger(t) + ) + + orm1, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + orm2, err := functions.NewORM(db, lggr, pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + + subscription := functions.CachedSubscription{ + SubscriptionID: uint64(1), + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: assets.Ether(10).ToInt(), + Owner: testutils.NewAddress(), + BlockedBalance: assets.Ether(20).ToInt(), + ProposedOwner: common.Address{}, + Consumers: []common.Address{}, + Flags: defaultFlags, + }, + } + + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + // should update the existing subscription + subscription.Balance = assets.Ether(12).ToInt() + err = orm1.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err := orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + // should create a new subscription because it comes from a different router contract + err = orm2.UpsertSubscription(subscription) + require.NoError(t, err) + + results, err = orm1.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + + results, err = orm2.GetSubscriptions(0, 10) + require.NoError(t, err) + require.Equal(t, 1, len(results), "incorrect results length") + }) +} + +func Test_NewORM(t *testing.T) { + t.Run("OK-create_ORM", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), testutils.NewAddress()) + require.NoError(t, err) + }) + t.Run("NOK-create_ORM_with_nil_fields", func(t *testing.T) { + _, err := functions.NewORM(nil, nil, nil, common.Address{}) + require.Error(t, err) + }) + t.Run("NOK-create_ORM_with_empty_address", func(t *testing.T) { + _, err := functions.NewORM(pgtest.NewSqlxDB(t), logger.TestLogger(t), pgtest.NewQConfig(true), common.Address{}) + require.Error(t, err) + }) +} diff --git a/core/services/gateway/handlers/functions/subscriptions.go b/core/services/gateway/handlers/functions/subscriptions.go index ebffbbdd206..8bc9cf09e7b 100644 --- a/core/services/gateway/handlers/functions/subscriptions.go +++ b/core/services/gateway/handlers/functions/subscriptions.go @@ -19,12 +19,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) +const defaultCacheBatchSize = 100 + type OnchainSubscriptionsConfig struct { ContractAddress common.Address `json:"contractAddress"` BlockConfirmations uint `json:"blockConfirmations"` UpdateFrequencySec uint `json:"updateFrequencySec"` UpdateTimeoutSec uint `json:"updateTimeoutSec"` UpdateRangeSize uint `json:"updateRangeSize"` + CacheBatchSize uint `json:"cacheBatchSize"` } // OnchainSubscriptions maintains a mirror of all subscriptions fetched from the blockchain (EVM-only). @@ -43,6 +46,7 @@ type onchainSubscriptions struct { config OnchainSubscriptionsConfig subscriptions UserSubscriptions + orm ORM client evmclient.Client router *functions_router.FunctionsRouter blockConfirmations *big.Int @@ -52,7 +56,7 @@ type onchainSubscriptions struct { stopCh services.StopChan } -func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, lggr logger.Logger) (OnchainSubscriptions, error) { +func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscriptionsConfig, orm ORM, lggr logger.Logger) (OnchainSubscriptions, error) { if client == nil { return nil, errors.New("client is nil") } @@ -63,9 +67,17 @@ func NewOnchainSubscriptions(client evmclient.Client, config OnchainSubscription if err != nil { return nil, fmt.Errorf("unexpected error during functions_router.NewFunctionsRouter: %s", err) } + + // if CacheBatchSize is not specified use the default value + if config.CacheBatchSize == 0 { + lggr.Info("CacheBatchSize not specified, using default size: ", defaultCacheBatchSize) + config.CacheBatchSize = defaultCacheBatchSize + } + return &onchainSubscriptions{ config: config, subscriptions: NewUserSubscriptions(), + orm: orm, client: client, router: router, blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), @@ -87,6 +99,8 @@ func (s *onchainSubscriptions) Start(ctx context.Context) error { return errors.New("OnchainSubscriptionsConfig.UpdateRangeSize must be greater than 0") } + s.loadCachedSubscriptions() + s.closeWait.Add(1) go s.queryLoop() @@ -190,7 +204,15 @@ func (s *onchainSubscriptions) querySubscriptionsRange(ctx context.Context, bloc for i, subscription := range subscriptions { subscriptionId := start + uint64(i) subscription := subscription - s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + updated := s.subscriptions.UpdateSubscription(subscriptionId, &subscription) + if updated { + if err = s.orm.UpsertSubscription(CachedSubscription{ + SubscriptionID: subscriptionId, + IFunctionsSubscriptionsSubscription: subscription, + }); err != nil { + s.lggr.Errorf("unexpected error updating subscription in the cache: %w", err) + } + } } return nil @@ -203,3 +225,30 @@ func (s *onchainSubscriptions) getSubscriptionsCount(ctx context.Context, blockN Context: ctx, }) } + +func (s *onchainSubscriptions) loadCachedSubscriptions() { + offset := uint(0) + for { + csBatch, err := s.orm.GetSubscriptions(offset, s.config.CacheBatchSize) + if err != nil { + break + } + + for _, cs := range csBatch { + _ = s.subscriptions.UpdateSubscription(cs.SubscriptionID, &functions_router.IFunctionsSubscriptionsSubscription{ + Balance: cs.Balance, + Owner: cs.Owner, + BlockedBalance: cs.BlockedBalance, + ProposedOwner: cs.ProposedOwner, + Consumers: cs.Consumers, + Flags: cs.Flags, + }) + } + s.lggr.Debugw("Loading cached subscriptions", "offset", offset, "batch_length", len(csBatch)) + + if len(csBatch) != int(s.config.CacheBatchSize) { + break + } + offset += s.config.CacheBatchSize + } +} diff --git a/core/services/gateway/handlers/functions/subscriptions_test.go b/core/services/gateway/handlers/functions/subscriptions_test.go index adbf637ad73..782dcc2332d 100644 --- a/core/services/gateway/handlers/functions/subscriptions_test.go +++ b/core/services/gateway/handlers/functions/subscriptions_test.go @@ -15,14 +15,17 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_router" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + fmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions/mocks" ) const ( validUser = "0x9ED925d8206a4f88a2f643b28B3035B315753Cd6" invalidUser = "0x6E2dc0F9DB014aE19888F539E59285D2Ea04244C" + cachedUser = "0x3E2dc0F9DB014aE19888F539E59285D2Ea04233G" ) func TestSubscriptions_OnePass(t *testing.T) { @@ -47,7 +50,10 @@ func TestSubscriptions_OnePass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -95,7 +101,10 @@ func TestSubscriptions_MultiPass(t *testing.T) { UpdateTimeoutSec: 1, UpdateRangeSize: 3, } - subscriptions, err := functions.NewOnchainSubscriptions(client, config, logger.TestLogger(t)) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(100)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) require.NoError(t, err) err = subscriptions.Start(ctx) @@ -108,3 +117,57 @@ func TestSubscriptions_MultiPass(t *testing.T) { return currentCycle.Load() == ncycles }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) } + +func TestSubscriptions_Cached(t *testing.T) { + getSubscriptionCount := hexutil.MustDecode("0x0000000000000000000000000000000000000000000000000000000000000003") + getSubscriptionsInRange := hexutil.MustDecode("0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000240000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000109e6e1b12098cc8f3a1e9719a817ec53ab9b35c000000000000000000000000000000000000000000000000000034e23f515cb0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f5340f0968ee8b7dfd97e3327a6139273cc2c4fa000000000000000000000000000000000000000000000001158e460913d000000000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001bc14b92364c75e20000000000000000000000009ed925d8206a4f88a2f643b28b3035b315753cd60000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000005439e5881a529f3ccbffc0e82d49f9db3950aefe") + + ctx := testutils.Context(t) + client := mocks.NewClient(t) + client.On("LatestBlockHeight", mock.Anything).Return(big.NewInt(42), nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getSubscriptionCount + To: &common.Address{}, + Data: hexutil.MustDecode("0x66419970"), + }, mock.Anything).Return(getSubscriptionCount, nil) + client.On("CallContract", mock.Anything, ethereum.CallMsg{ // GetSubscriptionsInRange + To: &common.Address{}, + Data: hexutil.MustDecode("0xec2454e500000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000003"), + }, mock.Anything).Return(getSubscriptionsInRange, nil) + config := functions.OnchainSubscriptionsConfig{ + ContractAddress: common.Address{}, + BlockConfirmations: 1, + UpdateFrequencySec: 1, + UpdateTimeoutSec: 1, + UpdateRangeSize: 3, + CacheBatchSize: 1, + } + + expectedBalance := big.NewInt(5) + orm := fmocks.NewORM(t) + orm.On("GetSubscriptions", uint(0), uint(1)).Return([]functions.CachedSubscription{ + { + SubscriptionID: 1, + IFunctionsSubscriptionsSubscription: functions_router.IFunctionsSubscriptionsSubscription{ + Balance: expectedBalance, + Owner: common.HexToAddress(cachedUser), + BlockedBalance: big.NewInt(10), + }, + }, + }, nil) + orm.On("GetSubscriptions", uint(1), uint(1)).Return([]functions.CachedSubscription{}, nil) + orm.On("UpsertSubscription", mock.Anything).Return(nil) + + subscriptions, err := functions.NewOnchainSubscriptions(client, config, orm, logger.TestLogger(t)) + require.NoError(t, err) + + err = subscriptions.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, subscriptions.Close()) + }) + + gomega.NewGomegaWithT(t).Eventually(func() bool { + actualBalance, err := subscriptions.GetMaxUserBalance(common.HexToAddress(cachedUser)) + return err == nil && assert.Equal(t, expectedBalance, actualBalance) + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) +} diff --git a/core/services/gateway/handlers/functions/user_subscriptions.go b/core/services/gateway/handlers/functions/user_subscriptions.go index ff3dd753995..7e63720da71 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions.go +++ b/core/services/gateway/handlers/functions/user_subscriptions.go @@ -12,8 +12,10 @@ import ( // Methods are NOT thread-safe. +var ErrUserHasNoSubscription = errors.New("user has no subscriptions") + type UserSubscriptions interface { - UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) + UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool GetMaxUserBalance(user common.Address) (*big.Int, error) } @@ -29,29 +31,45 @@ func NewUserSubscriptions() UserSubscriptions { } } -func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) { +// CachedSubscription is used to populate the user subscription maps from a persistent layer like postgres. +type CachedSubscription struct { + SubscriptionID uint64 + functions_router.IFunctionsSubscriptionsSubscription +} + +// UpdateSubscription updates a subscription returning false in case there was no variation to the current state. +func (us *userSubscriptions) UpdateSubscription(subscriptionId uint64, subscription *functions_router.IFunctionsSubscriptionsSubscription) bool { if subscription == nil || subscription.Owner == utils.ZeroAddress { user, ok := us.subscriptionIdsMap[subscriptionId] - if ok { - delete(us.userSubscriptionsMap[user], subscriptionId) - if len(us.userSubscriptionsMap[user]) == 0 { - delete(us.userSubscriptionsMap, user) - } + if !ok { + return false } + + delete(us.userSubscriptionsMap[user], subscriptionId) delete(us.subscriptionIdsMap, subscriptionId) - } else { - us.subscriptionIdsMap[subscriptionId] = subscription.Owner - if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { - us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) + if len(us.userSubscriptionsMap[user]) == 0 { + delete(us.userSubscriptionsMap, user) } - us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true + } + + // there is no change to the subscription + if us.userSubscriptionsMap[subscription.Owner][subscriptionId] == subscription { + return false + } + + us.subscriptionIdsMap[subscriptionId] = subscription.Owner + if _, ok := us.userSubscriptionsMap[subscription.Owner]; !ok { + us.userSubscriptionsMap[subscription.Owner] = make(map[uint64]*functions_router.IFunctionsSubscriptionsSubscription) } + us.userSubscriptionsMap[subscription.Owner][subscriptionId] = subscription + return true } func (us *userSubscriptions) GetMaxUserBalance(user common.Address) (*big.Int, error) { subs, exists := us.userSubscriptionsMap[user] if !exists { - return nil, errors.New("user has no subscriptions") + return nil, ErrUserHasNoSubscription } maxBalance := big.NewInt(0) diff --git a/core/services/gateway/handlers/functions/user_subscriptions_test.go b/core/services/gateway/handlers/functions/user_subscriptions_test.go index e86399eb609..53827e07e1b 100644 --- a/core/services/gateway/handlers/functions/user_subscriptions_test.go +++ b/core/services/gateway/handlers/functions/user_subscriptions_test.go @@ -28,18 +28,23 @@ func TestUserSubscriptions(t *testing.T) { user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(5, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user1, Balance: user1Balance, }) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) balance, err := us.GetMaxUserBalance(user1) assert.NoError(t, err) @@ -49,29 +54,96 @@ func TestUserSubscriptions(t *testing.T) { assert.NoError(t, err) assert.Zero(t, balance.Cmp(user2Balance2)) }) +} - t.Run("UpdateSubscription to remove subscriptions", func(t *testing.T) { +func TestUserSubscriptions_UpdateSubscription(t *testing.T) { + t.Parallel() + + t.Run("update balance", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(100), + }) + assert.True(t, updated) + }) + + t.Run("updated proposed owner", func(t *testing.T) { + us := functions.NewUserSubscriptions() + owner := utils.RandomAddress() + + updated := us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + }) + assert.True(t, updated) + + updated = us.UpdateSubscription(1, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: owner, + Balance: big.NewInt(10), + ProposedOwner: utils.RandomAddress(), + }) + assert.True(t, updated) + }) + t.Run("remove subscriptions", func(t *testing.T) { + us := functions.NewUserSubscriptions() user2 := utils.RandomAddress() user2Balance1 := big.NewInt(50) user2Balance2 := big.NewInt(70) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance1, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: user2, Balance: user2Balance2, }) + assert.True(t, updated) - us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + updated = us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) - us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ + assert.True(t, updated) + + updated = us.UpdateSubscription(10, &functions_router.IFunctionsSubscriptionsSubscription{ Owner: utils.ZeroAddress, }) + assert.True(t, updated) _, err := us.GetMaxUserBalance(user2) assert.Error(t, err) }) + + t.Run("remove a non existing subscription", func(t *testing.T) { + us := functions.NewUserSubscriptions() + updated := us.UpdateSubscription(3, &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.ZeroAddress, + }) + assert.False(t, updated) + }) + + t.Run("no actual changes", func(t *testing.T) { + us := functions.NewUserSubscriptions() + subscription := &functions_router.IFunctionsSubscriptionsSubscription{ + Owner: utils.RandomAddress(), + Balance: big.NewInt(25), + BlockedBalance: big.NewInt(25), + } + updated := us.UpdateSubscription(5, subscription) + assert.True(t, updated) + + updated = us.UpdateSubscription(5, subscription) + assert.False(t, updated) + }) } diff --git a/core/services/gateway/integration_tests/gateway_integration_test.go b/core/services/gateway/integration_tests/gateway_integration_test.go index 415a8f67cf8..9e4900efeee 100644 --- a/core/services/gateway/integration_tests/gateway_integration_test.go +++ b/core/services/gateway/integration_tests/gateway_integration_test.go @@ -118,7 +118,7 @@ func TestIntegration_Gateway_NoFullNodes_BasicConnectionAndMessage(t *testing.T) // Launch Gateway lggr := logger.TestLogger(t) gatewayConfig := fmt.Sprintf(gatewayConfigTemplate, nodeKeys.Address) - gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) userPort, nodePort := gateway.GetUserPort(), gateway.GetNodePort() diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 7e2b15bdccf..c6cfa946aba 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -143,7 +143,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs if err2 != nil { return nil, errors.Wrap(err, "failed to create a RateLimiter") } - subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, conf.Logger) + gwFunctionsORM, err := gwFunctions.NewORM(conf.DB, conf.Logger, conf.QConfig, pluginConfig.OnchainSubscriptions.ContractAddress) + if err != nil { + return nil, errors.Wrap(err, "failed to create functions ORM") + } + subscriptions, err2 := gwFunctions.NewOnchainSubscriptions(conf.Chain.Client(), *pluginConfig.OnchainSubscriptions, gwFunctionsORM, conf.Logger) if err2 != nil { return nil, errors.Wrap(err, "failed to create a OnchainSubscriptions") } diff --git a/core/store/migrate/migrations/0215_functions_subscriptions.sql b/core/store/migrate/migrations/0215_functions_subscriptions.sql new file mode 100644 index 00000000000..c3859d42f63 --- /dev/null +++ b/core/store/migrate/migrations/0215_functions_subscriptions.sql @@ -0,0 +1,19 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE functions_subscriptions( + router_contract_address bytea, + subscription_id bigint, + owner bytea CHECK (octet_length(owner) = 20) NOT NULL, + balance bigint, + blocked_balance bigint, + proposed_owner bytea, + consumers bytea[], + flags bytea, + PRIMARY KEY(router_contract_address, subscription_id) +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS functions_subscriptions; +-- +goose StatementEnd From 2f10153ff81916a561f3035dee98f2530e6a17a6 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Thu, 4 Jan 2024 11:35:26 -0600 Subject: [PATCH 2/2] add chainlink health command; make DB avialable for testscript client/server tests (#11591) --- core/cmd/app.go | 11 + core/cmd/shell_local.go | 14 +- core/cmd/shell_remote.go | 18 ++ core/internal/cltest/heavyweight/orm.go | 71 ++--- core/web/health_controller.go | 2 +- core/web/health_controller_test.go | 4 +- docs/CHANGELOG.md | 4 + internal/testdb/testdb.go | 56 ++++ main_test.go | 94 ++++++- testdata/scripts/health/default.txtar | 129 +++++++++ testdata/scripts/health/help.txtar | 13 + testdata/scripts/health/multi-chain.txtar | 309 ++++++++++++++++++++++ testdata/scripts/help.txtar | 1 + testdata/scripts/node/db/migrate/db.txtar | 6 + 14 files changed, 658 insertions(+), 74 deletions(-) create mode 100644 internal/testdb/testdb.go create mode 100644 testdata/scripts/health/default.txtar create mode 100644 testdata/scripts/health/help.txtar create mode 100644 testdata/scripts/health/multi-chain.txtar create mode 100644 testdata/scripts/node/db/migrate/db.txtar diff --git a/core/cmd/app.go b/core/cmd/app.go index 17a4da85002..8e61380156c 100644 --- a/core/cmd/app.go +++ b/core/cmd/app.go @@ -162,6 +162,17 @@ func NewApp(s *Shell) *cli.App { Usage: "Commands for the node's configuration", Subcommands: initRemoteConfigSubCmds(s), }, + { + Name: "health", + Usage: "Prints a health report", + Action: s.Health, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "json, j", + Usage: "json output", + }, + }, + }, { Name: "jobs", Usage: "Commands for managing Jobs", diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index d4fb796c3e2..69b7373ed70 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -51,6 +51,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/web" webPresenters "github.com/smartcontractkit/chainlink/v2/core/web/presenters" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" ) var ErrProfileTooLong = errors.New("requested profile duration too large") @@ -258,13 +259,6 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { // ownerPermsMask are the file permission bits reserved for owner. const ownerPermsMask = os.FileMode(0o700) -// PristineDBName is a clean copy of test DB with migrations. -// Used by heavyweight.FullTestDB* functions. -const ( - PristineDBName = "chainlink_test_pristine" - TestDBNamePrefix = "chainlink_test_" -) - // RunNode starts the Chainlink core. func (s *Shell) RunNode(c *cli.Context) error { if err := s.runNode(c); err != nil { @@ -815,7 +809,7 @@ func dropDanglingTestDBs(lggr logger.Logger, db *sqlx.DB) (err error) { }() } for _, dbname := range dbs { - if strings.HasPrefix(dbname, TestDBNamePrefix) && !strings.HasSuffix(dbname, "_pristine") { + if strings.HasPrefix(dbname, testdb.TestDBNamePrefix) && !strings.HasSuffix(dbname, "_pristine") { ch <- dbname } } @@ -1085,11 +1079,11 @@ func dropAndCreateDB(parsed url.URL) (err error) { } func dropAndCreatePristineDB(db *sqlx.DB, template string) (err error) { - _, err = db.Exec(fmt.Sprintf(`DROP DATABASE IF EXISTS "%s"`, PristineDBName)) + _, err = db.Exec(fmt.Sprintf(`DROP DATABASE IF EXISTS "%s"`, testdb.PristineDBName)) if err != nil { return fmt.Errorf("unable to drop postgres database: %v", err) } - _, err = db.Exec(fmt.Sprintf(`CREATE DATABASE "%s" WITH TEMPLATE "%s"`, PristineDBName, template)) + _, err = db.Exec(fmt.Sprintf(`CREATE DATABASE "%s" WITH TEMPLATE "%s"`, testdb.PristineDBName, template)) if err != nil { return fmt.Errorf("unable to create postgres database: %v", err) } diff --git a/core/cmd/shell_remote.go b/core/cmd/shell_remote.go index fa72d21ee6f..bc4620d0732 100644 --- a/core/cmd/shell_remote.go +++ b/core/cmd/shell_remote.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + "github.com/gin-gonic/gin" "github.com/manyminds/api2go/jsonapi" "github.com/mitchellh/go-homedir" "github.com/pelletier/go-toml" @@ -511,6 +512,23 @@ func (s *Shell) checkRemoteBuildCompatibility(lggr logger.Logger, onlyWarn bool, return nil } +func (s *Shell) Health(c *cli.Context) error { + mime := gin.MIMEPlain + if c.Bool("json") { + mime = gin.MIMEJSON + } + resp, err := s.HTTP.Get("/health", map[string]string{"Accept": mime}) + if err != nil { + return s.errorOut(err) + } + b, err := parseResponse(resp) + if err != nil { + return s.errorOut(err) + } + fmt.Println(string(b)) + return nil +} + // ErrIncompatible is returned when the cli and remote versions are not compatible. type ErrIncompatible struct { CLIVersion, CLISha string diff --git a/core/internal/cltest/heavyweight/orm.go b/core/internal/cltest/heavyweight/orm.go index 5df28a49778..f49a94be05b 100644 --- a/core/internal/cltest/heavyweight/orm.go +++ b/core/internal/cltest/heavyweight/orm.go @@ -1,13 +1,8 @@ -package heavyweight - -// The heavyweight package contains cltest items that are costly and you should +// Package heavyweight contains test helpers that are costly and you should // think **real carefully** before using in your tests. +package heavyweight import ( - "database/sql" - "errors" - "fmt" - "net/url" "os" "path" "runtime" @@ -20,41 +15,45 @@ import ( "github.com/jmoiron/sqlx" - "github.com/smartcontractkit/chainlink/v2/core/cmd" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/store/dialects" "github.com/smartcontractkit/chainlink/v2/core/store/models" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" ) // FullTestDBV2 creates a pristine DB which runs in a separate database than the normal // unit tests, so you can do things like use other Postgres connection types with it. func FullTestDBV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, false, true, overrideFn) + return KindFixtures.PrepareDB(t, overrideFn) } // FullTestDBNoFixturesV2 is the same as FullTestDB, but it does not load fixtures. func FullTestDBNoFixturesV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, false, false, overrideFn) + return KindTemplate.PrepareDB(t, overrideFn) } // FullTestDBEmptyV2 creates an empty DB (without migrations). func FullTestDBEmptyV2(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - return prepareFullTestDBV2(t, true, false, overrideFn) + return KindEmpty.PrepareDB(t, overrideFn) } func generateName() string { return strings.ReplaceAll(uuid.New().String(), "-", "") } -func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { - testutils.SkipShort(t, "FullTestDB") +type Kind int - if empty && loadFixtures { - t.Fatal("could not load fixtures into an empty DB") - } +const ( + KindEmpty Kind = iota + KindTemplate + KindFixtures +) + +func (c Kind) PrepareDB(t testing.TB, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { + testutils.SkipShort(t, "FullTestDB") gcfg := configtest.NewGeneralConfigSimulated(t, func(c *chainlink.Config, s *chainlink.Secrets) { c.Database.Dialect = dialects.Postgres @@ -64,7 +63,7 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn }) require.NoError(t, os.MkdirAll(gcfg.RootDir(), 0700)) - migrationTestDBURL, err := dropAndCreateThrowawayTestDB(gcfg.Database().URL(), generateName(), empty) + migrationTestDBURL, err := testdb.CreateOrReplace(gcfg.Database().URL(), generateName(), c != KindEmpty) require.NoError(t, err) db, err := pg.NewConnection(migrationTestDBURL, dialects.Postgres, gcfg.Database()) require.NoError(t, err) @@ -81,7 +80,7 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn } }) - if loadFixtures { + if c == KindFixtures { _, filename, _, ok := runtime.Caller(1) if !ok { t.Fatal("could not get runtime.Caller(1)") @@ -95,39 +94,3 @@ func prepareFullTestDBV2(t testing.TB, empty bool, loadFixtures bool, overrideFn return gcfg, db } - -func dropAndCreateThrowawayTestDB(parsed url.URL, postfix string, empty bool) (string, error) { - if parsed.Path == "" { - return "", errors.New("path missing from database URL") - } - - // Match the naming schema that our dangling DB cleanup methods expect - dbname := cmd.TestDBNamePrefix + postfix - if l := len(dbname); l > 63 { - return "", fmt.Errorf("dbname %v too long (%d), max is 63 bytes. Try a shorter postfix", dbname, l) - } - // Cannot drop test database if we are connected to it, so we must connect - // to a different one. 'postgres' should be present on all postgres installations - parsed.Path = "/postgres" - db, err := sql.Open(string(dialects.Postgres), parsed.String()) - if err != nil { - return "", fmt.Errorf("In order to drop the test database, we need to connect to a separate database"+ - " called 'postgres'. But we are unable to open 'postgres' database: %+v\n", err) - } - defer db.Close() - - _, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbname)) - if err != nil { - return "", fmt.Errorf("unable to drop postgres migrations test database: %v", err) - } - if empty { - _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", dbname)) - } else { - _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s WITH TEMPLATE %s", dbname, cmd.PristineDBName)) - } - if err != nil { - return "", fmt.Errorf("unable to create postgres test database with name '%s': %v", dbname, err) - } - parsed.Path = fmt.Sprintf("/%s", dbname) - return parsed.String(), nil -} diff --git a/core/web/health_controller.go b/core/web/health_controller.go index c8489fd6325..7ab07291b58 100644 --- a/core/web/health_controller.go +++ b/core/web/health_controller.go @@ -76,7 +76,7 @@ func (hc *HealthController) Health(c *gin.Context) { healthy, errors := checker.IsHealthy() if !healthy { - status = http.StatusServiceUnavailable + status = http.StatusMultiStatus } c.Status(status) diff --git a/core/web/health_controller_test.go b/core/web/health_controller_test.go index ae40a66bca9..547186e33fb 100644 --- a/core/web/health_controller_test.go +++ b/core/web/health_controller_test.go @@ -62,7 +62,7 @@ func TestHealthController_Health_status(t *testing.T) { { name: "not ready", ready: false, - status: http.StatusServiceUnavailable, + status: http.StatusMultiStatus, }, { name: "ready", @@ -118,7 +118,7 @@ func TestHealthController_Health_body(t *testing.T) { client := app.NewHTTPClient(nil) resp, cleanup := client.Get(tc.path, tc.headers) t.Cleanup(cleanup) - assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + assert.Equal(t, http.StatusMultiStatus, resp.StatusCode) body, err := io.ReadAll(resp.Body) require.NoError(t, err) if tc.expBody == bodyJSON { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f18fe5369af..36ee68412ca 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [dev] +### Added + +- `chainlink health` CLI command and HTML `/health` endpoint, to provide human-readable views of the underlying JSON health data. + ### Fixed - Fixed the encoding used for transactions when resending in batches diff --git a/internal/testdb/testdb.go b/internal/testdb/testdb.go new file mode 100644 index 00000000000..9b531166113 --- /dev/null +++ b/internal/testdb/testdb.go @@ -0,0 +1,56 @@ +package testdb + +import ( + "database/sql" + "errors" + "fmt" + "net/url" + + "github.com/smartcontractkit/chainlink/v2/core/store/dialects" +) + +const ( + // PristineDBName is a clean copy of test DB with migrations. + PristineDBName = "chainlink_test_pristine" + // TestDBNamePrefix is a common prefix that will be auto-removed by the dangling DB cleanup process. + TestDBNamePrefix = "chainlink_test_" +) + +// CreateOrReplace creates a database named with a common prefix and the given suffix, and returns the URL. +// If the database already exists, it will be dropped and re-created. +// If withTemplate is true, the pristine DB will be used as a template. +func CreateOrReplace(parsed url.URL, suffix string, withTemplate bool) (string, error) { + if parsed.Path == "" { + return "", errors.New("path missing from database URL") + } + + // Match the naming schema that our dangling DB cleanup methods expect + dbname := TestDBNamePrefix + suffix + if l := len(dbname); l > 63 { + return "", fmt.Errorf("dbname %v too long (%d), max is 63 bytes. Try a shorter suffix", dbname, l) + } + // Cannot drop test database if we are connected to it, so we must connect + // to a different one. 'postgres' should be present on all postgres installations + parsed.Path = "/postgres" + db, err := sql.Open(string(dialects.Postgres), parsed.String()) + if err != nil { + return "", fmt.Errorf("in order to drop the test database, we need to connect to a separate database"+ + " called 'postgres'. But we are unable to open 'postgres' database: %+v\n", err) + } + defer db.Close() + + _, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbname)) + if err != nil { + return "", fmt.Errorf("unable to drop postgres migrations test database: %v", err) + } + if withTemplate { + _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s WITH TEMPLATE %s", dbname, PristineDBName)) + } else { + _, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", dbname)) + } + if err != nil { + return "", fmt.Errorf("unable to create postgres test database with name '%s': %v", dbname, err) + } + parsed.Path = fmt.Sprintf("/%s", dbname) + return parsed.String(), nil +} diff --git a/main_test.go b/main_test.go index 032ec4718a2..15b17e32654 100644 --- a/main_test.go +++ b/main_test.go @@ -1,17 +1,41 @@ package main import ( + "fmt" + "net/url" "os" + "path/filepath" + "strconv" + "strings" "testing" + "github.com/google/uuid" + "github.com/hashicorp/consul/sdk/freeport" "github.com/rogpeppe/go-internal/testscript" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core" + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/static" + "github.com/smartcontractkit/chainlink/v2/internal/testdb" "github.com/smartcontractkit/chainlink/v2/tools/txtar" ) +// special files can be included to allocate additional test resources +const ( + // testDBName triggers initializing of a test database. + // The URL will be set as the value of an env var named by the file. + // + // -- testdb.txt -- + // CL_DATABASE_URL + testDBName = "testdb.txt" + // testPortName triggers injection of a free port as the value of an env var named by the file. + // + // -- testport.txt -- + // PORT + testPortName = "testport.txt" +) + func TestMain(m *testing.M) { os.Exit(testscript.RunMain(m, map[string]func() int{ "chainlink": core.Main, @@ -22,11 +46,14 @@ func TestScripts(t *testing.T) { t.Parallel() visitor := txtar.NewDirVisitor("testdata/scripts", txtar.Recurse, func(path string) error { - t.Run(path, func(t *testing.T) { + t.Run(strings.TrimPrefix(path, "testdata/scripts/"), func(t *testing.T) { t.Parallel() + testscript.Run(t, testscript.Params{ - Dir: path, - Setup: commonEnv, + Dir: path, + Setup: commonEnv, + ContinueOnError: true, + //UpdateScripts: true, // uncomment to update golden files }) }) return nil @@ -35,9 +62,62 @@ func TestScripts(t *testing.T) { require.NoError(t, visitor.Walk()) } -func commonEnv(env *testscript.Env) error { - env.Setenv("HOME", "$WORK/home") - env.Setenv("VERSION", static.Version) - env.Setenv("COMMIT_SHA", static.Sha) +func commonEnv(te *testscript.Env) error { + te.Setenv("HOME", "$WORK/home") + te.Setenv("VERSION", static.Version) + te.Setenv("COMMIT_SHA", static.Sha) + + b, err := os.ReadFile(filepath.Join(te.WorkDir, testPortName)) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read file %s: %w", testPortName, err) + } else if err == nil { + envVarName := strings.TrimSpace(string(b)) + te.T().Log("test port requested:", envVarName) + + port, ret, err2 := takeFreePort() + if err2 != nil { + return err2 + } + te.Defer(ret) + + te.Setenv(envVarName, strconv.Itoa(port)) + } + + b, err = os.ReadFile(filepath.Join(te.WorkDir, testDBName)) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read file %s: %w", testDBName, err) + } else if err == nil { + envVarName := strings.TrimSpace(string(b)) + te.T().Log("test database requested:", envVarName) + + u2, err2 := initDB() + if err2 != nil { + return err2 + } + + te.Setenv(envVarName, u2) + } return nil } + +func takeFreePort() (int, func(), error) { + ports, err := freeport.Take(1) + if err != nil { + return 0, nil, fmt.Errorf("failed to get free port: %w", err) + } + return ports[0], func() { freeport.Return(ports) }, nil +} + +func initDB() (string, error) { + u, err := url.Parse(string(env.DatabaseURL.Get())) + if err != nil { + return "", fmt.Errorf("failed to parse url: %w", err) + } + + name := strings.ReplaceAll(uuid.NewString(), "-", "_") + "_test" + u2, err := testdb.CreateOrReplace(*u, name, true) + if err != nil { + return "", fmt.Errorf("failed to create DB: %w", err) + } + return u2, nil +} diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar new file mode 100644 index 00000000000..c80faadfd13 --- /dev/null +++ b/testdata/scripts/health/default.txtar @@ -0,0 +1,129 @@ +# start node +exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' +exec chainlink node -c config.toml start -p password -a creds & + +# initialize client +env NODEURL=http://localhost:$PORT +exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL +exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check + +exec chainlink --remote-node-url $NODEURL health +cmp stdout out.txt + +exec chainlink --remote-node-url $NODEURL health -json +cp stdout compact.json +exec jq . compact.json +cmp stdout out.json + +-- testdb.txt -- +CL_DATABASE_URL +-- testport.txt -- +PORT + +-- password -- +T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ +-- creds -- +notreal@fakeemail.ch +fj293fbBnlQ!f9vNs + +-- config.toml.tmpl -- +[Webserver] +HTTPPort = $PORT + +-- out.txt -- +-EventBroadcaster +-JobSpawner +-Mailbox.Monitor +-Mercury.WSRPCPool +-Mercury.WSRPCPool.CacheSet +-PipelineORM +-PipelineRunner +-PromReporter +-TelemetryManager + +-- out.json -- +{ + "data": [ + { + "type": "checks", + "id": "EventBroadcaster", + "attributes": { + "name": "EventBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "JobSpawner", + "attributes": { + "name": "JobSpawner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mailbox.Monitor", + "attributes": { + "name": "Mailbox.Monitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool", + "attributes": { + "name": "Mercury.WSRPCPool", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool.CacheSet", + "attributes": { + "name": "Mercury.WSRPCPool.CacheSet", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineORM", + "attributes": { + "name": "PipelineORM", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner", + "attributes": { + "name": "PipelineRunner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PromReporter", + "attributes": { + "name": "PromReporter", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "TelemetryManager", + "attributes": { + "name": "TelemetryManager", + "status": "passing", + "output": "" + } + } + ] +} diff --git a/testdata/scripts/health/help.txtar b/testdata/scripts/health/help.txtar new file mode 100644 index 00000000000..07eb0509e73 --- /dev/null +++ b/testdata/scripts/health/help.txtar @@ -0,0 +1,13 @@ +exec chainlink health --help +cmp stdout out.txt + +-- out.txt -- +NAME: + chainlink health - Prints a health report + +USAGE: + chainlink health [command options] [arguments...] + +OPTIONS: + --json, -j json output + diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar new file mode 100644 index 00000000000..72c5fd8e3f6 --- /dev/null +++ b/testdata/scripts/health/multi-chain.txtar @@ -0,0 +1,309 @@ +# start node +exec sh -c 'eval "echo \"$(cat config.toml.tmpl)\" > config.toml"' +exec chainlink node -c config.toml start -p password -a creds & + +# initialize client +env NODEURL=http://localhost:$PORT +exec curl --retry 10 --retry-max-time 60 --retry-connrefused $NODEURL +exec chainlink --remote-node-url $NODEURL admin login -file creds --bypass-version-check + +exec chainlink --remote-node-url $NODEURL health +cmp stdout out.txt + +exec chainlink --remote-node-url $NODEURL health -json +cp stdout compact.json +exec jq . compact.json +cmp stdout out.json + +-- testdb.txt -- +CL_DATABASE_URL +-- testport.txt -- +PORT + +-- password -- +T.tLHkcmwePT/p,]sYuntjwHKAsrhm#4eRs4LuKHwvHejWYAC2JP4M8HimwgmbaZ +-- creds -- +notreal@fakeemail.ch +fj293fbBnlQ!f9vNs + +-- config.toml.tmpl -- +[Webserver] +HTTPPort = $PORT + +[[Cosmos]] +ChainID = 'Foo' + +[[Cosmos.Nodes]] +Name = 'primary' +TendermintURL = 'http://tender.mint' + +[[EVM]] +ChainID = '1' + +[[EVM.Nodes]] +Name = 'fake' +WSURL = 'wss://foo.bar/ws' +HTTPURL = 'https://foo.bar' + +[[Solana]] +ChainID = 'Bar' + +[[Solana.Nodes]] +Name = 'primary' +URL = 'http://solana.web' + +[[Starknet]] +ChainID = 'Baz' + +[[Starknet.Nodes]] +Name = 'primary' +URL = 'http://stark.node' + +-- out.txt -- +-Cosmos.Foo.Chain +-Cosmos.Foo.Txm +-EVM.1 +-EVM.1.BalanceMonitor +-EVM.1.HeadBroadcaster +-EVM.1.HeadTracker +!EVM.1.HeadTracker.HeadListener + Listener is not connected +-EVM.1.LogBroadcaster +-EVM.1.Txm +-EVM.1.Txm.BlockHistoryEstimator +-EVM.1.Txm.Broadcaster +-EVM.1.Txm.Confirmer +-EVM.1.Txm.WrappedEvmEstimator +-EventBroadcaster +-JobSpawner +-Mailbox.Monitor +-Mercury.WSRPCPool +-Mercury.WSRPCPool.CacheSet +-PipelineORM +-PipelineRunner +-PromReporter +-Solana.Bar +-StarkNet.Baz +-TelemetryManager + +-- out.json -- +{ + "data": [ + { + "type": "checks", + "id": "Cosmos.Foo.Chain", + "attributes": { + "name": "Cosmos.Foo.Chain", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Cosmos.Foo.Txm", + "attributes": { + "name": "Cosmos.Foo.Txm", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1", + "attributes": { + "name": "EVM.1", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.BalanceMonitor", + "attributes": { + "name": "EVM.1.BalanceMonitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadBroadcaster", + "attributes": { + "name": "EVM.1.HeadBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadTracker", + "attributes": { + "name": "EVM.1.HeadTracker", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.HeadTracker.HeadListener", + "attributes": { + "name": "EVM.1.HeadTracker.HeadListener", + "status": "failing", + "output": "Listener is not connected" + } + }, + { + "type": "checks", + "id": "EVM.1.LogBroadcaster", + "attributes": { + "name": "EVM.1.LogBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm", + "attributes": { + "name": "EVM.1.Txm", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.BlockHistoryEstimator", + "attributes": { + "name": "EVM.1.Txm.BlockHistoryEstimator", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.Broadcaster", + "attributes": { + "name": "EVM.1.Txm.Broadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.Confirmer", + "attributes": { + "name": "EVM.1.Txm.Confirmer", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EVM.1.Txm.WrappedEvmEstimator", + "attributes": { + "name": "EVM.1.Txm.WrappedEvmEstimator", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "EventBroadcaster", + "attributes": { + "name": "EventBroadcaster", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "JobSpawner", + "attributes": { + "name": "JobSpawner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mailbox.Monitor", + "attributes": { + "name": "Mailbox.Monitor", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool", + "attributes": { + "name": "Mercury.WSRPCPool", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Mercury.WSRPCPool.CacheSet", + "attributes": { + "name": "Mercury.WSRPCPool.CacheSet", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineORM", + "attributes": { + "name": "PipelineORM", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PipelineRunner", + "attributes": { + "name": "PipelineRunner", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "PromReporter", + "attributes": { + "name": "PromReporter", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "Solana.Bar", + "attributes": { + "name": "Solana.Bar", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "StarkNet.Baz", + "attributes": { + "name": "StarkNet.Baz", + "status": "passing", + "output": "" + } + }, + { + "type": "checks", + "id": "TelemetryManager", + "attributes": { + "name": "TelemetryManager", + "status": "passing", + "output": "" + } + } + ] +} diff --git a/testdata/scripts/help.txtar b/testdata/scripts/help.txtar index 1484aceb5df..e4c19f3987d 100644 --- a/testdata/scripts/help.txtar +++ b/testdata/scripts/help.txtar @@ -17,6 +17,7 @@ COMMANDS: blocks Commands for managing blocks bridges Commands for Bridges communicating with External Adapters config Commands for the node's configuration + health Prints a health report jobs Commands for managing Jobs keys Commands for managing various types of keys used by the Chainlink node node, local Commands for admin actions that must be run locally diff --git a/testdata/scripts/node/db/migrate/db.txtar b/testdata/scripts/node/db/migrate/db.txtar new file mode 100644 index 00000000000..f040a937fd0 --- /dev/null +++ b/testdata/scripts/node/db/migrate/db.txtar @@ -0,0 +1,6 @@ +exec chainlink node db migrate +! stdout . +stderr 'goose: no migrations to run. current version:' + +-- testdb.txt -- +CL_DATABASE_URL