Skip to content

Commit

Permalink
wrestling with too many connections
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Jun 21, 2024
1 parent 9ee4543 commit 76fc0b8
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 53 deletions.
12 changes: 9 additions & 3 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

var GetAuthorisedSendersABI = evmtypes.MustGetABI(authorized_receiver.AuthorizedReceiverABI).Methods["getAuthorizedSenders"]
Expand Down Expand Up @@ -113,10 +112,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {

func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
lggr := logger.Test(t)
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
cfg := configtest.NewTestGeneralConfig(t)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
// db := pgtest.NewSqlxDB(t)
testChainId := ubig.New(evmcfg.EVM().ChainID())
// db := pgtest.NewSqlxDB(t)

db := evmtestdb.NewDB(t, evm.Cfg{Schema: "evm_" + testChainId.String(), ChainID: testChainId})

owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Expand Down Expand Up @@ -163,10 +167,12 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {

func TestFwdMgr_InvalidForwarderForOCR2FeedsStates(t *testing.T) {
lggr := logger.Test(t)
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
cfg := configtest.NewTestGeneralConfig(t)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
testChainId := ubig.New(evmcfg.EVM().ChainID())
// db := pgtest.NewSqlxDB(t)
db := evmtestdb.NewDB(t, evm.Cfg{Schema: "evm_" + testChainId.String(), ChainID: testChainId})
owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Expand Down
8 changes: 7 additions & 1 deletion core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
evmdb "github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm"
evmtestdb "github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm/testutils"
)

func makeTestEvmTxm(
Expand Down Expand Up @@ -469,12 +471,16 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
}

func TestTxm_Lifecycle(t *testing.T) {
db := pgtest.NewSqlxDB(t)
//db := pgtest.NewSqlxDB(t)

ethClient := testutils.NewEthClientMockWithDefaultChain(t)
kst := ksmocks.NewEth(t)

config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
testChainId := ubig.New(&cltest.FixtureChainID)
// db := pgtest.NewSqlxDB(t)
db := evmtestdb.NewDB(t, evmdb.Cfg{Schema: "evm_" + testChainId.String(), ChainID: testChainId})

config.SetFinalityDepth(uint32(42))
config.RpcDefaultBatchSize = uint32(4)

Expand Down
1 change: 0 additions & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
DB: db.DB, // hack
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
7 changes: 3 additions & 4 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chainlink

import (
"context"
"database/sql"
"errors"
"fmt"

Expand Down Expand Up @@ -43,8 +42,6 @@ type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
// hack to allow for the factory to be used in the context of the relayer
DB *sql.DB
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) {
Expand All @@ -66,6 +63,8 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
return nil, err
}
legacyChains := evmrelay.NewLegacyChainsFromRelayerExtenders(evmRelayExtenders)
dburl := config.AppConfig.Database().URL()

for _, ext := range evmRelayExtenders.Slice() {
relayID := types.RelayID{Network: types.NetworkEVM, ChainID: ext.Chain().ID().String()}
chain, err2 := legacyChains.Get(relayID.ChainID)
Expand All @@ -75,11 +74,11 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m

relayerOpts := evmrelay.RelayerOpts{
DS: ccOpts.DS,
DBURL: &dburl,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
CapabilitiesRegistry: r.CapabilitiesRegistry,
DB: config.DB, //hack
}
relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts)
if err2 != nil {
Expand Down
29 changes: 18 additions & 11 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package evm

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -106,14 +105,15 @@ type CSAETHKeystore interface {

type RelayerOpts struct {
DS sqlutil.DataSource
DB *sql.DB // hack for migrations. need a way to get this from the DS, or other move the migration logic out of here
// we need the db url to run migrations
DBURL *url.URL
CSAETHKeystore
MercuryPool wsrpc.Pool
TransmitterConfig mercury.TransmitterConfig
CapabilitiesRegistry coretypes.CapabilitiesRegistry
}

func (c RelayerOpts) Validate() error {
func (c *RelayerOpts) Validate() error {
var err error
if c.DS == nil {
err = errors.Join(err, errors.New("nil DataSource"))
Expand All @@ -124,6 +124,19 @@ func (c RelayerOpts) Validate() error {
if c.CapabilitiesRegistry == nil {
err = errors.Join(err, errors.New("nil CapabilitiesRegistry"))
}
// compatibility check with existing code
if c.DBURL == nil {
dburl := string(env.DatabaseURL.Get())
if dburl == "" {
err = errors.Join(err, fmt.Errorf("no DBURL provided and CL_DATABASE_URL unset"))
} else {
var perr error
c.DBURL, perr = url.Parse(dburl)
if perr != nil {
err = errors.Join(err, fmt.Errorf("failed to parse CL_DATABASE_URL %s: %w", dburl, perr))
}
}
}
if err != nil {
err = fmt.Errorf("invalid RelayerOpts: %w", err)
}
Expand All @@ -137,13 +150,7 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
}
lggr = lggr.Named("Relayer")
// run the migrations for the relayer
// TODO: need a dburlVar in options. this is a hack. migration require a sql.DB because that's what goose uses
dburl := os.Getenv(string(env.DatabaseURL))
if dburl == "" {
return nil, errors.New("missing DatabaseURL")
}

db, err := sqlx.Open(string(dialects.Postgres), dburl)
db, err := sqlx.Open(string(dialects.Postgres), opts.DBURL.String())
if err != nil {
return nil, fmt.Errorf("failed to open db: %w", err)
}
Expand Down
30 changes: 17 additions & 13 deletions core/services/relay/evm/evm_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package evm_test

import (
"net/url"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
)

Expand All @@ -15,52 +18,53 @@ func TestRelayerOpts_Validate(t *testing.T) {
DS sqlutil.DataSource
CSAETHKeystore evm.CSAETHKeystore
CapabilitiesRegistry coretypes.CapabilitiesRegistry
DBURL *url.URL
}
tests := []struct {
name string
fields fields
wantErrContains string
beforeFn func(t *testing.T)
}{
{
name: "all invalid",
fields: fields{
DS: nil,
CSAETHKeystore: nil,
CapabilitiesRegistry: nil,
DBURL: nil,
},
beforeFn: func(t *testing.T) { t.Setenv(string(env.DatabaseURL), "") },
wantErrContains: `nil DataSource
nil Keystore`,
},
{
name: "missing ds, keystore",
fields: fields{
DS: nil,
},
wantErrContains: `nil DataSource
nil Keystore`,
nil Keystore
nil CapabilitiesRegistry
no DBURL provided and CL_DATABASE_URL unset`,
},
{
name: "missing ds, keystore, capabilitiesRegistry",
fields: fields{
DS: nil,
DS: nil,
DBURL: nil,
},
beforeFn: func(t *testing.T) { t.Setenv(string(env.DatabaseURL), ":/#unparseable") },
wantErrContains: `nil DataSource
nil Keystore
nil CapabilitiesRegistry`,
nil CapabilitiesRegistry
failed to parse CL_DATABASE_URL`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.beforeFn(t)
c := evm.RelayerOpts{
DS: tt.fields.DS,
CSAETHKeystore: tt.fields.CSAETHKeystore,
CapabilitiesRegistry: tt.fields.CapabilitiesRegistry,
}
err := c.Validate()
require.Equal(t, tt.wantErrContains != "", err != nil)
if tt.wantErrContains != "" {
assert.Contains(t, err.Error(), tt.wantErrContains)
} else {
assert.NoError(t, err)
}
})
}
Expand Down
2 changes: 0 additions & 2 deletions core/store/migrate/plugins/relayer/evm/0002_initial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
_ "embed"
"testing"

_ "github.com/mattn/go-sqlite3"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"

"github.com/stretchr/testify/assert"
Expand Down
62 changes: 46 additions & 16 deletions core/store/migrate/plugins/relayer/evm/testutils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
_ "embed"
"fmt"
"net/url"
"sync"
"testing"

Expand All @@ -13,22 +14,57 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm"
"github.com/smartcontractkit/chainlink/v2/internal/testdb"
)

var evmMu sync.RWMutex
var migratedDBs = map[string]*sqlx.DB{}
var evmHeavyDB *sqlx.DB
var testDBURL string

var initTestDB = sync.OnceFunc(newTestDB)

// func newTestDB() {
func newTestDB() {
// hack to get the migrations to run
eurl := string(env.DatabaseURL.Get())
if eurl == "" {
panic("you must provide a CL_DATABASE_URL environment variable")
}

uuid := uuid.NewString()
durl, err := url.Parse(eurl)
if err != nil {
panic(fmt.Sprintf("failed to parse database URL '%s': %v", eurl, err))
}
testDBURL, err = testdb.CreateOrReplace(*durl, "evm_hacking_test_db_"+uuid[:8], false)
if err != nil {
panic(fmt.Sprintf("failed to create or replace database for EVM tests: %v", err))
}
evmHeavyDB = sqlx.MustOpen(string(dialects.Postgres), testDBURL)
if evmHeavyDB == nil {
panic("failed to open EVM heavy database")
}
_, err = evmHeavyDB.DB.Exec(evmInitialState)
if err != nil {
panic(fmt.Sprintf("failed to exec SQL for the initial state of the EVM database: %v", err))
}

}

Check failure on line 58 in core/store/migrate/plugins/relayer/evm/testutils/db.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

//go:embed evm_initial_state.sql
var evmInitialState string

// NewEVMDB creates a new EVM database with the given schema and chainId
func NewDB(t testing.TB, cfg evm.Cfg) *sqlx.DB {
require.NotEmpty(t, evmInitialState, "evm initial state must not be empty")
testutils.SkipShortDB(t)
initTestDB()

// fetch the db for the schema
id := fmt.Sprintf("%s_%s", cfg.Schema, cfg.ChainID.String())
evmMu.RLock()
Expand All @@ -40,28 +76,22 @@ func NewDB(t testing.TB, cfg evm.Cfg) *sqlx.DB {
// need to check again in case another goroutine has already migrated the db
// while we were waiting for the write lock, which is more expensive than the optimistic read lock
_, exists := migratedDBs[id]
//var
if !exists {
c, evmHeavyDB := heavyweight.FullTestDBEmptyV2(t, nil)
// run migrations to mutate the db
// we have to setup the minimal tables for the migrations to work
// must load the initial state, derivied from the core migrations at v244
// because the evm migrations try to move try from the core schema to the evm schema
// b, err := os.ReadFile("../testdata/evm_initial_state.sql")
// require.NoError(t, err, "failed to read initial state for the evm migrations")
_, err := evmHeavyDB.DB.Exec(evmInitialState)
require.NoError(t, err, "failed to exec SQL for the initial state of the EVM database")
// now we can run the migrations
err = evm.Migrate(testutils.Context(t), evmHeavyDB, cfg)
err := evm.Migrate(testutils.Context(t), evmHeavyDB, cfg)
require.NoError(t, err, "failed to migrate EVM database for cfg %v", cfg)
migratedDBs[id] = evmHeavyDB
url := c.Database().URL()
sql.Register(id, pgtest.NewTxDriver(url.String()))

sql.Register(id, pgtest.NewTxDriver(testDBURL))
sqlx.BindDriver(id, sqlx.DOLLAR)
}
}
db, err := sqlx.Open(id, uuid.NewString())
require.NoError(t, err, "failed to open EVM database for cfg %v with driver id %s", cfg, id)
t.Cleanup(func() { assert.NoError(t, db.Close()) })
t.Cleanup(func() {
assert.NoError(t, db.Close())
assert.NoError(t, evmHeavyDB.Close())
})
db.MapperFunc(reflectx.CamelToSnakeASCII)

return db
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ require (
github.com/leanovate/gopter v0.2.10-0.20210127095200-9abe2343507a
github.com/lib/pq v1.10.9
github.com/manyminds/api2go v0.0.0-20171030193247-e7b693844a6f
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mr-tron/base58 v1.2.0
Expand Down
2 changes: 1 addition & 1 deletion internal/testdb/testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func CreateOrReplace(parsed url.URL, suffix string, withTemplate bool) (string,

_, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbname))
if err != nil {
return "", fmt.Errorf("unable to drop postgres migrations test database: %v", err)
return "", fmt.Errorf("unable to drop postgres migrations test database %s url %s: %v", (&parsed).String(), dbname, err)
}
if withTemplate {
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s WITH TEMPLATE %s", dbname, PristineDBName))
Expand Down

0 comments on commit 76fc0b8

Please sign in to comment.