Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redshift] Support truncating values #928

Merged
merged 13 commits into from
Sep 26, 2024
15 changes: 9 additions & 6 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (

const maxRedshiftLength int32 = 65535

func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
structOrString := colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind
if structOrString {
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind {
maxLength := maxRedshiftLength
// If the customer has specified the maximum string precision, let's use that as the max length.
if colKind.OptionalStringPrecision != nil {
Expand All @@ -24,14 +23,18 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
}

return constants.ExceededValueMarker
if truncateExceededValue {
return colVal[:maxLength]
} else {
return constants.ExceededValueMarker
}
}
}

return colVal
}

func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
Expand All @@ -48,5 +51,5 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind), nil
return replaceExceededValues(colValString, colKind, truncateExceededValue), nil
}
122 changes: 87 additions & 35 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,104 @@ import (

func (r *RedshiftTestSuite) TestReplaceExceededValues() {
{
// Masked, reached the DDL limit
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String))
}
{
// Masked, reached the string precision limit
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
// Irrelevant data type
{
// Integer
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false))
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false))
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd))
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct))
// Exceeded
{
// String
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
}
{
// TruncateExceededValue = false, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
}
{
// TruncateExceededValue = true, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test case where a struct value gets truncated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test case where a struct value gets truncated?

They don't get truncated right now because it's unclear how we would actually truncate the struct (e.g. which key do we truncate and still maintain valid JSON?)

}
}
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String))
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
}
}
}

func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// Masked
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// Valid
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
// Exceeded
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// TruncateExceededValue = true
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
}
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
// Not exceeded
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
}
}
}
2 changes: 1 addition & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails)
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
if castErr != nil {
return "", castErr
}
Expand Down
7 changes: 7 additions & 0 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type Kafka struct {
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Expand All @@ -57,6 +62,8 @@ type Config struct {
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}
Expand Down