Skip to content

Commit

Permalink
[BigQuery] Create numeric data type for variable numerics (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 19, 2024
1 parent d9f8720 commit e0d068c
Show file tree
Hide file tree
Showing 29 changed files with 140 additions and 91 deletions.
13 changes: 8 additions & 5 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type Store struct {

func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error {
if !useTempTable {
return shared.Append(ctx, s, tableData, types.AdditionalSettings{})
return shared.Append(ctx, s, tableData, types.AdditionalSettings{
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
})
}

// We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data
Expand All @@ -55,8 +57,9 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u
defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }()

err := shared.Append(ctx, s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
UseTempTable: true,
TempTableID: temporaryTableID,
})

if err != nil {
Expand All @@ -77,9 +80,9 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u
return nil
}

func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil {
if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions clients/bigquery/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, settings config.SharedDestinationColumnSettings) string {
// Doesn't look like we need to do any special type mapping.
switch kindDetails.Kind {
case typing.Float.Kind:
Expand All @@ -33,7 +34,7 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case typing.EDecimal.Kind:
return kindDetails.ExtendedDecimalDetails.BigQueryKind()
return kindDetails.ExtendedDecimalDetails.BigQueryKind(settings.BigQueryNumericForVariableNumeric)
}

return kindDetails.Kind
Expand Down
14 changes: 8 additions & 6 deletions clients/bigquery/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package dialect
import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing"
)

func TestBigQueryDialect_DataTypeForKind(t *testing.T) {
{
// String
{
assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.String, false))
assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{}))
}
{
assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, true))
assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, true, config.SharedDestinationColumnSettings{}))
}
}
}
Expand All @@ -30,7 +32,7 @@ func TestBigQueryDialect_KindForDataType_NoDataLoss(t *testing.T) {
}

for _, kindDetail := range kindDetails {
kd, err := BigQueryDialect{}.KindForDataType(BigQueryDialect{}.DataTypeForKind(kindDetail, false), "")
kd, err := BigQueryDialect{}.KindForDataType(BigQueryDialect{}.DataTypeForKind(kindDetail, false, config.SharedDestinationColumnSettings{}), "")
assert.NoError(t, err)
assert.Equal(t, kindDetail, kd)
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
assert.Equal(t, typing.EDecimal.Kind, kd.Kind)
assert.Equal(t, int32(5), kd.ExtendedDecimalDetails.Precision())
assert.Equal(t, int32(0), kd.ExtendedDecimalDetails.Scale())
assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind())
assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind(false))

}
{
Expand All @@ -87,7 +89,7 @@ func TestBigQueryDialect_KindForDataType(t *testing.T) {
assert.Equal(t, typing.EDecimal.Kind, kd.Kind)
assert.Equal(t, int32(5), kd.ExtendedDecimalDetails.Precision())
assert.Equal(t, int32(0), kd.ExtendedDecimalDetails.Scale())
assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind())
assert.Equal(t, "NUMERIC(5, 0)", kd.ExtendedDecimalDetails.BigQueryKind(false))
}
{
// Numeric(5, 2)
Expand Down
1 change: 1 addition & 0 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

return shared.Merge(ctx, s, tableData, types.MergeOpts{
AdditionalEqualityStrings: additionalEqualityStrings,
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
// BigQuery has DDL quotas.
RetryColBackfill: true,
// We are using BigQuery's streaming API which doesn't guarantee exactly once semantics
Expand Down
3 changes: 2 additions & 1 deletion clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string {
switch kindDetails.Kind {
case typing.Float.Kind:
return "DOUBLE"
Expand Down
28 changes: 15 additions & 13 deletions clients/databricks/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,66 @@ package dialect
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/stretchr/testify/assert"
)

func TestDatabricksDialect_DataTypeForKind(t *testing.T) {
{
// Float
assert.Equal(t, "DOUBLE", DatabricksDialect{}.DataTypeForKind(typing.Float, false))
assert.Equal(t, "DOUBLE", DatabricksDialect{}.DataTypeForKind(typing.Float, false, config.SharedDestinationColumnSettings{}))
}
{
// Integer
assert.Equal(t, "BIGINT", DatabricksDialect{}.DataTypeForKind(typing.Integer, false))
assert.Equal(t, "BIGINT", DatabricksDialect{}.DataTypeForKind(typing.Integer, false, config.SharedDestinationColumnSettings{}))
}
{
// Variant
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Struct.Kind}, false))
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Struct.Kind}, false, config.SharedDestinationColumnSettings{}))
}
{
// Array
assert.Equal(t, "ARRAY<string>", DatabricksDialect{}.DataTypeForKind(typing.Array, false))
assert.Equal(t, "ARRAY<string>", DatabricksDialect{}.DataTypeForKind(typing.Array, false, config.SharedDestinationColumnSettings{}))
}
{
// String
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.String, false))
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{}))
}
{
// Boolean
assert.Equal(t, "BOOLEAN", DatabricksDialect{}.DataTypeForKind(typing.Boolean, false))
assert.Equal(t, "BOOLEAN", DatabricksDialect{}.DataTypeForKind(typing.Boolean, false, config.SharedDestinationColumnSettings{}))
}
{
// Times
{
// Date
assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.Date, false))
assert.Equal(t, "DATE", DatabricksDialect{}.DataTypeForKind(typing.Date, false, config.SharedDestinationColumnSettings{}))
}
{
// Timestamp
assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false))
assert.Equal(t, "TIMESTAMP", DatabricksDialect{}.DataTypeForKind(typing.TimestampTZ, false, config.SharedDestinationColumnSettings{}))
}
{
// Timestamp (w/o timezone)
assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false))
assert.Equal(t, "TIMESTAMP_NTZ", DatabricksDialect{}.DataTypeForKind(typing.TimestampNTZ, false, config.SharedDestinationColumnSettings{}))
}
{
// Time
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.Time, false))
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.Time, false, config.SharedDestinationColumnSettings{}))
}
}
{
// Decimals
{
// Below 38 precision
assert.Equal(t, "DECIMAL(10, 2)", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(10, 2))}, false))
assert.Equal(t, "DECIMAL(10, 2)", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(10, 2))}, false, config.SharedDestinationColumnSettings{}))
}
{
// Above 38 precision
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(40, 2))}, false))
assert.Equal(t, "STRING", DatabricksDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.EDecimal.Kind, ExtendedDecimalDetails: typing.ToPtr(decimal.NewDetails(40, 2))}, false, config.SharedDestinationColumnSettings{}))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl
}.GetTableConfig()
}

