diff --git a/go.mod b/go.mod index 5aed3f408..aebcdd959 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.25.28 + github.com/artie-labs/transfer v1.25.29 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/aws/aws-sdk-go-v2/config v1.18.19 @@ -61,6 +61,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.6 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 // indirect github.com/aws/smithy-go v1.13.5 // indirect + github.com/cockroachdb/apd/v3 v3.2.1 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect diff --git a/go.sum b/go.sum index 75fa6ecc2..e10aa61c7 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= github.com/artie-labs/transfer v1.25.28 h1:SOezpfhKLKzR1SMHLW3MLJ60X5yNk8Bk5d85Dw9dpow= github.com/artie-labs/transfer v1.25.28/go.mod h1:PxZjjW1+OnZDgRRJwVXUoiGY2iPsLnY2TUMrdcY3zfg= +github.com/artie-labs/transfer v1.25.29 h1:afMF9YrRRidCs4QYh+fbrD+i7s6qnUUVd1E4ZGxT01E= +github.com/artie-labs/transfer v1.25.29/go.mod h1:pMn7/nkM2gQVw4rgjNIWIGUKtCkCO7yS6Y1IVDgrS3k= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY= github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= @@ -167,6 +169,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/apd/v3 v3.2.1 h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg= +github.com/cockroachdb/apd/v3 v3.2.1/go.mod h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index a03b781f8..a87469792 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -1,84 +1,77 @@ package converters import ( - "fmt" - "strings" + "fmt" - "github.com/artie-labs/transfer/lib/debezium" + "github.com/cockroachdb/apd/v3" + + "github.com/artie-labs/transfer/lib/debezium" ) type decimalConverter struct { - scale uint16 - precision *int + scale uint16 + precision *int } func NewDecimalConverter(scale uint16, precision *int) decimalConverter { - return decimalConverter{scale: scale, precision: precision} + return decimalConverter{scale: scale, precision: precision} } func (d decimalConverter) ToField(name string) debezium.Field { - field := debezium.Field{ - FieldName: name, - Type: debezium.Bytes, - DebeziumType: debezium.KafkaDecimalType, - Parameters: map[string]any{ - "scale": fmt.Sprint(d.scale), - }, - } - - if d.precision != nil { - field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision) - } - - return field + field := debezium.Field{ + FieldName: name, + Type: debezium.Bytes, + DebeziumType: debezium.KafkaDecimalType, + Parameters: map[string]any{ + "scale": fmt.Sprint(d.scale), + }, + } + + if d.precision != nil { + field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision) + } + + return field } func (d decimalConverter) Convert(value any) (any, error) { - castValue, isOk := value.(string) - if !isOk { - return nil, fmt.Errorf("expected string got %T with value: %v", value, value) - } - return debezium.EncodeDecimal(castValue, d.scale) -} - -func getScale(value string) uint16 { - // Find the index of the decimal point - i := strings.IndexRune(value, '.') + stringValue, isOk := value.(string) + if !isOk { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + } - if i == -1 { - // No decimal point: scale is 0 - return 0 - } + decimal, _, err := apd.NewFromString(stringValue) + if err != nil { + return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err) + } - // The scale is the number of digits after the decimal point - return uint16(len(value[i+1:])) + return debezium.EncodeDecimalWithScale(decimal, int32(d.scale)), nil } type VariableNumericConverter struct{} func (VariableNumericConverter) ToField(name string) debezium.Field { - return debezium.Field{ - FieldName: name, - Type: debezium.Struct, - DebeziumType: debezium.KafkaVariableNumericType, - } + return debezium.Field{ + FieldName: name, + Type: debezium.Struct, + DebeziumType: debezium.KafkaVariableNumericType, + } } func (VariableNumericConverter) Convert(value any) (any, error) { - stringValue, ok := value.(string) - if !ok { - return nil, fmt.Errorf("expected string got %T with value: %v", value, value) - } - - scale := getScale(stringValue) - - bytes, err := debezium.EncodeDecimal(stringValue, scale) - if err != nil { - return nil, err - } - - return map[string]any{ - "scale": int32(scale), - "value": bytes, - }, nil + stringValue, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + } + + decimal, _, err := apd.NewFromString(stringValue) + if err != nil { + return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err) + } + + bytes, scale := debezium.EncodeDecimal(decimal) + return map[string]any{ + "scale": scale, + "value": bytes, + }, nil } diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go index 047ff8612..023b4f5e7 100644 --- a/lib/debezium/converters/decimal_test.go +++ b/lib/debezium/converters/decimal_test.go @@ -1,133 +1,102 @@ package converters import ( - "fmt" - "testing" + "fmt" + "testing" - "github.com/artie-labs/transfer/lib/debezium" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/debezium" + "github.com/artie-labs/transfer/lib/ptr" + "github.com/stretchr/testify/assert" ) func TestDecimalConverter_ToField(t *testing.T) { - { - // Without precision - converter := NewDecimalConverter(2, nil) - expected := debezium.Field{ - Type: "bytes", - FieldName: "col", - DebeziumType: "org.apache.kafka.connect.data.Decimal", - Parameters: map[string]any{ - "scale": "2", - }, - } - assert.Equal(t, expected, converter.ToField("col")) - } - { - // With precision - converter := NewDecimalConverter(2, ptr.ToInt(3)) - expected := debezium.Field{ - Type: "bytes", - FieldName: "col", - DebeziumType: "org.apache.kafka.connect.data.Decimal", - Parameters: map[string]any{ - "connect.decimal.precision": "3", - "scale": "2", - }, - } - assert.Equal(t, expected, converter.ToField("col")) - } + { + // Without precision + converter := NewDecimalConverter(2, nil) + expected := debezium.Field{ + Type: "bytes", + FieldName: "col", + DebeziumType: "org.apache.kafka.connect.data.Decimal", + Parameters: map[string]any{ + "scale": "2", + }, + } + assert.Equal(t, expected, converter.ToField("col")) + } + { + // With precision + converter := NewDecimalConverter(2, ptr.ToInt(3)) + expected := debezium.Field{ + Type: "bytes", + FieldName: "col", + DebeziumType: "org.apache.kafka.connect.data.Decimal", + Parameters: map[string]any{ + "connect.decimal.precision": "3", + "scale": "2", + }, + } + assert.Equal(t, expected, converter.ToField("col")) + } } func TestDecimalConverter_Convert(t *testing.T) { - converter := NewDecimalConverter(2, nil) - { - // Malformed value - empty string. - _, err := converter.Convert("") - assert.ErrorContains(t, err, `unable to use "" as a floating-point number`) - } - { - // Malformed value - not a floating-point. - _, err := converter.Convert("11qwerty00") - assert.ErrorContains(t, err, `unable to use "11qwerty00" as a floating-point number`) - } - { - // Happy path. - converted, err := converter.Convert("1.23") - assert.NoError(t, err) - bytes, ok := converted.([]byte) - assert.True(t, ok) - actualValue, err := converter.ToField("").DecodeDecimal(bytes) - assert.NoError(t, err) - assert.Equal(t, "1.23", fmt.Sprint(actualValue)) - } -} - -func TestGetScale(t *testing.T) { - type _testCase struct { - name string - value string - expectedScale uint16 - } - - testCases := []_testCase{ - { - name: "0 scale", - value: "5", - expectedScale: 0, - }, - { - name: "2 scale", - value: "9.99", - expectedScale: 2, - }, - { - name: "5 scale", - value: "9.12345", - expectedScale: 5, - }, - } - - for _, testCase := range testCases { - actualScale := getScale(testCase.value) - assert.Equal(t, testCase.expectedScale, actualScale, testCase.name) - } + converter := NewDecimalConverter(2, nil) + { + // Malformed value - empty string. + _, err := converter.Convert("") + assert.ErrorContains(t, err, `unable to use "" as a decimal: parse mantissa:`) + } + { + // Malformed value - not a floating-point. + _, err := converter.Convert("11qwerty00") + assert.ErrorContains(t, err, `unable to use "11qwerty00" as a decimal: parse exponent:`) + } + { + // Happy path. + converted, err := converter.Convert("1.23") + assert.NoError(t, err) + bytes, ok := converted.([]byte) + assert.True(t, ok) + actualValue, err := converter.ToField("").DecodeDecimal(bytes) + assert.NoError(t, err) + assert.Equal(t, "1.23", fmt.Sprint(actualValue)) + } } func TestVariableNumericConverter_ToField(t *testing.T) { - converter := VariableNumericConverter{} - expected := debezium.Field{ - FieldName: "col", - Type: "struct", - DebeziumType: "io.debezium.data.VariableScaleDecimal", - } - assert.Equal(t, expected, converter.ToField("col")) + converter := VariableNumericConverter{} + expected := debezium.Field{ + FieldName: "col", + Type: "struct", + DebeziumType: "io.debezium.data.VariableScaleDecimal", + } + assert.Equal(t, expected, converter.ToField("col")) } func TestVariableNumericConverter_Convert(t *testing.T) { - converter := VariableNumericConverter{} - { - // Wrong type - _, err := converter.Convert(1234) - assert.ErrorContains(t, err, "expected string got int with value: 1234") - } - { - // Malformed value - empty string. - _, err := converter.Convert("") - assert.ErrorContains(t, err, `unable to use "" as a floating-point number`) - } - { - // Malformed value - not a floating point. - _, err := converter.Convert("malformed") - assert.ErrorContains(t, err, `unable to use "malformed" as a floating-point number`) - } - { - // Happy path - converted, err := converter.Convert("12.34") - assert.NoError(t, err) - assert.Equal(t, map[string]any{"scale": int32(2), "value": []byte{0x4, 0xd2}}, converted) - actualValue, err := converter.ToField("").DecodeDebeziumVariableDecimal(converted) - assert.NoError(t, err) - assert.Equal(t, "12.34", actualValue.String()) - } + converter := VariableNumericConverter{} + { + // Wrong type + _, err := converter.Convert(1234) + assert.ErrorContains(t, err, "expected string got int with value: 1234") + } + { + // Malformed value - empty string. + _, err := converter.Convert("") + assert.ErrorContains(t, err, `unable to use "" as a decimal: parse mantissa:`) + } + { + // Malformed value - not a floating point. + _, err := converter.Convert("malformed") + assert.ErrorContains(t, err, `unable to use "malformed" as a decimal: parse exponent`) + } + { + // Happy path + converted, err := converter.Convert("12.34") + assert.NoError(t, err) + assert.Equal(t, map[string]any{"scale": int32(2), "value": []byte{0x4, 0xd2}}, converted) + actualValue, err := converter.ToField("").DecodeDebeziumVariableDecimal(converted) + assert.NoError(t, err) + assert.Equal(t, "12.34", actualValue.String()) + } } diff --git a/lib/debezium/converters/money.go b/lib/debezium/converters/money.go index 007e8dcd0..d634010d8 100644 --- a/lib/debezium/converters/money.go +++ b/lib/debezium/converters/money.go @@ -1,53 +1,60 @@ package converters import ( - "fmt" - "github.com/artie-labs/transfer/lib/debezium" - "strings" + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/debezium" + "github.com/cockroachdb/apd/v3" ) const defaultScale = uint16(2) type MoneyConverter struct { - // All of these configs are optional + // All of these configs are optional - StripCommas bool - CurrencySymbol string - ScaleOverride *uint16 + StripCommas bool + CurrencySymbol string + ScaleOverride *uint16 } func (m MoneyConverter) Scale() uint16 { - if m.ScaleOverride != nil { - return *m.ScaleOverride - } + if m.ScaleOverride != nil { + return *m.ScaleOverride + } - return defaultScale + return defaultScale } func (m MoneyConverter) ToField(name string) debezium.Field { - return debezium.Field{ - FieldName: name, - Type: debezium.Bytes, - DebeziumType: debezium.KafkaDecimalType, - Parameters: map[string]any{ - "scale": fmt.Sprint(m.Scale()), - }, - } + return debezium.Field{ + FieldName: name, + Type: debezium.Bytes, + DebeziumType: debezium.KafkaDecimalType, + Parameters: map[string]any{ + "scale": fmt.Sprint(m.Scale()), + }, + } } func (m MoneyConverter) Convert(value any) (any, error) { - valString, isOk := value.(string) - if !isOk { - return nil, fmt.Errorf("expected string got %T with value: %v", value, value) - } + valString, isOk := value.(string) + if !isOk { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + } + + if m.CurrencySymbol != "" { + valString = strings.Trim(valString, m.CurrencySymbol) + } - if m.CurrencySymbol != "" { - valString = strings.Trim(valString, m.CurrencySymbol) - } + if m.StripCommas { + valString = strings.ReplaceAll(valString, ",", "") + } - if m.StripCommas { - valString = strings.ReplaceAll(valString, ",", "") - } + decimal, _, err := apd.NewFromString(valString) + if err != nil { + return nil, fmt.Errorf(`unable to use %q as a money value: %w`, valString, err) + } - return debezium.EncodeDecimal(valString, m.Scale()) + return debezium.EncodeDecimalWithScale(decimal, int32(m.Scale())), nil } diff --git a/lib/debezium/converters/money_test.go b/lib/debezium/converters/money_test.go index 1ea7b01a6..6e6d6e6cd 100644 --- a/lib/debezium/converters/money_test.go +++ b/lib/debezium/converters/money_test.go @@ -1,97 +1,98 @@ package converters import ( - "github.com/artie-labs/reader/lib/ptr" - transferDbz "github.com/artie-labs/transfer/lib/debezium" - "github.com/stretchr/testify/assert" - "testing" + "testing" + + "github.com/artie-labs/reader/lib/ptr" + transferDbz "github.com/artie-labs/transfer/lib/debezium" + "github.com/stretchr/testify/assert" ) func TestMoney_Scale(t *testing.T) { - { - // Not specified - converter := MoneyConverter{} - assert.Equal(t, defaultScale, converter.Scale()) - } - { - // Specified - converter := MoneyConverter{ - ScaleOverride: ptr.ToUint16(3), - } - assert.Equal(t, uint16(3), converter.Scale()) - } + { + // Not specified + converter := MoneyConverter{} + assert.Equal(t, defaultScale, converter.Scale()) + } + { + // Specified + converter := MoneyConverter{ + ScaleOverride: ptr.ToUint16(3), + } + assert.Equal(t, uint16(3), converter.Scale()) + } } func TestMoneyConverter_ToField(t *testing.T) { - converter := MoneyConverter{} - expected := transferDbz.Field{ - FieldName: "col", - Type: "bytes", - DebeziumType: "org.apache.kafka.connect.data.Decimal", - Parameters: map[string]any{ - "scale": "2", - }, - } - assert.Equal(t, expected, converter.ToField("col")) + converter := MoneyConverter{} + expected := transferDbz.Field{ + FieldName: "col", + Type: "bytes", + DebeziumType: "org.apache.kafka.connect.data.Decimal", + Parameters: map[string]any{ + "scale": "2", + }, + } + assert.Equal(t, expected, converter.ToField("col")) } func TestMoneyConverter_Convert(t *testing.T) { - decimalField := NewDecimalConverter(defaultScale, nil).ToField("") - decodeValue := func(value any) string { - bytes, ok := value.([]byte) - assert.True(t, ok) - val, err := decimalField.DecodeDecimal(bytes) - assert.NoError(t, err) - return val.String() - } - { - // Converter where mutateString is true - converter := MoneyConverter{ - StripCommas: true, - CurrencySymbol: "$", - } - { - // string - converted, err := converter.Convert("1234.56") - assert.NoError(t, err) - assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) - assert.Equal(t, "1234.56", decodeValue(converted)) - } - { - // string with $ and comma - converted, err := converter.Convert("$1,234.56") - assert.NoError(t, err) - assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) - assert.Equal(t, "1234.56", decodeValue(converted)) - } - { - // string with $, comma, and no cents - converted, err := converter.Convert("$1000,234") - assert.NoError(t, err) - assert.Equal(t, []byte{0x5, 0xf6, 0x3c, 0x68}, converted) - assert.Equal(t, "1000234.00", decodeValue(converted)) - } - { - // Malformed string - empty string. - _, err := converter.Convert("") - assert.ErrorContains(t, err, `unable to use "" as a floating-point number`) - } - { - // Malformed string - not a floating-point. - _, err := converter.Convert("malformed") - assert.ErrorContains(t, err, `unable to use "malformed" as a floating-point number`) - } - } - { - // Converter where mutateString is false - converter := MoneyConverter{} - { - // int - converted, err := converter.Convert("1234.56") - assert.NoError(t, err) - assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) - assert.Equal(t, "1234.56", decodeValue(converted)) - } - } + decimalField := NewDecimalConverter(defaultScale, nil).ToField("") + decodeValue := func(value any) string { + bytes, ok := value.([]byte) + assert.True(t, ok) + val, err := decimalField.DecodeDecimal(bytes) + assert.NoError(t, err) + return val.String() + } + { + // Converter where mutateString is true + converter := MoneyConverter{ + StripCommas: true, + CurrencySymbol: "$", + } + { + // string + converted, err := converter.Convert("1234.56") + assert.NoError(t, err) + assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) + assert.Equal(t, "1234.56", decodeValue(converted)) + } + { + // string with $ and comma + converted, err := converter.Convert("$1,234.56") + assert.NoError(t, err) + assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) + assert.Equal(t, "1234.56", decodeValue(converted)) + } + { + // string with $, comma, and no cents + converted, err := converter.Convert("$1000,234") + assert.NoError(t, err) + assert.Equal(t, []byte{0x5, 0xf6, 0x3c, 0x68}, converted) + assert.Equal(t, "1000234.00", decodeValue(converted)) + } + { + // Malformed string - empty string. + _, err := converter.Convert("") + assert.ErrorContains(t, err, `unable to use "" as a money value: parse mantissa:`) + } + { + // Malformed string - not a floating-point. + _, err := converter.Convert("malformed") + assert.ErrorContains(t, err, `unable to use "malformed" as a money value: parse exponent`) + } + } + { + // Converter where mutateString is false + converter := MoneyConverter{} + { + // int + converted, err := converter.Convert("1234.56") + assert.NoError(t, err) + assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted) + assert.Equal(t, "1234.56", decodeValue(converted)) + } + } }