Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standing up guardrails for Invalid Cols #512

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package shared

import (
"log/slog"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
Expand All @@ -20,7 +20,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return err
return fmt.Errorf("failed to get table config: %w", err)
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
Expand All @@ -40,13 +40,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
}

// Keys that exist in CDC stream, but not in DWH
err = createAlterTableArgs.AlterTable(targetKeysMissing...)
if err != nil {
slog.Warn("Failed to apply alter table", slog.Any("err", err))
return err
if err = createAlterTableArgs.AlterTable(targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
Expand Down
5 changes: 4 additions & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())
temporaryTableName := temporaryTableID.FullyQualifiedName()
if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil {
Expand Down
261 changes: 141 additions & 120 deletions lib/optimization/event_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,139 +11,160 @@ import (
"github.com/stretchr/testify/assert"
)

const strCol = "string"

func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) {
const strCol = "string"
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

tableDataCols := &columns.Columns{}
tableDataCols.AddColumn(columns.NewColumn("name", typing.String))
tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean))
tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid))
tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal))
tableData.AddInMemoryCol(columns.NewColumn("foo", typing.String))
invalidCol := columns.NewColumn("foo", typing.Invalid)
assert.ErrorContains(t, tableData.MergeColumnsFromDestination(invalidCol), `column "foo" is invalid`)
}
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

// Casting these as STRING so tableColumn via this f(x) will set it correctly.
tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String))
tableDataCols.AddColumn(columns.NewColumn("name", typing.String))
tableDataCols.AddColumn(columns.NewColumn("bool_backfill", typing.Boolean))
tableDataCols.AddColumn(columns.NewColumn("prev_invalid", typing.Invalid))
tableDataCols.AddColumn(columns.NewColumn("numeric_test", typing.EDecimal))

extDecimalType := typing.EDecimal
extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil)
tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType))
// Casting these as STRING so tableColumn via this f(x) will set it correctly.
tableDataCols.AddColumn(columns.NewColumn("ext_date", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_time", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_datetime", typing.String))
tableDataCols.AddColumn(columns.NewColumn("ext_dec", typing.String))

tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))
extDecimalType := typing.EDecimal
extDecimalType.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(22), 2, nil)
tableDataCols.AddColumn(columns.NewColumn("ext_dec_filled", extDecimalType))

tableData := &TableData{
inMemoryColumns: tableDataCols,
}
tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))

nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"}
var nonExistentCols []columns.Column
for _, nonExistentTableCol := range nonExistentTableCols {
nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String))
}
nonExistentTableCols := []string{"dusty", "the", "mini", "aussie"}
var nonExistentCols []columns.Column
for _, nonExistentTableCol := range nonExistentTableCols {
nonExistentCols = append(nonExistentCols, columns.NewColumn(nonExistentTableCol, typing.String))
}

// Testing to make sure we don't copy over non-existent columns
tableData.MergeColumnsFromDestination(nonExistentCols...)
for _, nonExistentTableCol := range nonExistentTableCols {
_, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol)
assert.False(t, isOk, nonExistentTableCol)
}
// Testing to make sure we don't copy over non-existent columns
assert.NoError(t, tableData.MergeColumnsFromDestination(nonExistentCols...))
for _, nonExistentTableCol := range nonExistentTableCols {
_, isOk := tableData.inMemoryColumns.GetColumn(nonExistentTableCol)
assert.False(t, isOk, nonExistentTableCol)
}

// Making sure it's still numeric
tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer))
numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test")

// Testing to make sure we're copying the kindDetails over.
tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String))
prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid")
assert.True(t, isOk)
assert.Equal(t, typing.String, prevInvalidCol.KindDetails)

// Testing backfill
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean)
backfilledCol.SetBackfilled(true)
tableData.MergeColumnsFromDestination(backfilledCol)
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
if inMemoryCol.RawName() == backfilledCol.RawName() {
assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
} else {
// Making sure it's still numeric
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("numeric_test", typing.Integer)))
numericCol, isOk := tableData.inMemoryColumns.GetColumn("numeric_test")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, numericCol.KindDetails.Kind, "numeric_test")

// Testing to make sure we're copying the kindDetails over.
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("prev_invalid", typing.String)))
prevInvalidCol, isOk := tableData.inMemoryColumns.GetColumn("prev_invalid")
assert.True(t, isOk)
assert.Equal(t, typing.String, prevInvalidCol.KindDetails)

// Testing backfill
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
}
backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean)
backfilledCol.SetBackfilled(true)
assert.NoError(t, tableData.MergeColumnsFromDestination(backfilledCol))
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
if inMemoryCol.RawName() == backfilledCol.RawName() {
assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
} else {
assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
}
}

// Testing extTimeDetails
for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} {
col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol)
assert.True(t, isOk, extTimeDetailsCol)
assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol)
assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol)
}

// Testing extTimeDetails
for _, extTimeDetailsCol := range []string{"ext_date", "ext_time", "ext_datetime"} {
col, isOk := tableData.inMemoryColumns.GetColumn(extTimeDetailsCol)
assert.True(t, isOk, extTimeDetailsCol)
assert.Equal(t, typing.String, col.KindDetails, extTimeDetailsCol)
assert.Nil(t, col.KindDetails.ExtendedTimeDetails, extTimeDetailsCol)
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType))))
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType))))
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType))))

dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date")
assert.True(t, isOk)
assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type)

timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time")
assert.True(t, isOk)
assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type)

dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime")
assert.True(t, isOk)
assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type)

// Testing extDecimalDetails
// Confirm that before you update, it's invalid.
extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.String, extDecCol.KindDetails)

extDecimal := typing.EDecimal
extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil)
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal)))
// Now it should be ext decimal type
extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale())

// Testing ext_dec_filled since it's already filled out
extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal)))
extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())
}
{
tableDataCols := &columns.Columns{}
tableData := &TableData{
inMemoryColumns: tableDataCols,
}

tableDataCols.AddColumn(columns.NewColumn(strCol, typing.String))

// Testing string precision
stringKindWithPrecision := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: ptr.ToInt(123),
}

tableData.MergeColumnsFromDestination(columns.NewColumn("ext_time", typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType)))
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_date", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateKindType)))
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_datetime", typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType)))

dateCol, isOk := tableData.inMemoryColumns.GetColumn("ext_date")
assert.True(t, isOk)
assert.NotNil(t, dateCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateKindType, dateCol.KindDetails.ExtendedTimeDetails.Type)

timeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_time")
assert.True(t, isOk)
assert.NotNil(t, timeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.TimeKindType, timeCol.KindDetails.ExtendedTimeDetails.Type)

dateTimeCol, isOk := tableData.inMemoryColumns.GetColumn("ext_datetime")
assert.True(t, isOk)
assert.NotNil(t, dateTimeCol.KindDetails.ExtendedTimeDetails)
assert.Equal(t, ext.DateTimeKindType, dateTimeCol.KindDetails.ExtendedTimeDetails.Type)

// Testing extDecimalDetails
// Confirm that before you update, it's invalid.
extDecCol, isOk := tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.String, extDecCol.KindDetails)

extDecimal := typing.EDecimal
extDecimal.ExtendedDecimalDetails = decimal.NewDecimal(ptr.ToInt(30), 2, nil)
tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec", extDecimal))
// Now it should be ext decimal type
extDecCol, isOk = tableData.inMemoryColumns.GetColumn("ext_dec")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecCol.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 30, *extDecCol.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecCol.KindDetails.ExtendedDecimalDetails.Scale())

// Testing ext_dec_filled since it's already filled out
extDecColFilled, isOk := tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

tableData.MergeColumnsFromDestination(columns.NewColumn("ext_dec_filled", extDecimal))
extDecColFilled, isOk = tableData.inMemoryColumns.GetColumn("ext_dec_filled")
assert.True(t, isOk)
assert.Equal(t, typing.EDecimal.Kind, extDecColFilled.KindDetails.Kind)
// Check precision and scale too.
assert.Equal(t, 22, *extDecColFilled.KindDetails.ExtendedDecimalDetails.Precision())
assert.Equal(t, 2, extDecColFilled.KindDetails.ExtendedDecimalDetails.Scale())

// Testing string precision
stringKindWithPrecision := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: ptr.ToInt(123),
assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision)))
foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol)
assert.True(t, isOk)
assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind)
assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision)
}
tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision))
foundStrCol, isOk := tableData.inMemoryColumns.GetColumn(strCol)
assert.True(t, isOk)
assert.Equal(t, typing.String.Kind, foundStrCol.KindDetails.Kind)
assert.Equal(t, 123, *foundStrCol.KindDetails.OptionalStringPrecision)
}
10 changes: 8 additions & 2 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,20 @@ func (t *TableData) ShouldFlush(cfg config.Config) (bool, string) {
// Prior to merging, we will need to treat `tableConfig` as the source-of-truth and whenever there's discrepancies
// We will prioritize using the values coming from (2) TableConfig. We also cannot simply do a replacement, as we have in-memory columns
// That carry metadata for Artie Transfer. They are prefixed with __artie.
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) error {
if t == nil || len(destCols) == 0 {
return
return nil
}

for _, inMemoryCol := range t.inMemoryColumns.GetColumns() {
var foundColumn columns.Column
var found bool
for _, destCol := range destCols {
if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) {
if destCol.KindDetails.Kind == typing.Invalid.Kind {
return fmt.Errorf("column %q is invalid", destCol.RawName())
}

foundColumn = destCol
found = true
break
Expand Down Expand Up @@ -297,4 +301,6 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
t.inMemoryColumns.UpdateColumn(inMemoryCol)
}
}

return nil
}
8 changes: 7 additions & 1 deletion lib/typing/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package typing

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -52,7 +53,12 @@ func TestBigQueryTypeToKind(t *testing.T) {

for bqCol, expectedKind := range bqColToExpectedKind {
kd, err := DwhTypeToKind(constants.BigQuery, bqCol, "")
assert.NoError(t, err)
if expectedKind.Kind == Invalid.Kind {
assert.ErrorContains(t, err, fmt.Sprintf("unable to map type: %q to dwh type", bqCol))
} else {
assert.NoError(t, err)
}

assert.Equal(t, expectedKind.Kind, kd.Kind, bqCol)
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/typing/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
// TODO: Check if there are any missing Redshift data types.
if strings.HasPrefix(rawType, "numeric") {
return ParseNumeric(defaultPrefix, rawType)
}
Expand All @@ -29,7 +30,7 @@ func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
switch rawType {
case "super":
return Struct
case "integer", "bigint":
case "smallint", "integer", "bigint":
return Integer
case "double precision":
return Float
Expand Down
Loading