Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debezium] Return apd.Decimal from DecodeDecimal #765

Merged
merged 28 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package bigquery

import (
"encoding/json"
"math/big"
"testing"
"time"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestRowToMessage(t *testing.T) {
"c_float_int32": int32(1234),
"c_float_int64": int64(1234),
"c_float_string": "4444.55555",
"c_numeric": decimal.NewDecimal(nil, 5, big.NewFloat(3.1415926)),
"c_numeric": decimal.NewDecimal(nil, numbers.MustParseDecimal("3.14159")),
Copy link
Contributor Author

@nathan-artie nathan-artie Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now rely on the exponent of the decimal instead of passing in a scale, so the decimal places here were trimmed to 5 to match the previous scale.

"c_string": "foo bar",
"c_string_decimal": decimal.NewDecimal(nil, 5, big.NewFloat(1.618033)),
"c_string_decimal": decimal.NewDecimal(nil, numbers.MustParseDecimal("1.61803")),
"c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""),
"c_date": ext.NewExtendedTime(time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), ext.DateKindType, ""),
"c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.DateTimeKindType, ""),
Expand Down
1 change: 1 addition & 0 deletions clients/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func DefaultValue(column columns.Column, dialect sql.Dialect, additionalDateFmts
return nil, fmt.Errorf("colVal is not type *decimal.Decimal")
}

// TODO: Call [String] instead.
return val.Value(), nil
case typing.String.Kind:
return sql.QuoteLiteral(fmt.Sprint(column.DefaultValue())), nil
Expand Down
31 changes: 9 additions & 22 deletions lib/cdc/util/relational_event_decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package util
import (
"encoding/json"
"io"
"math/big"
"os"
"testing"

Expand Down Expand Up @@ -47,25 +46,21 @@ func TestSchemaEventPayload_Numeric_GetData(t *testing.T) {
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)

assert.Equal(t, "123456.789", retMap["numeric_test"].(*decimal.Decimal).Value())
assert.Equal(t, 0, big.NewFloat(1234).Cmp(retMap["numeric_5"].(*decimal.Decimal).Value().(*big.Float)))
assert.Equal(t, "123456.789", retMap["numeric_test"].(*decimal.Decimal).String())
assert.Equal(t, "1234", retMap["numeric_5"].(*decimal.Decimal).String())
numericWithScaleMap := map[string]string{
"numeric_5_2": "568.01",
"numeric_5_6": "0.023456",
"numeric_5_0": "5",
}

for key, expectedValue := range numericWithScaleMap {
// Numeric data types that actually have scale fails when comparing *big.Float using `.Cmp`, so we are using STRING() instead.
_, isOk := retMap[key].(*decimal.Decimal).Value().(*big.Float)
assert.True(t, isOk)
// Now, we know the data type is *big.Float, let's check the .String() value.
assert.Equal(t, expectedValue, retMap[key].(*decimal.Decimal).String())
}

assert.Equal(t, "58569102859845154622791691858438258688", retMap["numeric_39_0"].(*decimal.Decimal).Value())
assert.Equal(t, "5856910285984515462279169185843825868.22", retMap["numeric_39_2"].(*decimal.Decimal).Value())
assert.Equal(t, "585691028598451546227958438258688.123456", retMap["numeric_39_6"].(*decimal.Decimal).Value())
assert.Equal(t, "58569102859845154622791691858438258688", retMap["numeric_39_0"].(*decimal.Decimal).String())
assert.Equal(t, "5856910285984515462279169185843825868.22", retMap["numeric_39_2"].(*decimal.Decimal).String())
assert.Equal(t, "585691028598451546227958438258688.123456", retMap["numeric_39_6"].(*decimal.Decimal).String())
}

func TestSchemaEventPayload_Decimal_GetData(t *testing.T) {
Expand All @@ -79,23 +74,19 @@ func TestSchemaEventPayload_Decimal_GetData(t *testing.T) {
assert.NoError(t, err)
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)
assert.Equal(t, "123.45", retMap["decimal_test"].(*decimal.Decimal).Value())
assert.Equal(t, "123.45", retMap["decimal_test"].(*decimal.Decimal).String())
decimalWithScaleMap := map[string]string{
"decimal_test_5": "123",
"decimal_test_5_2": "123.45",
}

for key, expectedValue := range decimalWithScaleMap {
// Numeric data types that actually have scale fails when comparing *big.Float using `.Cmp`, so we are using STRING() instead.
_, isOk := retMap[key].(*decimal.Decimal).Value().(*big.Float)
assert.True(t, isOk)
// Now, we know the data type is *big.Float, let's check the .String() value.
assert.Equal(t, expectedValue, retMap[key].(*decimal.Decimal).String(), key)
}

assert.Equal(t, "58569102859845154622791691858438258688", retMap["decimal_test_39"].(*decimal.Decimal).Value(), "decimal_test_39")
assert.Equal(t, "585691028598451546227916918584382586.22", retMap["decimal_test_39_2"].(*decimal.Decimal).Value(), "decimal_test_39_2")
assert.Equal(t, "585691028598451546227916918584388.123456", retMap["decimal_test_39_6"].(*decimal.Decimal).Value(), "decimal_test_39_6")
assert.Equal(t, "58569102859845154622791691858438258688", retMap["decimal_test_39"].(*decimal.Decimal).String(), "decimal_test_39")
assert.Equal(t, "585691028598451546227916918584382586.22", retMap["decimal_test_39_2"].(*decimal.Decimal).String(), "decimal_test_39_2")
assert.Equal(t, "585691028598451546227916918584388.123456", retMap["decimal_test_39_6"].(*decimal.Decimal).String(), "decimal_test_39_6")
}

func TestSchemaEventPayload_Money_GetData(t *testing.T) {
Expand All @@ -115,10 +106,6 @@ func TestSchemaEventPayload_Money_GetData(t *testing.T) {
}

for key, expectedValue := range decimalWithScaleMap {
// Numeric data types that actually have scale fails when comparing *big.Float using `.Cmp`, so we are using STRING() instead.
_, isOk := retMap[key].(*decimal.Decimal).Value().(*big.Float)
assert.True(t, isOk)
// Now, we know the data type is *big.Float, let's check the .String() value.
assert.Equal(t, expectedValue, retMap[key].(*decimal.Decimal).String(), key)
}
}
20 changes: 4 additions & 16 deletions lib/debezium/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/big"
"slices"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/cockroachdb/apd/v3"
)

