Skip to content

Commit

Permalink
plugins/cmd/chainlink-example-relay: add example relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Dec 17, 2024
1 parent 8555745 commit 586cd57
Show file tree
Hide file tree
Showing 27 changed files with 127 additions and 421 deletions.
2 changes: 1 addition & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G

mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox"))

loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Database(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex)

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
func genTestEVMRelayers(t *testing.T, opts legacyevm.ChainRelayOpts, ks evmrelayer.CSAETHKeystore) *chainlink.CoreRelayerChainInteroperators {
f := chainlink.RelayerFactory{
Logger: opts.Logger,
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""),
LoopRegistry: plugins.NewLoopRegistry(opts.Logger, opts.AppConfig.Database(), opts.AppConfig.Tracing(), opts.AppConfig.Telemetry(), nil, ""),
CapabilitiesRegistry: capabilities.NewRegistry(opts.Logger),
}

Expand Down
4 changes: 2 additions & 2 deletions core/cmd/shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func TestNewUserCache(t *testing.T) {

func TestSetupSolanaRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, nil, "")
ks := mocks.NewSolana(t)
ds := sqltest.NewNoOpDataSource()

Expand Down Expand Up @@ -483,7 +483,7 @@ func TestSetupSolanaRelayer(t *testing.T) {

func TestSetupStarkNetRelayer(t *testing.T) {
lggr := logger.TestLogger(t)
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
reg := plugins.NewLoopRegistry(lggr, nil, nil, nil, nil, "")
ks := mocks.NewStarkNet(t)
// config 3 chains but only enable 2 => should only be 2 relayer
nEnabledChains := 2
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
keyStore := keystore.NewInMemory(ds, utils.FastScryptParams, lggr)

mailMon := mailbox.NewMonitor(cfg.AppID().String(), lggr.Named("Mailbox"))
loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil, nil, "")
loopRegistry := plugins.NewLoopRegistry(lggr, nil, nil, nil, nil, "")

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
Expand Down Expand Up @@ -489,7 +489,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
RestrictedHTTPClient: c,
UnrestrictedHTTPClient: c,
SecretGenerator: MockSecretGenerator{},
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""),
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, nil, ""),
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
CapabilitiesDispatcher: dispatcher,
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805
github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217144508-28130f7e9584

Check failure on line 36 in core/scripts/go.mod

View workflow job for this annotation

GitHub Actions / Validate go.mod dependencies

[./core/scripts/go.mod] dependency github.com/smartcontractkit/[email protected] not on default branch (main). Version(commit): 28130f7e9584 Tree: https://github.com/smartcontractkit/chainlink-common/tree/28130f7e9584 Commit: https://github.com/smartcontractkit/chainlink-common/commit/28130f7e9584
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down Expand Up @@ -68,7 +68,7 @@ require (
github.com/NethermindEth/juno v0.3.1 // indirect
github.com/NethermindEth/starknet.go v0.7.1-0.20240401080518-34a506f3cfdb // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/XSAM/otelsql v0.27.0 // indirect
github.com/XSAM/otelsql v0.29.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c // indirect
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs=
github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A=
github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc=
github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -1150,8 +1150,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241213122413-5e8f65dd6b1b h1:iSQJ6ng4FhEswf8SXunGkaJlVP3E3JlgLB8Oo2f3Ud4=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241213122413-5e8f65dd6b1b/go.mod h1:F8xQAIW0ymb2BZhqn89sWZLXreJhM5KDVF6Qb4y44N0=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ=
github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217144508-28130f7e9584 h1:tGINh0h0tqLRb20JhpruqHpJYSwHbKpcLetJN9HsnzM=
github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217144508-28130f7e9584/go.mod h1:V3BHfvLnQNBUoZ4bGjD29ZPhyzPE++DkYkhvPb9tcRs=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc=
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
if err != nil {
return nil, fmt.Errorf("could not build Beholder auth: %w", err)
}
loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry = plugins.NewLoopRegistry(globalLogger, opts.Config.Database(), opts.Config.Tracing(), opts.Config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
}

// If the audit logger is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) {

factory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, ""),
LoopRegistry: plugins.NewLoopRegistry(lggr, nil, nil, nil, nil, ""),
GRPCOpts: loop.GRPCOpts{},
CapabilitiesRegistry: capabilities.NewRegistry(lggr),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func setupNodeCCIP(
beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(keyStore)
require.NoError(t, err)

loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
loopRegistry := plugins.NewLoopRegistry(lggr.Named("LoopRegistry"), config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex)
relayerFactory := chainlink.RelayerFactory{
Logger: lggr,
LoopRegistry: loopRegistry,
Expand Down Expand Up @@ -493,7 +493,7 @@ func setupNodeCCIP(
RestrictedHTTPClient: &http.Client{},
AuditLogger: audit.NoopLogger,
MailMon: mailMon,
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex),
LoopRegistry: plugins.NewLoopRegistry(lggr, config.Database(), config.Tracing(), config.Telemetry(), beholderAuthHeaders, csaPubKeyHex),
})
require.NoError(t, err)
require.NoError(t, app.GetKeyStore().Unlock(ctx, "password"))
Expand Down
107 changes: 15 additions & 92 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,19 @@ package pg

