Skip to content

Commit

Permalink
pkg/sqlutil/pg: create package; expand env config
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 4, 2024
1 parent 2fb437e commit 5742e7a
Show file tree
Hide file tree
Showing 9 changed files with 672 additions and 36 deletions.
29 changes: 19 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-common
go 1.21

require (
github.com/XSAM/otelsql v0.29.0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/ethereum/go-ethereum v1.13.8
github.com/fxamacker/cbor/v2 v2.5.0
Expand All @@ -12,25 +13,27 @@ require (
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.0
github.com/jackc/pgx/v4 v4.18.3
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.2.0
github.com/lib/pq v1.10.2
github.com/linkedin/goavro/v2 v2.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
github.com/pelletier/go-toml/v2 v2.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/riferrei/srclient v0.5.4
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.18.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.2.1
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
Expand All @@ -46,15 +49,21 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand All @@ -69,13 +78,13 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
Expand Down
157 changes: 140 additions & 17 deletions go.sum

Large diffs are not rendered by default.

78 changes: 74 additions & 4 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ import (
"os"
"strconv"
"strings"
"time"
)

const (
envDatabaseURL = "CL_DATABASE_URL"
envDatabaseURL = "CL_DATABASE_URL"
envDatabaseDefaultIdleInTxSessionTimeout = "CL_DATABASE_DEFAULT_IDLE_IN_TX_SESSION_TIMEOUT"
envDatabaseDefaultLockTimeout = "CL_DATABASE_DEFAULT_LOCK_TIMEOUT"
envDatabaseDefaultQueryTimeout = "CL_DATABASE_DEFAULT_QUERY_TIMEOUT"
envDatabaseLogSQL = "CL_DATABASE_LOG_SQL"
envDatabaseMaxOpenConns = "CL_DATABASE_MAX_OPEN_CONNS"
envDatabaseMaxIdleConns = "CL_DATABASE_MAX_IDLE_CONNS"

envPromPort = "CL_PROMETHEUS_PORT"
envTracingEnabled = "CL_TRACING_ENABLED"
envTracingCollectorTarget = "CL_TRACING_COLLECTOR_TARGET"
Expand All @@ -21,7 +29,13 @@ const (
// EnvConfig is the configuration between the application and the LOOP executable. The values
// are fully resolved and static and passed via the environment.
type EnvConfig struct {
DatabaseURL *url.URL
DatabaseURL *url.URL
DatabaseDefaultIdleInTxSessionTimeout time.Duration
DatabaseDefaultLockTimeout time.Duration
DatabaseDefaultQueryTimeout time.Duration
DatabaseLogSQL bool
DatabaseMaxOpenConns int
DatabaseMaxIdleConns int

PrometheusPort int

Expand All @@ -45,6 +59,12 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
// DatabaseURL is optional
if e.DatabaseURL != nil {
injectEnv[envDatabaseURL] = e.DatabaseURL.String()
injectEnv[envDatabaseDefaultIdleInTxSessionTimeout] = e.DatabaseDefaultIdleInTxSessionTimeout.String()
injectEnv[envDatabaseDefaultLockTimeout] = e.DatabaseDefaultLockTimeout.String()
injectEnv[envDatabaseDefaultQueryTimeout] = e.DatabaseDefaultQueryTimeout.String()
injectEnv[envDatabaseLogSQL] = strconv.FormatBool(e.DatabaseLogSQL)
injectEnv[envDatabaseMaxOpenConns] = strconv.Itoa(e.DatabaseMaxOpenConns)
injectEnv[envDatabaseMaxIdleConns] = strconv.Itoa(e.DatabaseMaxIdleConns)
}

for k, v := range e.TracingAttributes {
Expand All @@ -59,13 +79,39 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {

// parse deserializes environment variables
func (e *EnvConfig) parse() error {
promPortStr := os.Getenv(envPromPort)
var err error
e.DatabaseURL, err = getDatabaseURL()
if err != nil {
return fmt.Errorf("failed to parse %s: %q", envDatabaseURL, err)
return err
}
if e.DatabaseURL != nil {
e.DatabaseDefaultIdleInTxSessionTimeout, err = getDatabaseIdleInTxSessionTimeout()
if err != nil {
return err
}
e.DatabaseDefaultLockTimeout, err = getDatabaseDefaultLockTimeout()
if err != nil {
return err
}
e.DatabaseDefaultQueryTimeout, err = getDatabaseDefaultQueryTimeout()
if err != nil {
return err
}
e.DatabaseLogSQL, err = getDatabaseLogSQL()
if err != nil {
return err
}
e.DatabaseMaxOpenConns, err = getDatabaseMaxOpenConns()
if err != nil {
return err
}
e.DatabaseMaxIdleConns, err = getDatabaseMaxIdleConns()
if err != nil {
return err
}
}

promPortStr := os.Getenv(envPromPort)
e.PrometheusPort, err = strconv.Atoi(promPortStr)
if err != nil {
return fmt.Errorf("failed to parse %s = %q: %w", envPromPort, promPortStr, err)
Expand Down Expand Up @@ -150,3 +196,27 @@ func getDatabaseURL() (*url.URL, error) {
}
return u, nil
}

func getDatabaseIdleInTxSessionTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultIdleInTxSessionTimeout))
}

func getDatabaseDefaultLockTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultLockTimeout))
}

func getDatabaseDefaultQueryTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultQueryTimeout))
}

func getDatabaseLogSQL() (bool, error) {
return strconv.ParseBool(os.Getenv(envDatabaseLogSQL))
}

func getDatabaseMaxOpenConns() (int, error) {
return strconv.Atoi(os.Getenv(envDatabaseMaxOpenConns))
}

