Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 29, 2024
1 parent a8f99c0 commit 81aa497
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
14 changes: 7 additions & 7 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package shared

import (
"log/slog"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
Expand All @@ -20,7 +20,7 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return err
return fmt.Errorf("failed to get table config: %w", err)
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
Expand All @@ -40,13 +40,13 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
}

// Keys that exist in CDC stream, but not in DWH
err = createAlterTableArgs.AlterTable(targetKeysMissing...)
if err != nil {
slog.Warn("Failed to apply alter table", slog.Any("err", err))
return err
if err = createAlterTableArgs.AlterTable(targetKeysMissing...); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
Expand Down
6 changes: 5 additions & 1 deletion clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)

if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

temporaryTableID := TempTableID(dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()), tableData.TempTableSuffix())
temporaryTableName := temporaryTableID.FullyQualifiedName()
if err = dwh.PrepareTemporaryTable(tableData, tableConfig, temporaryTableID, types.AdditionalSettings{}, true); err != nil {
Expand Down
10 changes: 8 additions & 2 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,20 @@ func (t *TableData) ShouldFlush(cfg config.Config) (bool, string) {
// Prior to merging, we will need to treat `tableConfig` as the source-of-truth and whenever there's discrepancies
// We will prioritize using the values coming from (2) TableConfig. We also cannot simply do a replacement, as we have in-memory columns
// That carry metadata for Artie Transfer. They are prefixed with __artie.
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) error {
if t == nil || len(destCols) == 0 {
return
return nil
}

for _, inMemoryCol := range t.inMemoryColumns.GetColumns() {
var foundColumn columns.Column
var found bool
for _, destCol := range destCols {
if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) {
if destCol.KindDetails.Kind == typing.Invalid.Kind {
return fmt.Errorf("column %q is invalid", destCol.RawName())
}

foundColumn = destCol
found = true
break
Expand Down Expand Up @@ -297,4 +301,6 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) {
t.inMemoryColumns.UpdateColumn(inMemoryCol)
}
}

return nil
}
3 changes: 2 additions & 1 deletion lib/typing/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
// TODO: Check if there are any missing Redshift data types.
if strings.HasPrefix(rawType, "numeric") {
return ParseNumeric(defaultPrefix, rawType)
}
Expand All @@ -29,7 +30,7 @@ func redshiftTypeToKind(rawType string, stringPrecision string) KindDetails {
switch rawType {
case "super":
return Struct
case "integer", "bigint":
case "smallint", "integer", "bigint":
return Integer
case "double precision":
return Float
Expand Down
13 changes: 9 additions & 4 deletions lib/typing/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,20 @@ func KindToDWHType(kd KindDetails, dwh constants.DestinationKind, isPk bool) str
func DwhTypeToKind(dwh constants.DestinationKind, dwhType, stringPrecision string) (KindDetails, error) {
dwhType = strings.ToLower(dwhType)

var kd KindDetails
switch dwh {
case constants.Snowflake:
return snowflakeTypeToKind(dwhType), nil
kd = snowflakeTypeToKind(dwhType)
case constants.BigQuery:
return bigQueryTypeToKind(dwhType), nil
kd = bigQueryTypeToKind(dwhType)
case constants.Redshift:
return redshiftTypeToKind(dwhType, stringPrecision), nil
kd = redshiftTypeToKind(dwhType, stringPrecision)
case constants.MSSQL:
return mssqlTypeToKind(dwhType, stringPrecision), nil
kd = mssqlTypeToKind(dwhType, stringPrecision)
}

if kd.Kind == Invalid.Kind {
return Invalid, fmt.Errorf("unable to map type: %q to dwh type", dwhType)
}

return Invalid, fmt.Errorf("unexpected dwh kind, label: %v", dwh)
Expand Down

0 comments on commit 81aa497

Please sign in to comment.