Skip to content

Commit

Permalink
More efficient Snowflake transfer (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 24, 2023
1 parent 2185957 commit 3351c36
Show file tree
Hide file tree
Showing 26 changed files with 1,251 additions and 200 deletions.
1 change: 1 addition & 0 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func CastColVal(colVal interface{}, colKind typing.Column) (interface{}, error)
colVal = extTime.String(typing.StreamingTimeFormat)
}
// All the other types do not need string wrapping.
// TODO - what does typing.String.Kind do?
case typing.String.Kind, typing.Struct.Kind:
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
Expand Down
66 changes: 66 additions & 0 deletions clients/snowflake/cast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package snowflake

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
)

// CastColValStaging - takes `colVal` interface{} and `colKind` typing.Column and converts the value into a string value
// This is necessary because CSV writers require values to in `string`.
func CastColValStaging(colVal interface{}, colKind typing.Column) (string, error) {
if colVal == nil {
return "", fmt.Errorf("colVal is nil")
}

colValString := fmt.Sprint(colVal)
switch colKind.KindDetails.Kind {
// All the other types do not need string wrapping.
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
}

switch extTime.NestedKind.Type {
case ext.TimeKindType:
colValString = extTime.String(ext.PostgresTimeFormatNoTZ)
default:
colValString = extTime.String("")
}

case typing.Struct.Kind:
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = map[string]interface{}{
"key": constants.ToastUnavailableValuePlaceholder,
}
}

if reflect.TypeOf(colVal).Kind() != reflect.String {
colValBytes, err := json.Marshal(colVal)
if err != nil {
return "", err
}

colValString = string(colValBytes)
}
}
case typing.Array.Kind:
colValBytes, err := json.Marshal(colVal)
if err != nil {
return "", err
}

colValString = string(colValBytes)
}

return colValString, nil

}
214 changes: 214 additions & 0 deletions clients/snowflake/cast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package snowflake

import (
"fmt"
"testing"
"time"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
)

type _testCase struct {
name string
colVal interface{}
colKind typing.Column

expectedString string
expectErr bool
}

func evaluateTestCase(t *testing.T, testCase _testCase) {
actualString, actualErr := CastColValStaging(testCase.colVal, testCase.colKind)
if testCase.expectErr {
assert.Error(t, actualErr, testCase.name)
} else {
assert.NoError(t, actualErr, testCase.name)
}

assert.Equal(t, testCase.expectedString, actualString, testCase.name)
}

func (s *SnowflakeTestSuite) TestCastColValStaging_Basic() {
testCases := []_testCase{
{
name: "string",
colVal: "foo",
colKind: typing.Column{
KindDetails: typing.String,
},

expectedString: "foo",
},
{
name: "integer",
colVal: 7,
colKind: typing.Column{
KindDetails: typing.Integer,
},
expectedString: "7",
},
{
name: "boolean",
colVal: true,
colKind: typing.Column{
KindDetails: typing.Boolean,
},
expectedString: "true",
},
{
name: "array",
colVal: []string{"hello", "there"},
colKind: typing.Column{
KindDetails: typing.Array,
},
expectedString: `["hello","there"]`,
},
{
name: "JSON string",
colVal: `{"hello": "world"}`,
colKind: typing.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello": "world"}`,
},
{
name: "JSON struct",
colVal: map[string]string{"hello": "world"},
colKind: typing.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello":"world"}`,
},
}

for _, testCase := range testCases {
evaluateTestCase(s.T(), testCase)
}
}

