Skip to content

Commit

Permalink
Escaping column names for DWHs (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 9, 2023
1 parent 193d3bd commit 591d726
Show file tree
Hide file tree
Showing 41 changed files with 756 additions and 698 deletions.
8 changes: 2 additions & 6 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ func CastColVal(colVal interface{}, colKind typing.Column) (interface{}, error)
case ext.TimeKindType:
colVal = extTime.String(typing.StreamingTimeFormat)
}
// All the other types do not need string wrapping.
// TODO - what does typing.String.Kind do?
case typing.String.Kind, typing.Struct.Kind:
case typing.Struct.Kind:
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = map[string]interface{}{
"key": constants.ToastUnavailableValuePlaceholder,
}
colVal = fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder)
}
}
case typing.Array.Kind:
Expand Down
9 changes: 9 additions & 0 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package bigquery

import (
"fmt"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -58,6 +61,12 @@ func TestCastColVal(t *testing.T) {
colKind: typing.Column{KindDetails: typing.Struct},
expectedValue: `{"hello": "world"}`,
},
{
name: "struct w/ toast",
colVal: constants.ToastUnavailableValuePlaceholder,
colKind: typing.Column{KindDetails: typing.Struct},
expectedValue: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder),
},
{
name: "array",
colVal: []int{1, 2, 3, 4, 5},
Expand Down
5 changes: 1 addition & 4 deletions clients/bigquery/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ func parseSchemaQuery(row string, createTable, dropDeletedColumns bool) (*types.
return nil, fmt.Errorf("unexpected colType, colType: %s, parts: %v", colType, parts)
}

bigQueryColumns.AddColumn(typing.Column{
Name: parts[0],
KindDetails: typing.BigQueryTypeToKind(strings.Join(parts[1:], " ")),
})
bigQueryColumns.AddColumn(typing.NewColumn(typing.UnescapeColumnName(parts[0], constants.BigQuery), typing.BigQueryTypeToKind(strings.Join(parts[1:], " "))))
}

return types.NewDwhTableConfig(&bigQueryColumns, nil, createTable, dropDeletedColumns), nil
Expand Down
11 changes: 7 additions & 4 deletions clients/bigquery/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bigquery

import (
"fmt"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
"github.com/stretchr/testify/assert"
Expand All @@ -22,19 +23,20 @@ func (b *BigQueryTestSuite) TestParseSchemaQuery() {
assert.Equal(b.T(), true, tableConfig.DropDeletedColumns())
assert.Equal(b.T(), len(tableConfig.Columns().GetColumns()), 2, tableConfig.Columns)
for _, col := range tableConfig.Columns().GetColumns() {
assert.Equal(b.T(), col.KindDetails, typing.String, fmt.Sprintf("col: %s, kind: %v incorrect", col.Name, col.KindDetails))
assert.Equal(b.T(), col.KindDetails, typing.String, fmt.Sprintf("col: %s, kind: %v incorrect", col.Name(nil), col.KindDetails))
}
}
}