Expand Down Expand Up @@ -100,7 +99,7 @@ func EncodeDecimal(decimal *apd.Decimal) ([]byte, int32) {
return encodeBigInt(bigIntValue), -decimal.Exponent
}

// EncodeDecimalWithScale is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal]
// EncodeDecimalWithScale is used to encode a [apd.Decimal] to [org.apache.kafka.connect.data.Decimal].
// using a specific scale.
func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
Expand All @@ -111,18 +110,7 @@ func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
return bytes
}

// DecodeDecimal is used to decode `org.apache.kafka.connect.data.Decimal`.
func DecodeDecimal(data []byte, precision *int, scale int) *decimal.Decimal {
// Convert the big integer to a big float
bigFloat := new(big.Float).SetInt(decodeBigInt(data))

// Compute divisor as 10^scale with big.Int's Exp, then convert to big.Float
scaleInt := big.NewInt(int64(scale))
ten := big.NewInt(10)
divisorInt := new(big.Int).Exp(ten, scaleInt, nil)
divisorFloat := new(big.Float).SetInt(divisorInt)

// Perform the division
bigFloat.Quo(bigFloat, divisorFloat)
return decimal.NewDecimal(precision, scale, bigFloat)
// DecodeDecimal is used to decode [org.apache.kafka.connect.data.Decimal].
func DecodeDecimal(data []byte, scale int32) *apd.Decimal {
return apd.NewWithBigInt(new(apd.BigInt).SetMathBigInt(decodeBigInt(data)), -scale)
}
34 changes: 15 additions & 19 deletions lib/debezium/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"math/big"
"testing"

"github.com/artie-labs/transfer/lib/numbers"
"github.com/cockroachdb/apd/v3"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -41,14 +42,6 @@
}
}

func mustParseDecimal(value string) *apd.Decimal {
decimal, _, err := apd.NewFromString(value)
if err != nil {
panic(err)
}
return decimal
}