import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"os"
"time"

"github.com/XSAM/otelsql"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

pgcommon "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
)

// NOTE: This is the default level in Postgres anyway, we just make it
// explicit here
const defaultIsolation = sql.LevelReadCommitted

var MinRequiredPGVersion = 110000

func init() {
Expand All @@ -51,81 +40,29 @@ type ConnectionConfig interface {
MaxIdleConns() int
}

func NewConnection(ctx context.Context, uri string, dialect pgcommon.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
})}

// Set default connection options
lockTimeout := config.DefaultLockTimeout().Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds()
connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())

var sqldb *sql.DB
if dialect == pgcommon.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register

err := pgcommon.RegisterTxDb(uri)
if err != nil {
return nil, fmt.Errorf("failed to register txdb: %w", err)
}
sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to open txdb: %w", err)
}
_, err = sqldb.ExecContext(ctx, connParams)
if err != nil {
return nil, fmt.Errorf("failed to set options: %w", err)
}
} else {
// Set sane defaults for every new database connection.
// Those can be overridden with Txn options or SET statements in individual connections.
// The default values are the same for Txns.
connConfig, err := pgx.ParseConfig(uri)
if err != nil {
return nil, fmt.Errorf("database: failed to parse config: %w", err)
}

connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) {
_, err = c.Exec(ctx, connParams)
return
}))

// Initialize sql/sqlx
sqldb = otelsql.OpenDB(connector, opts...)
// TODO double check common side for changes (otel? registertxdb?)
func NewConnection(ctx context.Context, uri string, driverName string, config ConnectionConfig) (db *sqlx.DB, err error) {
db, err = commonpg.DBConfig{
IdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(),
LockTimeout: config.DefaultLockTimeout(),
MaxOpenConns: config.MaxOpenConns(),
MaxIdleConns: config.MaxIdleConns(),
}.New(ctx, uri, driverName)
if err != nil {
return nil, err
}
db := sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

setMaxConns(db, config)
setMaxMercuryConns(db, config)

if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" {
if err := checkVersion(db, MinRequiredPGVersion); err != nil {
return nil, err
}
}

return db, disallowReplica(db)
return db, nil
}

func setMaxConns(db *sqlx.DB, config ConnectionConfig) {
db.SetMaxOpenConns(config.MaxOpenConns())
db.SetMaxIdleConns(config.MaxIdleConns())

func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) {
// HACK: In the case of mercury jobs, one conn is needed per job for good
// performance. Most nops will forget to increase the defaults to account
// for this so we detect it here instead.
Expand Down Expand Up @@ -176,17 +113,3 @@ func checkVersion(db Getter, minVersion int) error {
}
return nil
}

func disallowReplica(db *sqlx.DB) error {
var val string
err := db.Get(&val, "SHOW session_replication_role")
if err != nil {
return err
}

if val == "replica" {
return fmt.Errorf("invalid `session_replication_role`: %s. Refusing to connect to replica database. Writing to a replica will corrupt the database", val)
}

return nil
}
21 changes: 0 additions & 21 deletions core/services/pg/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

var _ Getter = &mockGetter{}
Expand Down Expand Up @@ -62,21 +59,3 @@ func Test_checkVersion(t *testing.T) {
require.NoError(t, err)
})
}

func Test_disallowReplica(t *testing.T) {
testutils.SkipShortDB(t)
db := pgtest.NewSqlxDB(t)

_, err := db.Exec("SET session_replication_role= 'origin'")
require.NoError(t, err)
err = disallowReplica(db)
require.NoError(t, err)

_, err = db.Exec("SET session_replication_role= 'replica'")
require.NoError(t, err)
err = disallowReplica(db)
require.Error(t, err, "replica role should be disallowed")

_, err = db.Exec("SET session_replication_role= 'not_valid_role'")
require.Error(t, err)
}
Loading

0 comments on commit 586cd57

Please sign in to comment.