From e91e23ab6db531b896e2e77149eee458ba78a7e7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 17 Jun 2024 11:10:21 -0700 Subject: [PATCH] Refactor Dedupe signature. --- clients/bigquery/bigquery.go | 4 ++-- clients/bigquery/bigquery_dedupe_test.go | 9 ++++----- clients/bigquery/dialect/dialect.go | 5 ++--- clients/mssql/dialect/dialect.go | 3 +-- clients/mssql/store.go | 2 +- clients/redshift/dialect/dialect.go | 5 ++--- clients/redshift/redshift.go | 4 ++-- clients/redshift/redshift_dedupe_test.go | 9 ++++----- clients/snowflake/dialect/dialect.go | 5 ++--- clients/snowflake/snowflake.go | 4 ++-- clients/snowflake/snowflake_dedupe_test.go | 9 ++++----- lib/destination/dwh.go | 2 +- lib/sql/dialect.go | 3 +-- 13 files changed, 28 insertions(+), 36 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 80bda585e..b16472452 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -180,10 +180,10 @@ func (s *Store) putTableViaStorageWriteAPI(ctx context.Context, bqTableID TableI return nil } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) defer func() { _ = ddl.DropTemporaryTable(s, stagingTableID, false) }() diff --git a/clients/bigquery/bigquery_dedupe_test.go b/clients/bigquery/bigquery_dedupe_test.go index e43018740..b247eda67 100644 --- a/clients/bigquery/bigquery_dedupe_test.go +++ b/clients/bigquery/bigquery_dedupe_test.go @@ -11,7 +11,6 @@ import ( "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" ) @@ -21,7 +20,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -39,7 +38,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project12", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -57,7 +56,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -75,7 +74,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("project123", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.BigQueryDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index bc4c93d3c..6a3814b25 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "time" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -163,11 +162,11 @@ func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, pri ) } -func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (bd BigQueryDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, bd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, bd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 083880539..adfbff906 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -173,7 +172,7 @@ func (MSSQLDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKe panic("not implemented") } -func (MSSQLDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (MSSQLDialect) BuildDedupeQueries(_, _ sql.TableIdentifier, _ []string, _ bool) []string { panic("not implemented") // We don't currently support deduping for MS SQL. } diff --git a/clients/mssql/store.go b/clients/mssql/store.go index a34a6e09c..db4254cfe 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -70,7 +70,7 @@ func (s *Store) Sweep() error { return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ kafkalib.TopicConfig) error { +func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error { return nil // dedupe is not necessary for MS SQL } diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 757b90410..29ae4e00f 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -139,11 +138,11 @@ func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ [ return fmt.Sprintf(`( SELECT DISTINCT * FROM %s )`, tableID.FullyQualifiedName()) } -func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (rd RedshiftDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, rd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, rd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 4def473f3..9c840e6e4 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -110,9 +110,9 @@ WHERE return shared.Sweep(s, tcs, queryFunc) } -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/redshift/redshift_dedupe_test.go b/clients/redshift/redshift_dedupe_test.go index 9e1f02568..b0cfde510 100644 --- a/clients/redshift/redshift_dedupe_test.go +++ b/clients/redshift/redshift_dedupe_test.go @@ -6,7 +6,6 @@ import ( "github.com/artie-labs/transfer/clients/redshift/dialect" "github.com/artie-labs/transfer/clients/shared" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -17,7 +16,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -32,7 +31,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -47,7 +46,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), @@ -62,7 +61,7 @@ func (r *RedshiftTestSuite) Test_GenerateDedupeQueries() { tableID := NewTableIdentifier("public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.RedshiftDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(r.T(), parts, 3) assert.Equal( r.T(), diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index 57148a944..5edee8950 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" @@ -157,11 +156,11 @@ func (SnowflakeDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, prima panic("not implemented") } -func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string { +func (sd SnowflakeDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, sd) orderColsToIterate := primaryKeysEscaped - if topicConfig.IncludeArtieUpdatedAt { + if includeArtieUpdatedAt { orderColsToIterate = append(orderColsToIterate, sd.QuoteIdentifier(constants.UpdateColumnMarker)) } diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go index f8c6e8353..fb88d427d 100644 --- a/clients/snowflake/snowflake.go +++ b/clients/snowflake/snowflake.go @@ -130,9 +130,9 @@ func (s *Store) reestablishConnection() error { // Dedupe takes a table and will remove duplicates based on the primary key(s). // These queries are inspired and modified from: https://stackoverflow.com/a/71515946 -func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error { +func (s *Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, topicConfig) + dedupeQueries := s.Dialect().BuildDedupeQueries(tableID, stagingTableID, primaryKeys, includeArtieUpdatedAt) return destination.ExecStatements(s, dedupeQueries) } diff --git a/clients/snowflake/snowflake_dedupe_test.go b/clients/snowflake/snowflake_dedupe_test.go index 88f4d5036..2f37e71fa 100644 --- a/clients/snowflake/snowflake_dedupe_test.go +++ b/clients/snowflake/snowflake_dedupe_test.go @@ -7,7 +7,6 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/stringutil" "github.com/stretchr/testify/assert" ) @@ -18,7 +17,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -33,7 +32,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "customers") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"id"}, true) assert.Len(t, parts, 3) assert.Equal( t, @@ -48,7 +47,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, false) assert.Len(t, parts, 3) assert.Equal( t, @@ -63,7 +62,7 @@ func TestGenerateDedupeQueries(t *testing.T) { tableID := NewTableIdentifier("db", "public", "user_settings") stagingTableID := shared.TempTableID(tableID, strings.ToLower(stringutil.Random(5))) - parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, kafkalib.TopicConfig{IncludeArtieUpdatedAt: true}) + parts := dialect.SnowflakeDialect{}.BuildDedupeQueries(tableID, stagingTableID, []string{"user_id", "settings"}, true) assert.Len(t, parts, 3) assert.Equal( t, diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 2cf5d43d3..2416761f9 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -15,7 +15,7 @@ type DataWarehouse interface { Dialect() sqllib.Dialect Merge(tableData *optimization.TableData) error Append(tableData *optimization.TableData) error - Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) error + Dedupe(tableID sqllib.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) Begin() (*sql.Tx, error) diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index 4c5d257d6..bef2ce857 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -2,7 +2,6 @@ package sql import ( "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -25,7 +24,7 @@ type Dialect interface { BuildAlterColumnQuery(tableID TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string BuildDedupeTableQuery(tableID TableIdentifier, primaryKeys []string) string - BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string + BuildDedupeQueries(tableID, stagingTableID TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string BuildMergeQueries( tableID TableIdentifier, subQuery string,