From b13b6234500a316032a46f50270a1675c8c7fde5 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 7 Oct 2024 19:07:32 -0700 Subject: [PATCH 1/5] WIP. --- clients/bigquery/bigquery.go | 10 +++++----- clients/bigquery/merge.go | 5 +++-- clients/databricks/store.go | 13 ++++++------- clients/mssql/staging.go | 3 ++- clients/mssql/store.go | 9 +++++---- clients/redshift/redshift.go | 9 +++++---- clients/redshift/staging.go | 4 ++-- clients/s3/s3.go | 8 ++++---- clients/shared/append.go | 4 +++- clients/shared/merge.go | 5 +++-- clients/snowflake/staging.go | 3 ++- clients/snowflake/writes.go | 10 ++++++---- lib/destination/dwh.go | 7 ++++--- processes/consumer/flush.go | 4 ++-- 14 files changed, 52 insertions(+), 42 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 1e9944aa1..d67fedcaf 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -42,9 +42,9 @@ type Store struct { db.Store } -func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) error { +func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error { if !useTempTable { - return shared.Append(s, tableData, types.AdditionalSettings{}) + return shared.Append(ctx, s, tableData, types.AdditionalSettings{}) } // We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data @@ -55,7 +55,7 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }() - err := shared.Append(s, tableData, types.AdditionalSettings{ + err := shared.Append(ctx, s, tableData, types.AdditionalSettings{ UseTempTable: true, TempTableID: temporaryTableID, }) @@ -78,7 +78,7 @@ func (s *Store) Append(tableData *optimization.TableData, useTempTable bool) err return nil } -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -100,7 +100,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo return err } - return s.putTable(context.Background(), bqTempTableID, tableData) + return s.putTable(ctx, bqTempTableID, tableData) } func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index dad5e7595..dbe53764c 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -1,6 +1,7 @@ package bigquery import ( + "context" "fmt" "strings" @@ -14,7 +15,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) Merge(tableData *optimization.TableData) error { +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { var additionalEqualityStrings []string if tableData.TopicConfig().BigQueryPartitionSettings != nil { distinctDates, err := tableData.DistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField) @@ -30,7 +31,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { additionalEqualityStrings = []string{mergeString} } - return shared.Merge(s, tableData, types.MergeOpts{ + return shared.Merge(ctx, s, tableData, types.MergeOpts{ AdditionalEqualityStrings: additionalEqualityStrings, // BigQuery has DDL quotas. RetryColBackfill: true, diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 817b561c0..daa39f7b5 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -36,12 +36,12 @@ func describeTableQuery(tableID TableIdentifier) (string, []any) { return fmt.Sprintf("DESCRIBE TABLE %s", tableID.FullyQualifiedName()), nil } -func (s Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, types.MergeOpts{}) +func (s Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + return shared.Merge(ctx, s, tableData, types.MergeOpts{}) } -func (s Store) Append(tableData *optimization.TableData, useTempTable bool) error { - return shared.Append(s, tableData, types.AdditionalSettings{UseTempTable: useTempTable}) +func (s Store) Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error { + return shared.Append(ctx, s, tableData, types.AdditionalSettings{UseTempTable: useTempTable}) } func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { @@ -89,8 +89,7 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl }.GetTableConfig() } -func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { - // TODO: Update PrepareTemporaryTable interface to include context +func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -120,7 +119,7 @@ func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCon }() // Upload the local file to DBFS - ctx := driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"}) + ctx = driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"}) castedTempTableID, isOk := tempTableID.(TableIdentifier) if !isOk { diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index 89a2026ec..484c293d5 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -1,6 +1,7 @@ package mssql import ( + "context" "fmt" mssql "github.com/microsoft/go-mssqldb" @@ -13,7 +14,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(_ context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 7694d5b33..f9d50034d 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -1,6 +1,7 @@ package mssql import ( + "context" "strings" _ "github.com/microsoft/go-mssqldb" @@ -38,12 +39,12 @@ func (s *Store) dialect() dialect.MSSQLDialect { return dialect.MSSQLDialect{} } -func (s *Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, types.MergeOpts{}) +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + return shared.Merge(ctx, s, tableData, types.MergeOpts{}) } -func (s *Store) Append(tableData *optimization.TableData, _ bool) error { - return shared.Append(s, tableData, types.AdditionalSettings{}) +func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, _ bool) error { + return shared.Append(ctx, s, tableData, types.AdditionalSettings{}) } // specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name. diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go index 7a09c4311..05947eac8 100644 --- a/clients/redshift/redshift.go +++ b/clients/redshift/redshift.go @@ -1,6 +1,7 @@ package redshift import ( + "context" "fmt" _ "github.com/jackc/pgx/v5/stdlib" @@ -26,12 +27,12 @@ type Store struct { db.Store } -func (s *Store) Append(tableData *optimization.TableData, _ bool) error { - return shared.Append(s, tableData, types.AdditionalSettings{}) +func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, _ bool) error { + return shared.Append(ctx, s, tableData, types.AdditionalSettings{}) } -func (s *Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, types.MergeOpts{ +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + return shared.Merge(ctx, s, tableData, types.MergeOpts{ // We are adding SELECT DISTINCT here for the temporary table as an extra guardrail. // Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise. SubQueryDedupe: true, diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 777690a70..6be9007d5 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -17,7 +17,7 @@ import ( "github.com/artie-labs/transfer/lib/sql" ) -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -47,7 +47,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo }() // Load fp into s3, get S3 URI and pass it down. - s3Uri, err := s3lib.UploadLocalFileToS3(context.Background(), s3lib.UploadArgs{ + s3Uri, err := s3lib.UploadLocalFileToS3(ctx, s3lib.UploadArgs{ OptionalS3Prefix: s.optionalS3Prefix, Bucket: s.bucket, FilePath: fp, diff --git a/clients/s3/s3.go b/clients/s3/s3.go index 5a05c5539..7cb0d8444 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -58,9 +58,9 @@ func (s *Store) ObjectPrefix(tableData *optimization.TableData) string { return strings.Join([]string{fqTableName, yyyyMMDDFormat}, "/") } -func (s *Store) Append(tableData *optimization.TableData, _ bool) error { +func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, _ bool) error { // There's no difference in appending or merging for S3. - return s.Merge(tableData) + return s.Merge(ctx, tableData) } // Merge - will take tableData, write it into a particular file in the specified format, in these steps: @@ -68,7 +68,7 @@ func (s *Store) Append(tableData *optimization.TableData, _ bool) error { // 2. Load the temporary file, under this format: s3://bucket/folderName/fullyQualifiedTableName/YYYY-MM-DD/{{unix_timestamp}}.parquet.gz // 3. It will then upload this to S3 // 4. Delete the temporary file -func (s *Store) Merge(tableData *optimization.TableData) error { +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { if tableData.ShouldSkipUpdate() { return nil } @@ -127,7 +127,7 @@ func (s *Store) Merge(tableData *optimization.TableData) error { } }() - if _, err = s3lib.UploadLocalFileToS3(context.Background(), s3lib.UploadArgs{ + if _, err = s3lib.UploadLocalFileToS3(ctx, s3lib.UploadArgs{ Bucket: s.config.S3.Bucket, OptionalS3Prefix: s.ObjectPrefix(tableData), FilePath: fp, diff --git a/clients/shared/append.go b/clients/shared/append.go index dd12d568b..2f213ba70 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -1,6 +1,7 @@ package shared import ( + "context" "fmt" "github.com/artie-labs/transfer/lib/config/constants" @@ -11,7 +12,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error { +func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error { if tableData.ShouldSkipUpdate() { return nil } @@ -58,6 +59,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op } return dwh.PrepareTemporaryTable( + ctx, tableData, tableConfig, tempTableID, diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 1ce39e74a..493e0c1a1 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -1,6 +1,7 @@ package shared import ( + "context" "fmt" "log/slog" "time" @@ -17,7 +18,7 @@ import ( const backfillMaxRetries = 1000 -func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.MergeOpts) error { +func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.MergeOpts) error { if tableData.ShouldSkipUpdate() { return nil } @@ -76,7 +77,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt } }() - if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, tableID, types.AdditionalSettings{}, true); err != nil { + if err = dwh.PrepareTemporaryTable(ctx, tableData, tableConfig, temporaryTableID, tableID, types.AdditionalSettings{}, true); err != nil { return fmt.Errorf("failed to prepare temporary table: %w", err) } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 6a279c7b0..b71a912d3 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -1,6 +1,7 @@ package snowflake import ( + "context" "encoding/csv" "fmt" "log/slog" @@ -54,7 +55,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { return replaceExceededValues(value, colKind), nil } -func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(_ context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index f99c6ad01..cff8d4b2d 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -1,6 +1,8 @@ package snowflake import ( + "context" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" @@ -10,9 +12,9 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) Append(tableData *optimization.TableData, _ bool) error { +func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, _ bool) error { // TODO: For history mode - in the future, we could also have a separate stage name for history mode so we can enable parallel processing. - return shared.Append(s, tableData, types.AdditionalSettings{ + return shared.Append(ctx, s, tableData, types.AdditionalSettings{ AdditionalCopyClause: `FILE_FORMAT = (TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) PURGE = TRUE`, }) } @@ -25,8 +27,8 @@ func (s *Store) additionalEqualityStrings(tableData *optimization.TableData) []s return sql.BuildColumnComparisons(cols, constants.TargetAlias, constants.StagingAlias, sql.Equal, s.Dialect()) } -func (s *Store) Merge(tableData *optimization.TableData) error { - return shared.Merge(s, tableData, types.MergeOpts{ +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + return shared.Merge(ctx, s, tableData, types.MergeOpts{ AdditionalEqualityStrings: s.additionalEqualityStrings(tableData), }) } diff --git a/lib/destination/dwh.go b/lib/destination/dwh.go index 9c1430f41..31e65c754 100644 --- a/lib/destination/dwh.go +++ b/lib/destination/dwh.go @@ -1,6 +1,7 @@ package destination import ( + "context" "database/sql" "fmt" "log/slog" @@ -24,12 +25,12 @@ type DataWarehouse interface { // Helper functions for merge GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) - PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, parentTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error + PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sqllib.TableIdentifier, parentTableID sqllib.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error } type Baseline interface { - Merge(tableData *optimization.TableData) error - Append(tableData *optimization.TableData, useTempTable bool) error + Merge(ctx context.Context, tableData *optimization.TableData) error + Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error IsRetryableError(err error) bool IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sqllib.TableIdentifier } diff --git a/processes/consumer/flush.go b/processes/consumer/flush.go index ca40b7068..cb854c77c 100644 --- a/processes/consumer/flush.go +++ b/processes/consumer/flush.go @@ -115,9 +115,9 @@ func flush(ctx context.Context, dest destination.Baseline, _tableData *models.Ta // Merge or Append depending on the mode. var err error if _tableData.Mode() == config.History { - err = dest.Append(_tableData.TableData, false) + err = dest.Append(ctx, _tableData.TableData, false) } else { - err = dest.Merge(_tableData.TableData) + err = dest.Merge(ctx, _tableData.TableData) } if err != nil { From aa7ce50ae3aff5e6c5438b93fe7724bdc6f743b5 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 7 Oct 2024 19:10:08 -0700 Subject: [PATCH 2/5] Clean up. --- clients/snowflake/snowflake_test.go | 27 +++++++++++---------------- clients/snowflake/staging_test.go | 11 +++++------ 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 520bcd818..da70de708 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -1,6 +1,7 @@ package snowflake import ( + "context" "fmt" "strconv" "strings" @@ -9,19 +10,16 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/kafkalib/partition" - "github.com/artie-labs/transfer/lib/sql" - - "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/stretchr/testify/assert" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/kafkalib/partition" "github.com/artie-labs/transfer/lib/optimization" + "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" ) func (s *SnowflakeTestSuite) identifierFor(tableData *optimization.TableData) sql.TableIdentifier { @@ -80,7 +78,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() { s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true)) - err := s.stageStore.Merge(tableData) + err := s.stageStore.Merge(context.Background(), tableData) _col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name") assert.True(s.T(), isOk) assert.Equal(s.T(), _col.KindDetails, typing.String) @@ -126,7 +124,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true)) - assert.NoError(s.T(), s.stageStore.Merge(tableData)) + assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) } @@ -174,7 +172,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() { tableID := s.identifierFor(tableData) fqName := tableID.FullyQualifiedName() s.stageStore.configMap.AddTableToConfig(tableID, types.NewDwhTableConfig(&cols, nil, false, true)) - err := s.stageStore.Merge(tableData) + err := s.stageStore.Merge(context.Background(), tableData) assert.Nil(s.T(), err) s.fakeStageStore.ExecReturns(nil, nil) // CREATE TABLE IF NOT EXISTS customer.public.orders___artie_Mwv9YADmRy (id int,name string,__artie_delete boolean,created_at timestamp_tz) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT='expires:2023-06-27 11:54:03 UTC' @@ -258,8 +256,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { _config := types.NewDwhTableConfig(&sflkCols, nil, false, true) s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), _config) - err := s.stageStore.Merge(tableData) - assert.Nil(s.T(), err) + assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) s.fakeStageStore.ExecReturns(nil, nil) assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 5, "called merge") @@ -282,8 +279,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { break } - err = s.stageStore.Merge(tableData) - assert.NoError(s.T(), err) + assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) s.fakeStageStore.ExecReturns(nil, nil) assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 10, "called merge again") @@ -294,8 +290,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() { tableData := optimization.NewTableData(nil, config.Replication, nil, kafkalib.TopicConfig{}, "foo") - err := s.stageStore.Merge(tableData) - assert.Nil(s.T(), err) + assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) } func (s *SnowflakeTestSuite) TestStore_AdditionalEqualityStrings() { diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 8202074d6..c6332217d 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -1,6 +1,7 @@ package snowflake import ( + "context" "encoding/csv" "fmt" "io" @@ -9,15 +10,13 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/artie-labs/transfer/lib/destination/types" - "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" "github.com/stretchr/testify/assert" ) @@ -157,7 +156,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() { sflkTc := s.stageStore.GetConfigMap().TableConfigCache(tempTableID) { - assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, true)) + assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(context.Background(), tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, true)) assert.Equal(s.T(), 3, s.fakeStageStore.ExecCallCount()) // First call is to create the temp table @@ -179,7 +178,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() { } { // Don't create the temporary table. - assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, false)) + assert.NoError(s.T(), s.stageStore.PrepareTemporaryTable(context.Background(), tableData, sflkTc, tempTableID, tempTableID, types.AdditionalSettings{}, false)) assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) } From 0e3d506dd4c46113223b304a391176a335c8e574 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 7 Oct 2024 19:11:38 -0700 Subject: [PATCH 3/5] No more. --- clients/databricks/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/databricks/store.go b/clients/databricks/store.go index daa39f7b5..ac07b5863 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -119,7 +119,7 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio }() // Upload the local file to DBFS - ctx = driverctx.NewContextWithStagingInfo(context.Background(), []string{"/var"}) + ctx = driverctx.NewContextWithStagingInfo(ctx, []string{"/var"}) castedTempTableID, isOk := tempTableID.(TableIdentifier) if !isOk { From 9c07f66e1415b3d8bd56b95355b7af30ebc246e9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 8 Oct 2024 09:29:30 -0700 Subject: [PATCH 4/5] PR Feedback. --- clients/snowflake/snowflake_test.go | 3 ++- clients/snowflake/staging_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index da70de708..6bf041239 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" @@ -19,7 +21,6 @@ import ( "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" - "github.com/stretchr/testify/assert" ) func (s *SnowflakeTestSuite) identifierFor(tableData *optimization.TableData) sql.TableIdentifier { diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index c6332217d..37502ddfd 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -8,6 +8,8 @@ import ( "os" "strings" + "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" @@ -17,7 +19,6 @@ import ( "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/stretchr/testify/assert" ) func (s *SnowflakeTestSuite) TestReplaceExceededValues() { From 8b5110edd741eb47fa6cb2f23ff0d90eac7c81ac Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 8 Oct 2024 09:30:07 -0700 Subject: [PATCH 5/5] No error. --- clients/snowflake/snowflake_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index 6bf041239..fdb98c6ec 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -79,11 +79,10 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() { s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&anotherCols, nil, false, true)) - err := s.stageStore.Merge(context.Background(), tableData) + assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) _col, isOk := tableData.ReadOnlyInMemoryCols().GetColumn("first_name") assert.True(s.T(), isOk) assert.Equal(s.T(), _col.KindDetails, typing.String) - assert.NoError(s.T(), err) } func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { @@ -124,7 +123,6 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { } s.stageStore.configMap.AddTableToConfig(s.identifierFor(tableData), types.NewDwhTableConfig(&cols, nil, false, true)) - assert.NoError(s.T(), s.stageStore.Merge(context.Background(), tableData)) assert.Equal(s.T(), 5, s.fakeStageStore.ExecCallCount()) }