Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 19, 2024
1 parent bc26531 commit d0a1e1d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 24 deletions.
10 changes: 6 additions & 4 deletions clients/snowflake/dialect/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package dialect
import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/typing"
)

func TestSnowflakeDialect_DataTypeForKind(t *testing.T) {
{
// String
{
assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false))
assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false, config.SharedDestinationColumnSettings{}))
}
{
assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false))
assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false, config.SharedDestinationColumnSettings{}))
}
}
}
Expand Down Expand Up @@ -193,7 +195,7 @@ func TestSnowflakeDialect_KindForDataType_NoDataLoss(t *testing.T) {
}

for _, kindDetail := range kindDetails {
kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false), "")
kd, err := SnowflakeDialect{}.KindForDataType(SnowflakeDialect{}.DataTypeForKind(kindDetail, false, config.SharedDestinationColumnSettings{}), "")
assert.NoError(t, err)
assert.Equal(t, kindDetail, kd)
}
Expand Down
8 changes: 4 additions & 4 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
for name, kind := range newCols {
col := columns.NewColumn(name, kind)
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{col}))
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{col}))

_, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx)
assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()),
d.bigQueryStore.Dialect().DataTypeForKind(kind, false)), query)
d.bigQueryStore.Dialect().DataTypeForKind(kind, false, config.SharedDestinationColumnSettings{})), query)
callIdx += 1
}

Expand Down Expand Up @@ -156,10 +156,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
// BQ returning the same error because the column already exists.
d.fakeBigQueryStore.ExecContextReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]"))

assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column}))
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, config.SharedDestinationColumnSettings{}, tableID, []columns.Column{column}))
_, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx)
assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()),
d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false)), query)
d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false, config.SharedDestinationColumnSettings{})), query)
callIdx += 1
}

Expand Down
12 changes: 7 additions & 5 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"slices"
"time"

"github.com/artie-labs/transfer/lib/config"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/clients/shared"
Expand All @@ -28,12 +30,12 @@ func (d *DDLTestSuite) TestAlterComplexObjects() {
tableID := dialect.NewTableIdentifier("shop", "public", "complex_columns")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols))
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols))
for i := 0; i < len(cols); i++ {
_, execQuery, _ := d.fakeSnowflakeStagesStore.ExecContextArgsForCall(i)
assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`,
d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()),
d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false)), execQuery)
d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false, config.SharedDestinationColumnSettings{})), execQuery)
}

assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols")
Expand All @@ -53,11 +55,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() {

d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists"))

assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols))
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols))
assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols")

d.fakeSnowflakeStagesStore.ExecContextReturns(nil, errors.New("table does not exist"))
assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols), `failed to alter table: table does not exist`)
assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols), `failed to alter table: table does not exist`)
}

func (d *DDLTestSuite) TestAlterTableAdd() {
Expand All @@ -73,7 +75,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)

assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols))
assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, config.SharedDestinationColumnSettings{}, tableID, cols))
assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols")

// Check the table config
Expand Down
4 changes: 2 additions & 2 deletions lib/destination/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() {
{
// Snowflake Stage
tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName")
query, err := ddl.BuildCreateTableSQL(d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)})
query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)})
assert.NoError(d.T(), err)
assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`)
}
{
// BigQuery
tableID := bigQueryDialect.NewTableIdentifier("db", "schema", "tempTableName")
query, err := ddl.BuildCreateTableSQL(d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)})
query, err := ddl.BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)})
assert.NoError(d.T(), err)
// Cutting off the expiration_timestamp since it's time based.
assert.Contains(d.T(), query, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (`foo` string,`bar` float64,`select` string) OPTIONS (expiration_timestamp =", query)
Expand Down
18 changes: 9 additions & 9 deletions lib/destination/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestShouldCreatePrimaryKey(t *testing.T) {
func TestBuildCreateTableSQL(t *testing.T) {
{
// No columns provided
_, err := BuildCreateTableSQL(nil, nil, false, config.Replication, []columns.Column{})
_, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, nil, nil, false, config.Replication, []columns.Column{})
assert.ErrorContains(t, err, "no columns provided")
}
{
Expand All @@ -54,7 +54,7 @@ func TestBuildCreateTableSQL(t *testing.T) {
// Redshift
{
// No primary key
sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
columns.NewColumn("foo", typing.String),
columns.NewColumn("bar", typing.String),
})
Expand All @@ -65,7 +65,7 @@ func TestBuildCreateTableSQL(t *testing.T) {
// With primary key
pk := columns.NewColumn("pk", typing.String)
pk.SetPrimaryKeyForTest(true)
sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
pk,
columns.NewColumn("bar", typing.String),
})
Expand All @@ -79,7 +79,7 @@ func TestBuildCreateTableSQL(t *testing.T) {
pk2 := columns.NewColumn("pk2", typing.String)
pk2.SetPrimaryKeyForTest(true)

sql, err := BuildCreateTableSQL(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), false, config.Replication, []columns.Column{
pk1,
pk2,
columns.NewColumn("bar", typing.String),
Expand All @@ -94,7 +94,7 @@ func TestBuildCreateTableSQL(t *testing.T) {
// With primary key
pk := columns.NewColumn("pk", typing.String)
pk.SetPrimaryKeyForTest(true)
sql, err := BuildCreateTableSQL(bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{
sql, err := BuildCreateTableSQL(config.SharedDestinationColumnSettings{}, bqDialect.BigQueryDialect{}, bqDialect.NewTableIdentifier("projectID", "dataset", "table"), false, config.Replication, []columns.Column{
pk,
columns.NewColumn("bar", typing.String),
})
Expand All @@ -108,22 +108,22 @@ func TestBuildCreateTableSQL(t *testing.T) {
func TestBuildAlterTableAddColumns(t *testing.T) {
{
// No columns
sqlParts, err := BuildAlterTableAddColumns(nil, nil, []columns.Column{})
sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, nil, nil, []columns.Column{})
assert.NoError(t, err)
assert.Empty(t, sqlParts)
}
{
// One column to add
col := columns.NewColumn("dusty", typing.String)
sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col})
sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col})
assert.NoError(t, err)
assert.Len(t, sqlParts, 1)
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "dusty" VARCHAR(MAX)`, sqlParts[0])
}
{
// Two columns, one invalid, it will error.
col := columns.NewColumn("dusty", typing.String)
_, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"),
_, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"),
[]columns.Column{
col,
columns.NewColumn("invalid", typing.Invalid),
Expand All @@ -138,7 +138,7 @@ func TestBuildAlterTableAddColumns(t *testing.T) {
col2 := columns.NewColumn("doge", typing.String)
col3 := columns.NewColumn("age", typing.Integer)

sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3})
sqlParts, err := BuildAlterTableAddColumns(config.SharedDestinationColumnSettings{}, dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3})
assert.NoError(t, err)
assert.Len(t, sqlParts, 3)
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "aussie" VARCHAR(MAX)`, sqlParts[0])
Expand Down

0 comments on commit d0a1e1d

Please sign in to comment.