diff --git a/.github/workflows/ci-core.yml b/.github/workflows/ci-core.yml index aac8e578d13..05d40108942 100644 --- a/.github/workflows/ci-core.yml +++ b/.github/workflows/ci-core.yml @@ -160,6 +160,7 @@ jobs: pushd $(go list -m -f "{{.Dir}}" github.com/smartcontractkit/chainlink-starknet/relayer) go install ./pkg/chainlink/cmd/chainlink-starknet popd + go install ./plugins/cmd/chainlink-example-relay - name: Increase Race Timeout if: ${{ github.event.schedule != '' && needs.filter.outputs.changes == 'true' }} run: | diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index e19cc485d8b..a59da56d24e 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -612,7 +612,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { } lggr := logger.Sugared(s.Logger.Named("RebroadcastTransactions")) - db, err := pg.OpenUnlockedDB(s.Config.AppID(), s.Config.Database()) + db, err := pg.OpenUnlockedDB(ctx, s.Config.AppID(), s.Config.Database()) if err != nil { return s.errorOut(errors.Wrap(err, "opening DB")) } @@ -950,7 +950,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error { version = null.IntFrom(numVersion) } - db, err := newConnection(s.Config.Database()) + db, err := newConnection(ctx, s.Config.Database()) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } @@ -965,7 +965,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error { // VersionDatabase displays the current database version. func (s *Shell) VersionDatabase(_ *cli.Context) error { ctx := s.ctx() - db, err := newConnection(s.Config.Database()) + db, err := newConnection(ctx, s.Config.Database()) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } @@ -982,7 +982,7 @@ func (s *Shell) VersionDatabase(_ *cli.Context) error { // StatusDatabase displays the database migration status func (s *Shell) StatusDatabase(_ *cli.Context) error { ctx := s.ctx() - db, err := newConnection(s.Config.Database()) + db, err := newConnection(ctx, s.Config.Database()) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } @@ -995,10 +995,11 @@ func (s *Shell) StatusDatabase(_ *cli.Context) error { // CreateMigration displays the database migration status func (s *Shell) CreateMigration(c *cli.Context) error { + ctx := s.ctx() if !c.Args().Present() { return s.errorOut(errors.New("You must specify a migration name")) } - db, err := newConnection(s.Config.Database()) + db, err := newConnection(ctx, s.Config.Database()) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } @@ -1016,6 +1017,7 @@ func (s *Shell) CreateMigration(c *cli.Context) error { // CleanupChainTables deletes database table rows based on chain type and chain id input. func (s *Shell) CleanupChainTables(c *cli.Context) error { + ctx := s.ctx() cfg := s.Config.Database() parsed := cfg.URL() if parsed.String() == "" { @@ -1027,7 +1029,7 @@ func (s *Shell) CleanupChainTables(c *cli.Context) error { return s.errorOut(fmt.Errorf("cannot reset database named `%s`. This command can only be run against databases with a name that ends in `_test`, to prevent accidental data loss. If you really want to delete chain specific data from this database, pass in the --danger option", dbname)) } - db, err := newConnection(cfg) + db, err := newConnection(ctx, cfg) if err != nil { return s.errorOut(errors.Wrap(err, "error connecting to the database")) } @@ -1079,12 +1081,12 @@ type dbConfig interface { Dialect() dialects.DialectName } -func newConnection(cfg dbConfig) (*sqlx.DB, error) { +func newConnection(ctx context.Context, cfg dbConfig) (*sqlx.DB, error) { parsed := cfg.URL() if parsed.String() == "" { return nil, errDBURLMissing } - return pg.NewConnection(parsed.String(), cfg.Dialect(), cfg) + return pg.NewConnection(ctx, parsed.String(), cfg.Dialect(), cfg) } func dropAndCreateDB(parsed url.URL, force bool) (err error) { @@ -1132,7 +1134,7 @@ func dropAndCreatePristineDB(db *sqlx.DB, template string) (err error) { } func migrateDB(ctx context.Context, config dbConfig) error { - db, err := newConnection(config) + db, err := newConnection(ctx, config) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } @@ -1144,7 +1146,7 @@ func migrateDB(ctx context.Context, config dbConfig) error { } func downAndUpDB(ctx context.Context, cfg dbConfig, baseVersionID int64) error { - db, err := newConnection(cfg) + db, err := newConnection(ctx, cfg) if err != nil { return fmt.Errorf("failed to initialize orm: %v", err) } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 12491300bf7..aa195768300 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -346,7 +346,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn } url := cfg.Database().URL() - db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database()) + db, err := pg.NewConnection(testutils.Context(t), url.String(), cfg.Database().Dialect(), cfg.Database()) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, db.Close()) }) diff --git a/core/internal/cltest/heavyweight/orm.go b/core/internal/cltest/heavyweight/orm.go index 4e824b1ab0f..992fefbaa0e 100644 --- a/core/internal/cltest/heavyweight/orm.go +++ b/core/internal/cltest/heavyweight/orm.go @@ -64,7 +64,7 @@ func (c Kind) PrepareDB(t testing.TB, overrideFn func(c *chainlink.Config, s *ch require.NoError(t, os.MkdirAll(gcfg.RootDir(), 0700)) migrationTestDBURL, err := testdb.CreateOrReplace(gcfg.Database().URL(), generateName(), c != KindEmpty) require.NoError(t, err) - db, err := pg.NewConnection(migrationTestDBURL, dialects.Postgres, gcfg.Database()) + db, err := pg.NewConnection(testutils.Context(t), migrationTestDBURL, dialects.Postgres, gcfg.Database()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) // must close before dropping diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 45b5ee59059..e49bde5dea9 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -22,7 +22,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 github.com/spf13/cobra v1.8.0 @@ -58,7 +58,7 @@ require ( github.com/NethermindEth/juno v0.3.1 // indirect github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect - github.com/XSAM/otelsql v0.27.0 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/avast/retry-go/v4 v4.6.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect @@ -207,7 +207,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect - github.com/jackc/pgx/v4 v4.18.2 // indirect + github.com/jackc/pgx/v4 v4.18.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jmhodges/levigo v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index dff6f3f356a..d678c4481b3 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -121,8 +121,8 @@ github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bw github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= -github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= @@ -822,8 +822,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= -github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -1184,8 +1184,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c h1:3apUsez/6Pkp1ckXzSwIhzPRuWjDGjzMjKapEKi0Fcw= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a h1:HluVrtdN7p7fWT4wMuYawCBzo4JXkqysZRELBrcfQFk= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a/go.mod h1:4SaVAyy6CQQrMsCAe15TRkJgGOul5VwKpyUXzisuKNA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go index f9c51cfb660..9df4b73883f 100644 --- a/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go +++ b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go @@ -41,7 +41,7 @@ func TestAdapter_Integration(t *testing.T) { logger := logger.TestLogger(t) cfg := configtest.NewTestGeneralConfig(t) url := cfg.Database().URL() - db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database()) + db, err := pg.NewConnection(ctx, url.String(), cfg.Database().Dialect(), cfg.Database()) require.NoError(t, err) keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger) diff --git a/core/services/pg/connection.go b/core/services/pg/connection.go index c1e6248b13f..feb007ea3bc 100644 --- a/core/services/pg/connection.go +++ b/core/services/pg/connection.go @@ -1,31 +1,21 @@ package pg import ( - "database/sql" + "context" "errors" "fmt" "log" "os" "time" - "github.com/XSAM/otelsql" - "github.com/google/uuid" "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" + _ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection "github.com/jmoiron/sqlx" - "github.com/scylladb/go-reflectx" - "go.opentelemetry.io/otel" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - "golang.org/x/net/context" + commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" "github.com/smartcontractkit/chainlink/v2/core/store/dialects" ) -// NOTE: This is the default level in Postgres anyway, we just make it -// explicit here -const defaultIsolation = sql.LevelReadCommitted - var MinRequiredPGVersion = 110000 func init() { @@ -51,63 +41,17 @@ type ConnectionConfig interface { MaxIdleConns() int } -func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) { - opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL), - otelsql.WithTracerProvider(otel.GetTracerProvider()), - otelsql.WithSQLCommenter(true), - otelsql.WithSpanOptions(otelsql.SpanOptions{ - OmitConnResetSession: true, - OmitConnPrepare: true, - OmitRows: true, - OmitConnectorConnect: true, - OmitConnQuery: false, - })} - - // Set default connection options - lockTimeout := config.DefaultLockTimeout().Milliseconds() - idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds() - connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`, - lockTimeout, idleInTxSessionTimeout, defaultIsolation.String()) - - var sqldb *sql.DB - if dialect == dialects.TransactionWrappedPostgres { - // Dbtx uses the uri as a unique identifier for each transaction. Each ORM - // should be encapsulated in it's own transaction, and thus needs its own - // unique id. - // - // We can happily throw away the original uri here because if we are using - // txdb it should have already been set at the point where we called - // txdb.Register - var err error - sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...) - if err != nil { - return nil, fmt.Errorf("failed to open txdb: %w", err) - } - _, err = sqldb.Exec(connParams) - if err != nil { - return nil, fmt.Errorf("failed to set options: %w", err) - } - } else { - // Set sane defaults for every new database connection. - // Those can be overridden with Txn options or SET statements in individual connections. - // The default values are the same for Txns. - connConfig, err := pgx.ParseConfig(uri) - if err != nil { - return nil, fmt.Errorf("database: failed to parse config: %w", err) - } - - connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) { - _, err = c.Exec(ctx, connParams) - return - })) - - // Initialize sql/sqlx - sqldb = otelsql.OpenDB(connector, opts...) +func NewConnection(ctx context.Context, uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error) { + db, err = commonpg.ConnectionConfig{ + DefaultIdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(), + DefaultLockTimeout: config.DefaultLockTimeout(), + MaxOpenConns: config.MaxOpenConns(), + MaxIdleConns: config.MaxIdleConns(), + }.NewDB(ctx, uri, dialect) + if err != nil { + return nil, err } - db := sqlx.NewDb(sqldb, string(dialect)) - db.MapperFunc(reflectx.CamelToSnakeASCII) - - setMaxConns(db, config) + setMaxMercuryConns(db, config) if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" { if err := checkVersion(db, MinRequiredPGVersion); err != nil { @@ -115,13 +59,10 @@ func NewConnection(uri string, dialect dialects.DialectName, config ConnectionCo } } - return db, disallowReplica(db) + return db, nil } -func setMaxConns(db *sqlx.DB, config ConnectionConfig) { - db.SetMaxOpenConns(config.MaxOpenConns()) - db.SetMaxIdleConns(config.MaxIdleConns()) - +func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) { // HACK: In the case of mercury jobs, one conn is needed per job for good // performance. Most nops will forget to increase the defaults to account // for this so we detect it here instead. diff --git a/core/services/pg/locked_db.go b/core/services/pg/locked_db.go index a9157fe1ae1..163aadf7242 100644 --- a/core/services/pg/locked_db.go +++ b/core/services/pg/locked_db.go @@ -53,8 +53,8 @@ func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr // OpenUnlockedDB just opens DB connection, without any DB locks. // This should be used carefully, when we know we don't need any locks. // Currently this is used by RebroadcastTransactions command only. -func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) { - return openDB(appID, cfg) +func OpenUnlockedDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) { + return openDB(ctx, appID, cfg) } // Open function connects to DB and acquires DB locks based on configuration. @@ -68,7 +68,7 @@ func (l *lockedDb) Open(ctx context.Context) (err error) { } // Step 1: open DB connection - l.db, err = openDB(l.appID, l.cfg) + l.db, err = openDB(ctx, l.appID, l.cfg) if err != nil { // l.db will be nil in case of error return errors.Wrap(err, "failed to open db") @@ -82,7 +82,7 @@ func (l *lockedDb) Open(ctx context.Context) (err error) { // Step 2: start the stat reporter l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr) - l.statsReporter.Start(ctx) + l.statsReporter.Start() // Step 3: acquire DB locks lockingMode := l.lockCfg.LockingMode() @@ -139,10 +139,10 @@ func (l lockedDb) DB() *sqlx.DB { return l.db } -func openDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) { +func openDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) { uri := cfg.URL() static.SetConsumerName(&uri, "App", &appID) dialect := cfg.Dialect() - db, err = NewConnection(uri.String(), dialect, cfg) + db, err = NewConnection(ctx, uri.String(), dialect, cfg) return } diff --git a/core/services/pg/locked_db_test.go b/core/services/pg/locked_db_test.go index ed0935c1411..af97726dae9 100644 --- a/core/services/pg/locked_db_test.go +++ b/core/services/pg/locked_db_test.go @@ -88,14 +88,15 @@ func TestLockedDB_TwoInstances(t *testing.T) { func TestOpenUnlockedDB(t *testing.T) { testutils.SkipShortDB(t) + ctx := testutils.Context(t) config := configtest.NewGeneralConfig(t, nil) - db1, err1 := pg.OpenUnlockedDB(config.AppID(), config.Database()) + db1, err1 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database()) require.NoError(t, err1) require.NotNil(t, db1) // should not block the second connection - db2, err2 := pg.OpenUnlockedDB(config.AppID(), config.Database()) + db2, err2 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database()) require.NoError(t, err2) require.NotNil(t, db2) diff --git a/core/services/pg/stats.go b/core/services/pg/stats.go index b8b1ed68401..3aa40580a90 100644 --- a/core/services/pg/stats.go +++ b/core/services/pg/stats.go @@ -1,132 +1,17 @@ package pg import ( - "context" - "database/sql" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - + commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" "github.com/smartcontractkit/chainlink/v2/core/logger" ) -const dbStatsInternal = 10 * time.Second - -var ( - promDBConnsMax = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_max", - Help: "Maximum number of open connections to the database.", - }) - promDBConnsOpen = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_open", - Help: "The number of established connections both in use and idle.", - }) - promDBConnsInUse = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_conns_used", - Help: "The number of connections currently in use.", - }) - promDBWaitCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_wait_count", - Help: "The total number of connections waited for.", - }) - promDBWaitDuration = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "db_wait_time_seconds", - Help: "The total time blocked waiting for a new connection.", - }) -) - -func publishStats(stats sql.DBStats) { - promDBConnsMax.Set(float64(stats.MaxOpenConnections)) - promDBConnsOpen.Set(float64(stats.OpenConnections)) - promDBConnsInUse.Set(float64(stats.InUse)) - - promDBWaitCount.Set(float64(stats.WaitCount)) - promDBWaitDuration.Set(stats.WaitDuration.Seconds()) -} - -type StatsReporterOpt func(*StatsReporter) - -func StatsInterval(d time.Duration) StatsReporterOpt { - return func(r *StatsReporter) { - r.interval = d - } -} - -func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt { - return func(r *StatsReporter) { - r.reportFn = fn - } -} - type ( - StatFn func() sql.DBStats - ReportFn func(sql.DBStats) + StatFn = commonpg.StatFn + ReportFn = commonpg.ReportFn ) -type StatsReporter struct { - statFn StatFn - reportFn ReportFn - interval time.Duration - cancel context.CancelFunc - lggr logger.Logger - once sync.Once - wg sync.WaitGroup -} - -func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter { - r := &StatsReporter{ - statFn: fn, - reportFn: publishStats, - interval: dbStatsInternal, - lggr: lggr.Named("StatsReporter"), - } - - for _, opt := range opts { - opt(r) - } - - return r -} - -func (r *StatsReporter) Start(ctx context.Context) { - startOnce := func() { - r.wg.Add(1) - r.lggr.Debug("Starting DB stat reporter") - rctx, cancelFunc := context.WithCancel(ctx) - r.cancel = cancelFunc - go r.loop(rctx) - } - - r.once.Do(startOnce) -} - -// Stop stops all resources owned by the reporter and waits -// for all of them to be done -func (r *StatsReporter) Stop() { - if r.cancel != nil { - r.lggr.Debug("Stopping DB stat reporter") - r.cancel() - r.cancel = nil - r.wg.Wait() - } -} - -func (r *StatsReporter) loop(ctx context.Context) { - defer r.wg.Done() - - ticker := time.NewTicker(r.interval) - defer ticker.Stop() +type StatsReporter = commonpg.StatsReporter - r.reportFn(r.statFn()) - for { - select { - case <-ticker.C: - r.reportFn(r.statFn()) - case <-ctx.Done(): - r.lggr.Debug("stat reporter loop received done. stopping...") - return - } - } +func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...commonpg.StatsReporterOpt) *StatsReporter { + return commonpg.NewStatsReporter(fn, lggr, opts...) } diff --git a/core/services/pg/stats_test.go b/core/services/pg/stats_test.go deleted file mode 100644 index 76a8b426fd8..00000000000 --- a/core/services/pg/stats_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package pg - -import ( - "context" - "database/sql" - "strings" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/stretchr/testify/mock" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// testDbStater implements mocks for the function signatures -// needed by the stat reporte wrapper for statFn -type testDbStater struct { - mock.Mock - t *testing.T - name string - testGauge prometheus.Gauge -} - -func newtestDbStater(t *testing.T, name string) *testDbStater { - return &testDbStater{ - t: t, - name: name, - testGauge: promauto.NewGauge(prometheus.GaugeOpts{ - Name: strings.ReplaceAll(name, " ", "_"), - }), - } -} - -func (s *testDbStater) Stats() sql.DBStats { - s.Called() - return sql.DBStats{} -} - -func (s *testDbStater) Report(stats sql.DBStats) { - s.Called() - s.testGauge.Set(float64(stats.MaxOpenConnections)) -} - -type statScenario struct { - name string - testFn func(*testing.T, *StatsReporter, time.Duration, int) -} - -func TestStatReporter(t *testing.T) { - interval := 2 * time.Millisecond - expectedIntervals := 4 - - lggr := logger.TestLogger(t) - - for _, scenario := range []statScenario{ - {name: "parent_ctx_canceled", testFn: testParentContextCanceled}, - {name: "normal_collect_and_stop", testFn: testCollectAndStop}, - {name: "mutli_start", testFn: testMultiStart}, - {name: "multi_stop", testFn: testMultiStop}, - } { - t.Run(scenario.name, func(t *testing.T) { - d := newtestDbStater(t, scenario.name) - d.Mock.On("Stats").Return(sql.DBStats{}) - d.Mock.On("Report").Return() - reporter := NewStatsReporter(d.Stats, - lggr, - StatsInterval(interval), - StatsCustomReporterFn(d.Report), - ) - - scenario.testFn( - t, - reporter, - interval, - expectedIntervals, - ) - - d.AssertCalled(t, "Stats") - d.AssertCalled(t, "Report") - }) - } -} - -// test appropriate handling of context cancellation -func testParentContextCanceled(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - tctx, cancel := context.WithTimeout(ctx, time.Duration(n)*interval) - - r.Start(tctx) - defer r.Stop() - // wait for parent cancelation - <-tctx.Done() - // call cancel to statisy linter - cancel() -} - -// test normal stop -func testCollectAndStop(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - r.Start(ctx) - time.Sleep(time.Duration(n) * interval) - r.Stop() -} - -// test multiple start calls are idempotent -func testMultiStart(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - ticker := time.NewTicker(time.Duration(n) * interval) - defer ticker.Stop() - - r.Start(ctx) - r.Start(ctx) - <-ticker.C - r.Stop() -} - -// test multiple stop calls are idempotent -func testMultiStop(t *testing.T, r *StatsReporter, interval time.Duration, n int) { - ctx := testutils.Context(t) - - ticker := time.NewTicker(time.Duration(n) * interval) - defer ticker.Stop() - - r.Start(ctx) - <-ticker.C - r.Stop() - r.Stop() -} diff --git a/core/static/static.go b/core/static/static.go index f840331bc99..a8a47899802 100644 --- a/core/static/static.go +++ b/core/static/static.go @@ -6,6 +6,8 @@ import ( "time" "github.com/google/uuid" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" ) // Version and Sha are set at compile time via build arguments. @@ -40,17 +42,11 @@ func buildPrettyVersion() string { // SetConsumerName sets a nicely formatted application_name on the // database uri func SetConsumerName(uri *url.URL, name string, id *uuid.UUID) { - q := uri.Query() - applicationName := fmt.Sprintf("Chainlink%s|%s", buildPrettyVersion(), name) if id != nil { applicationName += fmt.Sprintf("|%s", id.String()) } - if len(applicationName) > 63 { - applicationName = applicationName[:63] - } - q.Set("application_name", applicationName) - uri.RawQuery = q.Encode() + pg.SetApplicationName(uri, applicationName) } // Short returns a 7-character sha prefix and version, or Unset if blank. diff --git a/core/store/dialects/dialects.go b/core/store/dialects/dialects.go index d250fa1b99b..5f35e055cb3 100644 --- a/core/store/dialects/dialects.go +++ b/core/store/dialects/dialects.go @@ -3,16 +3,18 @@ package dialects import ( // need to make sure pgx driver is registered before opening connection _ "github.com/jackc/pgx/v4/stdlib" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" ) // DialectName is a compiler enforced type used that maps to database dialect names -type DialectName string +type DialectName = pg.Driver const ( // Postgres represents the postgres dialect. - Postgres DialectName = "pgx" + Postgres DialectName = pg.DriverPostgres // TransactionWrappedPostgres is useful for tests. // When the connection is opened, it starts a transaction and all // operations performed on the DB will be within that transaction. - TransactionWrappedPostgres DialectName = "txdb" + TransactionWrappedPostgres DialectName = pg.DriverTxWrappedPostgres ) diff --git a/go.mod b/go.mod index 78ec7d29ee1..907f8568bb2 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/NethermindEth/juno v0.3.1 github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb - github.com/XSAM/otelsql v0.27.0 github.com/avast/retry-go/v4 v4.6.0 github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/cometbft/cometbft v0.37.2 @@ -44,7 +43,7 @@ require ( github.com/hdevalence/ed25519consensus v0.1.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgtype v1.14.0 - github.com/jackc/pgx/v4 v4.18.2 + github.com/jackc/pgx/v4 v4.18.3 github.com/jmoiron/sqlx v1.4.0 github.com/jonboulle/clockwork v0.4.0 github.com/jpillora/backoff v1.0.0 @@ -74,7 +73,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 @@ -104,7 +103,6 @@ require ( golang.org/x/crypto v0.25.0 golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7 golang.org/x/mod v0.19.0 - golang.org/x/net v0.27.0 golang.org/x/sync v0.7.0 golang.org/x/term v0.22.0 golang.org/x/text v0.16.0 @@ -141,6 +139,7 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect @@ -334,6 +333,7 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/ratelimit v0.3.0 // indirect golang.org/x/arch v0.8.0 // indirect + golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect google.golang.org/api v0.188.0 // indirect diff --git a/go.sum b/go.sum index f5ef0f91e70..46696d4c335 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bw github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= -github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/participle/v2 v2.0.0-alpha7 h1:cK4vjj0VSgb3lN1nuKA5F7dw+1s1pWBe5bx7nNCnN+c= @@ -790,8 +790,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= -github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -1139,8 +1139,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c h1:3apUsez/6Pkp1ckXzSwIhzPRuWjDGjzMjKapEKi0Fcw= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a h1:HluVrtdN7p7fWT4wMuYawCBzo4JXkqysZRELBrcfQFk= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a/go.mod h1:4SaVAyy6CQQrMsCAe15TRkJgGOul5VwKpyUXzisuKNA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index a648e46e9f0..dd58394d3cc 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -28,7 +28,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a github.com/smartcontractkit/chainlink-testing-framework v1.34.2 github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240405215812-5a72bc9af239 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 @@ -85,7 +85,7 @@ require ( github.com/NethermindEth/juno v0.3.1 // indirect github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect - github.com/XSAM/otelsql v0.27.0 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -285,7 +285,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect - github.com/jackc/pgx/v4 v4.18.2 // indirect + github.com/jackc/pgx/v4 v4.18.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 03e4a9082ff..fb6d81684c1 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -170,8 +170,8 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/Workiva/go-datastructures v1.1.0 h1:hu20UpgZneBhQ3ZvwiOGlqJSKIosin2Rd5wAKUHEO/k= github.com/Workiva/go-datastructures v1.1.0/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= -github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= -github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/kingpin/v2 v2.3.1/go.mod h1:oYL5vtsvEHZGHxU7DMp32Dvx+qL+ptGn6lWaot2vCNE= @@ -1052,8 +1052,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= -github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -1488,8 +1488,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c h1:3apUsez/6Pkp1ckXzSwIhzPRuWjDGjzMjKapEKi0Fcw= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a h1:HluVrtdN7p7fWT4wMuYawCBzo4JXkqysZRELBrcfQFk= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a/go.mod h1:4SaVAyy6CQQrMsCAe15TRkJgGOul5VwKpyUXzisuKNA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 1aa754f8cfa..6bf54ffa6ed 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.31.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a github.com/smartcontractkit/chainlink-testing-framework v1.34.2 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 @@ -72,7 +72,7 @@ require ( github.com/NethermindEth/juno v0.3.1 // indirect github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect - github.com/XSAM/otelsql v0.27.0 // indirect + github.com/XSAM/otelsql v0.29.0 // indirect github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -274,7 +274,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect - github.com/jackc/pgx/v4 v4.18.2 // indirect + github.com/jackc/pgx/v4 v4.18.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 698623c50f1..47fbbb45ef4 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -170,8 +170,8 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/Workiva/go-datastructures v1.1.0 h1:hu20UpgZneBhQ3ZvwiOGlqJSKIosin2Rd5wAKUHEO/k= github.com/Workiva/go-datastructures v1.1.0/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= -github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= -github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= +github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/kingpin/v2 v2.3.1/go.mod h1:oYL5vtsvEHZGHxU7DMp32Dvx+qL+ptGn6lWaot2vCNE= @@ -1040,8 +1040,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= -github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -1470,8 +1470,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c h1:3apUsez/6Pkp1ckXzSwIhzPRuWjDGjzMjKapEKi0Fcw= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a h1:HluVrtdN7p7fWT4wMuYawCBzo4JXkqysZRELBrcfQFk= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a/go.mod h1:4SaVAyy6CQQrMsCAe15TRkJgGOul5VwKpyUXzisuKNA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/plugins/cmd/chainlink-example-relay/main.go b/plugins/cmd/chainlink-example-relay/main.go new file mode 100644 index 00000000000..62e757f181a --- /dev/null +++ b/plugins/cmd/chainlink-example-relay/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "errors" + + "github.com/hashicorp/go-plugin" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +const ( + loggerName = "PluginExample" +) + +func main() { + s := loop.MustNewStartedServer(loggerName) + defer s.Stop() + + p := &pluginRelayer{lggr: s.Logger, ds: s.DataSource} + defer s.Logger.ErrorIfFn(p.Close, "Failed to close") + + s.MustRegister(p) + + stopCh := make(chan struct{}) + defer close(stopCh) + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: loop.PluginRelayerHandshakeConfig(), + Plugins: map[string]plugin.Plugin{ + loop.PluginRelayerName: &loop.GRPCPluginRelayer{ + PluginServer: p, + BrokerConfig: loop.BrokerConfig{ + StopCh: stopCh, + Logger: s.Logger, + GRPCOpts: s.GRPCOpts, + }, + }, + }, + GRPCServer: s.GRPCOpts.NewServer, + }) +} + +type pluginRelayer struct { + lggr logger.Logger + ds sqlutil.DataSource +} + +func (p *pluginRelayer) Ready() error { return nil } + +func (p *pluginRelayer) HealthReport() map[string]error { return map[string]error{p.Name(): nil} } + +func (p *pluginRelayer) Name() string { return p.lggr.Name() } + +func (p *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore core.Keystore, cr core.CapabilitiesRegistry) (loop.Relayer, error) { + var names []string + err := p.ds.SelectContext(ctx, names, "SELECT table_name FROM information_schema.tables WHERE table_schema='public'") + if err != nil { + return nil, err + } + p.lggr.Info("Queried table names", "names", names) + return nil, errors.New("example relayer unimplemented") //TODO sentinel error/message +} + +func (p *pluginRelayer) Close() error { return nil } diff --git a/plugins/loop_registry.go b/plugins/loop_registry.go index b796ddf87ee..f20778ca1db 100644 --- a/plugins/loop_registry.go +++ b/plugins/loop_registry.go @@ -65,6 +65,8 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { envCfg.TracingAttributes = m.cfgTracing.Attributes() } + //TODO set DB config? Always? sometimes? + m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg} m.lggr.Debugf("Registered loopp %q with config %v, port %d", id, envCfg, envCfg.PrometheusPort) return m.registry[id], nil