diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index 22b5de1d..0181750e 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -2,7 +2,6 @@ package converters import ( "fmt" - "log/slog" "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/debezium/converters" @@ -10,50 +9,17 @@ import ( "github.com/cockroachdb/apd/v3" ) -// decimalWithNewExponent takes a [*apd.Decimal] and returns a new [*apd.Decimal] with a the given exponent. -// If the new exponent is less precise then the extra digits will be truncated. -func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decimal { - exponentDelta := newExponent - decimal.Exponent // Exponent is negative. - - if exponentDelta == 0 { - return new(apd.Decimal).Set(decimal) - } - - coefficient := new(apd.BigInt).Set(&decimal.Coeff) - - if exponentDelta < 0 { - multiplier := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(-exponentDelta)), nil) - coefficient.Mul(coefficient, multiplier) - } else if exponentDelta > 0 { - divisor := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(exponentDelta)), nil) - coefficient.Div(coefficient, divisor) - } - - return &apd.Decimal{ - Form: decimal.Form, - Negative: decimal.Negative, - Exponent: newExponent, - Coeff: *coefficient, - } -} - // encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal` // using a specific scale. -func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte { +func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) { targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative. if decimal.Exponent != targetExponent { - // TODO: We may be able to remove this conversion and just return an error to maintain parity with `org.apache.kafka.connect.data.Decimal` + // Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`. // https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69 - slog.Warn("Value scale is different from schema scale", - slog.Any("value", decimal.Text('f')), - slog.Any("actual", -decimal.Exponent), - slog.Any("expected", scale), - ) - - decimal = decimalWithNewExponent(decimal, targetExponent) + return nil, fmt.Errorf("value scale (%d) is different from schema scale (%d)", -decimal.Exponent, scale) } bytes, _ := converters.EncodeDecimal(decimal) - return bytes + return bytes, nil } type decimalConverter struct { @@ -93,7 +59,7 @@ func (d decimalConverter) Convert(value any) (any, error) { return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err) } - return encodeDecimalWithScale(decimal, int32(d.scale)), nil + return encodeDecimalWithScale(decimal, int32(d.scale)) } type VariableNumericConverter struct{} diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go index adc09f8c..87221c05 100644 --- a/lib/debezium/converters/decimal_test.go +++ b/lib/debezium/converters/decimal_test.go @@ -9,36 +9,22 @@ import ( "github.com/artie-labs/transfer/lib/numbers" "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing/decimal" - "github.com/cockroachdb/apd/v3" "github.com/stretchr/testify/assert" ) -func TestDecimalWithNewExponent(t *testing.T) { - assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 0), 0).Text('f')) - assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 1), 1).Text('f')) - assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 100), 0).Text('f')) - assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 0), 1).Text('f')) - assert.Equal(t, "0.0", decimalWithNewExponent(apd.New(0, 0), -1).Text('f')) - - // Same exponent: - assert.Equal(t, "12.349", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -3).Text('f')) - // More precise exponent: - assert.Equal(t, "12.3490", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -4).Text('f')) - assert.Equal(t, "12.34900", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -5).Text('f')) - // Lest precise exponent: - // Extra digits should be truncated rather than rounded. - assert.Equal(t, "12.34", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -2).Text('f')) - assert.Equal(t, "12.3", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -1).Text('f')) - assert.Equal(t, "12", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 0).Text('f')) - assert.Equal(t, "10", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 1).Text('f')) -} - func TestEncodeDecimalWithScale(t *testing.T) { mustEncodeAndDecodeDecimal := func(value string, scale int32) string { - bytes := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale) + bytes, err := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale) + assert.NoError(t, err) return converters.DecodeDecimal(bytes, scale).String() } + mustReturnError := func(value string, scale int32) error { + _, err := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale) + assert.Error(t, err) + return err + } + // Whole numbers: for i := range 100_000 { strValue := fmt.Sprint(i) @@ -52,28 +38,10 @@ func TestEncodeDecimalWithScale(t *testing.T) { // Scale of 15 that is equal to the amount of decimal places in the value: assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15)) assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15)) - // If scale is smaller than the amount of decimal places then the extra places should be truncated without rounding: - assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 14)) - assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000005", 14)) - assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000005", 14)) - assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000009", 14)) - assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000009", 14)) - assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 14)) - assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000001", 14)) - assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000001", 14)) - assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000004", 14)) - assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000004", 14)) - // If scale is larger than the amount of decimal places then the extra places should be padded with zeros: - assert.Equal(t, "145.1830000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 16)) - assert.Equal(t, "-145.1830000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 16)) - assert.Equal(t, "145.1830000000000010", mustEncodeAndDecodeDecimal("145.183000000000001", 16)) - assert.Equal(t, "-145.1830000000000010", mustEncodeAndDecodeDecimal("-145.183000000000001", 16)) - assert.Equal(t, "145.1830000000000040", mustEncodeAndDecodeDecimal("145.183000000000004", 16)) - assert.Equal(t, "-145.1830000000000040", mustEncodeAndDecodeDecimal("-145.183000000000004", 16)) - assert.Equal(t, "145.1830000000000050", mustEncodeAndDecodeDecimal("145.183000000000005", 16)) - assert.Equal(t, "-145.1830000000000050", mustEncodeAndDecodeDecimal("-145.183000000000005", 16)) - assert.Equal(t, "145.1830000000000090", mustEncodeAndDecodeDecimal("145.183000000000009", 16)) - assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16)) + // If scale is smaller than the amount of decimal places then an error should be returned: + assert.ErrorContains(t, mustReturnError("145.183000000000000", 14), "value scale (15) is different from schema scale (14)") + // If scale is larger than the amount of decimal places then an error should be returned: + assert.ErrorContains(t, mustReturnError("-145.183000000000005", 16), "value scale (15) is different from schema scale (16)") assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9)) assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8)) diff --git a/lib/debezium/converters/money.go b/lib/debezium/converters/money.go index c1205062..2c64a620 100644 --- a/lib/debezium/converters/money.go +++ b/lib/debezium/converters/money.go @@ -56,5 +56,5 @@ func (m MoneyConverter) Convert(value any) (any, error) { return nil, fmt.Errorf(`unable to use %q as a money value: %w`, valString, err) } - return encodeDecimalWithScale(decimal, int32(m.Scale())), nil + return encodeDecimalWithScale(decimal, int32(m.Scale())) } diff --git a/lib/debezium/converters/money_test.go b/lib/debezium/converters/money_test.go index 6c412525..992c5c3f 100644 --- a/lib/debezium/converters/money_test.go +++ b/lib/debezium/converters/money_test.go @@ -70,11 +70,9 @@ func TestMoneyConverter_Convert(t *testing.T) { assert.Equal(t, "1234.56", decodeValue(converted)) } { - // string with $, comma, and no cents - converted, err := converter.Convert("$1000,234") - assert.NoError(t, err) - assert.Equal(t, []byte{0x5, 0xf6, 0x3c, 0x68}, converted) - assert.Equal(t, "1000234.00", decodeValue(converted)) + // string with missing cents + _, err := converter.Convert("$1000,234") + assert.ErrorContains(t, err, "value scale (0) is different from schema scale (2)") } { // Malformed string - empty string.