Skip to content

Commit

Permalink
plugins/cmd/chainlink-example-relay: add example relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 17, 2024
1 parent f50f2dd commit 895551f
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 367 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ jobs:
pushd $(go list -m -f "{{.Dir}}" github.com/smartcontractkit/chainlink-starknet/relayer)
go install ./pkg/chainlink/cmd/chainlink-starknet
popd
go install ./plugins/cmd/chainlink-example-relay
- name: Increase Race Timeout
if: ${{ github.event.schedule != '' && needs.filter.outputs.changes == 'true' }}
run: |
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240417140413-132458dd738c
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down Expand Up @@ -55,7 +55,7 @@ require (
github.com/NethermindEth/juno v0.3.1 // indirect
github.com/NethermindEth/starknet.go v0.6.1-0.20231218140327-915109ab5bc1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/XSAM/otelsql v0.27.0 // indirect
github.com/XSAM/otelsql v0.29.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
Expand Down Expand Up @@ -197,7 +197,7 @@ require (
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
github.com/jackc/pgx/v4 v4.18.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
Expand Down Expand Up @@ -303,7 +303,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bw
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs=
github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A=
github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc=
github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
Expand Down Expand Up @@ -821,8 +821,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU=
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
Expand Down Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb h1:yLDt5cQWRwcFM5VEdSTbc3vDrYrxYqBjSvyTMU/o8s4=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240417140413-132458dd738c h1:N/ktA12ZCVsi4S+7qxrxXQYOP1qF/gXqb3nnNxHTOVw=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240417140413-132458dd738c/go.mod h1:MvSUAodwVsCtNMSazqsPhWf/f0uaVLZUBjmok8wA4XI=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down Expand Up @@ -1405,10 +1405,10 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqhe
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0=
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8=
go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
Expand Down
57 changes: 10 additions & 47 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@ import (
"os"
"time"

"github.com/google/uuid"
_ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"

"github.com/XSAM/otelsql"
)

var MinRequiredPGVersion = 110000
Expand Down Expand Up @@ -44,59 +39,27 @@ type ConnectionConfig interface {
}

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error) {
if dialect == dialects.TransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register
uri = uuid.New().String()
}

// Initialize sql/sqlx
sqldb, err := otelsql.Open(string(dialect), uri,
otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
}),
)
db, err = commonpg.ConnectionConfig{
DefaultIdleInTxSessionTimeout: config.DefaultIdleInTxSessionTimeout(),
DefaultLockTimeout: config.DefaultLockTimeout(),
MaxOpenConns: config.MaxOpenConns(),
MaxIdleConns: config.MaxIdleConns(),
}.NewDB(uri, commonpg.DialectName(dialect))
if err != nil {
return nil, err
}
db = sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

// Set default connection options
lockTimeout := config.DefaultLockTimeout().Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds()
stmt := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())
if _, err = db.Exec(stmt); err != nil {
return nil, err
}
setMaxConns(db, config)
setMaxMercuryConns(db, config)

if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" {
if err := checkVersion(db, MinRequiredPGVersion); err != nil {
return nil, err
}
}

return db, disallowReplica(db)
return db, nil
}

func setMaxConns(db *sqlx.DB, config ConnectionConfig) {
db.SetMaxOpenConns(config.MaxOpenConns())
db.SetMaxIdleConns(config.MaxIdleConns())

func setMaxMercuryConns(db *sqlx.DB, config ConnectionConfig) {
// HACK: In the case of mercury jobs, one conn is needed per job for good
// performance. Most nops will forget to increase the defaults to account
// for this so we detect it here instead.
Expand Down
2 changes: 1 addition & 1 deletion core/services/pg/locked_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (l *lockedDb) Open(ctx context.Context) (err error) {

// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)
l.statsReporter.Start()

// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
Expand Down
128 changes: 6 additions & 122 deletions core/services/pg/stats.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,17 @@
package pg

import (
"context"
"database/sql"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

commonpg "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const dbStatsInternal = 10 * time.Second

var (
promDBConnsMax = promauto.NewGauge(prometheus.GaugeOpts{
Name: "db_conns_max",
Help: "Maximum number of open connections to the database.",
})
promDBConnsOpen = promauto.NewGauge(prometheus.GaugeOpts{
Name: "db_conns_open",
Help: "The number of established connections both in use and idle.",
})
promDBConnsInUse = promauto.NewGauge(prometheus.GaugeOpts{
Name: "db_conns_used",
Help: "The number of connections currently in use.",
})
promDBWaitCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "db_wait_count",
Help: "The total number of connections waited for.",
})
promDBWaitDuration = promauto.NewGauge(prometheus.GaugeOpts{
Name: "db_wait_time_seconds",
Help: "The total time blocked waiting for a new connection.",
})
)

func publishStats(stats sql.DBStats) {
promDBConnsMax.Set(float64(stats.MaxOpenConnections))
promDBConnsOpen.Set(float64(stats.OpenConnections))
promDBConnsInUse.Set(float64(stats.InUse))

promDBWaitCount.Set(float64(stats.WaitCount))
promDBWaitDuration.Set(stats.WaitDuration.Seconds())
}

type StatsReporterOpt func(*StatsReporter)

func StatsInterval(d time.Duration) StatsReporterOpt {
return func(r *StatsReporter) {
r.interval = d
}
}

func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt {
return func(r *StatsReporter) {
r.reportFn = fn
}
}

type (
StatFn func() sql.DBStats
ReportFn func(sql.DBStats)
StatFn = commonpg.StatFn
ReportFn = commonpg.ReportFn
)

type StatsReporter struct {
statFn StatFn
reportFn ReportFn
interval time.Duration
cancel context.CancelFunc
lggr logger.Logger
once sync.Once
wg sync.WaitGroup
}

func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter {
r := &StatsReporter{
statFn: fn,
reportFn: publishStats,
interval: dbStatsInternal,
lggr: lggr.Named("StatsReporter"),
}

for _, opt := range opts {
opt(r)
}

return r
}

func (r *StatsReporter) Start(ctx context.Context) {

startOnce := func() {
r.wg.Add(1)
r.lggr.Debug("Starting DB stat reporter")
rctx, cancelFunc := context.WithCancel(ctx)
r.cancel = cancelFunc
go r.loop(rctx)
}

r.once.Do(startOnce)
}

// Stop stops all resources owned by the reporter and waits
// for all of them to be done
func (r *StatsReporter) Stop() {
if r.cancel != nil {
r.lggr.Debug("Stopping DB stat reporter")
r.cancel()
r.cancel = nil
r.wg.Wait()
}
}

func (r *StatsReporter) loop(ctx context.Context) {
defer r.wg.Done()

ticker := time.NewTicker(r.interval)
defer ticker.Stop()
type StatsReporter = commonpg.StatsReporter

r.reportFn(r.statFn())
for {
select {
case <-ticker.C:
r.reportFn(r.statFn())
case <-ctx.Done():
r.lggr.Debug("stat reporter loop received done. stopping...")
return
}
}
func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...commonpg.StatsReporterOpt) *StatsReporter {
return commonpg.NewStatsReporter(fn, lggr, opts...)
}
Loading

0 comments on commit 895551f

Please sign in to comment.