Skip to content

Commit

Permalink
plugins/cmd/chainlink-example-relay: add example relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Aug 7, 2024
1 parent 69f7bd6 commit 6ce1dc4
Show file tree
Hide file tree
Showing 22 changed files with 161 additions and 396 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ jobs:
pushd $(go list -m -f "{{.Dir}}" github.com/smartcontractkit/chainlink-starknet/relayer)
go install ./pkg/chainlink/cmd/chainlink-starknet
popd
go install ./plugins/cmd/chainlink-example-relay
- name: Increase Race Timeout
if: ${{ github.event.schedule != '' && needs.filter.outputs.changes == 'true' }}
run: |
Expand Down
22 changes: 12 additions & 10 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) {
}

lggr := logger.Sugared(s.Logger.Named("RebroadcastTransactions"))
db, err := pg.OpenUnlockedDB(s.Config.AppID(), s.Config.Database())
db, err := pg.OpenUnlockedDB(ctx, s.Config.AppID(), s.Config.Database())
if err != nil {
return s.errorOut(errors.Wrap(err, "opening DB"))
}
Expand Down Expand Up @@ -950,7 +950,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error {
version = null.IntFrom(numVersion)
}

db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -965,7 +965,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error {
// VersionDatabase displays the current database version.
func (s *Shell) VersionDatabase(_ *cli.Context) error {
ctx := s.ctx()
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -982,7 +982,7 @@ func (s *Shell) VersionDatabase(_ *cli.Context) error {
// StatusDatabase displays the database migration status
func (s *Shell) StatusDatabase(_ *cli.Context) error {
ctx := s.ctx()
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -995,10 +995,11 @@ func (s *Shell) StatusDatabase(_ *cli.Context) error {

// CreateMigration displays the database migration status
func (s *Shell) CreateMigration(c *cli.Context) error {
ctx := s.ctx()
if !c.Args().Present() {
return s.errorOut(errors.New("You must specify a migration name"))
}
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -1016,6 +1017,7 @@ func (s *Shell) CreateMigration(c *cli.Context) error {

// CleanupChainTables deletes database table rows based on chain type and chain id input.
func (s *Shell) CleanupChainTables(c *cli.Context) error {
ctx := s.ctx()
cfg := s.Config.Database()
parsed := cfg.URL()
if parsed.String() == "" {
Expand All @@ -1027,7 +1029,7 @@ func (s *Shell) CleanupChainTables(c *cli.Context) error {
return s.errorOut(fmt.Errorf("cannot reset database named `%s`. This command can only be run against databases with a name that ends in `_test`, to prevent accidental data loss. If you really want to delete chain specific data from this database, pass in the --danger option", dbname))
}

db, err := newConnection(cfg)
db, err := newConnection(ctx, cfg)
if err != nil {
return s.errorOut(errors.Wrap(err, "error connecting to the database"))
}
Expand Down Expand Up @@ -1079,12 +1081,12 @@ type dbConfig interface {
Dialect() dialects.DialectName
}

func newConnection(cfg dbConfig) (*sqlx.DB, error) {
func newConnection(ctx context.Context, cfg dbConfig) (*sqlx.DB, error) {
parsed := cfg.URL()
if parsed.String() == "" {
return nil, errDBURLMissing
}
return pg.NewConnection(parsed.String(), cfg.Dialect(), cfg)
return pg.NewConnection(ctx, parsed.String(), cfg.Dialect(), cfg)
}

func dropAndCreateDB(parsed url.URL, force bool) (err error) {
Expand Down Expand Up @@ -1132,7 +1134,7 @@ func dropAndCreatePristineDB(db *sqlx.DB, template string) (err error) {
}

func migrateDB(ctx context.Context, config dbConfig) error {
db, err := newConnection(config)
db, err := newConnection(ctx, config)
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -1144,7 +1146,7 @@ func migrateDB(ctx context.Context, config dbConfig) error {
}

func downAndUpDB(ctx context.Context, cfg dbConfig, baseVersionID int64) error {
db, err := newConnection(cfg)
db, err := newConnection(ctx, cfg)
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
}

url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
db, err := pg.NewConnection(testutils.Context(t), url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, db.Close()) })

Expand Down
2 changes: 1 addition & 1 deletion core/internal/cltest/heavyweight/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c Kind) PrepareDB(t testing.TB, overrideFn func(c *chainlink.Config, s *ch
require.NoError(t, os.MkdirAll(gcfg.RootDir(), 0700))
migrationTestDBURL, err := testdb.CreateOrReplace(gcfg.Database().URL(), generateName(), c != KindEmpty)
require.NoError(t, err)
db, err := pg.NewConnection(migrationTestDBURL, dialects.Postgres, gcfg.Database())
db, err := pg.NewConnection(testutils.Context(t), migrationTestDBURL, dialects.Postgres, gcfg.Database())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close()) // must close before dropping
Expand Down
6 changes: 3 additions & 3 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -58,7 +58,7 @@ require (
github.com/NethermindEth/juno v0.3.1 // indirect
github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/XSAM/otelsql v0.27.0 // indirect
github.com/XSAM/otelsql v0.29.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/avast/retry-go/v4 v4.6.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
Expand Down Expand Up @@ -207,7 +207,7 @@ require (
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
github.com/jackc/pgx/v4 v4.18.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bw
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs=
github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A=
github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc=
github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
Expand Down Expand Up @@ -822,8 +822,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU=
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
Expand Down Expand Up @@ -1184,8 +1184,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c h1:3apUsez/6Pkp1ckXzSwIhzPRuWjDGjzMjKapEKi0Fcw=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240805160614-501c4f40b98c/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a h1:HluVrtdN7p7fWT4wMuYawCBzo4JXkqysZRELBrcfQFk=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240807131504-e8487619ac5a/go.mod h1:4SaVAyy6CQQrMsCAe15TRkJgGOul5VwKpyUXzisuKNA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAdapter_Integration(t *testing.T) {
logger := logger.TestLogger(t)
cfg := configtest.NewTestGeneralConfig(t)
url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
db, err := pg.NewConnection(ctx, url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)

keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger)
Expand Down
89 changes: 15 additions & 74 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
package pg

import (
"database/sql"
"context"
"errors"
"fmt"
"log"
"os"
"time"

"github.com/XSAM/otelsql"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/net/context"

commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
)

// NOTE: This is the default level in Postgres anyway, we just make it
// explicit here
const defaultIsolation = sql.LevelReadCommitted

var MinRequiredPGVersion = 110000

func init() {
Expand All @@ -51,77 +41,28 @@ type ConnectionConfig interface {
MaxIdleConns() int
}

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
})}

// Set default connection options
lockTimeout := config.DefaultLockTimeout().Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds()
connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())

var sqldb *sql.DB
if dialect == dialects.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register
var err error
sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to open txdb: %w", err)
}
_, err = sqldb.Exec(connParams)
if err != nil {
return nil, fmt.Errorf("failed to set options: %w", err)
}
} else {
// Set sane defaults for every new database connection.
// Those can be overridden with Txn options or SET statements in individual connections.
// The default values are the same for Txns.
connConfig, err := pgx.ParseConfig(uri)
if err != nil {
return nil, fmt.Errorf("database: failed to parse config: %w", err)
}

connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) {
_, err = c.Exec(ctx, connParams)
return
}))

// Initialize sql/sqlx
sqldb = otelsql.OpenDB(connector, opts...)
func NewConnection(ctx context.Context, uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error) {
db, err = commonpg.ConnectionConfig{
DefaultIdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(),
DefaultLockTimeout: config.DefaultLockTimeout(),
MaxOpenConns: config.MaxOpenConns(),
MaxIdleConns: config.MaxIdleConns(),
}.NewDB(ctx, uri, dialect)
if err != nil {
return nil, err
}
db := sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

setMaxConns(db, config)
setMaxMercuryConns(db, config)

if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" {
if err := checkVersion(db, MinRequiredPGVersion); err != nil {
return nil, err
}
}

return db, disallowReplica(db)
return db, nil
}

func setMaxConns(db *sqlx.DB, config ConnectionConfig) {
db.SetMaxOpenConns(config.MaxOpenConns())
db.SetMaxIdleConns(config.MaxIdleConns())

func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) {
// HACK: In the case of mercury jobs, one conn is needed per job for good
// performance. Most nops will forget to increase the defaults to account
// for this so we detect it here instead.
Expand Down
12 changes: 6 additions & 6 deletions core/services/pg/locked_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr
// OpenUnlockedDB just opens DB connection, without any DB locks.
// This should be used carefully, when we know we don't need any locks.
// Currently this is used by RebroadcastTransactions command only.
func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
return openDB(appID, cfg)
func OpenUnlockedDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
return openDB(ctx, appID, cfg)
}

// Open function connects to DB and acquires DB locks based on configuration.
Expand All @@ -68,7 +68,7 @@ func (l *lockedDb) Open(ctx context.Context) (err error) {
}

// Step 1: open DB connection
l.db, err = openDB(l.appID, l.cfg)
l.db, err = openDB(ctx, l.appID, l.cfg)
if err != nil {
// l.db will be nil in case of error
return errors.Wrap(err, "failed to open db")
Expand All @@ -82,7 +82,7 @@ func (l *lockedDb) Open(ctx context.Context) (err error) {

// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)
l.statsReporter.Start()

// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
Expand Down Expand Up @@ -139,10 +139,10 @@ func (l lockedDb) DB() *sqlx.DB {
return l.db
}

func openDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
func openDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
uri := cfg.URL()
static.SetConsumerName(&uri, "App", &appID)
dialect := cfg.Dialect()
db, err = NewConnection(uri.String(), dialect, cfg)
db, err = NewConnection(ctx, uri.String(), dialect, cfg)
return
}
5 changes: 3 additions & 2 deletions core/services/pg/locked_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func TestLockedDB_TwoInstances(t *testing.T) {

func TestOpenUnlockedDB(t *testing.T) {
testutils.SkipShortDB(t)
ctx := testutils.Context(t)
config := configtest.NewGeneralConfig(t, nil)

db1, err1 := pg.OpenUnlockedDB(config.AppID(), config.Database())
db1, err1 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database())
require.NoError(t, err1)
require.NotNil(t, db1)

// should not block the second connection
db2, err2 := pg.OpenUnlockedDB(config.AppID(), config.Database())
db2, err2 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database())
require.NoError(t, err2)
require.NotNil(t, db2)

Expand Down
Loading

0 comments on commit 6ce1dc4

Please sign in to comment.