func (s *SnowflakeTestSuite) TestCastColValStaging_Array() {
testCases := []_testCase{
{
name: "array w/ numbers",
colVal: []int{1, 2, 3, 4, 5},
colKind: typing.Column{
KindDetails: typing.Array,
},
expectedString: `[1,2,3,4,5]`,
},
{
name: "array w/ nested objects (JSON)",
colKind: typing.Column{
KindDetails: typing.Array,
},
colVal: []map[string]interface{}{
{
"dusty": "the mini aussie",
},
{
"robin": "tang",
},
{
"foo": "bar",
},
},
expectedString: `[{"dusty":"the mini aussie"},{"robin":"tang"},{"foo":"bar"}]`,
},
{
name: "array w/ bools",
colKind: typing.Column{
KindDetails: typing.Array,
},
colVal: []bool{
true,
true,
false,
false,
true,
},
expectedString: `[true,true,false,false,true]`,
},
}

for _, testCase := range testCases {
evaluateTestCase(s.T(), testCase)
}
}

// TestCastColValStaging_Time - will test all the variants of date, time and date time.
func (s *SnowflakeTestSuite) TestCastColValStaging_Time() {
birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC)
// date
dateKind := typing.ETime
dateKind.ExtendedTimeDetails = &ext.Date
// time
timeKind := typing.ETime
timeKind.ExtendedTimeDetails = &ext.Time
// date time
dateTimeKind := typing.ETime
dateTimeKind.ExtendedTimeDetails = &ext.DateTime

birthdate, err := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "")
assert.NoError(s.T(), err)

birthTime, err := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "")
assert.NoError(s.T(), err)

birthDateTime, err := ext.NewExtendedTime(birthday, dateTimeKind.ExtendedTimeDetails.Type, "")
assert.NoError(s.T(), err)

testCases := []_testCase{
{
name: "date",
colVal: birthdate,
colKind: typing.Column{
KindDetails: dateKind,
},
expectedString: "2022-09-06",
},
{
name: "time",
colVal: birthTime,
colKind: typing.Column{
KindDetails: timeKind,
},
expectedString: "03:19:24.942",
},
{
name: "datetime",
colVal: birthDateTime,
colKind: typing.Column{
KindDetails: dateTimeKind,
},
expectedString: "2022-09-06T03:19:24.942Z",
},
}

for _, testCase := range testCases {
evaluateTestCase(s.T(), testCase)
}
}

func (s *SnowflakeTestSuite) TestCastColValStaging_TOAST() {
// Toast only really matters for JSON blobs since it'll return a STRING value that's not a JSON object.
// We're testing that we're casting the unavailable value correctly into a JSON object so that it can compile.
testCases := []_testCase{
{
name: "struct with TOAST value",
colVal: constants.ToastUnavailableValuePlaceholder,
colKind: typing.Column{
KindDetails: typing.Struct,
},
expectedString: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder),
},
}

for _, testCase := range testCases {
evaluateTestCase(s.T(), testCase)
}
}
13 changes: 7 additions & 6 deletions clients/snowflake/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/dwh/dml"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
)

// escapeCols will return the following arguments:
Expand Down Expand Up @@ -65,13 +66,13 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {

switch extTime.NestedKind.Type {
case ext.TimeKindType:
colVal = stringutil.Wrap(extTime.String(ext.PostgresTimeFormatNoTZ))
colVal = stringutil.Wrap(extTime.String(ext.PostgresTimeFormatNoTZ), false)
default:
colVal = stringutil.Wrap(extTime.String(""))
colVal = stringutil.Wrap(extTime.String(""), false)
}

case typing.String.Kind, typing.Struct.Kind:
colVal = stringutil.Wrap(colVal)
colVal = stringutil.Wrap(colVal, false)
case typing.Array.Kind:
// We need to marshall, so we can escape the strings.
// https://go.dev/play/p/BcCwUSCeTmT
Expand All @@ -80,7 +81,7 @@ func getMergeStatement(tableData *optimization.TableData) (string, error) {
return "", err
}

colVal = stringutil.Wrap(string(colValBytes))
colVal = stringutil.Wrap(string(colValBytes), false)
}
} else {
colVal = "null"
Expand Down
4 changes: 2 additions & 2 deletions clients/snowflake/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/typing"
)

Expand Down
Loading

0 comments on commit 3351c36

Please sign in to comment.