From 81a45f0c2e55e7cb4bd81bb324848ba8ad4d9157 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 29 Apr 2024 13:40:32 -0700 Subject: [PATCH] Standing up guardrails for Invalid Cols (#512) --- clients/shared/append.go | 14 +- clients/shared/merge.go | 5 +- lib/optimization/event_update_test.go | 261 ++++++++++++++------------ lib/optimization/table_data.go | 10 +- lib/typing/bigquery_test.go | 8 +- lib/typing/redshift.go | 3 +- lib/typing/snowflake_test.go | 6 +- lib/typing/typing.go | 17 +- 8 files changed, 183 insertions(+), 141 deletions(-) diff --git a/clients/shared/append.go b/clients/shared/append.go index 21c755194..06f1e2172 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -1,7 +1,7 @@ package shared import ( - "log/slog" + "fmt" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" @@ -20,7 +20,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) tableConfig, err := dwh.GetTableConfig(tableData) if err != nil { - return err + return fmt.Errorf("failed to get table config: %w", err) } // We don't care about srcKeysMissing because we don't drop columns when we append. @@ -40,13 +40,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op } // Keys that exist in CDC stream, but not in DWH - err = createAlterTableArgs.AlterTable(targetKeysMissing...) - if err != nil { - slog.Warn("Failed to apply alter table", slog.Any("err", err)) - return err + if err = createAlterTableArgs.AlterTable(targetKeysMissing...); err != nil { + return fmt.Errorf("failed to alter table: %w", err) } - tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...) + if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + return fmt.Errorf("failed to merge columns from destination: %w", err) + } additionalSettings := types.AdditionalSettings{ AdditionalCopyClause: opts.AdditionalCopyClause, diff --git a/clients/shared/merge.go b/clients/shared/merge.go index b9247faa6..47ce3c6d3 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -69,7 +69,10 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg } tableConfig.AuditColumnsToDelete(srcKeysMissing) - tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...) + if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + return fmt.Errorf("failed to merge columns from destination: %w", err) + } + temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix()) temporaryTableName := temporaryTableID.FullyQualifiedName() if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil { diff --git a/lib/optimization/event_update_test.go b/lib/optimization/event_update_test.go index af4a2e1eb..2d5fa5e78 100644 --- a/lib/optimization/event_update_test.go +++ b/lib/optimization/event_update_test.go @@ -11,139 +11,160 @@ import ( "github.com/stretchr/testify/assert" ) +const strCol = "string" + func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { - const strCol = "string" + { + tableDataCols := &columns.Columns{} + tableData := &TableData{ + inMemoryColumns: tableDataCols, + } - tableDataCols := &columns.Columns{} - tableDataCols.AddColumn(columns.NewColumn("name", typing.String)) - tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean)) - tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid)) - tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal)) + tableData.AddInMemoryCol(columns.NewColumn("foo", typing.String)) + invalidCol := columns.NewColumn("foo", typing.Invalid) + assert.ErrorContains(t, tableData.MergeColumnsFromDestination(invalidCol), `column "foo" is invalid`) + } + { + tableDataCols := &columns.Columns{} + tableData := &TableData{ + inMemoryColumns: tableDataCols, + } - // Casting these as STRING so tableColumn via this f(x) will set it correctly. - tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String)) - tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String)) - tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String)) - tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("name", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean)) + tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid)) + tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal)) - extDecimalType := typing.EDecimal - extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil) - tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType)) + // Casting these as STRING so tableColumn via this f(x) will set it correctly. + tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String)) + tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String)) - tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String)) + extDecimalType := typing.EDecimal + extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil) + tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType)) - tableData := &TableData{ - inMemoryColumns: tableDataCols, - } + tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String)) - nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"} - var nonExistentCols []columns.Column - for _, nonExistentTableCol := range nonExistentTableCols { - nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String)) - } + nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"} + var nonExistentCols []columns.Column + for _, nonExistentTableCol := range nonExistentTableCols { + nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String)) + } - // Testing to make sure we don't copy over non-existent columns - tableData.MergeColumnsFromDestination(nonExistentCols...) - for _, nonExistentTableCol := range nonExistentTableCols { - _, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol) - assert.False(t, isOk, nonExistentTableCol) - } + // Testing to make sure we don't copy over non-existent columns + assert.NoError(t, tableData.MergeColumnsFromDestination(nonExistentCols...)) + for _, nonExistentTableCol := range nonExistentTableCols { + _, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol) + assert.False(t, isOk, nonExistentTableCol) + } - // Making sure it's still numeric - tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer)) - numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test") - assert.True(t, isOk) - assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test") - - // Testing to make sure we're copying the kindDetails over. - tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String)) - prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid") - assert.True(t, isOk) - assert.Equal(t, typing.String, prevInvalidCol.KindDetails) - - // Testing backfill - for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { - assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) - } - backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean) - backfilledCol.SetBackfilled(true) - tableData.MergeColumnsFromDestination(backfilledCol) - for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { - if inMemoryCol.RawName() == backfilledCol.RawName() { - assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) - } else { + // Making sure it's still numeric + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer))) + numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test") + assert.True(t, isOk) + assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test") + + // Testing to make sure we're copying the kindDetails over. + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String))) + prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid") + assert.True(t, isOk) + assert.Equal(t, typing.String, prevInvalidCol.KindDetails) + + // Testing backfill + for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) } - } + backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean) + backfilledCol.SetBackfilled(true) + assert.NoError(t, tableData.MergeColumnsFromDestination(backfilledCol)) + for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() { + if inMemoryCol.RawName() == backfilledCol.RawName() { + assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) + } else { + assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName()) + } + } + + // Testing extTimeDetails + for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} { + col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol) + assert.True(t, isOk, extTimeDetailsCol) + assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol) + assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol) + } - // Testing extTimeDetails - for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} { - col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol) - assert.True(t, isOk, extTimeDetailsCol) - assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol) - assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)))) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)))) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)))) + + dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date") + assert.True(t, isOk) + assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails) + assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type) + + timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time") + assert.True(t, isOk) + assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails) + assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type) + + dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime") + assert.True(t, isOk) + assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails) + assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type) + + // Testing extDecimalDetails + // Confirm that before you update, it's invalid. + extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec") + assert.True(t, isOk) + assert.Equal(t, typing.String, extDecCol.KindDetails) + + extDecimal := typing.EDecimal + extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil) + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal))) + // Now it should be ext decimal type + extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec") + assert.True(t, isOk) + assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind) + // Check precision and scale too. + assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision()) + assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale()) + + // Testing ext_dec_filled since it's already filled out + extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled") + assert.True(t, isOk) + assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind) + // Check precision and scale too. + assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision()) + assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale()) + + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal))) + extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled") + assert.True(t, isOk) + assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind) + // Check precision and scale too. + assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision()) + assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale()) } + { + tableDataCols := &columns.Columns{} + tableData := &TableData{ + inMemoryColumns: tableDataCols, + } + + tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String)) + + // Testing string precision + stringKindWithPrecision := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: ptr.ToInt(123), + } - tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType))) - tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType))) - tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType))) - - dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date") - assert.True(t, isOk) - assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails) - assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type) - - timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time") - assert.True(t, isOk) - assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails) - assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type) - - dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime") - assert.True(t, isOk) - assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails) - assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type) - - // Testing extDecimalDetails - // Confirm that before you update, it's invalid. - extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec") - assert.True(t, isOk) - assert.Equal(t, typing.String, extDecCol.KindDetails) - - extDecimal := typing.EDecimal - extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil) - tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal)) - // Now it should be ext decimal type - extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec") - assert.True(t, isOk) - assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind) - // Check precision and scale too. - assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision()) - assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale()) - - // Testing ext_dec_filled since it's already filled out - extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled") - assert.True(t, isOk) - assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind) - // Check precision and scale too. - assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision()) - assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale()) - - tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal)) - extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled") - assert.True(t, isOk) - assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind) - // Check precision and scale too. - assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision()) - assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale()) - - // Testing string precision - stringKindWithPrecision := typing.KindDetails{ - Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt(123), + assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision))) + foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol) + assert.True(t, isOk) + assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind) + assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision) } - tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision)) - foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol) - assert.True(t, isOk) - assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind) - assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision) } diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index 89b3eb264..e47839528 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -248,9 +248,9 @@ func (t *TableData) ShouldFlush(cfg config.Config) (bool, string) { // Prior to merging, we will need to treat `tableConfig` as the source-of-truth and whenever there's discrepancies // We will prioritize using the values coming from (2) TableConfig. We also cannot simply do a replacement, as we have in-memory columns // That carry metadata for Artie Transfer. They are prefixed with __artie. -func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) { +func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) error { if t == nil || len(destCols) == 0 { - return + return nil } for _, inMemoryCol := range t.inMemoryColumns.GetColumns() { @@ -258,6 +258,10 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) { var found bool for _, destCol := range destCols { if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) { + if destCol.KindDetails.Kind == typing.Invalid.Kind { + return fmt.Errorf("column %q is invalid", destCol.RawName()) + } + foundColumn = destCol found = true break @@ -297,4 +301,6 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) { t.inMemoryColumns.UpdateColumn(inMemoryCol) } } + + return nil } diff --git a/lib/typing/bigquery_test.go b/lib/typing/bigquery_test.go index 64337909e..11c126d20 100644 --- a/lib/typing/bigquery_test.go +++ b/lib/typing/bigquery_test.go @@ -1,6 +1,7 @@ package typing import ( + "fmt" "testing" "time" @@ -52,7 +53,12 @@ func TestBigQueryTypeToKind(t *testing.T) { for bqCol, expectedKind := range bqColToExpectedKind { kd, err := DwhTypeToKind(constants.BigQuery, bqCol, "") - assert.NoError(t, err) + if expectedKind.Kind == Invalid.Kind { + assert.ErrorContains(t, err, fmt.Sprintf("unable to map type: %q to dwh type", bqCol)) + } else { + assert.NoError(t, err) + } + assert.Equal(t, expectedKind.Kind, kd.Kind, bqCol) } } diff --git a/lib/typing/redshift.go b/lib/typing/redshift.go index 18c96d794..91f5f5daf 100644 --- a/lib/typing/redshift.go +++ b/lib/typing/redshift.go @@ -9,6 +9,7 @@ import ( ) func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails { + // TODO: Check if there are any missing Redshift data types. if strings.HasPrefix(rawType, "numeric") { return ParseNumeric(defaultPrefix, rawType) } @@ -29,7 +30,7 @@ func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails { switch rawType { case "super": return Struct - case "integer", "bigint": + case "smallint", "integer", "bigint": return Integer case "double precision": return Float diff --git a/lib/typing/snowflake_test.go b/lib/typing/snowflake_test.go index 571dcdbd1..7a5eb1f2b 100644 --- a/lib/typing/snowflake_test.go +++ b/lib/typing/snowflake_test.go @@ -41,7 +41,7 @@ func TestSnowflakeTypeToKindFloats(t *testing.T) { { // Invalid because precision nor scale is included. kd, err := DwhTypeToKind(constants.Snowflake, "NUMERIC", "") - assert.NoError(t, err) + assert.ErrorContains(t, err, `unable to map type: "numeric" to dwh type`) assert.Equal(t, Invalid, kd) } { @@ -105,12 +105,12 @@ func TestSnowflakeTypeToKindComplex(t *testing.T) { func TestSnowflakeTypeToKindErrors(t *testing.T) { { kd, err := DwhTypeToKind(constants.Snowflake, "", "") - assert.NoError(t, err) + assert.ErrorContains(t, err, `unable to map type: "" to dwh type`) assert.Equal(t, Invalid, kd) } { kd, err := DwhTypeToKind(constants.Snowflake, "abc123", "") - assert.NoError(t, err) + assert.ErrorContains(t, err, `unable to map type: "abc123" to dwh type`) assert.Equal(t, Invalid, kd) } } diff --git a/lib/typing/typing.go b/lib/typing/typing.go index 16b635b0d..ac3b1f72c 100644 --- a/lib/typing/typing.go +++ b/lib/typing/typing.go @@ -194,17 +194,22 @@ func KindToDWHType(kd KindDetails, dwh constants.DestinationKind, isPk bool) str func DwhTypeToKind(dwh constants.DestinationKind, dwhType, stringPrecision string) (KindDetails, error) { dwhType = strings.ToLower(dwhType) - + var kd KindDetails switch dwh { case constants.Snowflake: - return snowflakeTypeToKind(dwhType), nil + kd = snowflakeTypeToKind(dwhType) case constants.BigQuery: - return bigQueryTypeToKind(dwhType), nil + kd = bigQueryTypeToKind(dwhType) case constants.Redshift: - return redshiftTypeToKind(dwhType, stringPrecision), nil + kd = redshiftTypeToKind(dwhType, stringPrecision) case constants.MSSQL: - return mssqlTypeToKind(dwhType, stringPrecision), nil + kd = mssqlTypeToKind(dwhType, stringPrecision) + default: + return Invalid, fmt.Errorf("unexpected dwh kind, label: %v", dwh) } - return Invalid, fmt.Errorf("unexpected dwh kind, label: %v", dwh) + if kd.Kind == Invalid.Kind { + return Invalid, fmt.Errorf("unable to map type: %q to dwh type", dwhType) + } + return kd, nil }