diff --git a/clients/snowflake/dialect/typing_test.go b/clients/snowflake/dialect/typing_test.go index 2c64f1aa0..aacdc2703 100644 --- a/clients/snowflake/dialect/typing_test.go +++ b/clients/snowflake/dialect/typing_test.go @@ -3,18 +3,20 @@ package dialect import ( "testing" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/typing" ) func TestSnowflakeDialect_DataTypeForKind(t *testing.T) { { // String { - assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false)) + assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{})) } { - assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false)) + assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false, config.SharedDestinationColumnSettings{})) } } } @@ -193,7 +195,7 @@ func TestSnowflakeDialect_KindForDataType_NoDataLoss(t *testing.T) { } for _, kindDetail := range kindDetails { - kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false), "") + kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false, config.SharedDestinationColumnSettings{}), "") assert.NoError(t, err) assert.Equal(t, kindDetail, kd) } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 9c68d73eb..a2e5ad98b 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -106,11 +106,11 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) for name, kind := range newCols { col := columns.NewColumn(name, kind) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{col})) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{col})) _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()), - d.bigQueryStore.Dialect().DataTypeForKind(kind, false)), query) + d.bigQueryStore.Dialect().DataTypeForKind(kind, false, config.SharedDestinationColumnSettings{})), query) callIdx += 1 } @@ -156,10 +156,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { // BQ returning the same error because the column already exists. d.fakeBigQueryStore.ExecContextReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]")) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column})) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{column})) _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()), - d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false)), query) + d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false, config.SharedDestinationColumnSettings{})), query) callIdx += 1 } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index feae06910..d0910b5e7 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -7,6 +7,8 @@ import ( "slices" "time" + "github.com/artie-labs/transfer/lib/config" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/clients/shared" @@ -28,12 +30,12 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { tableID := dialect.NewTableIdentifier("shop", "public", "complex_columns") d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) for i := 0; i < len(cols); i++ { _, execQuery, _ := d.fakeSnowflakeStagesStore.ExecContextArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`, d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), - d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false)), execQuery) + d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false, config.SharedDestinationColumnSettings{})), execQuery) } assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") @@ -53,11 +55,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() { d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") d.fakeSnowflakeStagesStore.ExecContextReturns(nil, errors.New("table does not exist")) - assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols), `failed to alter table: table does not exist`) + assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols), `failed to alter table: table does not exist`) } func (d *DDLTestSuite) TestAlterTableAdd() { @@ -73,7 +75,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") // Check the table config diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index f357b0ddd..bd2e2f2bf 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -21,14 +21,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { { // Snowflake Stage tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName") - query, err := ddl.BuildCreateTableSQL(d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)}) + query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)}) assert.NoError(d.T(), err) assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`) } { // BigQuery tableID := bigQueryDialect.NewTableIdentifier("db", "schema", "tempTableName") - query, err := ddl.BuildCreateTableSQL(d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)}) + query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)}) assert.NoError(d.T(), err) // Cutting off the expiration_timestamp since it's time based. assert.Contains(d.T(), query, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (`foo` string,`bar` float64,`select` string) OPTIONS (expiration_timestamp =", query) diff --git a/lib/destination/ddl/ddl_test.go b/lib/destination/ddl/ddl_test.go index 19eca3094..edc0af532 100644 --- a/lib/destination/ddl/ddl_test.go +++ b/lib/destination/ddl/ddl_test.go @@ -45,7 +45,7 @@ func TestShouldCreatePrimaryKey(t *testing.T) { func TestBuildCreateTableSQL(t *testing.T) { { // No columns provided - _, err := BuildCreateTableSQL(nil, nil, false, config.Replication, []columns.Column{}) + _, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, nil, nil, false, config.Replication, []columns.Column{}) assert.ErrorContains(t, err, "no columns provided") } { @@ -54,7 +54,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // Redshift { // No primary key - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.String), }) @@ -65,7 +65,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // With primary key pk := columns.NewColumn("pk", typing.String) pk.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ pk, columns.NewColumn("bar", typing.String), }) @@ -79,7 +79,7 @@ func TestBuildCreateTableSQL(t *testing.T) { pk2 := columns.NewColumn("pk2", typing.String) pk2.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ pk1, pk2, columns.NewColumn("bar", typing.String), @@ -94,7 +94,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // With primary key pk := columns.NewColumn("pk", typing.String) pk.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{ pk, columns.NewColumn("bar", typing.String), }) @@ -108,14 +108,14 @@ func TestBuildCreateTableSQL(t *testing.T) { func TestBuildAlterTableAddColumns(t *testing.T) { { // No columns - sqlParts, err := BuildAlterTableAddColumns(nil, nil, []columns.Column{}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, nil, nil, []columns.Column{}) assert.NoError(t, err) assert.Empty(t, sqlParts) } { // One column to add col := columns.NewColumn("dusty", typing.String) - sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col}) assert.NoError(t, err) assert.Len(t, sqlParts, 1) assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "dusty" VARCHAR(MAX)`, sqlParts[0]) @@ -123,7 +123,7 @@ func TestBuildAlterTableAddColumns(t *testing.T) { { // Two columns, one invalid, it will error. col := columns.NewColumn("dusty", typing.String) - _, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), + _, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{ col, columns.NewColumn("invalid", typing.Invalid), @@ -138,7 +138,7 @@ func TestBuildAlterTableAddColumns(t *testing.T) { col2 := columns.NewColumn("doge", typing.String) col3 := columns.NewColumn("age", typing.Integer) - sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3}) assert.NoError(t, err) assert.Len(t, sqlParts, 3) assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "aussie" VARCHAR(MAX)`, sqlParts[0])