Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debezium] Change function signature of EncodeDecimal #761

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions lib/debezium/decimal.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package debezium

import (
"fmt"
"math/big"
"slices"

Expand Down Expand Up @@ -90,24 +89,26 @@ func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decima
}
}

// EncodeDecimal is used to encode a string representation of a number to `org.apache.kafka.connect.data.Decimal`.
func EncodeDecimal(value string, scale uint16) ([]byte, error) {
decimal, _, err := new(apd.Decimal).SetString(value)
if err != nil {
return nil, fmt.Errorf("unable to use %q as a floating-point number: %w", value, err)
}

targetExponent := -int32(scale) // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
decimal = decimalWithNewExponent(decimal, targetExponent)
}

// EncodeDecimal is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal].
// The scale of the value (which is the negated exponent of the decimal) is returned as the second argument.
func EncodeDecimal(decimal *apd.Decimal) ([]byte, int32) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to use int32 for the scale because that's both what apd.Decimal uses as well as the wire representation of org.apache.kafka.connect.data.Decimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you are planning to have reader call EncodeDecimal and EncodeDecimalWithScale depending on whether we know the scale or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reader is going to call both functions in different places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah your second comment didn't show up for me when I responded!

bigIntValue := decimal.Coeff.MathBigInt()
if decimal.Negative {
bigIntValue.Neg(bigIntValue)
}

return encodeBigInt(bigIntValue), nil
return encodeBigInt(bigIntValue), -decimal.Exponent
}

// EncodeDecimalWithScale is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal] using a specific
// scale.
func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
decimal = decimalWithNewExponent(decimal, targetExponent)
}
out, _ := EncodeDecimal(decimal)
return out
}

// DecodeDecimal is used to decode `org.apache.kafka.connect.data.Decimal`.
Expand Down
138 changes: 65 additions & 73 deletions lib/debezium/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,12 @@ func TestDecodeBigInt(t *testing.T) {
}
}

