Skip to content

Commit

Permalink
Return error if value scale is different from schema scale
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Sep 10, 2024
1 parent f033a07 commit 2d84d43
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 84 deletions.
44 changes: 5 additions & 39 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,24 @@ package converters

import (
"fmt"
"log/slog"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/debezium/converters"
"github.com/artie-labs/transfer/lib/typing"
"github.com/cockroachdb/apd/v3"
)

// decimalWithNewExponent takes a [*apd.Decimal] and returns a new [*apd.Decimal] with a the given exponent.
// If the new exponent is less precise then the extra digits will be truncated.
func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decimal {
exponentDelta := newExponent - decimal.Exponent // Exponent is negative.

if exponentDelta == 0 {
return new(apd.Decimal).Set(decimal)
}

coefficient := new(apd.BigInt).Set(&decimal.Coeff)

if exponentDelta < 0 {
multiplier := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(-exponentDelta)), nil)
coefficient.Mul(coefficient, multiplier)
} else if exponentDelta > 0 {
divisor := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(exponentDelta)), nil)
coefficient.Div(coefficient, divisor)
}

return &apd.Decimal{
Form: decimal.Form,
Negative: decimal.Negative,
Exponent: newExponent,
Coeff: *coefficient,
}
}

// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
// TODO: We may be able to remove this conversion and just return an error to maintain parity with `org.apache.kafka.connect.data.Decimal`
// Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`.
// https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69
slog.Warn("Value scale is different from schema scale",
slog.Any("value", decimal.Text('f')),
slog.Any("actual", -decimal.Exponent),
slog.Any("expected", scale),
)

decimal = decimalWithNewExponent(decimal, targetExponent)
return nil, fmt.Errorf("value scale (%d) is different from schema scale (%d)", -decimal.Exponent, scale)
}
bytes, _ := converters.EncodeDecimal(decimal)
return bytes
return bytes, nil
}

type decimalConverter struct {
Expand Down Expand Up @@ -93,7 +59,7 @@ func (d decimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

return encodeDecimalWithScale(decimal, int32(d.scale)), nil
return encodeDecimalWithScale(decimal, int32(d.scale))
}

type VariableNumericConverter struct{}
Expand Down
56 changes: 12 additions & 44 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,22 @@ import (
"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/cockroachdb/apd/v3"
"github.com/stretchr/testify/assert"
)

func TestDecimalWithNewExponent(t *testing.T) {
assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 0), 0).Text('f'))
assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 1), 1).Text('f'))
assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 100), 0).Text('f'))
assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 0), 1).Text('f'))
assert.Equal(t, "0.0", decimalWithNewExponent(apd.New(0, 0), -1).Text('f'))

// Same exponent:
assert.Equal(t, "12.349", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -3).Text('f'))
// More precise exponent:
assert.Equal(t, "12.3490", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -5).Text('f'))
// Lest precise exponent:
// Extra digits should be truncated rather than rounded.
assert.Equal(t, "12.34", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 1).Text('f'))
}

func TestEncodeDecimalWithScale(t *testing.T) {
mustEncodeAndDecodeDecimal := func(value string, scale int32) string {
bytes := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale)
bytes, err := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale)
assert.NoError(t, err)
return converters.DecodeDecimal(bytes, scale).String()
}

mustReturnError := func(value string, scale int32) error {
_, err := encodeDecimalWithScale(numbers.MustParseDecimal(value), scale)
assert.Error(t, err)
return err
}

// Whole numbers:
for i := range 100_000 {
strValue := fmt.Sprint(i)
Expand All @@ -52,28 +38,10 @@ func TestEncodeDecimalWithScale(t *testing.T) {
// Scale of 15 that is equal to the amount of decimal places in the value:
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15))
// If scale is smaller than the amount of decimal places then the extra places should be truncated without rounding:
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000005", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000005", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000001", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000001", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000004", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000004", 14))
// If scale is larger than the amount of decimal places then the extra places should be padded with zeros:
assert.Equal(t, "145.1830000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 16))
assert.Equal(t, "-145.1830000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 16))
assert.Equal(t, "145.1830000000000010", mustEncodeAndDecodeDecimal("145.183000000000001", 16))
assert.Equal(t, "-145.1830000000000010", mustEncodeAndDecodeDecimal("-145.183000000000001", 16))
assert.Equal(t, "145.1830000000000040", mustEncodeAndDecodeDecimal("145.183000000000004", 16))
assert.Equal(t, "-145.1830000000000040", mustEncodeAndDecodeDecimal("-145.183000000000004", 16))
assert.Equal(t, "145.1830000000000050", mustEncodeAndDecodeDecimal("145.183000000000005", 16))
assert.Equal(t, "-145.1830000000000050", mustEncodeAndDecodeDecimal("-145.183000000000005", 16))
assert.Equal(t, "145.1830000000000090", mustEncodeAndDecodeDecimal("145.183000000000009", 16))
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16))
// If scale is smaller than the amount of decimal places then an error should be returned:
assert.ErrorContains(t, mustReturnError("145.183000000000000", 14), "value scale (15) is different from schema scale (14)")
// If scale is larger than the amount of decimal places then an error should be returned:
assert.ErrorContains(t, mustReturnError("-145.183000000000005", 16), "value scale (15) is different from schema scale (16)")

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/converters/money.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func (m MoneyConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a money value: %w`, valString, err)
}

return encodeDecimalWithScale(decimal, int32(m.Scale())), nil
return encodeDecimalWithScale(decimal, int32(m.Scale()))
}

0 comments on commit 2d84d43

Please sign in to comment.