Skip to content

Commit

Permalink
[Snowflake] Distinguishing NULL and empty strings (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 1, 2023
1 parent a23376c commit f845242
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 21 deletions.
3 changes: 2 additions & 1 deletion clients/snowflake/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
// This is necessary because CSV writers require values to in `string`.
func CastColValStaging(colVal interface{}, colKind typing.Column) (string, error) {
if colVal == nil {
return "", fmt.Errorf("colVal is nil")
// \\N needs to match NULL_IF(...) from ddl.go
return `\\N`, nil
}

colValString := fmt.Sprint(colVal)
Expand Down
18 changes: 18 additions & 0 deletions clients/snowflake/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ func evaluateTestCase(t *testing.T, testCase _testCase) {

func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
testCases := []_testCase{
{
name: "empty string",
colVal: "",
colKind: typing.Column{
KindDetails: typing.String,
},

expectedString: "",
},
{
name: "null value (string, not that it matters)",
colVal: nil,
colKind: typing.Column{
KindDetails: typing.String,
},

expectedString: `\\N`,
},
{
name: "string",
colVal: "foo",
Expand Down
16 changes: 6 additions & 10 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,13 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableNa
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal := value[col]
if colVal != nil {
// Check
castedValue, castErr := CastColValStaging(colVal, colKind)
if castErr != nil {
return "", castErr
}

row = append(row, castedValue)
} else {
row = append(row, "")
// Check
castedValue, castErr := CastColValStaging(colVal, colKind)
if castErr != nil {
return "", castErr
}

row = append(row, castedValue)
}

if err = writer.Write(row); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *SnowflakeTestSuite) TestPrepareTempTable() {
createQuery, _ := s.fakeStageStore.ExecArgsForCall(0)

prefixQuery := fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s (user_id string,first_name string,last_name string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"') COMMENT=`, tempTableName)
`CREATE TABLE IF NOT EXISTS %s (user_id string,first_name string,last_name string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT=`, tempTableName)
containsPrefix := strings.HasPrefix(createQuery, prefixQuery)
assert.True(s.T(), containsPrefix, fmt.Sprintf("createQuery:%v, prefixQuery:%s", createQuery, prefixQuery))
resourceName := addPrefixToTableName(tempTableName, "%")
Expand Down
1 change: 0 additions & 1 deletion clients/snowflake/sweep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (s *SnowflakeTestSuite) TestSweep() {
})

assert.NoError(s.T(), s.stageStore.Sweep(s.ctx))
fmt.Println(s.fakeStageStore.QueryCallCount())
query, _ := s.fakeStageStore.QueryArgsForCall(0)
assert.Equal(s.T(), `SELECT table_name, comment FROM db.information_schema.tables where table_name ILIKE '%__artie%' AND table_schema = UPPER('schema')`, query)
}
2 changes: 1 addition & 1 deletion lib/dwh/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func AlterTable(_ context.Context, args AlterTableArgs, cols ...typing.Column) e
// TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table
// PURGE syntax - https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#purging-files-after-loading
// FIELD_OPTIONALLY_ENCLOSED_BY - is needed because CSV will try to escape any values that have `"`
sqlQuery = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"') COMMENT='%s'`,
sqlQuery = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT='%s'`,
args.FqTableName, strings.Join(colSQLParts, ","),
// Comment on the table
fmt.Sprintf("%s%s", constants.SnowflakeExpireCommentPrefix, expiryString))
Expand Down
3 changes: 2 additions & 1 deletion lib/dwh/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func (d *DDLTestSuite) TestCreateTemporaryTable() {
assert.NoError(d.T(), err)
assert.Equal(d.T(), 1, d.fakeSnowflakeStagesStore.ExecCallCount())
query, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(0)

assert.Contains(d.T(),
query,
`CREATE TABLE IF NOT EXISTS db.schema.tempTableName (foo string,bar float) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"') COMMENT=`,
`CREATE TABLE IF NOT EXISTS db.schema.tempTableName (foo string,bar float) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT=`,
query)

// BigQuery
Expand Down
9 changes: 4 additions & 5 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ type Event struct {
Table string
PrimaryKeyMap map[string]interface{}
Data map[string]interface{} // json serialized column data
OptiomalSchema map[string]typing.KindDetails
OptionalSchema map[string]typing.KindDetails
Columns *typing.Columns
ExecutionTime time.Time // When the SQL command was executed

}

func ToMemoryEvent(ctx context.Context, event cdc.Event, pkMap map[string]interface{}, tc *kafkalib.TopicConfig) Event {
return Event{
Table: tc.TableName,
PrimaryKeyMap: pkMap,
ExecutionTime: event.GetExecutionTime(),
OptiomalSchema: event.GetOptionalSchema(ctx),
OptionalSchema: event.GetOptionalSchema(ctx),
Columns: event.GetColumns(),
Data: event.GetData(ctx, pkMap, tc),
}
Expand Down Expand Up @@ -147,15 +146,15 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes
// This would only happen if the columns did not get passed in initially.
inMemoryColumns.AddColumn(typing.Column{
Name: newColName,
KindDetails: typing.ParseValue(_col, e.OptiomalSchema, val),
KindDetails: typing.ParseValue(_col, e.OptionalSchema, val),
})
} else {
if retrievedColumn.KindDetails == typing.Invalid {
// If colType is Invalid, let's see if we can update it to a better type
// If everything is nil, we don't need to add a column
// However, it's important to create a column even if it's nil.
// This is because we don't want to think that it's okay to drop a column in DWH
if kindDetails := typing.ParseValue(_col, e.OptiomalSchema, val); kindDetails.Kind != typing.Invalid.Kind {
if kindDetails := typing.ParseValue(_col, e.OptionalSchema, val); kindDetails.Kind != typing.Invalid.Kind {
retrievedColumn.KindDetails = kindDetails
inMemoryColumns.UpdateColumn(retrievedColumn)
}
Expand Down
2 changes: 1 addition & 1 deletion models/event/event_save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *EventsTestSuite) TestEventSaveOptionalSchema() {
"json_object_string": `{"foo": "bar"}`,
"json_object_no_schema": `{"foo": "bar"}`,
},
OptiomalSchema: map[string]typing.KindDetails{
OptionalSchema: map[string]typing.KindDetails{
// Explicitly casting this as a string.
"created_at_date_string": typing.String,
"json_object_string": typing.String,
Expand Down

0 comments on commit f845242

Please sign in to comment.