Skip to content

Commit

Permalink
Merge branch 'master' into implement-databricks-sweep-volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 8, 2024
2 parents d067637 + 68b5f95 commit dd64ab4
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 83 deletions.
35 changes: 24 additions & 11 deletions clients/redshift/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,61 @@ import (
"github.com/artie-labs/transfer/lib/typing/values"
)

type Result struct {
Value string
// NewLength - If the value exceeded the maximum length, this will be the new length of the value.
// This is only applicable if [expandStringPrecision] is enabled.
NewLength int32
}

const maxRedshiftLength int32 = 65535

func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool) string {
func replaceExceededValues(colVal string, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) Result {
if colKind.Kind == typing.Struct.Kind || colKind.Kind == typing.String.Kind {
maxLength := maxRedshiftLength
// If the customer has specified the maximum string precision, let's use that as the max length.
if colKind.OptionalStringPrecision != nil {
maxLength = *colKind.OptionalStringPrecision
}

if shouldReplace := int32(len(colVal)) > maxLength; shouldReplace {
colValLength := int32(len(colVal))
// If [expandStringPrecision] is enabled and the value is greater than the maximum length, and lte Redshift's max length.
if expandStringPrecision && colValLength > maxLength && colValLength <= maxRedshiftLength {
return Result{Value: colVal, NewLength: colValLength}
}

if shouldReplace := colValLength > maxLength; shouldReplace {
if colKind.Kind == typing.Struct.Kind {
return fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)
return Result{Value: fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker)}
}

if truncateExceededValue {
return colVal[:maxLength]
return Result{Value: colVal[:maxLength]}
} else {
return constants.ExceededValueMarker
return Result{Value: constants.ExceededValueMarker}
}
}
}

return colVal
return Result{Value: colVal}
}

