Skip to content

Commit

Permalink
Redshift (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 24, 2023
1 parent f2bf773 commit 259c0f1
Show file tree
Hide file tree
Showing 38 changed files with 1,983 additions and 193 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
- [Destinations](https://docs.artie.so/real-time-destinations/overview):
- Snowflake
- BigQuery
- Redshift

- [Sources](https://docs.artie.so/real-time-sources/overview):
- MongoDB
Expand Down
2 changes: 1 addition & 1 deletion clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Store) backfillColumn(ctx context.Context, column columns.Column, fqTab

defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{
Escape: true,
BigQuery: true,
DestKind: s.Label(),
})

if err != nil {
Expand Down
88 changes: 88 additions & 0 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package redshift

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/artie-labs/transfer/lib/array"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

// CastColValStaging - takes `colVal` interface{} and `colKind` typing.Column and converts the value into a string value
// This is necessary because CSV writers require values to in `string`.
func CastColValStaging(colVal interface{}, colKind columns.Column) (string, error) {
if colVal == nil {
// This matches the COPY clause for NULL terminator.
return `\N`, nil
}

colValString := fmt.Sprint(colVal)
switch colKind.KindDetails.Kind {
// All the other types do not need string wrapping.
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
}

switch extTime.NestedKind.Type {
case ext.TimeKindType:
colValString = extTime.String(ext.PostgresTimeFormatNoTZ)
default:
colValString = extTime.String("")
}

case typing.String.Kind:
// TODO: Worth writing a benchmark whether we should check for prefix and suffix of `[ ]`
// Check if it's an array.
list, err := array.InterfaceToArrayString(colVal)
if err == nil {
colValString = "[" + strings.Join(list, ",") + "]"
} else {
colValString = stringutil.Wrap(colVal, true)
}

case typing.Struct.Kind:
if colKind.KindDetails == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = map[string]interface{}{
"key": constants.ToastUnavailableValuePlaceholder,
}
}

if reflect.TypeOf(colVal).Kind() != reflect.String {
colValBytes, err := json.Marshal(colVal)
if err != nil {
return "", err
}

colValString = string(colValBytes)
}
}
case typing.Array.Kind:
colValBytes, err := json.Marshal(colVal)
if err != nil {
return "", err
}

colValString = string(colValBytes)
case typing.EDecimal.Kind:
val, isOk := colVal.(*decimal.Decimal)
if !isOk {
return "", fmt.Errorf("colVal is not *decimal.Decimal type")
}

return val.String(), nil
}

return colValString, nil

}
263 changes: 263 additions & 0 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package redshift

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/transfer/lib/typing/decimal"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
)

type _testCase struct {
name string
colVal interface{}
colKind columns.Column

expectedString string
expectErr bool
}

func evaluateTestCase(t *testing.T, testCase _testCase) {
actualString, actualErr := CastColValStaging(testCase.colVal, testCase.colKind)
if testCase.expectErr {
assert.Error(t, actualErr, testCase.name)
} else {
assert.NoError(t, actualErr, testCase.name)
}
assert.Equal(t, testCase.expectedString, actualString, testCase.name)
}