func encodeAndDecodeDecimal(value string, scale uint16) (string, error) {
bytes, err := EncodeDecimal(value, scale)
func mustParseDecimal(value string) *apd.Decimal {
decimal, _, err := apd.NewFromString(value)
if err != nil {
return "", err
panic(fmt.Errorf("unable to use %q as a floating-point number: %w", value, err))
}
return DecodeDecimal(bytes, nil, int(scale)).String(), nil
}

func mustEncodeAndDecodeDecimal(value string, scale uint16) string {
out, err := encodeAndDecodeDecimal(value, scale)
if err != nil {
panic(err)
}
return out
}

func mustParseDecimalFromString(in string) *apd.Decimal {
out, _, err := apd.NewFromString(in)
if err != nil {
panic(err)
}
return out
return decimal
}

func TestDecimalWithNewExponent(t *testing.T) {
Expand All @@ -73,63 +57,86 @@ func TestDecimalWithNewExponent(t *testing.T) {
assert.Equal(t, "0.0", decimalWithNewExponent(apd.New(0, 0), -1).Text('f'))

// Same exponent:
assert.Equal(t, "12.349", decimalWithNewExponent(mustParseDecimalFromString("12.349"), -3).Text('f'))
assert.Equal(t, "12.349", decimalWithNewExponent(mustParseDecimal("12.349"), -3).Text('f'))
// More precise exponent:
assert.Equal(t, "12.3490", decimalWithNewExponent(mustParseDecimalFromString("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(mustParseDecimalFromString("12.349"), -5).Text('f'))
assert.Equal(t, "12.3490", decimalWithNewExponent(mustParseDecimal("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(mustParseDecimal("12.349"), -5).Text('f'))
// Lest precise exponent:
// Extra digits should be truncated rather than rounded.
assert.Equal(t, "12.34", decimalWithNewExponent(mustParseDecimalFromString("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(mustParseDecimalFromString("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(mustParseDecimalFromString("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(mustParseDecimalFromString("12.349"), 1).Text('f'))
assert.Equal(t, "12.34", decimalWithNewExponent(mustParseDecimal("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(mustParseDecimal("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(mustParseDecimal("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(mustParseDecimal("12.349"), 1).Text('f'))
}

func TestEncodeDecimal(t *testing.T) {
testValue := func(value string, expectedScale int32) {
bytes, scale := EncodeDecimal(mustParseDecimal(value))
result := DecodeDecimal(bytes, nil, int(scale)).String()
assert.Equal(t, result, value, value)
assert.Equal(t, expectedScale, scale, value)
}

testValue("0", 0)
testValue("0.0", 1)
testValue("0.00", 2)
testValue("0.00000", 5)
testValue("1", 0)
testValue("1.0", 1)
testValue("-1", 0)
testValue("-1.0", 1)
testValue("145.183000000000009", 15)
testValue("-145.183000000000009", 15)
}

func TestEncodeDecimalWithScale(t *testing.T) {
mustEncodeAndDecodeDecimalWithScale := func(value string, scale int32) string {
bytes := EncodeDecimalWithScale(mustParseDecimal(value), scale)
return DecodeDecimal(bytes, nil, int(scale)).String()
}

// Whole numbers:
for i := range 100_000 {
strValue := fmt.Sprint(i)
assert.Equal(t, strValue, mustEncodeAndDecodeDecimal(strValue, 0))
assert.Equal(t, strValue, mustEncodeAndDecodeDecimalWithScale(strValue, 0))
if i != 0 {
strValue := "-" + strValue
assert.Equal(t, strValue, mustEncodeAndDecodeDecimal(strValue, 0))
assert.Equal(t, strValue, mustEncodeAndDecodeDecimalWithScale(strValue, 0))
}
}

// Scale of 15 that is equal to the amount of decimal places in the value:
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15))
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000000", 15))
// If scale is smaller than the amount of decimal places then the extra places should be truncated without rounding:
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000005", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000005", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000001", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000001", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000004", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000004", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000005", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000005", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000001", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000001", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000004", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000004", 14))
// If scale is larger than the amount of decimal places then the extra places should be padded with zeros:
assert.Equal(t, "145.1830000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 16))
assert.Equal(t, "-145.1830000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 16))
assert.Equal(t, "145.1830000000000010", mustEncodeAndDecodeDecimal("145.183000000000001", 16))
assert.Equal(t, "-145.1830000000000010", mustEncodeAndDecodeDecimal("-145.183000000000001", 16))
assert.Equal(t, "145.1830000000000040", mustEncodeAndDecodeDecimal("145.183000000000004", 16))
assert.Equal(t, "-145.1830000000000040", mustEncodeAndDecodeDecimal("-145.183000000000004", 16))
assert.Equal(t, "145.1830000000000050", mustEncodeAndDecodeDecimal("145.183000000000005", 16))
assert.Equal(t, "-145.1830000000000050", mustEncodeAndDecodeDecimal("-145.183000000000005", 16))
assert.Equal(t, "145.1830000000000090", mustEncodeAndDecodeDecimal("145.183000000000009", 16))
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16))

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "145.1830000000000000", mustEncodeAndDecodeDecimalWithScale("145.183000000000000", 16))
assert.Equal(t, "-145.1830000000000000", mustEncodeAndDecodeDecimalWithScale("-145.183000000000000", 16))
assert.Equal(t, "145.1830000000000010", mustEncodeAndDecodeDecimalWithScale("145.183000000000001", 16))
assert.Equal(t, "-145.1830000000000010", mustEncodeAndDecodeDecimalWithScale("-145.183000000000001", 16))
assert.Equal(t, "145.1830000000000040", mustEncodeAndDecodeDecimalWithScale("145.183000000000004", 16))
assert.Equal(t, "-145.1830000000000040", mustEncodeAndDecodeDecimalWithScale("-145.183000000000004", 16))
assert.Equal(t, "145.1830000000000050", mustEncodeAndDecodeDecimalWithScale("145.183000000000005", 16))
assert.Equal(t, "-145.1830000000000050", mustEncodeAndDecodeDecimalWithScale("-145.183000000000005", 16))
assert.Equal(t, "145.1830000000000090", mustEncodeAndDecodeDecimalWithScale("145.183000000000009", 16))
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimalWithScale("-145.183000000000009", 16))

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimalWithScale("-9063701308.217222135", 9))

testCases := []struct {
name string
value string
scale uint16

expectedErr string
scale int32
}{
{
name: "0 scale",
Expand Down Expand Up @@ -208,25 +215,10 @@ func TestEncodeDecimal(t *testing.T) {
value: "145.183000000000000",
scale: 15,
},
{
name: "malformed - empty string",
value: "",
expectedErr: `unable to use "" as a floating-point number`,
},
{
name: "malformed - not a floating-point",
value: "abcdefg",
expectedErr: `unable to use "abcdefg" as a floating-point number`,
},
}

for _, testCase := range testCases {
actual, err := encodeAndDecodeDecimal(testCase.value, testCase.scale)
if testCase.expectedErr == "" {
assert.NoError(t, err)
assert.Equal(t, testCase.value, actual, testCase.name)
} else {
assert.ErrorContains(t, err, testCase.expectedErr, testCase.name)
}
actual := mustEncodeAndDecodeDecimalWithScale(testCase.value, testCase.scale)
assert.Equal(t, testCase.value, actual, testCase.name)
}
}