func TestDecimalWithNewExponent(t *testing.T) {
assert.Equal(t, "0", decimalWithNewExponent(apd.New(0, 0), 0).Text('f'))
assert.Equal(t, "00", decimalWithNewExponent(apd.New(0, 1), 1).Text('f'))
Expand All @@ -57,24 +50,26 @@
assert.Equal(t, "0.0", decimalWithNewExponent(apd.New(0, 0), -1).Text('f'))

// Same exponent:
assert.Equal(t, "12.349", decimalWithNewExponent(mustParseDecimal("12.349"), -3).Text('f'))
assert.Equal(t, "12.349", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -3).Text('f'))
// More precise exponent:
assert.Equal(t, "12.3490", decimalWithNewExponent(mustParseDecimal("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(mustParseDecimal("12.349"), -5).Text('f'))
assert.Equal(t, "12.3490", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -4).Text('f'))
assert.Equal(t, "12.34900", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -5).Text('f'))
// Lest precise exponent:
// Extra digits should be truncated rather than rounded.
assert.Equal(t, "12.34", decimalWithNewExponent(mustParseDecimal("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(mustParseDecimal("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(mustParseDecimal("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(mustParseDecimal("12.349"), 1).Text('f'))
assert.Equal(t, "12.34", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -2).Text('f'))
assert.Equal(t, "12.3", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), -1).Text('f'))
assert.Equal(t, "12", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 0).Text('f'))
assert.Equal(t, "10", decimalWithNewExponent(numbers.MustParseDecimal("12.349"), 1).Text('f'))
}

func TestEncodeDecimal(t *testing.T) {
testEncodeDecimal := func(value string, expectedScale int32) {
bytes, scale := EncodeDecimal(mustParseDecimal(value))
actual := DecodeDecimal(bytes, nil, int(scale)).String()
assert.Equal(t, value, actual, value)
bytes, scale := EncodeDecimal(numbers.MustParseDecimal(value))
assert.Equal(t, expectedScale, scale, value)

actual := DecodeDecimal(bytes, scale)
assert.Equal(t, value, actual.Text('f'), value)
assert.Equal(t, expectedScale, -actual.Exponent, value)
}

testEncodeDecimal("0", 0)
Expand All @@ -91,8 +86,8 @@

func TestEncodeDecimalWithScale(t *testing.T) {
mustEncodeAndDecodeDecimal := func(value string, scale int32) string {
bytes := EncodeDecimalWithScale(mustParseDecimal(value), scale)

Check failure on line 89 in lib/debezium/decimal_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: mustParseDecimal (compile)
return DecodeDecimal(bytes, nil, int(scale)).String()
return DecodeDecimal(bytes, scale).String()
}

// Whole numbers:
Expand Down Expand Up @@ -132,6 +127,7 @@
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16))

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails on master.


testCases := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func (f Field) DecodeDecimal(encoded []byte) (*decimal.Decimal, error) {
if err != nil {
return nil, fmt.Errorf("failed to get scale and/or precision: %w", err)
}
return DecodeDecimal(encoded, precision, scale), nil
_decimal := DecodeDecimal(encoded, int32(scale))
return decimal.NewDecimal(precision, _decimal), nil
}

func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error) {
Expand All @@ -259,5 +260,6 @@ func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error
if err != nil {
return nil, err
}
return DecodeDecimal(bytes, ptr.ToInt(decimal.PrecisionNotSpecified), scale), nil
_decimal := DecodeDecimal(bytes, int32(scale))
return decimal.NewDecimal(ptr.ToInt(decimal.PrecisionNotSpecified), _decimal), nil
}
2 changes: 1 addition & 1 deletion lib/debezium/types_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func BenchmarkDecodeDecimal_P64_S10(b *testing.B) {
assert.NoError(b, err)
dec, err := field.DecodeDecimal(bytes)
assert.NoError(b, err)
assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567889", dec.Value())
assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567890", dec.String())
Copy link
Contributor Author

@nathan-artie nathan-artie Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was decoding incorrectly, the unscaled value is 1234567890123456789012345678901234567890123456789012341234567890

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which makes sense since this number is just 1234567890 repeating (with a 1234 before the decimal point).

require.NoError(b, err)
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ func TestField_DecodeDebeziumVariableDecimal(t *testing.T) {
assert.Equal(t, -1, *dec.Precision(), testCase.name)
assert.Equal(t, testCase.expectedScale, dec.Scale(), testCase.name)
assert.Equal(t, testCase.expectedValue, dec.Value(), testCase.name)
assert.Equal(t, testCase.expectedValue, dec.String(), testCase.name)
}

}
11 changes: 11 additions & 0 deletions lib/numbers/numbers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package numbers