func (b *BigQueryTestSuite) TestParseSchemaQueryComplex() {
// This test will test every single data type.
tableConfig, err := parseSchemaQuery("CREATE TABLE `artie-labs.mock.customers`(string_field_0 STRING,string_field_1 STRING,field2 INT64,field3 ARRAY<INT64>,field4 FLOAT64,field5 NUMERIC,field6 BIGNUMERIC,field7 BOOL,field8 TIMESTAMP,field9 DATE,field10 TIME,field11 DATETIME,field12 STRUCT<foo STRING>,field13 JSON, field14 TIME)OPTIONS(expiration_timestamp=TIMESTAMP 2023-03-26T20:03:44.504Z);",
false, false)
reservedKeywordCol := "`select`"
unparsedQuery := fmt.Sprintf("CREATE TABLE `artie-labs.mock.customers`(string_field_0 STRING,string_field_1 STRING,field2 INT64,field3 ARRAY<INT64>,field4 FLOAT64,field5 NUMERIC,field6 BIGNUMERIC,field7 BOOL,field8 TIMESTAMP,field9 DATE,field10 TIME,field11 DATETIME,field12 STRUCT<foo STRING>,field13 JSON, field14 TIME, %s STRING)OPTIONS(expiration_timestamp=TIMESTAMP 2023-03-26T20:03:44.504Z);", reservedKeywordCol)
tableConfig, err := parseSchemaQuery(unparsedQuery, false, false)

assert.NoError(b.T(), err, err)
assert.Equal(b.T(), false, tableConfig.DropDeletedColumns())
assert.Equal(b.T(), len(tableConfig.Columns().GetColumns()), 15, tableConfig.Columns)
assert.Equal(b.T(), len(tableConfig.Columns().GetColumns()), 16, tableConfig.Columns)

anticipatedColumns := map[string]typing.KindDetails{
"string_field_0": typing.String,
Expand All @@ -52,6 +54,7 @@ func (b *BigQueryTestSuite) TestParseSchemaQueryComplex() {
"field12": typing.Struct,
"field13": typing.Struct,
"field14": typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType),
"select": typing.String,
}

for anticipatedCol, anticipatedKind := range anticipatedColumns {
Expand Down
27 changes: 16 additions & 11 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func merge(tableData *optimization.TableData) ([]*Row, error) {
var rows []*Row
for _, value := range tableData.RowsData() {
data := make(map[string]bigquery.Value)
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() {
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(nil) {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal, err := CastColVal(value[col], colKind)
if err != nil {
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

log := logger.FromContext(ctx)
// Check if all the columns exist in Snowflake
// Check if all the columns exist in BigQuery
srcKeysMissing, targetKeysMissing := typing.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), tableData.TopicConfig.SoftDelete)
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand All @@ -75,14 +75,14 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
CdcTime: tableData.LatestCDCTs,
}

// Keys that exist in CDC stream, but not in Snowflake
// Keys that exist in CDC stream, but not in BigQuery
err = ddl.AlterTable(ctx, createAlterTableArgs, targetKeysMissing...)
if err != nil {
log.WithError(err).Warn("failed to apply alter table")
return err
}

// Keys that exist in Snowflake, but don't exist in our CDC stream.
// Keys that exist in BigQuery, but don't exist in our CDC stream.
// createTable is set to false because table creation requires a column to be added
// Which means, we'll only do it upon Add columns.
deleteAlterTableArgs := ddl.AlterTableArgs{
Expand All @@ -105,7 +105,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
for colToDelete := range tableConfig.ReadOnlyColumnsToDelete() {
var found bool
for _, col := range srcKeysMissing {
if found = col.Name == colToDelete; found {
if found = col.Name(nil) == colToDelete; found {
// Found it.
break
}
Expand All @@ -117,6 +117,9 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}
}

// Infer the right data types from BigQuery before temp table creation.
tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)

// Start temporary table creation
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand All @@ -132,7 +135,6 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}
// End temporary table creation

tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
rows, err := merge(tableData)
if err != nil {
log.WithError(err).Warn("failed to generate the merge query")
Expand All @@ -146,11 +148,14 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

mergeQuery, err := dml.MergeStatement(dml.MergeArgument{
FqTableName: tableData.ToFqName(ctx, constants.BigQuery),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(),
FqTableName: tableData.ToFqName(ctx, constants.BigQuery),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(&typing.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.TopicConfig.SoftDelete,
BigQuery: true,
Expand Down
1 change: 0 additions & 1 deletion clients/bigquery/merge_test.go

This file was deleted.

5 changes: 1 addition & 4 deletions clients/snowflake/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ func (s *Store) getTableConfig(ctx context.Context, fqName string, dropDeletedCo
row[columnNameList[idx]] = strings.ToLower(fmt.Sprint(*interfaceVal))
}

snowflakeColumns.AddColumn(typing.Column{
Name: row[describeNameCol],
KindDetails: typing.SnowflakeTypeToKind(row[describeTypeCol]),
})
snowflakeColumns.AddColumn(typing.NewColumn(row[describeNameCol], typing.SnowflakeTypeToKind(row[describeTypeCol])))
}

sflkTableConfig := types.NewDwhTableConfig(&snowflakeColumns, nil, tableMissing, dropDeletedColumns)
Expand Down
37 changes: 10 additions & 27 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,17 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
"name": typing.String,
"created_at": typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType),
} {
cols.AddColumn(typing.Column{
Name: colName,
KindDetails: kindDetails,
})
cols.AddColumn(typing.NewColumn(colName, kindDetails))
}

config := types.NewDwhTableConfig(&cols, nil, false, true)

s.store.configMap.AddTableToConfig(fqName, config)

nameCol := typing.Column{
Name: "name",
KindDetails: typing.String,
}

nameCol := typing.NewColumn("name", typing.String)
tc := s.store.configMap.TableConfig(fqName)

val := tc.ShouldDeleteColumn(nameCol.Name, time.Now().Add(-1*6*time.Hour))
val := tc.ShouldDeleteColumn(nameCol.Name(nil), time.Now().Add(-1*6*time.Hour))
assert.False(s.T(), val, "should not try to delete this column")
assert.Equal(s.T(), len(s.store.configMap.TableConfig(fqName).ReadOnlyColumnsToDelete()), 1)

Expand All @@ -61,36 +54,29 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
"name": typing.String,
"created_at": typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType),
} {
cols.AddColumn(typing.Column{
Name: colName,
KindDetails: kindDetails,
})
cols.AddColumn(typing.NewColumn(colName, kindDetails))
}

config := types.NewDwhTableConfig(&cols, nil, false, true)
s.store.configMap.AddTableToConfig(fqName, config)

nameCol := typing.Column{
Name: "name",
KindDetails: typing.String,
}

nameCol := typing.NewColumn("name", typing.String)
// Let's try to delete name.
allowed := s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name, time.Now().Add(-1*(6*time.Hour)))
allowed := s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name(nil), time.Now().Add(-1*(6*time.Hour)))

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process tried to delete, but it's lagged.
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name, time.Now().Add(-1*(6*time.Hour)))
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name(nil), time.Now().Add(-1*(6*time.Hour)))

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process now caught up, and is asking if we can delete, should still be no.
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name, time.Now())
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name(nil), time.Now())
assert.Equal(s.T(), allowed, false, "should not be allowed to delete still")

