Skip to content

Commit

Permalink
Standing up guardrails for Invalid Cols (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Apr 29, 2024
1 parent 5b9d6c7 commit 81a45f0
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 141 deletions.
14 changes: 7 additions & 7 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package shared

import (
"log/slog"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
Expand All @@ -20,7 +20,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return err
return fmt.Errorf("failed to get table config: %w", err)
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
Expand All @@ -40,13 +40,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
}

// Keys that exist in CDC stream, but not in DWH
err = createAlterTableArgs.AlterTable(targetKeysMissing...)
if err != nil {
slog.Warn("Failed to apply alter table", slog.Any("err", err))
return err
if err = createAlterTableArgs.AlterTable(targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
Expand Down
5 changes: 4 additions & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())
temporaryTableName := temporaryTableID.FullyQualifiedName()
if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil {
Expand Down
261 changes: 141 additions & 120 deletions lib/optimization/event_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,139 +11,160 @@ import (
"github.com/stretchr/testify/assert"
)

const strCol = "string"

func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) {
const strCol = "string"
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

tableDataCols := &columns.Columns{}
tableDataCols.AddColumn(columns.NewColumn("name", typing.String))
tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean))
tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid))
tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal))
tableData.AddInMemoryCol(columns.NewColumn("foo", typing.String))
invalidCol := columns.NewColumn("foo", typing.Invalid)
assert.ErrorContains(t, tableData.MergeColumnsFromDestination(invalidCol), `column "foo" is invalid`)
}
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

// Casting these as STRING so tableColumn via this f(x) will set it correctly.
tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String))
tableDataCols.AddColumn(columns.NewColumn("name", typing.String))
tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean))
tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid))
tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal))

extDecimalType := typing.EDecimal
extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil)
tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType))
// Casting these as STRING so tableColumn via this f(x) will set it correctly.
tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String))

tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))
extDecimalType := typing.EDecimal
extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil)
tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType))

tableData := &TableData{
inMemoryColumns: tableDataCols,
}
tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))

nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"}
var nonExistentCols []columns.Column
for _, nonExistentTableCol := range nonExistentTableCols {
nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String))
}
nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"}
var nonExistentCols []columns.Column
for _, nonExistentTableCol := range nonExistentTableCols {
nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String))
}

// Testing to make sure we don't copy over non-existent columns
tableData.MergeColumnsFromDestination(nonExistentCols...)
for _, nonExistentTableCol := range nonExistentTableCols {
_, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol)
assert.False(t, isOk, nonExistentTableCol)
}
// Testing to make sure we don't copy over non-existent columns
assert.NoError(t, tableData.MergeColumnsFromDestination(nonExistentCols...))
for _, nonExistentTableCol := range nonExistentTableCols {
_, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol)
assert.False(t, isOk, nonExistentTableCol)
}

// Making sure it's still numeric
tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer))
numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test")

// Testing to make sure we're copying the kindDetails over.
tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String))
prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid")
assert.True(t, isOk)
assert.Equal(t, typing.String, prevInvalidCol.KindDetails)

// Testing backfill
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean)
backfilledCol.SetBackfilled(true)
tableData.MergeColumnsFromDestination(backfilledCol)
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
if inMemoryCol.RawName() == backfilledCol.RawName() {
assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
} else {
// Making sure it's still numeric
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer)))
numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test")

// Testing to make sure we're copying the kindDetails over.
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String)))
prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid")
assert.True(t, isOk)
assert.Equal(t, typing.String, prevInvalidCol.KindDetails)

// Testing backfill
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
}
backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean)
backfilledCol.SetBackfilled(true)
assert.NoError(t, tableData.MergeColumnsFromDestination(backfilledCol))
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
if inMemoryCol.RawName() == backfilledCol.RawName() {
assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
} else {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
}

// Testing extTimeDetails
for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} {
col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol)
assert.True(t, isOk, extTimeDetailsCol)
assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol)
assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol)
}

// Testing extTimeDetails
for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} {
col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol)
assert.True(t, isOk, extTimeDetailsCol)
assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol)
assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol)
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType))))
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType))))
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType))))

dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date")
assert.True(t, isOk)
assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type)

timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time")
assert.True(t, isOk)
assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type)

dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime")
assert.True(t, isOk)
assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type)

// Testing extDecimalDetails
// Confirm that before you update, it's invalid.
extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.String, extDecCol.KindDetails)

extDecimal := typing.EDecimal
extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil)
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal)))
// Now it should be ext decimal type
extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale())

// Testing ext_dec_filled since it's already filled out
extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal)))
extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())
}
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))

// Testing string precision
stringKindWithPrecision := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: ptr.ToInt(123),
}

tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)))
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)))
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)))

dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date")
assert.True(t, isOk)
assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type)

timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time")
assert.True(t, isOk)
assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type)

dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime")
assert.True(t, isOk)
assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type)

// Testing extDecimalDetails
// Confirm that before you update, it's invalid.
extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.String, extDecCol.KindDetails)

extDecimal := typing.EDecimal
extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil)
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal))
// Now it should be ext decimal type
extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale())

// Testing ext_dec_filled since it's already filled out
extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal))
extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

// Testing string precision
stringKindWithPrecision := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: ptr.ToInt(123),
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision)))
foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol)
assert.True(t, isOk)
assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind)
assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision)
}
tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision))
foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol)
assert.True(t, isOk)
assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind)
assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision)
}
10 changes: 8 additions & 2 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,20 @@ func (t *TableData) ShouldFlush(cfg config.Config) (bool, string) {
// Prior to merging, we will need to treat `tableConfig` as the source-of-truth and whenever there's discrepancies
// We will prioritize using the values coming from (2) TableConfig. We also cannot simply do a replacement, as we have in-memory columns
// That carry metadata for Artie Transfer. They are prefixed with __artie.
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) error {
if t == nil || len(destCols) == 0 {
return
return nil
}

for _, inMemoryCol := range t.inMemoryColumns.GetColumns() {
var foundColumn columns.Column
var found bool
for _, destCol := range destCols {
if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) {
if destCol.KindDetails.Kind == typing.Invalid.Kind {
return fmt.Errorf("column %q is invalid", destCol.RawName())
}

foundColumn = destCol
found = true
break
Expand Down Expand Up @@ -297,4 +301,6 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
t.inMemoryColumns.UpdateColumn(inMemoryCol)
}
}

return nil
}
8 changes: 7 additions & 1 deletion lib/typing/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package typing

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -52,7 +53,12 @@ func TestBigQueryTypeToKind(t *testing.T) {

for bqCol, expectedKind := range bqColToExpectedKind {
kd, err := DwhTypeToKind(constants.BigQuery, bqCol, "")
assert.NoError(t, err)
if expectedKind.Kind == Invalid.Kind {
assert.ErrorContains(t, err, fmt.Sprintf("unable to map type: %q to dwh type", bqCol))
} else {
assert.NoError(t, err)
}

assert.Equal(t, expectedKind.Kind, kd.Kind, bqCol)
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/typing/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
// TODO: Check if there are any missing Redshift data types.
if strings.HasPrefix(rawType, "numeric") {
return ParseNumeric(defaultPrefix, rawType)
}
Expand All @@ -29,7 +30,7 @@ func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
switch rawType {
case "super":
return Struct
case "integer", "bigint":
case "smallint", "integer", "bigint":
return Integer
case "double precision":
return Float
Expand Down
Loading

0 comments on commit 81a45f0

Please sign in to comment.