Skip to content

Commit

Permalink
Escape primary key comparison for keywords (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 11, 2023
1 parent 0f4ac4b commit 3e61acc
Show file tree
Hide file tree
Showing 41 changed files with 595 additions and 340 deletions.
4 changes: 3 additions & 1 deletion clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/array"

"github.com/artie-labs/transfer/lib/config/constants"
Expand All @@ -12,7 +14,7 @@ import (
"github.com/artie-labs/transfer/lib/typing"
)

func CastColVal(colVal interface{}, colKind typing.Column) (interface{}, error) {
func CastColVal(colVal interface{}, colKind columns.Column) (interface{}, error) {
if colVal != nil {
switch colKind.KindDetails.Kind {
case typing.ETime.Kind:
Expand Down
20 changes: 11 additions & 9 deletions clients/bigquery/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing/ext"
Expand All @@ -18,7 +20,7 @@ func TestCastColVal(t *testing.T) {
type _testCase struct {
name string
colVal interface{}
colKind typing.Column
colKind columns.Column

expectedErr error
expectedValue interface{}
Expand Down Expand Up @@ -46,49 +48,49 @@ func TestCastColVal(t *testing.T) {
{
name: "escaping string",
colVal: "foo",
colKind: typing.Column{KindDetails: typing.String},
colKind: columns.Column{KindDetails: typing.String},
expectedValue: "foo",
},
{
name: "123 as int",
colVal: 123,
colKind: typing.Column{KindDetails: typing.Integer},
colKind: columns.Column{KindDetails: typing.Integer},
expectedValue: "123",
},
{
name: "struct",
colVal: `{"hello": "world"}`,
colKind: typing.Column{KindDetails: typing.Struct},
colKind: columns.Column{KindDetails: typing.Struct},
expectedValue: `{"hello": "world"}`,
},
{
name: "struct w/ toast",
colVal: constants.ToastUnavailableValuePlaceholder,
colKind: typing.Column{KindDetails: typing.Struct},
colKind: columns.Column{KindDetails: typing.Struct},
expectedValue: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder),
},
{
name: "array",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{KindDetails: typing.Array},
colKind: columns.Column{KindDetails: typing.Array},
expectedValue: []string{"1", "2", "3", "4", "5"},
},
{
name: "timestamp",
colVal: birthdayTSExt,
colKind: typing.Column{KindDetails: tsKind},
colKind: columns.Column{KindDetails: tsKind},
expectedValue: "2022-09-06 03:19:24.942",
},
{
name: "date",
colVal: birthdayDateExt,
colKind: typing.Column{KindDetails: dateKind},
colKind: columns.Column{KindDetails: dateKind},
expectedValue: "2022-09-06",
},
{
name: "time",
colVal: birthdayTimeExt,
colKind: typing.Column{KindDetails: timeKind},
colKind: columns.Column{KindDetails: timeKind},
expectedValue: "03:19:24",
},
}
Expand Down
8 changes: 5 additions & 3 deletions clients/bigquery/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/dwh/types"
"github.com/artie-labs/transfer/lib/optimization"
Expand Down Expand Up @@ -51,7 +53,7 @@ func (s *Store) getTableConfig(ctx context.Context, tableData *optimization.Tabl
// parseSchemaQuery is to parse out the results from this query: https://cloud.google.com/bigquery/docs/information-schema-tables#example_1
func parseSchemaQuery(row string, createTable, dropDeletedColumns bool) (*types.DwhTableConfig, error) {
if createTable {
return types.NewDwhTableConfig(&typing.Columns{}, nil, createTable, dropDeletedColumns), nil
return types.NewDwhTableConfig(&columns.Columns{}, nil, createTable, dropDeletedColumns), nil
}

// TrimSpace only does the L + R side.
Expand Down Expand Up @@ -87,7 +89,7 @@ func parseSchemaQuery(row string, createTable, dropDeletedColumns bool) (*types.
return nil, fmt.Errorf("malformed DDL string: missing (, %s", ddlString)
}

var bigQueryColumns typing.Columns
var bigQueryColumns columns.Columns
ddlString = ddlString[:endOfStatement]
columnsToTypes := strings.Split(ddlString, ",")
for _, colType := range columnsToTypes {
Expand All @@ -104,7 +106,7 @@ func parseSchemaQuery(row string, createTable, dropDeletedColumns bool) (*types.
return nil, fmt.Errorf("unexpected colType, colType: %s, parts: %v", colType, parts)
}

bigQueryColumns.AddColumn(typing.NewColumn(typing.UnescapeColumnName(parts[0], constants.BigQuery), typing.BigQueryTypeToKind(strings.Join(parts[1:], " "))))
bigQueryColumns.AddColumn(columns.NewColumn(columns.UnescapeColumnName(parts[0], constants.BigQuery), typing.BigQueryTypeToKind(strings.Join(parts[1:], " "))))
}

return types.NewDwhTableConfig(&bigQueryColumns, nil, createTable, dropDeletedColumns), nil
Expand Down
12 changes: 8 additions & 4 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/artie-labs/transfer/lib/typing/columns"

"cloud.google.com/go/bigquery"

"github.com/artie-labs/transfer/lib/dwh/dml"
Expand All @@ -12,7 +14,6 @@ import (
"github.com/artie-labs/transfer/lib/dwh/ddl"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing"
)

type Row struct {
Expand Down Expand Up @@ -65,7 +66,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

log := logger.FromContext(ctx)
// Check if all the columns exist in BigQuery
srcKeysMissing, targetKeysMissing := typing.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), tableData.TopicConfig.SoftDelete)
srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), tableData.TopicConfig.SoftDelete)
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
Expand Down Expand Up @@ -151,8 +152,11 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
FqTableName: tableData.ToFqName(ctx, constants.BigQuery),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys,
Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(&typing.NameArgs{
PrimaryKeys: tableData.PrimaryKeys(&columns.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(&columns.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
Expand Down
4 changes: 3 additions & 1 deletion clients/snowflake/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
"strings"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/stringutil"

"github.com/artie-labs/transfer/lib/config/constants"
Expand All @@ -16,7 +18,7 @@ import (

// CastColValStaging - takes `colVal` interface{} and `colKind` typing.Column and converts the value into a string value
// This is necessary because CSV writers require values to in `string`.
func CastColValStaging(colVal interface{}, colKind typing.Column) (string, error) {
func CastColValStaging(colVal interface{}, colKind columns.Column) (string, error) {
if colVal == nil {
// \\N needs to match NULL_IF(...) from ddl.go
return `\\N`, nil
Expand Down
34 changes: 18 additions & 16 deletions clients/snowflake/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing/ext"
Expand All @@ -16,7 +18,7 @@ import (
type _testCase struct {
name string
colVal interface{}
colKind typing.Column
colKind columns.Column

expectedString string
expectErr bool
Expand All @@ -38,7 +40,7 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
{
name: "empty string",
colVal: "",
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.String,
},

Expand All @@ -47,7 +49,7 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
{
name: "null value (string, not that it matters)",
colVal: nil,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.String,
},

Expand All @@ -56,7 +58,7 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
{
name: "string",
colVal: "foo",
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.String,
},

Expand All @@ -65,39 +67,39 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
{
name: "integer",
colVal: 7,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Integer,
},
expectedString: "7",
},
{
name: "boolean",
colVal: true,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Boolean,
},
expectedString: "true",
},
{
name: "array",
colVal: []string{"hello", "there"},
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Array,
},
expectedString: `["hello","there"]`,
},
{
name: "JSON string",
colVal: `{"hello": "world"}`,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello": "world"}`,
},
{
name: "JSON struct",
colVal: map[string]string{"hello": "world"},
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello":"world"}`,
Expand All @@ -114,14 +116,14 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Array() {
{
name: "array w/ numbers",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Array,
},
expectedString: `[1,2,3,4,5]`,
},
{
name: "array w/ nested objects (JSON)",
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Array,
},
colVal: []map[string]interface{}{
Expand All @@ -139,7 +141,7 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Array() {
},
{
name: "array w/ bools",
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Array,
},
colVal: []bool{
Expand Down Expand Up @@ -184,23 +186,23 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_Time() {
{
name: "date",
colVal: birthdate,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: dateKind,
},
expectedString: "2022-09-06",
},
{
name: "time",
colVal: birthTime,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: timeKind,
},
expectedString: "03:19:24.942",
},
{
name: "datetime",
colVal: birthDateTime,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: dateTimeKind,
},
expectedString: "2022-09-06T03:19:24.942Z",
Expand All @@ -219,7 +221,7 @@ func (s *SnowflakeTestSuite) TestCastColValStaging_TOAST() {
{
name: "struct with TOAST value",
colVal: constants.ToastUnavailableValuePlaceholder,
colKind: typing.Column{
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder),
Expand Down
12 changes: 7 additions & 5 deletions clients/snowflake/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/dwh/types"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/typing"
Expand Down Expand Up @@ -40,20 +42,20 @@ func (s *Store) getTableConfig(ctx context.Context, fqName string, dropDeletedCo
}
}

var snowflakeColumns typing.Columns
var snowflakeColumns columns.Columns
for rows != nil && rows.Next() {
// figure out what columns were returned
// the column names will be the JSON object field keys
columns, err := rows.ColumnTypes()
cols, err := rows.ColumnTypes()
if err != nil {
return nil, err
}

var columnNameList []string
// Scan needs an array of pointers to the values it is setting
// This creates the object and sets the values correctly
values := make([]interface{}, len(columns))
for idx, column := range columns {
values := make([]interface{}, len(cols))
for idx, column := range cols {
values[idx] = new(interface{})
columnNameList = append(columnNameList, strings.ToLower(column.Name()))
}
Expand All @@ -73,7 +75,7 @@ func (s *Store) getTableConfig(ctx context.Context, fqName string, dropDeletedCo
row[columnNameList[idx]] = strings.ToLower(fmt.Sprint(*interfaceVal))
}

snowflakeColumns.AddColumn(typing.NewColumn(row[describeNameCol], typing.SnowflakeTypeToKind(row[describeTypeCol])))
snowflakeColumns.AddColumn(columns.NewColumn(row[describeNameCol], typing.SnowflakeTypeToKind(row[describeTypeCol])))
}

sflkTableConfig := types.NewDwhTableConfig(&snowflakeColumns, nil, tableMissing, dropDeletedColumns)
Expand Down
Loading

0 comments on commit 3e61acc

Please sign in to comment.