// Process is finally ahead, has permission to delete now.
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name,
allowed = s.store.configMap.TableConfig(fqName).ShouldDeleteColumn(nameCol.Name(nil),
time.Now().Add(2*constants.DeletionConfidencePadding))

assert.Equal(s.T(), allowed, true, "should now be allowed to delete")
Expand All @@ -105,10 +91,7 @@ func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() {
"name": typing.String,
"created_at": typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType),
} {
cols.AddColumn(typing.Column{
Name: colName,
KindDetails: kindDetails,
})
cols.AddColumn(typing.NewColumn(colName, kindDetails))
}

tc := types.NewDwhTableConfig(&cols, map[string]time.Time{
Expand Down
16 changes: 11 additions & 5 deletions clients/snowflake/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,35 @@ func escapeCols(cols []typing.Column) (colsToUpdate []string, colsToUpdateEscape
continue
}

escapedCol := column.Name
nameArgs := &typing.NameArgs{
Escape: true,
DestKind: constants.SnowflakeStages,
}

escapedCol := column.Name(nameArgs)
switch column.KindDetails.Kind {
case typing.Struct.Kind, typing.Array.Kind:
if column.ToastColumn {
escapedCol = fmt.Sprintf("CASE WHEN %s = '%s' THEN {'key': '%s'} ELSE PARSE_JSON(%s) END %s",
// Comparing the column against placeholder
column.Name, constants.ToastUnavailableValuePlaceholder,
column.Name(nameArgs), constants.ToastUnavailableValuePlaceholder,
// Casting placeholder as a JSON object
constants.ToastUnavailableValuePlaceholder,
// Regular parsing.
column.Name, column.Name)
column.Name(nameArgs), column.Name(nameArgs))
} else {
escapedCol = fmt.Sprintf("PARSE_JSON(%s) %s", column.Name, column.Name)
escapedCol = fmt.Sprintf("PARSE_JSON(%s) %s", column.Name(nameArgs), column.Name(nameArgs))
}
}

colsToUpdate = append(colsToUpdate, column.Name)
colsToUpdate = append(colsToUpdate, column.Name(nil))
colsToUpdateEscaped = append(colsToUpdateEscaped, escapedCol)
}

return
}

// TODO - this needs to be patched to support keyword substitution.
func getMergeStatement(ctx context.Context, tableData *optimization.TableData) (string, error) {
var tableValues []string
colsToUpdate, colsToUpdateEscaped := escapeCols(tableData.ReadOnlyInMemoryCols().GetColumns())
Expand Down
Loading

0 comments on commit 591d726

Please sign in to comment.