func (r *RedshiftTestSuite) TestCastColValStaging_Basic() {
testCases := []_testCase{
{
name: "empty string",
colVal: "",
colKind: columns.Column{
KindDetails: typing.String,
},

expectedString: "",
},
{
name: "null value (string, not that it matters)",
colVal: nil,
colKind: columns.Column{
KindDetails: typing.String,
},

expectedString: `\N`,
},
{
name: "string",
colVal: "foo",
colKind: columns.Column{
KindDetails: typing.String,
},

expectedString: "foo",
},
{
name: "integer",
colVal: 7,
colKind: columns.Column{
KindDetails: typing.Integer,
},
expectedString: "7",
},
{
name: "boolean",
colVal: true,
colKind: columns.Column{
KindDetails: typing.Boolean,
},
expectedString: "true",
},
{
name: "array",
colVal: []string{"hello", "there"},
colKind: columns.Column{
KindDetails: typing.Array,
},
expectedString: `["hello","there"]`,
},
{
name: "array (string with interface type)",
colVal: []interface{}{"hello", "there", "world"},
colKind: columns.Column{
KindDetails: typing.String,
},
expectedString: `["hello","there","world"]`,
},
{
name: "JSON string",
colVal: `{"hello": "world"}`,
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello": "world"}`,
},
{
name: "JSON struct",
colVal: map[string]string{"hello": "world"},
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: `{"hello":"world"}`,
},
{
name: "numeric data types (backwards compatibility)",
colVal: decimal.NewDecimal(2, ptr.ToInt(5), big.NewFloat(55.22)),
colKind: columns.Column{
KindDetails: typing.Float,
},

expectedString: "55.22",
},
{
name: "numeric data types",
colVal: decimal.NewDecimal(2, ptr.ToInt(38), big.NewFloat(585692791691858.25)),
colKind: columns.Column{
KindDetails: typing.EDecimal,
},
expectedString: "585692791691858.25",
},
}

for _, testCase := range testCases {
evaluateTestCase(r.T(), testCase)
}
}

func (r *RedshiftTestSuite) TestCastColValStaging_Array() {
testCases := []_testCase{
{
name: "array w/ numbers",
colVal: []int{1, 2, 3, 4, 5},
colKind: columns.Column{
KindDetails: typing.Array,
},
expectedString: `[1,2,3,4,5]`,
},
{
name: "array w/ nested objects (JSON)",
colKind: columns.Column{
KindDetails: typing.Array,
},
colVal: []map[string]interface{}{
{
"dusty": "the mini aussie",
},
{
"robin": "tang",
},
{
"foo": "bar",
},
},
expectedString: `[{"dusty":"the mini aussie"},{"robin":"tang"},{"foo":"bar"}]`,
},
{
name: "array w/ bools",
colKind: columns.Column{
KindDetails: typing.Array,
},
colVal: []bool{
true,
true,
false,
false,
true,
},
expectedString: `[true,true,false,false,true]`,
},
}

for _, testCase := range testCases {
evaluateTestCase(r.T(), testCase)
}
}

// TestCastColValStaging_Time - will test all the variants of date, time and date time.
func (r *RedshiftTestSuite) TestCastColValStaging_Time() {
birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC)
// date
dateKind := typing.ETime
dateKind.ExtendedTimeDetails = &ext.Date
// time
timeKind := typing.ETime
timeKind.ExtendedTimeDetails = &ext.Time
// date time
dateTimeKind := typing.ETime
dateTimeKind.ExtendedTimeDetails = &ext.DateTime

birthdate, err := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "")
assert.NoError(r.T(), err)

birthTime, err := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "")
assert.NoError(r.T(), err)

birthDateTime, err := ext.NewExtendedTime(birthday, dateTimeKind.ExtendedTimeDetails.Type, "")
assert.NoError(r.T(), err)

testCases := []_testCase{
{
name: "date",
colVal: birthdate,
colKind: columns.Column{
KindDetails: dateKind,
},
expectedString: "2022-09-06",
},
{
name: "time",
colVal: birthTime,
colKind: columns.Column{
KindDetails: timeKind,
},
expectedString: "03:19:24.942",
},
{
name: "datetime",
colVal: birthDateTime,
colKind: columns.Column{
KindDetails: dateTimeKind,
},
expectedString: "2022-09-06T03:19:24.942Z",
},
}

for _, testCase := range testCases {
evaluateTestCase(r.T(), testCase)
}
}

func (r *RedshiftTestSuite) TestCastColValStaging_TOAST() {
// Toast only really matters for JSON blobs since it'll return a STRING value that's not a JSON object.
// We're testing that we're casting the unavailable value correctly into a JSON object so that it can compile.
testCases := []_testCase{
{
name: "struct with TOAST value",
colVal: constants.ToastUnavailableValuePlaceholder,
colKind: columns.Column{
KindDetails: typing.Struct,
},
expectedString: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder),
},
}

for _, testCase := range testCases {
evaluateTestCase(r.T(), testCase)
}
}
Loading

0 comments on commit 259c0f1

Please sign in to comment.