diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index a8f019e8d..5b3332506 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -43,7 +43,9 @@ type Store struct { func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error { if !useTempTable { - return shared.Append(ctx, s, tableData, types.AdditionalSettings{}) + return shared.Append(ctx, s, tableData, types.AdditionalSettings{ + ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings, + }) } // We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data @@ -55,8 +57,9 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }() err := shared.Append(ctx, s, tableData, types.AdditionalSettings{ - UseTempTable: true, - TempTableID: temporaryTableID, + ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings, + UseTempTable: true, + TempTableID: temporaryTableID, }) if err != nil { @@ -77,9 +80,9 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u return nil } -func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { return err } } diff --git a/clients/bigquery/dialect/typing.go b/clients/bigquery/dialect/typing.go index 4542e27bc..e4c087492 100644 --- a/clients/bigquery/dialect/typing.go +++ b/clients/bigquery/dialect/typing.go @@ -4,11 +4,12 @@ import ( "fmt" "strings" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" ) -func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string { +func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, settings config.SharedDestinationColumnSettings) string { // Doesn't look like we need to do any special type mapping. switch kindDetails.Kind { case typing.Float.Kind: @@ -33,7 +34,7 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s // We should be using TIMESTAMP since it's an absolute point in time. return "timestamp" case typing.EDecimal.Kind: - return kindDetails.ExtendedDecimalDetails.BigQueryKind() + return kindDetails.ExtendedDecimalDetails.BigQueryKind(settings.BigQueryNumericForVariableNumeric) } return kindDetails.Kind diff --git a/clients/bigquery/dialect/typing_test.go b/clients/bigquery/dialect/typing_test.go index 8b42381a8..f5cc44a8e 100644 --- a/clients/bigquery/dialect/typing_test.go +++ b/clients/bigquery/dialect/typing_test.go @@ -3,18 +3,20 @@ package dialect import ( "testing" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/typing" ) func TestBigQueryDialect_DataTypeForKind(t *testing.T) { { // String { - assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.String, false)) + assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{})) } { - assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, true)) + assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, true, config.SharedDestinationColumnSettings{})) } } } @@ -30,7 +32,7 @@ func TestBigQueryDialect_KindForDataType_NoDataLoss(t *testing.T) { } for _, kindDetail := range kindDetails { - kd, err := BigQueryDialect{}.KindForDataType(BigQueryDialect{}.DataTypeForKind(kindDetail, false), "") + kd, err := BigQueryDialect{}.KindForDataType(BigQueryDialect{}.DataTypeForKind(kindDetail, false, config.SharedDestinationColumnSettings{}), "") assert.NoError(t, err) assert.Equal(t, kindDetail, kd) } @@ -76,7 +78,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) { assert.Equal(t, typing.EDecimal.Kind, kd.Kind) assert.Equal(t, int32(5), kd.ExtendedDecimalDetails.Precision()) assert.Equal(t, int32(0), kd.ExtendedDecimalDetails.Scale()) - assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind()) + assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind(false)) } { @@ -87,7 +89,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) { assert.Equal(t, typing.EDecimal.Kind, kd.Kind) assert.Equal(t, int32(5), kd.ExtendedDecimalDetails.Precision()) assert.Equal(t, int32(0), kd.ExtendedDecimalDetails.Scale()) - assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind()) + assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind(false)) } { // Numeric(5, 2) diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index 5d9197ee0..f17e2c843 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -33,6 +33,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er return shared.Merge(ctx, s, tableData, types.MergeOpts{ AdditionalEqualityStrings: additionalEqualityStrings, + ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings, // BigQuery has DDL quotas. RetryColBackfill: true, // We are using BigQuery's streaming API which doesn't guarantee exactly once semantics diff --git a/clients/databricks/dialect/typing.go b/clients/databricks/dialect/typing.go index ad8faa9bc..dafdf4dfa 100644 --- a/clients/databricks/dialect/typing.go +++ b/clients/databricks/dialect/typing.go @@ -4,11 +4,12 @@ import ( "fmt" "strings" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" ) -func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string { +func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string { switch kindDetails.Kind { case typing.Float.Kind: return "DOUBLE" diff --git a/clients/databricks/dialect/typing_test.go b/clients/databricks/dialect/typing_test.go index c7776684c..d91fade66 100644 --- a/clients/databricks/dialect/typing_test.go +++ b/clients/databricks/dialect/typing_test.go @@ -3,64 +3,66 @@ package dialect import ( "testing" + "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/stretchr/testify/assert" ) func TestDatabricksDialect_DataTypeForKind(t *testing.T) { { // Float - assert.Equal(t, "DOUBLE", DatabricksDialect{}.DataTypeForKind(typing.Float, false)) + assert.Equal(t, "DOUBLE", DatabricksDialect{}.DataTypeForKind(typing.Float, false, config.SharedDestinationColumnSettings{})) } { // Integer - assert.Equal(t, "BIGINT", DatabricksDialect{}.DataTypeForKind(typing.Integer, false)) + assert.Equal(t, "BIGINT", DatabricksDialect{}.DataTypeForKind(typing.Integer, false, config.SharedDestinationColumnSettings{})) } { // Variant - assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Struct.Kind}, false)) + assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Struct.Kind}, false, config.SharedDestinationColumnSettings{})) } { // Array - assert.Equal(t, "ARRAY", DatabricksDialect{}.DataTypeForKind(typing.Array, false)) + assert.Equal(t, "ARRAY", DatabricksDialect{}.DataTypeForKind(typing.Array, false, config.SharedDestinationColumnSettings{})) } { // String - assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.String, false)) + assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{})) } { // Boolean - assert.Equal(t, "BOOLEAN", DatabricksDialect{}.DataTypeForKind(typing.Boolean, false)) + assert.Equal(t, "BOOLEAN", DatabricksDialect{}.DataTypeForKind(typing.Boolean, false, config.SharedDestinationColumnSettings{})) } { // Times { // Date - assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.Date, false)) + assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.Date, false, config.SharedDestinationColumnSettings{})) } { // Timestamp - assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false)) + assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false, config.SharedDestinationColumnSettings{})) } { // Timestamp (w/o timezone) - assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false)) + assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false, config.SharedDestinationColumnSettings{})) } { // Time - assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.Time, false)) + assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.Time, false, config.SharedDestinationColumnSettings{})) } } { // Decimals { // Below 38 precision - assert.Equal(t, "DECIMAL(10, 2)", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(10, 2))}, false)) + assert.Equal(t, "DECIMAL(10, 2)", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(10, 2))}, false, config.SharedDestinationColumnSettings{})) } { // Above 38 precision - assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(40, 2))}, false)) + assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(40, 2))}, false, config.SharedDestinationColumnSettings{})) } } } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index c03795ade..6713468d1 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -80,9 +80,9 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl }.GetTableConfig() } -func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { return err } } diff --git a/clients/mssql/dialect/typing.go b/clients/mssql/dialect/typing.go index 62f500da5..d4645900c 100644 --- a/clients/mssql/dialect/typing.go +++ b/clients/mssql/dialect/typing.go @@ -5,11 +5,12 @@ import ( "strconv" "strings" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" ) -func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) string { +func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool, _ config.SharedDestinationColumnSettings) string { // Primary keys cannot exceed 900 chars in length. // https://learn.microsoft.com/en-us/sql/relational-databases/tables/primary-and-foreign-key-constraints?view=sql-server-ver16#PKeys const maxVarCharLengthForPrimaryKey = 900 diff --git a/clients/mssql/dialect/typing_test.go b/clients/mssql/dialect/typing_test.go index 46d3eb6e4..a4f0fdb97 100644 --- a/clients/mssql/dialect/typing_test.go +++ b/clients/mssql/dialect/typing_test.go @@ -3,8 +3,10 @@ package dialect import ( "testing" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/typing" ) func TestMSSQLDialect_DataTypeForKind(t *testing.T) { @@ -30,8 +32,8 @@ func TestMSSQLDialect_DataTypeForKind(t *testing.T) { } for idx, tc := range tcs { - assert.Equal(t, tc.expected, MSSQLDialect{}.DataTypeForKind(tc.kd, false), idx) - assert.Equal(t, tc.expectedIsPk, MSSQLDialect{}.DataTypeForKind(tc.kd, true), idx) + assert.Equal(t, tc.expected, MSSQLDialect{}.DataTypeForKind(tc.kd, false, config.SharedDestinationColumnSettings{}), idx) + assert.Equal(t, tc.expectedIsPk, MSSQLDialect{}.DataTypeForKind(tc.kd, true, config.SharedDestinationColumnSettings{}), idx) } } diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index 835f3a4cd..5bb2ebe2e 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -13,9 +13,9 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { return err } } diff --git a/clients/redshift/dialect/typing.go b/clients/redshift/dialect/typing.go index aecccf26d..6caa1cba9 100644 --- a/clients/redshift/dialect/typing.go +++ b/clients/redshift/dialect/typing.go @@ -5,11 +5,12 @@ import ( "strconv" "strings" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" ) -func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string { +func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string { switch kd.Kind { case typing.Integer.Kind: if kd.OptionalIntegerKind != nil { diff --git a/clients/redshift/dialect/typing_test.go b/clients/redshift/dialect/typing_test.go index 40ac8cb7c..09ad56e4a 100644 --- a/clients/redshift/dialect/typing_test.go +++ b/clients/redshift/dialect/typing_test.go @@ -3,42 +3,44 @@ package dialect import ( "testing" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/typing" ) func TestRedshiftDialect_DataTypeForKind(t *testing.T) { { // String { - assert.Equal(t, "VARCHAR(MAX)", RedshiftDialect{}.DataTypeForKind(typing.String, true)) + assert.Equal(t, "VARCHAR(MAX)", RedshiftDialect{}.DataTypeForKind(typing.String, true, config.SharedDestinationColumnSettings{})) } { - assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false)) + assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false, config.SharedDestinationColumnSettings{})) } } { // Integers { // Small int - assert.Equal(t, "INT2", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, false)) + assert.Equal(t, "INT2", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, false, config.SharedDestinationColumnSettings{})) } { // Integer - assert.Equal(t, "INT4", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.IntegerKind)}, false)) + assert.Equal(t, "INT4", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.IntegerKind)}, false, config.SharedDestinationColumnSettings{})) } { // Big integer - assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.BigIntegerKind)}, false)) + assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.BigIntegerKind)}, false, config.SharedDestinationColumnSettings{})) } { // Not specified { // Literal - assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.NotSpecifiedKind)}, false)) + assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.NotSpecifiedKind)}, false, config.SharedDestinationColumnSettings{})) } { - assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.Integer, false)) + assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.Integer, false, config.SharedDestinationColumnSettings{})) } } } @@ -46,11 +48,11 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) { // Timestamps { // With timezone - assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false)) + assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false, config.SharedDestinationColumnSettings{})) } { // Without timezone - assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false)) + assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false, config.SharedDestinationColumnSettings{})) } } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 4e233286d..99d465c77 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -18,7 +18,7 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { fp, colToNewLengthMap, err := s.loadTemporaryTable(tableData, tempTableID) if err != nil { return fmt.Errorf("failed to load temporary table: %w", err) @@ -40,7 +40,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if createTempTable { - if err = shared.CreateTable(ctx, s, tableData, tableConfig, tempTableID, true); err != nil { + if err = shared.CreateTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID, true); err != nil { return err } } diff --git a/clients/shared/append.go b/clients/shared/append.go index 88cdf9275..ec762064c 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -32,11 +32,11 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { - if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil { + if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil { return fmt.Errorf("failed to create table: %w", err) } } else { - if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil { + if err = AlterTableAddColumns(ctx, dwh, tableConfig, opts.ColumnSettings, tableID, targetKeysMissing); err != nil { return fmt.Errorf("failed to alter table: %w", err) } } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 83de84557..41ce87001 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -38,11 +38,11 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { - if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil { + if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil { return fmt.Errorf("failed to create table: %w", err) } } else { - if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil { + if err = AlterTableAddColumns(ctx, dwh, tableConfig, opts.ColumnSettings, tableID, targetKeysMissing); err != nil { return fmt.Errorf("failed to add columns for table %q: %w", tableID.Table(), err) } } diff --git a/clients/shared/table.go b/clients/shared/table.go index e92ab6739..aab6809a4 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -6,6 +6,7 @@ import ( "log/slog" "time" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/ddl" @@ -15,8 +16,8 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, tableID sql.TableIdentifier, tempTable bool) error { - query, err := ddl.BuildCreateTableSQL(dwh.Dialect(), tableID, tempTable, tableData.Mode(), tableData.ReadOnlyInMemoryCols().GetColumns()) +func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool) error { + query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), tableData.ReadOnlyInMemoryCols().GetColumns()) if err != nil { return fmt.Errorf("failed to build create table sql: %w", err) } @@ -31,7 +32,7 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData * return nil } -func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column) error { +func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, cols []columns.Column) error { if len(cols) == 0 { return nil } @@ -45,7 +46,7 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc colsToAdd = append(colsToAdd, col) } - sqlParts, err := ddl.BuildAlterTableAddColumns(dwh.Dialect(), tableID, colsToAdd) + sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, colsToAdd) if err != nil { return fmt.Errorf("failed to build alter table add columns: %w", err) } diff --git a/clients/snowflake/dialect/typing.go b/clients/snowflake/dialect/typing.go index ce43f7096..f70e9fef5 100644 --- a/clients/snowflake/dialect/typing.go +++ b/clients/snowflake/dialect/typing.go @@ -5,11 +5,12 @@ import ( "strconv" "strings" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" ) -func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string { +func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string { switch kindDetails.Kind { case typing.Struct.Kind: // Snowflake doesn't recognize struct. diff --git a/clients/snowflake/dialect/typing_test.go b/clients/snowflake/dialect/typing_test.go index 2c64f1aa0..aacdc2703 100644 --- a/clients/snowflake/dialect/typing_test.go +++ b/clients/snowflake/dialect/typing_test.go @@ -3,18 +3,20 @@ package dialect import ( "testing" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/typing" ) func TestSnowflakeDialect_DataTypeForKind(t *testing.T) { { // String { - assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false)) + assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{})) } { - assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false)) + assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false, config.SharedDestinationColumnSettings{})) } } } @@ -193,7 +195,7 @@ func TestSnowflakeDialect_KindForDataType_NoDataLoss(t *testing.T) { } for _, kindDetail := range kindDetails { - kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false), "") + kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false, config.SharedDestinationColumnSettings{}), "") assert.NoError(t, err) assert.Equal(t, kindDetail, kd) } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index be6db8f0b..4a09b6a12 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -52,7 +52,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, additionalSettings.ColumnSettings, tempTableID, true); err != nil { return err } } diff --git a/lib/config/types.go b/lib/config/types.go index 7ba7c4e8c..af370c666 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -21,6 +21,7 @@ type Pubsub struct { TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` PathToCredentials string `yaml:"pathToCredentials"` } + type Kafka struct { // Comma-separated Kafka servers to port. // e.g. host1:port1,host2:port2,... @@ -36,12 +37,18 @@ type Kafka struct { DisableTLS bool `yaml:"disableTLS,omitempty"` } +type SharedDestinationColumnSettings struct { + // BigQueryNumericForVariableNumeric - If enabled, we will use BigQuery's NUMERIC type for variable numeric types. + BigQueryNumericForVariableNumeric bool `yaml:"bigQueryNumericForVariableNumeric"` +} + type SharedDestinationSettings struct { // TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value` TruncateExceededValues bool `yaml:"truncateExceededValues"` // ExpandStringPrecision - This will expand the string precision if the incoming data has a higher precision than the destination table. // This is only supported by Redshift at the moment. - ExpandStringPrecision bool `yaml:"expandStringPrecision"` + ExpandStringPrecision bool `yaml:"expandStringPrecision"` + ColumnSettings SharedDestinationColumnSettings `yaml:"columnSettings"` } type Reporting struct { diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 4888fa17f..3d49a8ad2 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -17,7 +17,7 @@ func shouldCreatePrimaryKey(col columns.Column, mode config.Mode, createTable bo return col.PrimaryKey() && mode == config.Replication && createTable } -func BuildCreateTableSQL(dialect sql.Dialect, tableIdentifier sql.TableIdentifier, temporaryTable bool, mode config.Mode, columns []columns.Column) (string, error) { +func BuildCreateTableSQL(settings config.SharedDestinationColumnSettings, dialect sql.Dialect, tableIdentifier sql.TableIdentifier, temporaryTable bool, mode config.Mode, columns []columns.Column) (string, error) { if len(columns) == 0 { return "", fmt.Errorf("no columns provided") } @@ -34,7 +34,7 @@ func BuildCreateTableSQL(dialect sql.Dialect, tableIdentifier sql.TableIdentifie primaryKeys = append(primaryKeys, colName) } - parts = append(parts, fmt.Sprintf("%s %s", colName, dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey()))) + parts = append(parts, fmt.Sprintf("%s %s", colName, dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey(), settings))) } if len(primaryKeys) > 0 { @@ -71,14 +71,14 @@ func DropTemporaryTable(dwh destination.DataWarehouse, tableIdentifier sql.Table return nil } -func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier, cols []columns.Column) ([]string, error) { +func BuildAlterTableAddColumns(settings config.SharedDestinationColumnSettings, dialect sql.Dialect, tableID sql.TableIdentifier, cols []columns.Column) ([]string, error) { var parts []string for _, col := range cols { if col.ShouldSkip() { return nil, fmt.Errorf("received an invalid column %q", col.Name()) } - sqlPart := fmt.Sprintf("%s %s", dialect.QuoteIdentifier(col.Name()), dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey())) + sqlPart := fmt.Sprintf("%s %s", dialect.QuoteIdentifier(col.Name()), dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey(), settings)) parts = append(parts, dialect.BuildAddColumnQuery(tableID, sqlPart)) } diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 9c68d73eb..a2e5ad98b 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -106,11 +106,11 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) for name, kind := range newCols { col := columns.NewColumn(name, kind) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{col})) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{col})) _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()), - d.bigQueryStore.Dialect().DataTypeForKind(kind, false)), query) + d.bigQueryStore.Dialect().DataTypeForKind(kind, false, config.SharedDestinationColumnSettings{})), query) callIdx += 1 } @@ -156,10 +156,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { // BQ returning the same error because the column already exists. d.fakeBigQueryStore.ExecContextReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]")) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column})) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{column})) _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()), - d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false)), query) + d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false, config.SharedDestinationColumnSettings{})), query) callIdx += 1 } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index feae06910..59a373373 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -11,6 +11,7 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/typing" @@ -28,12 +29,12 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { tableID := dialect.NewTableIdentifier("shop", "public", "complex_columns") d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) for i := 0; i < len(cols); i++ { _, execQuery, _ := d.fakeSnowflakeStagesStore.ExecContextArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`, d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), - d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false)), execQuery) + d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false, config.SharedDestinationColumnSettings{})), execQuery) } assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") @@ -53,11 +54,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() { d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") d.fakeSnowflakeStagesStore.ExecContextReturns(nil, errors.New("table does not exist")) - assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols), `failed to alter table: table does not exist`) + assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols), `failed to alter table: table does not exist`) } func (d *DDLTestSuite) TestAlterTableAdd() { @@ -73,7 +74,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols)) assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") // Check the table config diff --git a/lib/destination/ddl/ddl_temp_test.go b/lib/destination/ddl/ddl_temp_test.go index f357b0ddd..bd2e2f2bf 100644 --- a/lib/destination/ddl/ddl_temp_test.go +++ b/lib/destination/ddl/ddl_temp_test.go @@ -21,14 +21,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() { { // Snowflake Stage tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName") - query, err := ddl.BuildCreateTableSQL(d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)}) + query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)}) assert.NoError(d.T(), err) assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`) } { // BigQuery tableID := bigQueryDialect.NewTableIdentifier("db", "schema", "tempTableName") - query, err := ddl.BuildCreateTableSQL(d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)}) + query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)}) assert.NoError(d.T(), err) // Cutting off the expiration_timestamp since it's time based. assert.Contains(d.T(), query, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (`foo` string,`bar` float64,`select` string) OPTIONS (expiration_timestamp =", query) diff --git a/lib/destination/ddl/ddl_test.go b/lib/destination/ddl/ddl_test.go index 19eca3094..edc0af532 100644 --- a/lib/destination/ddl/ddl_test.go +++ b/lib/destination/ddl/ddl_test.go @@ -45,7 +45,7 @@ func TestShouldCreatePrimaryKey(t *testing.T) { func TestBuildCreateTableSQL(t *testing.T) { { // No columns provided - _, err := BuildCreateTableSQL(nil, nil, false, config.Replication, []columns.Column{}) + _, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, nil, nil, false, config.Replication, []columns.Column{}) assert.ErrorContains(t, err, "no columns provided") } { @@ -54,7 +54,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // Redshift { // No primary key - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.String), }) @@ -65,7 +65,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // With primary key pk := columns.NewColumn("pk", typing.String) pk.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ pk, columns.NewColumn("bar", typing.String), }) @@ -79,7 +79,7 @@ func TestBuildCreateTableSQL(t *testing.T) { pk2 := columns.NewColumn("pk2", typing.String) pk2.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{ pk1, pk2, columns.NewColumn("bar", typing.String), @@ -94,7 +94,7 @@ func TestBuildCreateTableSQL(t *testing.T) { // With primary key pk := columns.NewColumn("pk", typing.String) pk.SetPrimaryKeyForTest(true) - sql, err := BuildCreateTableSQL(bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{ + sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{ pk, columns.NewColumn("bar", typing.String), }) @@ -108,14 +108,14 @@ func TestBuildCreateTableSQL(t *testing.T) { func TestBuildAlterTableAddColumns(t *testing.T) { { // No columns - sqlParts, err := BuildAlterTableAddColumns(nil, nil, []columns.Column{}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, nil, nil, []columns.Column{}) assert.NoError(t, err) assert.Empty(t, sqlParts) } { // One column to add col := columns.NewColumn("dusty", typing.String) - sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col}) assert.NoError(t, err) assert.Len(t, sqlParts, 1) assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "dusty" VARCHAR(MAX)`, sqlParts[0]) @@ -123,7 +123,7 @@ func TestBuildAlterTableAddColumns(t *testing.T) { { // Two columns, one invalid, it will error. col := columns.NewColumn("dusty", typing.String) - _, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), + _, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{ col, columns.NewColumn("invalid", typing.Invalid), @@ -138,7 +138,7 @@ func TestBuildAlterTableAddColumns(t *testing.T) { col2 := columns.NewColumn("doge", typing.String) col3 := columns.NewColumn("age", typing.Integer) - sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3}) + sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3}) assert.NoError(t, err) assert.Len(t, sqlParts, 3) assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "aussie" VARCHAR(MAX)`, sqlParts[0]) diff --git a/lib/destination/types/types.go b/lib/destination/types/types.go index 01cc6a06a..ed50472db 100644 --- a/lib/destination/types/types.go +++ b/lib/destination/types/types.go @@ -3,6 +3,7 @@ package types import ( "sync" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/sql" ) @@ -35,13 +36,15 @@ func (d *DwhToTablesConfigMap) AddTableToConfig(tableID sql.TableIdentifier, con } type MergeOpts struct { - SubQueryDedupe bool AdditionalEqualityStrings []string + ColumnSettings config.SharedDestinationColumnSettings RetryColBackfill bool + SubQueryDedupe bool } type AdditionalSettings struct { AdditionalCopyClause string + ColumnSettings config.SharedDestinationColumnSettings // These settings are used for the `Append` method. UseTempTable bool diff --git a/lib/sql/dialect.go b/lib/sql/dialect.go index a128ffe2d..bbe624561 100644 --- a/lib/sql/dialect.go +++ b/lib/sql/dialect.go @@ -1,6 +1,7 @@ package sql import ( + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -25,7 +26,7 @@ type TableIdentifier interface { type Dialect interface { QuoteIdentifier(identifier string) string EscapeStruct(value string) string - DataTypeForKind(kd typing.KindDetails, isPk bool) string + DataTypeForKind(kd typing.KindDetails, isPk bool, settings config.SharedDestinationColumnSettings) string KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error) IsColumnAlreadyExistsErr(err error) bool IsTableDoesNotExistErr(err error) bool diff --git a/lib/typing/decimal/details.go b/lib/typing/decimal/details.go index dd5018b9b..c0b6355ef 100644 --- a/lib/typing/decimal/details.go +++ b/lib/typing/decimal/details.go @@ -70,7 +70,11 @@ func (d Details) RedshiftKind() string { } // BigQueryKind - is inferring logic from: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types -func (d Details) BigQueryKind() string { +func (d Details) BigQueryKind(numericTypeForVariableNumeric bool) string { + if numericTypeForVariableNumeric && d.precision == PrecisionNotSpecified { + return "NUMERIC" + } + if d.isNumeric() { return fmt.Sprintf("NUMERIC(%v, %v)", d.precision, d.scale) } else if d.isBigNumeric() { diff --git a/lib/typing/decimal/details_test.go b/lib/typing/decimal/details_test.go index bacaa2862..966be36da 100644 --- a/lib/typing/decimal/details_test.go +++ b/lib/typing/decimal/details_test.go @@ -6,6 +6,19 @@ import ( "github.com/stretchr/testify/assert" ) +func TestDetails_BigQueryKind(t *testing.T) { + // Variable numeric + details := NewDetails(PrecisionNotSpecified, DefaultScale) + { + // numericTypeForVariableNumeric = false + assert.Equal(t, "STRING", details.BigQueryKind(false)) + } + { + // numericTypeForVariableNumeric = true + assert.Equal(t, "NUMERIC", details.BigQueryKind(true)) + } +} + func TestDecimalDetailsKind(t *testing.T) { type _testCase struct { Name string @@ -71,6 +84,6 @@ func TestDecimalDetailsKind(t *testing.T) { d := NewDetails(testCase.Precision, testCase.Scale) assert.Equal(t, testCase.ExpectedSnowflakeKind, d.SnowflakeKind(), testCase.Name) assert.Equal(t, testCase.ExpectedRedshiftKind, d.RedshiftKind(), testCase.Name) - assert.Equal(t, testCase.ExpectedBigQueryKind, d.BigQueryKind(), testCase.Name) + assert.Equal(t, testCase.ExpectedBigQueryKind, d.BigQueryKind(false), testCase.Name) } }