Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent c767dca commit 40a1a72
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 34 deletions.
35 changes: 22 additions & 13 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,63 @@ import (

const maxRedshiftLength int32 = 65535

func canIncreasePrecision(colKind typing.KindDetails) bool {
func canIncreasePrecision(colKind typing.KindDetails, valueLength int32) bool {
if colKind.Kind == typing.String.Kind && colKind.OptionalStringPrecision != nil {
return maxRedshiftLength > *colKind.OptionalStringPrecision
return maxRedshiftLength > *colKind.OptionalStringPrecision && valueLength <= maxRedshiftLength
}

return false
}

func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
// replaceExceededValues replaces the value with a marker if it exceeds the maximum length
// Returns the value and boolean indicating whether the column should be increased or not.
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool) {
if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind {
maxLength := maxRedshiftLength
// If the customer has specified the maximum string precision, let's use that as the max length.
if colKind.OptionalStringPrecision != nil {
maxLength = *colKind.OptionalStringPrecision
}

if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace {
colValLength := int32(len(colVal))
if shouldReplace := colValLength > maxLength; shouldReplace {
if colKind.Kind == typing.Struct.Kind {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), false
}

if increaseStringPrecision && canIncreasePrecision(colKind, colValLength) {
return colVal, true
}

if truncateExceededValue {
return colVal[:maxLength]
return colVal[:maxLength], false
} else {
return constants.ExceededValueMarker
return constants.ExceededValueMarker, false
}
}
}

return colVal
return colVal, false
}

func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, increaseStringPrecision bool) (string, bool, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
return "", nil
return "", false, nil
}

// This matches the COPY clause for NULL terminator.
return `\N`, nil
return `\N`, false, nil
}

colValString, err := values.ToString(colVal, colKind)
if err != nil {
return "", err
return "", false, err
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind, truncateExceededValue), nil

colValue, shouldIncreaseColumn := replaceExceededValues(colValString, colKind, truncateExceededValue, increaseStringPrecision)
return colValue, shouldIncreaseColumn, nil
}
63 changes: 43 additions & 20 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,58 +45,58 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() {
// Irrelevant data type
{
// Integer
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false))
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false, false))

Check failure on line 48 in clients/redshift/cast_test.go

View workflow job for this annotation

GitHub Actions / test

multiple-value replaceExceededValues("123", typing.Integer, false, false) (value of type (string, bool)) in single-value context
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false, false))

Check failure on line 54 in clients/redshift/cast_test.go

View workflow job for this annotation

GitHub Actions / test

multiple-value replaceExceededValues(value, typing.Integer, false, false) (value of type (string, bool)) in single-value context
}
}
{
// Exceeded
{
// String
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
// TruncateExceededValue = false, IncreaseStringPrecision = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false))

Check failure on line 63 in clients/redshift/cast_test.go

View workflow job for this annotation

GitHub Actions / test

multiple-value replaceExceededValues(stringutil.Random(int(maxRedshiftLength) + 1), typing.String, false, false) (value of type (string, bool)) in single-value context
}
{
// TruncateExceededValue = false, string precision specified
// TruncateExceededValue = false, string precision specified, IncreaseStringPrecision = false
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false, false))

Check failure on line 72 in clients/redshift/cast_test.go

View workflow job for this annotation

GitHub Actions / test

multiple-value replaceExceededValues("hello", stringKd, false, false) (value of type (string, bool)) in single-value context
}
{
// TruncateExceededValue = true
// TruncateExceededValue = true, IncreaseStringPrecision = false
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true, false))
}
{
// TruncateExceededValue = true, string precision specified
// TruncateExceededValue = true, string precision specified, IncreaseStringPrecision = false
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true, false))
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false))
}
}
{
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false, false))
}
}
}
Expand All @@ -107,22 +107,45 @@ func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
// TruncateExceededValue = false, IncreaseStringPrecision = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// TruncateExceededValue = true
// TruncateExceededValue = true, IncreaseStringPrecision = false
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
value, err := castColValStaging(value, typing.String, true, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
{
// TruncateExceededValue = false, IncreaseStringPrecision = true
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

value, err := castColValStaging("hello", stringKd, false, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "hello", value)
}
{
value := stringutil.Random(int(maxRedshiftLength) + 1)
// TruncateExceededValue = true, IncreaseStringPrecision = true
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

value, err := castColValStaging(value, stringKd, true, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
}
Expand All @@ -131,13 +154,13 @@ func (r *RedshiftTestSuite) TestCastColValStaging() {
// Not exceeded
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
}
Expand Down
4 changes: 4 additions & 0 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
}

func (s *Store) Dialect() sql.Dialect {
return s.dialect()
}

func (s *Store) dialect() dialect.RedshiftDialect {
return dialect.RedshiftDialect{}
}

Expand Down
11 changes: 10 additions & 1 deletion clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,20 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
castedValue, shouldIncreaseColumn, castErr := castColValStaging(
value[col.Name()],
col.KindDetails,
s.config.SharedDestinationSettings.TruncateExceededValues,
s.config.SharedDestinationSettings.IncreaseStringPrecision,
)
if castErr != nil {
return "", castErr
}

if shouldIncreaseColumn {
s.dialect().BuildIncreaseStringPrecisionQuery()

Check failure on line 108 in clients/redshift/staging.go

View workflow job for this annotation

GitHub Actions / test

not enough arguments in call to s.dialect().BuildIncreaseStringPrecisionQuery

Check failure on line 108 in clients/redshift/staging.go

View workflow job for this annotation

GitHub Actions / test

not enough arguments in call to s.dialect().BuildIncreaseStringPrecisionQuery
}

row = append(row, castedValue)
}

Expand Down

0 comments on commit 40a1a72

Please sign in to comment.