From 4a5ff3f73126fda4c9845c47f93982252a231238 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 6 Oct 2024 21:51:18 -0700 Subject: [PATCH] Basic scaffold. --- clients/redshift/cast.go | 6 +- clients/redshift/cast_test.go | 156 +++++++++++++++++----------------- clients/redshift/staging.go | 2 +- lib/config/types.go | 3 + 4 files changed, 87 insertions(+), 80 deletions(-) diff --git a/clients/redshift/cast.go b/clients/redshift/cast.go index 52af74d7f..8d2751de8 100644 --- a/clients/redshift/cast.go +++ b/clients/redshift/cast.go @@ -10,7 +10,7 @@ import ( const maxRedshiftLength int32 = 65535 -func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string { +func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) string { if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind { maxLength := maxRedshiftLength // If the customer has specified the maximum string precision, let's use that as the max length. @@ -34,7 +34,7 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateEx return colVal } -func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) { +func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) (string, error) { if colVal == nil { if colKind == typing.Struct { // Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null @@ -51,5 +51,5 @@ func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededV } // Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done. - return replaceExceededValues(colValString, colKind, truncateExceededValue), nil + return replaceExceededValues(colValString, colKind, truncateExceededValue, expandStringPrecision), nil } diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go index 7867e164f..268c771ba 100644 --- a/clients/redshift/cast_test.go +++ b/clients/redshift/cast_test.go @@ -3,114 +3,118 @@ package redshift import ( "fmt" - "github.com/artie-labs/transfer/lib/stringutil" - "github.com/artie-labs/transfer/lib/config/constants" - + "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) func (r *RedshiftTestSuite) TestReplaceExceededValues() { { - // Irrelevant data type - { - // Integer - assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false)) - } + // expandStringPrecision = false { - // Returns the full value since it's not a struct or string - // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. - value := stringutil.Random(int(maxRedshiftLength + 1)) - assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false)) - } - } - { - // Exceeded - { - // String - { - // TruncateExceededValue = false - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)) - } + // Irrelevant data type { - // TruncateExceededValue = false, string precision specified - stringKd := typing.KindDetails{ - Kind: typing.String.Kind, - OptionalStringPrecision: typing.ToPtr(int32(3)), - } - - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false)) + // Integer + assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false, false)) } { - // TruncateExceededValue = true - superLongString := stringutil.Random(int(maxRedshiftLength) + 1) - assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true)) + // Returns the full value since it's not a struct or string + // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. + value := stringutil.Random(int(maxRedshiftLength + 1)) + assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false, false)) } + } + { + // Exceeded { - // TruncateExceededValue = true, string precision specified - stringKd := typing.KindDetails{ - Kind: typing.String.Kind, - OptionalStringPrecision: typing.ToPtr(int32(3)), + // String + { + // TruncateExceededValue = false + assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)) } + { + // TruncateExceededValue = false, string precision specified + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(3)), + } - assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true)) + assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false, false)) + } + { + // TruncateExceededValue = true + superLongString := stringutil.Random(int(maxRedshiftLength) + 1) + assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true, false)) + } + { + // TruncateExceededValue = true, string precision specified + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(3)), + } + + assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true, false)) + } + } + { + // Struct and masked + assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)) } } { - // Struct and masked - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)) - } - } - { - // Valid - { - // Not masked - assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false)) - assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false)) + // Valid + { + // Not masked + assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false)) + assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false, false)) + } } } } func (r *RedshiftTestSuite) TestCastColValStaging() { { - // Exceeded { - // String + // expandStringPrecision = false + // Exceeded { - // TruncateExceededValue = false - value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), constants.ExceededValueMarker, value) + // String + { + // TruncateExceededValue = false + value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), constants.ExceededValueMarker, value) + } + { + // TruncateExceededValue = true + value := stringutil.Random(int(maxRedshiftLength) + 1) + value, err := castColValStaging(value, typing.String, true, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), value[:maxRedshiftLength], value) + } } { - // TruncateExceededValue = true - value := stringutil.Random(int(maxRedshiftLength) + 1) - value, err := castColValStaging(value, typing.String, true) + // Masked struct + value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) assert.NoError(r.T(), err) - assert.Equal(r.T(), value[:maxRedshiftLength], value) + assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) } } { - // Masked struct - value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) - } - } - { - // Not exceeded - { - // Valid string - value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value) - } - { - // Valid struct - value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), `{"foo": "bar"}`, value) + // Not exceeded + { + // Valid string + value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value) + } + { + // Valid struct + value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), `{"foo": "bar"}`, value) + } } } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 85de9e39b..3d7b881e0 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -94,7 +94,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID for _, value := range tableData.Rows() { var row []string for _, col := range columns { - castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues) + castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues, s.config.SharedDestinationSettings.ExpandStringPrecision) if castErr != nil { return "", castErr } diff --git a/lib/config/types.go b/lib/config/types.go index 8727f7070..665c55ccb 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -39,6 +39,9 @@ type Kafka struct { type SharedDestinationSettings struct { // TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value` TruncateExceededValues bool `yaml:"truncateExceededValues"` + // TODO: Update the yaml annotation once it's supported. + // ExpandStringPrecision - This will expand the string precision based on the values that come in, if the destination supports it. + ExpandStringPrecision bool `yaml:"_expandStringPrecision"` } type Reporting struct {