diff --git a/clients/redshift/cast.go b/clients/redshift/cast.go index 8d2751de8..397a17b2f 100644 --- a/clients/redshift/cast.go +++ b/clients/redshift/cast.go @@ -8,9 +8,16 @@ import ( "github.com/artie-labs/transfer/lib/typing/values" ) +type Result struct { + Value string + // NewLength - If the value exceeded the maximum length, this will be the new length of the value. + // This is only applicable if [expandStringPrecision] is enabled. + NewLength int32 +} + const maxRedshiftLength int32 = 65535 -func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) string { +func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) Result { if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind { maxLength := maxRedshiftLength // If the customer has specified the maximum string precision, let's use that as the max length. @@ -18,36 +25,42 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateEx maxLength = *colKind.OptionalStringPrecision } - if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace { + colValLength := int32(len(colVal)) + // If [expandStringPrecision] is enabled and the value is greater than the maximum length, but less than the maximum Redshift length. + if expandStringPrecision && colValLength > maxLength && colValLength < maxRedshiftLength { + return Result{Value: colVal, NewLength: colValLength} + } + + if shouldReplace := colValLength > maxLength; shouldReplace { if colKind.Kind == typing.Struct.Kind { - return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker) + return Result{Value: fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)} } if truncateExceededValue { - return colVal[:maxLength] + return Result{Value: colVal[:maxLength]} } else { - return constants.ExceededValueMarker + return Result{Value: constants.ExceededValueMarker} } } } - return colVal + return Result{Value: colVal} } -func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) (string, error) { +func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) (Result, error) { if colVal == nil { if colKind == typing.Struct { // Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null - return "", nil + return Result{}, nil } // This matches the COPY clause for NULL terminator. - return `\N`, nil + return Result{Value: `\N`}, nil } colValString, err := values.ToString(colVal, colKind) if err != nil { - return "", err + return Result{}, err } // Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done. diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go index 268c771ba..b90499ea5 100644 --- a/clients/redshift/cast_test.go +++ b/clients/redshift/cast_test.go @@ -16,13 +16,17 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { // Irrelevant data type { // Integer - assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false, false)) + result := replaceExceededValues("123", typing.Integer, false, false) + assert.Equal(r.T(), "123", result.Value) + assert.Zero(r.T(), result.NewLength) } { // Returns the full value since it's not a struct or string // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. value := stringutil.Random(int(maxRedshiftLength + 1)) - assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false, false)) + result := replaceExceededValues(value, typing.Integer, false, false) + assert.Equal(r.T(), value, result.Value) + assert.Zero(r.T(), result.NewLength) } } { @@ -31,7 +35,9 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { // String { // TruncateExceededValue = false - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)) + result := replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, result.Value) + assert.Zero(r.T(), result.NewLength) } { // TruncateExceededValue = false, string precision specified @@ -40,12 +46,16 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { OptionalStringPrecision: typing.ToPtr(int32(3)), } - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false, false)) + result := replaceExceededValues("hello", stringKd, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, result.Value) + assert.Zero(r.T(), result.NewLength) } { // TruncateExceededValue = true superLongString := stringutil.Random(int(maxRedshiftLength) + 1) - assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true, false)) + result := replaceExceededValues(superLongString, typing.String, true, false) + assert.Equal(r.T(), superLongString[:maxRedshiftLength], result.Value) + assert.Zero(r.T(), result.NewLength) } { // TruncateExceededValue = true, string precision specified @@ -54,20 +64,32 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { OptionalStringPrecision: typing.ToPtr(int32(3)), } - assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true, false)) + result := replaceExceededValues("hello", stringKd, true, false) + assert.Equal(r.T(), "hel", result.Value) + assert.Zero(r.T(), result.NewLength) } } { // Struct and masked - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)) + result := replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) + assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), result.Value) + assert.Zero(r.T(), result.NewLength) } } { // Valid { // Not masked - assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false)) - assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false, false)) + { + result := replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false) + assert.Equal(r.T(), `{"foo": "bar"}`, result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + result := replaceExceededValues("hello world", typing.String, false, false) + assert.Equal(r.T(), "hello world", result.Value) + assert.Zero(r.T(), result.NewLength) + } } } } @@ -75,46 +97,18 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { func (r *RedshiftTestSuite) TestCastColValStaging() { { + // nil { - // expandStringPrecision = false - // Exceeded - { - // String - { - // TruncateExceededValue = false - value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), constants.ExceededValueMarker, value) - } - { - // TruncateExceededValue = true - value := stringutil.Random(int(maxRedshiftLength) + 1) - value, err := castColValStaging(value, typing.String, true, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), value[:maxRedshiftLength], value) - } - } - { - // Masked struct - value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) - } + // Struct + result, err := castColValStaging(nil, typing.Struct, false, false) + assert.NoError(r.T(), err) + assert.Empty(r.T(), result.Value) } { - // Not exceeded - { - // Valid string - value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value) - } - { - // Valid struct - value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), `{"foo": "bar"}`, value) - } + // Not struct + result, err := castColValStaging(nil, typing.String, false, false) + assert.NoError(r.T(), err) + assert.Equal(r.T(), `\N`, result.Value) } } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 3d7b881e0..a2ffbc102 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -94,12 +94,12 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID for _, value := range tableData.Rows() { var row []string for _, col := range columns { - castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues, s.config.SharedDestinationSettings.ExpandStringPrecision) - if castErr != nil { - return "", castErr + result, err := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues, s.config.SharedDestinationSettings.ExpandStringPrecision) + if err != nil { + return "", err } - row = append(row, castedValue) + row = append(row, result.Value) } if err = writer.Write(row); err != nil {