diff --git a/clients/redshift/cast.go b/clients/redshift/cast.go index 52af74d7f..3a4ed32b7 100644 --- a/clients/redshift/cast.go +++ b/clients/redshift/cast.go @@ -8,9 +8,16 @@ import ( "github.com/artie-labs/transfer/lib/typing/values" ) +type Result struct { + Value string + // NewLength - If the value exceeded the maximum length, this will be the new length of the value. + // This is only applicable if [expandStringPrecision] is enabled. + NewLength int32 +} + const maxRedshiftLength int32 = 65535 -func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string { +func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) Result { if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind { maxLength := maxRedshiftLength // If the customer has specified the maximum string precision, let's use that as the max length. @@ -18,38 +25,44 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateEx maxLength = *colKind.OptionalStringPrecision } - if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace { + colValLength := int32(len(colVal)) + // If [expandStringPrecision] is enabled and the value is greater than the maximum length, and lte Redshift's max length. + if expandStringPrecision && colValLength > maxLength && colValLength <= maxRedshiftLength { + return Result{Value: colVal, NewLength: colValLength} + } + + if shouldReplace := colValLength > maxLength; shouldReplace { if colKind.Kind == typing.Struct.Kind { - return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker) + return Result{Value: fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)} } if truncateExceededValue { - return colVal[:maxLength] + return Result{Value: colVal[:maxLength]} } else { - return constants.ExceededValueMarker + return Result{Value: constants.ExceededValueMarker} } } } - return colVal + return Result{Value: colVal} } -func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) { +func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) (Result, error) { if colVal == nil { if colKind == typing.Struct { // Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null - return "", nil + return Result{}, nil } // This matches the COPY clause for NULL terminator. - return `\N`, nil + return Result{Value: `\N`}, nil } colValString, err := values.ToString(colVal, colKind) if err != nil { - return "", err + return Result{}, err } // Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done. - return replaceExceededValues(colValString, colKind, truncateExceededValue), nil + return replaceExceededValues(colValString, colKind, truncateExceededValue, expandStringPrecision), nil } diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go index 7867e164f..650d3dd5e 100644 --- a/clients/redshift/cast_test.go +++ b/clients/redshift/cast_test.go @@ -3,114 +3,159 @@ package redshift import ( "fmt" - "github.com/artie-labs/transfer/lib/stringutil" - "github.com/artie-labs/transfer/lib/config/constants" - + "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) func (r *RedshiftTestSuite) TestReplaceExceededValues() { { - // Irrelevant data type + // expandStringPrecision = false { - // Integer - assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false)) + // Irrelevant data type + { + // Integer + result := replaceExceededValues("123", typing.Integer, false, false) + assert.Equal(r.T(), "123", result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + // Returns the full value since it's not a struct or string + // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. + value := stringutil.Random(int(maxRedshiftLength + 1)) + result := replaceExceededValues(value, typing.Integer, false, false) + assert.Equal(r.T(), value, result.Value) + assert.Zero(r.T(), result.NewLength) + } + } + { + // Exceeded + { + // String + { + // TruncateExceededValue = false + result := replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + // TruncateExceededValue = false, string precision specified + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(3)), + } + + result := replaceExceededValues("hello", stringKd, false, false) + assert.Equal(r.T(), constants.ExceededValueMarker, result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + // TruncateExceededValue = true + superLongString := stringutil.Random(int(maxRedshiftLength) + 1) + result := replaceExceededValues(superLongString, typing.String, true, false) + assert.Equal(r.T(), superLongString[:maxRedshiftLength], result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + // TruncateExceededValue = true, string precision specified + stringKd := typing.KindDetails{ + Kind: typing.String.Kind, + OptionalStringPrecision: typing.ToPtr(int32(3)), + } + + result := replaceExceededValues("hello", stringKd, true, false) + assert.Equal(r.T(), "hel", result.Value) + assert.Zero(r.T(), result.NewLength) + } + } + { + // Struct and masked + result := replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false) + assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), result.Value) + assert.Zero(r.T(), result.NewLength) + } } { - // Returns the full value since it's not a struct or string - // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. - value := stringutil.Random(int(maxRedshiftLength + 1)) - assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false)) + // Valid + { + // Not masked + { + result := replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false) + assert.Equal(r.T(), `{"foo": "bar"}`, result.Value) + assert.Zero(r.T(), result.NewLength) + } + { + result := replaceExceededValues("hello world", typing.String, false, false) + assert.Equal(r.T(), "hello world", result.Value) + assert.Zero(r.T(), result.NewLength) + } + } } } { - // Exceeded + // expandStringPrecision = true { - // String + // Irrelevant data type + { + // Integer + result := replaceExceededValues("123", typing.Integer, false, true) + assert.Equal(r.T(), "123", result.Value) + assert.Zero(r.T(), result.NewLength) + } { - // TruncateExceededValue = false - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)) + // Returns the full value since it's not a struct or string + // This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings. + value := stringutil.Random(int(maxRedshiftLength + 1)) + result := replaceExceededValues(value, typing.Integer, false, true) + assert.Equal(r.T(), value, result.Value) + assert.Zero(r.T(), result.NewLength) } + } + { + // Exceeded the column string precision but not Redshift's max length { - // TruncateExceededValue = false, string precision specified stringKd := typing.KindDetails{ Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(3)), } - assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false)) - } - { - // TruncateExceededValue = true - superLongString := stringutil.Random(int(maxRedshiftLength) + 1) - assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true)) + result := replaceExceededValues("hello", stringKd, true, true) + assert.Equal(r.T(), "hello", result.Value) + assert.Equal(r.T(), int32(5), result.NewLength) } + } + { + // Exceeded both column and Redshift precision, so the value got replaced with an exceeded placeholder. { - // TruncateExceededValue = true, string precision specified stringKd := typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: typing.ToPtr(int32(3)), + OptionalStringPrecision: typing.ToPtr(maxRedshiftLength), } - assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true)) + superLongString := stringutil.Random(int(maxRedshiftLength) + 1) + result := replaceExceededValues(superLongString, stringKd, false, true) + assert.Equal(r.T(), constants.ExceededValueMarker, result.Value) + assert.Zero(r.T(), result.NewLength) } } - { - // Struct and masked - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)) - } - } - { - // Valid - { - // Not masked - assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false)) - assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false)) - } } } func (r *RedshiftTestSuite) TestCastColValStaging() { { - // Exceeded - { - // String - { - // TruncateExceededValue = false - value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), constants.ExceededValueMarker, value) - } - { - // TruncateExceededValue = true - value := stringutil.Random(int(maxRedshiftLength) + 1) - value, err := castColValStaging(value, typing.String, true) - assert.NoError(r.T(), err) - assert.Equal(r.T(), value[:maxRedshiftLength], value) - } - } - { - // Masked struct - value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false) - assert.NoError(r.T(), err) - assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value) - } - } - { - // Not exceeded + // nil { - // Valid string - value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false) + // Struct + result, err := castColValStaging(nil, typing.Struct, false, false) assert.NoError(r.T(), err) - assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value) + assert.Empty(r.T(), result.Value) } { - // Valid struct - value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false) + // Not struct + result, err := castColValStaging(nil, typing.String, false, false) assert.NoError(r.T(), err) - assert.Equal(r.T(), `{"foo": "bar"}`, value) + assert.Equal(r.T(), `\N`, result.Value) } } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 85de9e39b..777690a70 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -94,12 +94,19 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID for _, value := range tableData.Rows() { var row []string for _, col := range columns { - castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues) - if castErr != nil { - return "", castErr + result, err := castColValStaging( + value[col.Name()], + col.KindDetails, + s.config.SharedDestinationSettings.TruncateExceededValues, + s.config.SharedDestinationSettings.ExpandStringPrecision, + ) + + if err != nil { + return "", err } - row = append(row, castedValue) + // TODO: Do something about result.NewLength + row = append(row, result.Value) } if err = writer.Write(row); err != nil { diff --git a/lib/config/types.go b/lib/config/types.go index 8727f7070..665c55ccb 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -39,6 +39,9 @@ type Kafka struct { type SharedDestinationSettings struct { // TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value` TruncateExceededValues bool `yaml:"truncateExceededValues"` + // TODO: Update the yaml annotation once it's supported. + // ExpandStringPrecision - This will expand the string precision based on the values that come in, if the destination supports it. + ExpandStringPrecision bool `yaml:"_expandStringPrecision"` } type Reporting struct {