func getDatabaseMaxIdleConns() (int, error) {
return strconv.Atoi(os.Getenv(envDatabaseMaxIdleConns))
}
3 changes: 2 additions & 1 deletion pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestEnvConfig_parse(t *testing.T) {
{
name: "All variables set correctly",
envVars: map[string]string{
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envDatabaseURL: "postgres://user:password@localhost:5432/db",
//TODO more DB
envPromPort: "8080",
envTracingEnabled: "true",
envTracingCollectorTarget: "some:target",
Expand Down
44 changes: 40 additions & 4 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package loop

import (
"context"
"fmt"
"os"
"time"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
)

// NewStartedServer returns a started Server.
Expand Down Expand Up @@ -41,10 +47,13 @@ func MustNewStartedServer(loggerName string) *Server {

// Server holds common plugin server fields.
type Server struct {
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
promServer *PromServer
checker *services.HealthChecker
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
db *sqlx.DB
dbStatsReporter *pg.StatsReporter
DataSource sqlutil.DataSource
promServer *PromServer
checker *services.HealthChecker
}

func newServer(loggerName string) (*Server, error) {
Expand Down Expand Up @@ -90,6 +99,27 @@ func (s *Server) start() error {
return fmt.Errorf("error starting health checker: %w", err)
}

if envCfg.DatabaseURL != nil {
//TODO set application_name on url?
dbURL := envCfg.DatabaseURL.String()
var err error
s.db, err = pg.ConnectionConfig{
DefaultIdleInTxSessionTimeout: envCfg.DatabaseDefaultIdleInTxSessionTimeout,
DefaultLockTimeout: envCfg.DatabaseDefaultLockTimeout,
MaxOpenConns: envCfg.DatabaseMaxOpenConns,
MaxIdleConns: envCfg.DatabaseMaxIdleConns,
}.NewDB(dbURL, pg.DialectPostgres)
if err != nil {
return fmt.Errorf("error connecting to DataBase at %s: %w", dbURL, err)
}
s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger,
sqlutil.TimeoutHook(func() time.Duration { return envCfg.DatabaseDefaultQueryTimeout }),
sqlutil.MonitorHook(func() bool { return envCfg.DatabaseLogSQL }))

s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger)
s.dbStatsReporter.Start(context.Background()) //TODO why pass ctx at all?

Check failure on line 120 in pkg/loop/server.go

View workflow job for this annotation

GitHub Actions / build-test

too many arguments in call to s.dbStatsReporter.Start
}

return nil
}

Expand All @@ -104,6 +134,12 @@ func (s *Server) Register(c services.HealthReporter) error { return s.checker.Re

// Stop closes resources and flushes logs.
func (s *Server) Stop() {
if s.dbStatsReporter != nil {
s.dbStatsReporter.Stop()
}
if s.db != nil {
s.Logger.ErrorIfFn(s.db.Close, "Failed to close database connection")
}
s.Logger.ErrorIfFn(s.checker.Close, "Failed to close health checker")
s.Logger.ErrorIfFn(s.promServer.Close, "Failed to close prometheus server")
if err := s.Logger.Sync(); err != nil {
Expand Down
99 changes: 99 additions & 0 deletions pkg/sqlutil/pg/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package pg

import (
"database/sql"
"fmt"
"time"

"github.com/XSAM/otelsql"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

// need to make sure pgx driver is registered before opening connection
_ "github.com/jackc/pgx/v4/stdlib"
)

// NOTE: This is the default level in Postgres anyway, we just make it
// explicit here
const defaultIsolation = sql.LevelReadCommitted

// DialectName is a compiler enforced type used that maps to database dialect names
type DialectName string

const (
// DialectPostgres represents the postgres dialect.
DialectPostgres DialectName = "pgx"
// DialectTransactionWrappedPostgres is useful for tests.
// When the connection is opened, it starts a transaction and all
// operations performed on the DB will be within that transaction.
DialectTransactionWrappedPostgres DialectName = "txdb"
)

type ConnectionConfig struct {
DefaultIdleInTxSessionTimeout time.Duration
DefaultLockTimeout time.Duration
MaxOpenConns int
MaxIdleConns int
}

func (config ConnectionConfig) NewDB(uri string, dialect DialectName) (db *sqlx.DB, err error) {
if dialect == DialectTransactionWrappedPostgres {
// Dbtx uses the uri as a unique identifier for each transaction. Each ORM
// should be encapsulated in it's own transaction, and thus needs its own
// unique id.
//
// We can happily throw away the original uri here because if we are using
// txdb it should have already been set at the point where we called
// txdb.Register
uri = uuid.New().String()
}

// Initialize sql/sqlx
sqldb, err := otelsql.Open(string(dialect), uri,
otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitRows: true,
OmitConnectorConnect: true,
OmitConnQuery: false,
}),
)
if err != nil {
return nil, err
}
db = sqlx.NewDb(sqldb, string(dialect))
db.MapperFunc(reflectx.CamelToSnakeASCII)

// Set default connection options
lockTimeout := config.DefaultLockTimeout.Milliseconds()
idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout.Milliseconds()
stmt := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`,
lockTimeout, idleInTxSessionTimeout, defaultIsolation.String())
if _, err = db.Exec(stmt); err != nil {
return nil, err
}
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)

return db, disallowReplica(db)
}

func disallowReplica(db *sqlx.DB) error {
var val string
err := db.Get(&val, "SHOW session_replication_role")
if err != nil {
return err
}

if val == "replica" {
return fmt.Errorf("invalid `session_replication_role`: %s. Refusing to connect to replica database. Writing to a replica will corrupt the database", val)
}

return nil
}
Loading

0 comments on commit 5742e7a

Please sign in to comment.