import "github.com/cockroachdb/apd/v3"

// BetweenEq - Looks something like this. start <= number <= end
func BetweenEq[T int | int32 | int64](start, end, number T) bool {
return number >= start && number <= end
}

// MustParseDecimal parses a string to an [apd.Decimal] or panics -- used for tests.
func MustParseDecimal(value string) *apd.Decimal {
decimal, _, err := apd.NewFromString(value)
if err != nil {
panic(err)
}
return decimal
}
4 changes: 2 additions & 2 deletions lib/parquetutil/parse_values_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package parquetutil

import (
"math/big"
"testing"

"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/ext"

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestParseValue(t *testing.T) {
},
{
name: "decimal",
colVal: decimal.NewDecimal(ptr.ToInt(30), 5, big.NewFloat(5000.2232)),
colVal: decimal.NewDecimal(ptr.ToInt(30), numbers.MustParseDecimal("5000.22320")),
colKind: columns.NewColumn("", eDecimal),
expectedValue: "5000.22320",
},
Expand Down
25 changes: 16 additions & 9 deletions lib/typing/decimal/decimal.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package decimal

import (
"log/slog"
"math/big"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/cockroachdb/apd/v3"
)

// Decimal is Artie's wrapper around *big.Float which can store large numbers w/ no precision loss.
// Decimal is Artie's wrapper around [apd.Decimal] which can store large numbers w/ no precision loss.
type Decimal struct {
scale int
precision *int
value *big.Float
value *apd.Decimal
}

const (
Expand All @@ -21,7 +22,9 @@ const (
MaxPrecisionBeforeString = 38
)

func NewDecimal(precision *int, scale int, value *big.Float) *Decimal {
func NewDecimal(precision *int, value *apd.Decimal) *Decimal {
scale := int(-value.Exponent)

if precision != nil {
if scale > *precision && *precision != -1 {
// Note: -1 precision means it's not specified.
Expand All @@ -33,14 +36,13 @@ func NewDecimal(precision *int, scale int, value *big.Float) *Decimal {
}

return &Decimal{
scale: scale,
precision: precision,
value: value,
}
}

func (d *Decimal) Scale() int {
return d.scale
return int(-d.value.Exponent)
}

func (d *Decimal) Precision() *int {
Expand All @@ -51,7 +53,7 @@ func (d *Decimal) Precision() *int {
// This is particularly useful for Snowflake because we're writing all the values as STRINGS into TSV format.
// This function guarantees backwards compatibility.
func (d *Decimal) String() string {
return d.value.Text('f', d.scale)
return d.value.Text('f')
}

func (d *Decimal) Value() any {
Expand All @@ -62,9 +64,14 @@ func (d *Decimal) Value() any {
}

// Depending on the precision, we will want to convert value to STRING or keep as a FLOAT.
return d.value
// TODO: [Value] is only called in one place, look into callining [String] instead.
if out, ok := new(big.Float).SetString(d.String()); ok {
return out
}
slog.Error("Failed to convert apd.Decimal to big.Float", slog.String("value", d.String()))
return d.String()
}

func (d *Decimal) Details() DecimalDetails {
return DecimalDetails{scale: d.scale, precision: d.precision}
return DecimalDetails{scale: d.Scale(), precision: d.precision}
}
6 changes: 4 additions & 2 deletions lib/typing/values/string_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package values

import (
"math/big"
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config/constants"
Expand Down Expand Up @@ -123,7 +123,9 @@ func TestToString(t *testing.T) {
assert.Equal(t, "123.45", val)

// Decimals
value := decimal.NewDecimal(ptr.ToInt(38), 2, big.NewFloat(585692791691858.25))
_decimal, _, err := apd.NewFromString("585692791691858.25")
assert.NoError(t, err)
value := decimal.NewDecimal(ptr.ToInt(38), _decimal)
val, err = ToString(value, columns.Column{KindDetails: typing.EDecimal}, nil)
assert.NoError(t, err)
assert.Equal(t, "585692791691858.25", val)
Expand Down
Loading