Skip to content

Commit

Permalink
Rename postgres limit to batch size (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 30, 2024
1 parent 22c9e1a commit faf05f7
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions config/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type PostgreSQLTable struct {
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd"`
}

func (p *PostgreSQLTable) GetLimit() uint {
func (p *PostgreSQLTable) GetBatchSize() uint {
if p.Limit == 0 {
return constants.DefaultLimit
return constants.DefaultBatchSize
}

return p.Limit
Expand Down
2 changes: 1 addition & 1 deletion constants/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package constants

const (
DefaultLimit = 5_000
DefaultBatchSize = 5_000
DefaultPublishSize = 2_500
)
8 changes: 4 additions & 4 deletions sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
const defaultErrorRetries = 10

func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWriter *kafka.Writer) error {
batchWriter := kafkalib.NewBatchWriter(ctx, *cfg.Kafka, kafkaWriter)
writer := kafkalib.NewBatchWriter(ctx, *cfg.Kafka, kafkaWriter)

db, err := sql.Open("postgres", postgres.NewConnection(cfg.PostgreSQL).String())
if err != nil {
Expand All @@ -46,12 +46,12 @@ func Run(ctx context.Context, cfg config.Settings, statsD *mtr.Client, kafkaWrit
slog.String("schemaName", table.Schema),
slog.String("topicSuffix", table.TopicSuffix()),
slog.Any("primaryKeyColumns", table.PrimaryKeys.Keys()),
slog.Any("batchSize", tableCfg.GetLimit()),
slog.Any("batchSize", tableCfg.GetBatchSize()),
)

scanner := table.NewScanner(db, tableCfg.GetLimit(), defaultErrorRetries)
scanner := table.NewScanner(db, tableCfg.GetBatchSize(), defaultErrorRetries)
messageBuilder := postgres.NewMessageBuilder(table, &scanner, statsD, cfg.Kafka.MaxRequestSize)
count, err := batchWriter.WriteIterator(messageBuilder)
count, err := writer.WriteIterator(messageBuilder)
if err != nil {
return fmt.Errorf("failed to snapshot, table: %s, err: %w", table.Name, err)
}
Expand Down

0 comments on commit faf05f7

Please sign in to comment.