func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool) (string, error) {
func castColValStaging(colVal any, colKind typing.KindDetails, truncateExceededValue bool, expandStringPrecision bool) (Result, error) {
if colVal == nil {
if colKind == typing.Struct {
// Returning empty here because if it's a struct, it will go through JSON PARSE and JSON_PARSE("") = null
return "", nil
return Result{}, nil
}

// This matches the COPY clause for NULL terminator.
return `\N`, nil
return Result{Value: `\N`}, nil
}

colValString, err := values.ToString(colVal, colKind)
if err != nil {
return "", err
return Result{}, err
}

// Checks for DDL overflow needs to be done at the end in case there are any conversions that need to be done.
return replaceExceededValues(colValString, colKind, truncateExceededValue), nil
return replaceExceededValues(colValString, colKind, truncateExceededValue, expandStringPrecision), nil
}
181 changes: 113 additions & 68 deletions clients/redshift/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,114 +3,159 @@ package redshift
import (
"fmt"

"github.com/artie-labs/transfer/lib/stringutil"

"github.com/artie-labs/transfer/lib/config/constants"

"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
)

func (r *RedshiftTestSuite) TestReplaceExceededValues() {
{
// Irrelevant data type
// expandStringPrecision = false
{
// Integer
assert.Equal(r.T(), "123", replaceExceededValues("123", typing.Integer, false))
// Irrelevant data type
{
// Integer
result := replaceExceededValues("123", typing.Integer, false, false)
assert.Equal(r.T(), "123", result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
result := replaceExceededValues(value, typing.Integer, false, false)
assert.Equal(r.T(), value, result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
{
// Exceeded
{
// String
{
// TruncateExceededValue = false
result := replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false, false)
assert.Equal(r.T(), constants.ExceededValueMarker, result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
// TruncateExceededValue = false, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

result := replaceExceededValues("hello", stringKd, false, false)
assert.Equal(r.T(), constants.ExceededValueMarker, result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
result := replaceExceededValues(superLongString, typing.String, true, false)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
// TruncateExceededValue = true, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

result := replaceExceededValues("hello", stringKd, true, false)
assert.Equal(r.T(), "hel", result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
{
// Struct and masked
result := replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false, false)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
{
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
assert.Equal(r.T(), value, replaceExceededValues(value, typing.Integer, false))
// Valid
{
// Not masked
{
result := replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false, false)
assert.Equal(r.T(), `{"foo": "bar"}`, result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
result := replaceExceededValues("hello world", typing.String, false, false)
assert.Equal(r.T(), "hello world", result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
}
}
{
// Exceeded
// expandStringPrecision = true
{
// String
// Irrelevant data type
{
// Integer
result := replaceExceededValues("123", typing.Integer, false, true)
assert.Equal(r.T(), "123", result.Value)
assert.Zero(r.T(), result.NewLength)
}
{
// TruncateExceededValue = false
assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false))
// Returns the full value since it's not a struct or string
// This is invalid and should not happen, but it's here to ensure we're only checking for structs and strings.
value := stringutil.Random(int(maxRedshiftLength + 1))
result := replaceExceededValues(value, typing.Integer, false, true)
assert.Equal(r.T(), value, result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
{
// Exceeded the column string precision but not Redshift's max length
{
// TruncateExceededValue = false, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
}

assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd, false))
}
{
// TruncateExceededValue = true
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
assert.Equal(r.T(), superLongString[:maxRedshiftLength], replaceExceededValues(superLongString, typing.String, true))
result := replaceExceededValues("hello", stringKd, true, true)
assert.Equal(r.T(), "hello", result.Value)
assert.Equal(r.T(), int32(5), result.NewLength)
}
}
{
// Exceeded both column and Redshift precision, so the value got replaced with an exceeded placeholder.
{
// TruncateExceededValue = true, string precision specified
stringKd := typing.KindDetails{
Kind: typing.String.Kind,
OptionalStringPrecision: typing.ToPtr(int32(3)),
OptionalStringPrecision: typing.ToPtr(maxRedshiftLength),
}

assert.Equal(r.T(), "hel", replaceExceededValues("hello", stringKd, true))
superLongString := stringutil.Random(int(maxRedshiftLength) + 1)
result := replaceExceededValues(superLongString, stringKd, false, true)
assert.Equal(r.T(), constants.ExceededValueMarker, result.Value)
assert.Zero(r.T(), result.NewLength)
}
}
{
// Struct and masked
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), replaceExceededValues(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false))
}
}
{
// Valid
{
// Not masked
assert.Equal(r.T(), `{"foo": "bar"}`, replaceExceededValues(`{"foo": "bar"}`, typing.Struct, false))
assert.Equal(r.T(), "hello world", replaceExceededValues("hello world", typing.String, false))
}
}
}

func (r *RedshiftTestSuite) TestCastColValStaging() {
{
// Exceeded
{
// String
{
// TruncateExceededValue = false
value, err := castColValStaging(stringutil.Random(int(maxRedshiftLength)+1), typing.String, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), constants.ExceededValueMarker, value)
}
{
// TruncateExceededValue = true
value := stringutil.Random(int(maxRedshiftLength) + 1)
value, err := castColValStaging(value, typing.String, true)
assert.NoError(r.T(), err)
assert.Equal(r.T(), value[:maxRedshiftLength], value)
}
}
{
// Masked struct
value, err := castColValStaging(fmt.Sprintf(`{"foo": "%s"}`, stringutil.Random(int(maxRedshiftLength)+1)), typing.Struct, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), fmt.Sprintf(`{"key":"%s"}`, constants.ExceededValueMarker), value)
}
}
{
// Not exceeded
// nil
{
// Valid string
value, err := castColValStaging("thisissuperlongbutnotlongenoughtogetmasked", typing.String, false)
// Struct
result, err := castColValStaging(nil, typing.Struct, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), "thisissuperlongbutnotlongenoughtogetmasked", value)
assert.Empty(r.T(), result.Value)
}
{
// Valid struct
value, err := castColValStaging(`{"foo": "bar"}`, typing.Struct, false)
// Not struct
result, err := castColValStaging(nil, typing.String, false, false)
assert.NoError(r.T(), err)
assert.Equal(r.T(), `{"foo": "bar"}`, value)
assert.Equal(r.T(), `\N`, result.Value)
}
}
}
15 changes: 11 additions & 4 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,19 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
castedValue, castErr := castColValStaging(value[col.Name()], col.KindDetails, s.config.SharedDestinationSettings.TruncateExceededValues)
if castErr != nil {
return "", castErr
result, err := castColValStaging(
value[col.Name()],
col.KindDetails,
s.config.SharedDestinationSettings.TruncateExceededValues,
s.config.SharedDestinationSettings.ExpandStringPrecision,
)

if err != nil {
return "", err
}

row = append(row, castedValue)
// TODO: Do something about result.NewLength
row = append(row, result.Value)
}

if err = writer.Write(row); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Kafka struct {
type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
// TODO: Update the yaml annotation once it's supported.
// ExpandStringPrecision - This will expand the string precision based on the values that come in, if the destination supports it.
ExpandStringPrecision bool `yaml:"_expandStringPrecision"`
}

type Reporting struct {
Expand Down

0 comments on commit dd64ab4

Please sign in to comment.