From faf05f7509fec25e32b178bddbfcd79d3472d6db Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 29 Jan 2024 23:47:13 -0800 Subject: [PATCH] Rename postgres limit to batch size (#68) --- config/postgres.go | 4 ++-- constants/constants.go | 2 +- sources/postgres/snapshot.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/config/postgres.go b/config/postgres.go index 780daf78..6881e2ac 100644 --- a/config/postgres.go +++ b/config/postgres.go @@ -26,9 +26,9 @@ type PostgreSQLTable struct { OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd"` } -func (p *PostgreSQLTable) GetLimit() uint { +func (p *PostgreSQLTable) GetBatchSize() uint { if p.Limit == 0 { - return constants.DefaultLimit + return constants.DefaultBatchSize } return p.Limit diff --git a/constants/constants.go b/constants/constants.go index 5c610c58..e1af31a9 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,6 +1,6 @@ package constants const ( - DefaultLimit = 5_000 + DefaultBatchSize = 5_000 DefaultPublishSize = 2_500 ) diff --git a/sources/postgres/snapshot.go b/sources/postgres/snapshot.go index 529f60a1..f9392124 100644 --- a/sources/postgres/snapshot.go +++ b/sources/postgres/snapshot.go @@ -19,7 +19,7 @@ import ( const defaultErrorRetries = 10 func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWriter *kafka.Writer) error { - batchWriter := kafkalib.NewBatchWriter(ctx, *cfg.Kafka, kafkaWriter) + writer := kafkalib.NewBatchWriter(ctx, *cfg.Kafka, kafkaWriter) db, err := sql.Open("postgres", postgres.NewConnection(cfg.PostgreSQL).String()) if err != nil { @@ -46,12 +46,12 @@ func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWrit slog.String("schemaName", table.Schema), slog.String("topicSuffix", table.TopicSuffix()), slog.Any("primaryKeyColumns", table.PrimaryKeys.Keys()), - slog.Any("batchSize", tableCfg.GetLimit()), + slog.Any("batchSize", tableCfg.GetBatchSize()), ) - scanner := table.NewScanner(db, tableCfg.GetLimit(), defaultErrorRetries) + scanner := table.NewScanner(db, tableCfg.GetBatchSize(), defaultErrorRetries) messageBuilder := postgres.NewMessageBuilder(table, &scanner, statsD, cfg.Kafka.MaxRequestSize) - count, err := batchWriter.WriteIterator(messageBuilder) + count, err := writer.WriteIterator(messageBuilder) if err != nil { return fmt.Errorf("failed to snapshot, table: %s, err: %w", table.Name, err) }