Skip to content

Commit

Permalink
[Redshift] Support truncating values (#928)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 26, 2024
1 parent 12da05a commit fb58dbc
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 42 deletions.
15 changes: 9 additions & 6 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (

const maxRedshiftLength int32 = 65535

func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
structOrString := colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind
if structOrString {
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind {
maxLength := maxRedshiftLength
// If the customer has specified the maximum string precision, let's use that as the max length.
if colKind.OptionalStringPrecision != nil {
Expand All @@ -24,14 +23,18 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
}

return constants.ExceededValueMarker
if truncateExceededValue {
return colVal[:maxLength]
} else {
return constants.ExceededValueMarker
}
}
}

return colVal
}

func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
Expand All @@ -48,5 +51,5 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind), nil
return replaceExceededValues(colValString, colKind, truncateExceededValue), nil
}
122 changes: 87 additions & 35 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,104 @@ import (

func (r *RedshiftTestSuite) TestReplaceExceededValues() {
{
// Masked, reached the DDL limit
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String))
}
{
// Masked, reached the string precision limit
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
// Irrelevant data type
{
// Integer
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false))
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false))
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd))
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct))
// Exceeded
{
// String
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
}
{
// TruncateExceededValue = false, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
}
{
// TruncateExceededValue = true, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
}
}
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String))
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
}
}
}

func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// Masked
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// Valid
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
// Exceeded
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// TruncateExceededValue = true
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
}
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
// Not exceeded
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
}
}
}
2 changes: 1 addition & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails)
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
if castErr != nil {
return "", castErr
}
Expand Down
7 changes: 7 additions & 0 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type Kafka struct {
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Expand All @@ -57,6 +62,8 @@ type Config struct {
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}
Expand Down

0 comments on commit fb58dbc

Please sign in to comment.