Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 25, 2024
1 parent 4617d61 commit 30f94ba
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 43 deletions.
14 changes: 10 additions & 4 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

const maxRedshiftLength int32 = 65535

func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
structOrString := colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind
if structOrString {
maxLength := maxRedshiftLength
Expand All @@ -24,14 +24,18 @@ func replaceExceededValues(colVal string, colKind typing.KindDetails) string {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
}

return constants.ExceededValueMarker
if truncateExceededValue {
return colVal[:maxLength]
} else {
return constants.ExceededValueMarker
}
}
}

return colVal
}

func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
Expand All @@ -47,6 +51,8 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) {
return "", err
}

value := replaceExceededValues(colValString, colKind, truncateExceededValue)

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind), nil
return value, nil
}
113 changes: 76 additions & 37 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,91 @@ import (

func (r *RedshiftTestSuite) TestReplaceExceededValues() {
{
// Masked, reached the DDL limit
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String))
}
{
// Masked, reached the string precision limit
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}
// Exceeded
{
// String
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
}
{
// TruncateExceededValue = false, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd))
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct))
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
}
{
// TruncateExceededValue = true, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
}
}
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String))
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
}
}
}

func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// Masked
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// Valid
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
// Exceeded
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// TruncateExceededValue = true
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
}
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
// Not exceeded
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
}
}
}
2 changes: 1 addition & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails)
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
if castErr != nil {
return "", castErr
}
Expand Down
2 changes: 1 addition & 1 deletion lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Kafka struct {

type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `json:"truncateExceededValues"`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
}

type Config struct {
Expand Down

0 comments on commit 30f94ba

Please sign in to comment.