From 81d97dd25dc23ca37b4b922c19098d2c5bd81bc3 Mon Sep 17 00:00:00 2001 From: nicolasgere Date: Mon, 29 Apr 2024 12:59:12 -0700 Subject: [PATCH] [ENH]: use pgpool for log services (#2079) ## Description of changes pg.Connect is returning only one connection, which is not thread safe and cause an error. When running in grpc, each request run on a different go routine, and in parallel. pgpool will create and manage a set of connection. *Summarize the changes made by this PR.* - Improvements & Bug fixes - Use pg pool to use threadsafe connection. --- go/pkg/log/repository/log.go | 5 +++-- go/pkg/log/server/property_test.go | 4 ++-- go/shared/libs/db_utils.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/go/pkg/log/repository/log.go b/go/pkg/log/repository/log.go index ffba86fc747..a267e589276 100644 --- a/go/pkg/log/repository/log.go +++ b/go/pkg/log/repository/log.go @@ -5,11 +5,12 @@ import ( "errors" log "github.com/chroma-core/chroma/go/database/log/db" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "time" ) type LogRepository struct { - conn *pgx.Conn + conn *pgxpool.Pool queries *log.Queries } @@ -95,7 +96,7 @@ func (r *LogRepository) PurgeRecords(ctx context.Context) (err error) { return } -func NewLogRepository(conn *pgx.Conn) *LogRepository { +func NewLogRepository(conn *pgxpool.Pool) *LogRepository { return &LogRepository{ conn: conn, queries: log.New(conn), diff --git a/go/pkg/log/server/property_test.go b/go/pkg/log/server/property_test.go index 5e90e92683a..ace7c4dba6b 100644 --- a/go/pkg/log/server/property_test.go +++ b/go/pkg/log/server/property_test.go @@ -9,7 +9,7 @@ import ( "github.com/chroma-core/chroma/go/pkg/proto/logservicepb" "github.com/chroma-core/chroma/go/pkg/types" libs2 "github.com/chroma-core/chroma/go/shared/libs" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "pgregory.net/rapid" @@ -37,7 +37,7 @@ func (suite *LogServerTestSuite) SetupSuite() { connectionString, err := libs2.StartPgContainer(ctx) config.DATABASE_URL = connectionString assert.NoError(suite.t, err, "Failed to start pg container") - var conn *pgx.Conn + var conn *pgxpool.Pool conn, err = libs2.NewPgConnection(ctx, config) assert.NoError(suite.t, err, "Failed to create new pg connection") err = libs2.RunMigration(ctx, connectionString) diff --git a/go/shared/libs/db_utils.go b/go/shared/libs/db_utils.go index c56cc20a5cb..3c60e50344c 100644 --- a/go/shared/libs/db_utils.go +++ b/go/shared/libs/db_utils.go @@ -3,10 +3,10 @@ package libs import ( "context" "github.com/chroma-core/chroma/go/pkg/log/configuration" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) -func NewPgConnection(ctx context.Context, config *configuration.LogServiceConfiguration) (conn *pgx.Conn, err error) { - conn, err = pgx.Connect(ctx, config.DATABASE_URL) +func NewPgConnection(ctx context.Context, config *configuration.LogServiceConfiguration) (conn *pgxpool.Pool, err error) { + conn, err = pgxpool.New(ctx, config.DATABASE_URL) return }