Skip to content

Commit

Permalink
Merge branch 'master' into dana/soft-delete-behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
danafallon authored Jun 25, 2024
2 parents ab8aabe + d317519 commit a88793b
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 49 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.18.19
github.com/aws/aws-sdk-go-v2/credentials v1.13.18
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
github.com/cockroachdb/apd/v3 v3.2.1
github.com/getsentry/sentry-go v0.27.0
github.com/google/uuid v1.6.0
github.com/jessevdk/go-flags v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/apd/v3 v3.2.1 h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg=
github.com/cockroachdb/apd/v3 v3.2.1/go.mod h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
Expand Down
61 changes: 42 additions & 19 deletions lib/debezium/decimal.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package debezium

import (
"fmt"
"math/big"
"slices"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/cockroachdb/apd/v3"
)

// encodeBigInt encodes a [big.Int] into a big-endian byte slice using two's complement.
Expand Down Expand Up @@ -62,30 +62,53 @@ func decodeBigInt(data []byte) *big.Int {
return bigInt
}

// EncodeDecimal is used to encode a string representation of a number to `org.apache.kafka.connect.data.Decimal`.
func EncodeDecimal(value string, scale uint16) ([]byte, error) {
bigFloatValue := new(big.Float)
if _, success := bigFloatValue.SetString(value); !success {
return nil, fmt.Errorf("unable to use %q as a floating-point number", value)
// decimalWithNewExponent takes a [apd.Decimal] and returns a new [apd.Decimal] with a the given exponent.
// If the new exponent is less precise then the extra digits will be truncated.
func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decimal {
exponentDelta := newExponent - decimal.Exponent // Exponent is negative.

if exponentDelta == 0 {
return new(apd.Decimal).Set(decimal)
}

if scale > 0 {
scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
bigFloatValue.Mul(bigFloatValue, new(big.Float).SetInt(scaledValue))
coefficient := new(apd.BigInt).Set(&decimal.Coeff)

if exponentDelta < 0 {
multiplier := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(-exponentDelta)), nil)
coefficient.Mul(coefficient, multiplier)
} else if exponentDelta > 0 {
divisor := new(apd.BigInt).Exp(apd.NewBigInt(10), apd.NewBigInt(int64(exponentDelta)), nil)
coefficient.Div(coefficient, divisor)
}

// Extract the scaled integer value.
bigIntValue := new(big.Int)
if bigFloatValue.IsInt() {
bigFloatValue.Int(bigIntValue)
} else {
strValue := bigFloatValue.Text('f', 0)
if _, success := bigIntValue.SetString(strValue, 10); !success {
return nil, fmt.Errorf("unable to use %q as a big.Int", strValue)
}
return &apd.Decimal{
Form: decimal.Form,
Negative: decimal.Negative,
Exponent: newExponent,
Coeff: *coefficient,
}
}

// EncodeDecimal is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal].
// The scale of the value (which is the negated exponent of the decimal) is returned as the second argument.
func EncodeDecimal(decimal *apd.Decimal) ([]byte, int32) {
bigIntValue := decimal.Coeff.MathBigInt()
if decimal.Negative {
bigIntValue.Neg(bigIntValue)
}

return encodeBigInt(bigIntValue), nil
return encodeBigInt(bigIntValue), -decimal.Exponent
}

// EncodeDecimalWithScale is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal]
// using a specific scale.
func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
decimal = decimalWithNewExponent(decimal, targetExponent)
}
bytes, _ := EncodeDecimal(decimal)
return bytes
}

// DecodeDecimal is used to decode `org.apache.kafka.connect.data.Decimal`.
Expand Down
109 changes: 79 additions & 30 deletions lib/debezium/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/big"
"testing"

"github.com/cockroachdb/apd/v3"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -40,23 +41,60 @@ func TestDecodeBigInt(t *testing.T) {
}
}

func encodeAndDecodeDecimal(value string, scale uint16) (string, error) {
bytes, err := EncodeDecimal(value, scale)
func mustParseDecimal(value string) *apd.Decimal {
decimal, _, err := apd.NewFromString(value)
if err != nil {
return "", err
panic(err)
}
return DecodeDecimal(bytes, nil, int(scale)).String(), nil
return decimal
}

func mustEncodeAndDecodeDecimal(value string, scale uint16) string {
out, err := encodeAndDecodeDecimal(value, scale)
if err != nil {
panic(err)
}
return out
func TestDecimalWithNewExponent(t *testing.T) {
assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 0), 0).Text('f'))
assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 1), 1).Text('f'))
assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 100), 0).Text('f'))
assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 0), 1).Text('f'))
assert.Equal(t, "0.0", decimalWithNewExponent(apd.New(0, 0), -1).Text('f'))