func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil {
if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion clients/mssql/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) string {
func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool, _ config.SharedDestinationColumnSettings) string {
// Primary keys cannot exceed 900 chars in length.
// https://learn.microsoft.com/en-us/sql/relational-databases/tables/primary-and-foreign-key-constraints?view=sql-server-ver16#PKeys
const maxVarCharLengthForPrimaryKey = 900
Expand Down
8 changes: 5 additions & 3 deletions clients/mssql/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package dialect
import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing"
)

func TestMSSQLDialect_DataTypeForKind(t *testing.T) {
Expand All @@ -30,8 +32,8 @@ func TestMSSQLDialect_DataTypeForKind(t *testing.T) {
}

for idx, tc := range tcs {
assert.Equal(t, tc.expected, MSSQLDialect{}.DataTypeForKind(tc.kd, false), idx)
assert.Equal(t, tc.expectedIsPk, MSSQLDialect{}.DataTypeForKind(tc.kd, true), idx)
assert.Equal(t, tc.expected, MSSQLDialect{}.DataTypeForKind(tc.kd, false, config.SharedDestinationColumnSettings{}), idx)
assert.Equal(t, tc.expectedIsPk, MSSQLDialect{}.DataTypeForKind(tc.kd, true, config.SharedDestinationColumnSettings{}), idx)
}
}

Expand Down
4 changes: 2 additions & 2 deletions clients/mssql/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil {
if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string {
switch kd.Kind {
case typing.Integer.Kind:
if kd.OptionalIntegerKind != nil {
Expand Down
22 changes: 12 additions & 10 deletions clients/redshift/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,56 @@ package dialect
import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing"
)

func TestRedshiftDialect_DataTypeForKind(t *testing.T) {
{
// String
{
assert.Equal(t, "VARCHAR(MAX)", RedshiftDialect{}.DataTypeForKind(typing.String, true))
assert.Equal(t, "VARCHAR(MAX)", RedshiftDialect{}.DataTypeForKind(typing.String, true, config.SharedDestinationColumnSettings{}))
}
{
assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false))
assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false, config.SharedDestinationColumnSettings{}))
}
}
{
// Integers
{
// Small int
assert.Equal(t, "INT2", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, false))
assert.Equal(t, "INT2", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.SmallIntegerKind)}, false, config.SharedDestinationColumnSettings{}))
}
{
// Integer
assert.Equal(t, "INT4", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.IntegerKind)}, false))
assert.Equal(t, "INT4", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.IntegerKind)}, false, config.SharedDestinationColumnSettings{}))
}
{
// Big integer
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.BigIntegerKind)}, false))
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.BigIntegerKind)}, false, config.SharedDestinationColumnSettings{}))
}
{
// Not specified
{
// Literal
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.NotSpecifiedKind)}, false))
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.Integer.Kind, OptionalIntegerKind: typing.ToPtr(typing.NotSpecifiedKind)}, false, config.SharedDestinationColumnSettings{}))
}
{
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.Integer, false))
assert.Equal(t, "INT8", RedshiftDialect{}.DataTypeForKind(typing.Integer, false, config.SharedDestinationColumnSettings{}))
}
}
}
{
// Timestamps
{
// With timezone
assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false))
assert.Equal(t, "TIMESTAMP WITH TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampTZ, false, config.SharedDestinationColumnSettings{}))
}
{
// Without timezone
assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false))
assert.Equal(t, "TIMESTAMP WITHOUT TIME ZONE", RedshiftDialect{}.DataTypeForKind(typing.TimestampNTZ, false, config.SharedDestinationColumnSettings{}))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error {
fp, colToNewLengthMap, err := s.loadTemporaryTable(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
Expand All @@ -40,7 +40,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
}

if createTempTable {
if err = shared.CreateTable(ctx, s, tableData, tableConfig, tempTableID, true); err != nil {
if err = shared.CreateTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID, true); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, opts.ColumnSettings, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, opts.ColumnSettings, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to add columns for table %q: %w", tableID.Table(), err)
}
}
Expand Down
Loading

0 comments on commit e0d068c

Please sign in to comment.