From 1584d00f1e316b454e11ea6c5fa2c821a9674cf8 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 18 Mar 2024 14:33:31 -0700 Subject: [PATCH 1/3] [rdbms] Remove error return from `BuildQuery` --- lib/mysql/scanner/scanner.go | 4 ++-- lib/mysql/scanner/scanner_test.go | 6 ++---- lib/postgres/scanner.go | 4 ++-- lib/postgres/scanner_test.go | 6 ++---- lib/rdbms/scan/scan.go | 7 ++----- lib/rdbms/scan/scan_test.go | 2 +- 6 files changed, 11 insertions(+), 18 deletions(-) diff --git a/lib/mysql/scanner/scanner.go b/lib/mysql/scanner/scanner.go index 5c7d223d..4851ea8f 100644 --- a/lib/mysql/scanner/scanner.go +++ b/lib/mysql/scanner/scanner.go @@ -132,7 +132,7 @@ func queryPlaceholders(count int) []string { return result } -func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any, error) { +func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any) { colNames := make([]string, len(s.columns)) for idx, col := range s.columns { colNames[idx] = schema.QuoteIdentifier(col.Name) @@ -168,7 +168,7 @@ func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool strings.Join(quotedKeyNames, ","), // LIMIT batchSize, - ), slices.Concat(startingValues, endingValues), nil + ), slices.Concat(startingValues, endingValues) } func (s scanAdapter) ParseRow(values []any) error { diff --git a/lib/mysql/scanner/scanner_test.go b/lib/mysql/scanner/scanner_test.go index 81879ca5..3bacba97 100644 --- a/lib/mysql/scanner/scanner_test.go +++ b/lib/mysql/scanner/scanner_test.go @@ -249,15 +249,13 @@ func TestScanAdapter_BuildQuery(t *testing.T) { } { // exclusive lower bound - query, parameters, err := adapter.BuildQuery(keys, false, 12) - assert.NoError(t, err) + query, parameters := adapter.BuildQuery(keys, false, 12) assert.Equal(t, "SELECT `foo`,`bar` FROM `table` WHERE (`foo`) > (?) AND (`foo`) <= (?) ORDER BY `foo` LIMIT 12", query) assert.Equal(t, []any{"a", "b"}, parameters) } { // inclusive upper and lower bounds - query, parameters, err := adapter.BuildQuery(keys, true, 12) - assert.NoError(t, err) + query, parameters := adapter.BuildQuery(keys, true, 12) assert.Equal(t, "SELECT `foo`,`bar` FROM `table` WHERE (`foo`) >= (?) AND (`foo`) <= (?) ORDER BY `foo` LIMIT 12", query) assert.Equal(t, []any{"a", "b"}, parameters) } diff --git a/lib/postgres/scanner.go b/lib/postgres/scanner.go index f733ac73..9a0c1c85 100644 --- a/lib/postgres/scanner.go +++ b/lib/postgres/scanner.go @@ -160,7 +160,7 @@ func queryPlaceholders(offset, count int) []string { return result } -func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any, error) { +func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any) { castedColumns := make([]string, len(s.columns)) for i, col := range s.columns { castedColumns[i] = castColumn(col) @@ -196,7 +196,7 @@ func (s scanAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool strings.Join(quotedKeyNames, ","), // LIMIT batchSize, - ), slices.Concat(startingValues, endingValues), nil + ), slices.Concat(startingValues, endingValues) } func (s scanAdapter) ParseRow(values []any) error { diff --git a/lib/postgres/scanner_test.go b/lib/postgres/scanner_test.go index dc681172..915e8791 100644 --- a/lib/postgres/scanner_test.go +++ b/lib/postgres/scanner_test.go @@ -65,15 +65,13 @@ func TestScanAdapter_BuildQuery(t *testing.T) { { // inclusive lower bound - query, parameters, err := adapter.BuildQuery(primaryKeys, true, 1) - assert.NoError(t, err) + query, parameters := adapter.BuildQuery(primaryKeys, true, 1) assert.Equal(t, `SELECT "a","b","c","e","f",ARRAY_TO_JSON("g")::TEXT as "g" FROM "schema"."table" WHERE row("a","b","c") >= row($1,$2,$3) AND row("a","b","c") <= row($4,$5,$6) ORDER BY "a","b","c" LIMIT 1`, query) assert.Equal(t, []any{int64(1), int64(2), "3", int64(4), int64(5), "6"}, parameters) } { // exclusive lower bound - query, parameters, err := adapter.BuildQuery(primaryKeys, false, 2) - assert.NoError(t, err) + query, parameters := adapter.BuildQuery(primaryKeys, false, 2) assert.Equal(t, `SELECT "a","b","c","e","f",ARRAY_TO_JSON("g")::TEXT as "g" FROM "schema"."table" WHERE row("a","b","c") > row($1,$2,$3) AND row("a","b","c") <= row($4,$5,$6) ORDER BY "a","b","c" LIMIT 2`, query) assert.Equal(t, []any{int64(1), int64(2), "3", int64(4), int64(5), "6"}, parameters) } diff --git a/lib/rdbms/scan/scan.go b/lib/rdbms/scan/scan.go index bbcfcbc6..528152c1 100644 --- a/lib/rdbms/scan/scan.go +++ b/lib/rdbms/scan/scan.go @@ -23,7 +23,7 @@ type ScannerConfig struct { type ScanAdapter interface { ParsePrimaryKeyValue(columnName string, value string) (any, error) - BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any, error) + BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any) ParseRow(row []any) error } @@ -105,10 +105,7 @@ func (s *Scanner) Next() ([]map[string]any, error) { } func (s *Scanner) scan() ([]map[string]any, error) { - query, parameters, err := s.adapter.BuildQuery(s.primaryKeys.Keys(), s.isFirstBatch, s.batchSize) - if err != nil { - return nil, fmt.Errorf("failed to build scan query: %w", err) - } + query, parameters := s.adapter.BuildQuery(s.primaryKeys.Keys(), s.isFirstBatch, s.batchSize) logger := slog.With(slog.String("query", query)) if len(parameters) > 0 { diff --git a/lib/rdbms/scan/scan_test.go b/lib/rdbms/scan/scan_test.go index d170ef19..41e0655f 100644 --- a/lib/rdbms/scan/scan_test.go +++ b/lib/rdbms/scan/scan_test.go @@ -20,7 +20,7 @@ func (m mockAdapter) ParsePrimaryKeyValue(columnName string, value string) (any, } } -func (mockAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any, error) { +func (mockAdapter) BuildQuery(primaryKeys []primary_key.Key, isFirstBatch bool, batchSize uint) (string, []any) { panic("not implemented") } From 6b7e021152b64444dcd8d4a3949ba59726f3f585 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 18 Mar 2024 14:35:51 -0700 Subject: [PATCH 2/3] Change logger --- lib/rdbms/scan/scan.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/rdbms/scan/scan.go b/lib/rdbms/scan/scan.go index 528152c1..5be611c5 100644 --- a/lib/rdbms/scan/scan.go +++ b/lib/rdbms/scan/scan.go @@ -7,6 +7,7 @@ import ( "github.com/artie-labs/reader/lib/rdbms/primary_key" "github.com/artie-labs/transfer/lib/retry" + "gorm.io/gorm/logger" ) const ( @@ -106,12 +107,7 @@ func (s *Scanner) Next() ([]map[string]any, error) { func (s *Scanner) scan() ([]map[string]any, error) { query, parameters := s.adapter.BuildQuery(s.primaryKeys.Keys(), s.isFirstBatch, s.batchSize) - - logger := slog.With(slog.String("query", query)) - if len(parameters) > 0 { - logger = logger.With(slog.Any("parameters", parameters)) - } - logger.Info("Scan query") + logger.Info("Scan query", slog.String("query", query), slog.Any("parameters", parameters)) rows, err := retry.WithRetriesAndResult(s.retryCfg, func(_ int, _ error) (*sql.Rows, error) { return s.db.Query(query, parameters...) From 60c7f65633e2bebc108fb370ffb892d944c1be42 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 18 Mar 2024 14:39:54 -0700 Subject: [PATCH 3/3] Import --- lib/rdbms/scan/scan.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/rdbms/scan/scan.go b/lib/rdbms/scan/scan.go index 5be611c5..5b161f34 100644 --- a/lib/rdbms/scan/scan.go +++ b/lib/rdbms/scan/scan.go @@ -7,7 +7,6 @@ import ( "github.com/artie-labs/reader/lib/rdbms/primary_key" "github.com/artie-labs/transfer/lib/retry" - "gorm.io/gorm/logger" ) const ( @@ -107,7 +106,7 @@ func (s *Scanner) Next() ([]map[string]any, error) { func (s *Scanner) scan() ([]map[string]any, error) { query, parameters := s.adapter.BuildQuery(s.primaryKeys.Keys(), s.isFirstBatch, s.batchSize) - logger.Info("Scan query", slog.String("query", query), slog.Any("parameters", parameters)) + slog.Info("Scan query", slog.String("query", query), slog.Any("parameters", parameters)) rows, err := retry.WithRetriesAndResult(s.retryCfg, func(_ int, _ error) (*sql.Rows, error) { return s.db.Query(query, parameters...)