// Same exponent:
assert.Equal(t, "12.349", decimalWithNewExponent(mustParseDecimal("12.349"), -3).Text('f'))
// More precise exponent:
assert.Equal(t, "12.3490", decimalWithNewExponent(mustParseDecimal("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(mustParseDecimal("12.349"), -5).Text('f'))
// Lest precise exponent:
// Extra digits should be truncated rather than rounded.
assert.Equal(t, "12.34", decimalWithNewExponent(mustParseDecimal("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(mustParseDecimal("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(mustParseDecimal("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(mustParseDecimal("12.349"), 1).Text('f'))
}

func TestEncodeDecimal(t *testing.T) {
testEncodeDecimal := func(value string, expectedScale int32) {
bytes, scale := EncodeDecimal(mustParseDecimal(value))
actual := DecodeDecimal(bytes, nil, int(scale)).String()
assert.Equal(t, value, actual, value)
assert.Equal(t, expectedScale, scale, value)
}

testEncodeDecimal("0", 0)
testEncodeDecimal("0.0", 1)
testEncodeDecimal("0.00", 2)
testEncodeDecimal("0.00000", 5)
testEncodeDecimal("1", 0)
testEncodeDecimal("1.0", 1)
testEncodeDecimal("-1", 0)
testEncodeDecimal("-1.0", 1)
testEncodeDecimal("145.183000000000009", 15)
testEncodeDecimal("-145.183000000000009", 15)
}

func TestEncodeDecimalWithScale(t *testing.T) {
mustEncodeAndDecodeDecimal := func(value string, scale int32) string {
bytes := EncodeDecimalWithScale(mustParseDecimal(value), scale)
return DecodeDecimal(bytes, nil, int(scale)).String()
}

// Whole numbers:
for i := range 100_000 {
strValue := fmt.Sprint(i)
Expand All @@ -67,12 +105,38 @@ func TestEncodeDecimal(t *testing.T) {
}
}

// Scale of 15 that is equal to the amount of decimal places in the value:
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15))
// If scale is smaller than the amount of decimal places then the extra places should be truncated without rounding:
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000005", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000005", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000009", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000001", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000001", 14))
assert.Equal(t, "145.18300000000000", mustEncodeAndDecodeDecimal("145.183000000000004", 14))
assert.Equal(t, "-145.18300000000000", mustEncodeAndDecodeDecimal("-145.183000000000004", 14))
// If scale is larger than the amount of decimal places then the extra places should be padded with zeros:
assert.Equal(t, "145.1830000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 16))
assert.Equal(t, "-145.1830000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 16))
assert.Equal(t, "145.1830000000000010", mustEncodeAndDecodeDecimal("145.183000000000001", 16))
assert.Equal(t, "-145.1830000000000010", mustEncodeAndDecodeDecimal("-145.183000000000001", 16))
assert.Equal(t, "145.1830000000000040", mustEncodeAndDecodeDecimal("145.183000000000004", 16))
assert.Equal(t, "-145.1830000000000040", mustEncodeAndDecodeDecimal("-145.183000000000004", 16))
assert.Equal(t, "145.1830000000000050", mustEncodeAndDecodeDecimal("145.183000000000005", 16))
assert.Equal(t, "-145.1830000000000050", mustEncodeAndDecodeDecimal("-145.183000000000005", 16))
assert.Equal(t, "145.1830000000000090", mustEncodeAndDecodeDecimal("145.183000000000009", 16))
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16))

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))

testCases := []struct {
name string
value string
scale uint16

expectedErr string
scale int32
}{
{
name: "0 scale",
Expand Down Expand Up @@ -151,25 +215,10 @@ func TestEncodeDecimal(t *testing.T) {
value: "145.183000000000000",
scale: 15,
},
{
name: "malformed - empty string",
value: "",
expectedErr: `unable to use "" as a floating-point number`,
},
{
name: "malformed - not a floating-point",
value: "abcdefg",
expectedErr: `unable to use "abcdefg" as a floating-point number`,
},
}

for _, testCase := range testCases {
actual, err := encodeAndDecodeDecimal(testCase.value, testCase.scale)
if testCase.expectedErr == "" {
assert.NoError(t, err)
assert.Equal(t, testCase.value, actual, testCase.name)
} else {
assert.ErrorContains(t, err, testCase.expectedErr, testCase.name)
}
actual := mustEncodeAndDecodeDecimal(testCase.value, testCase.scale)
assert.Equal(t, testCase.value, actual, testCase.name)
}
}

0 comments on commit a88793b

Please sign in to comment.