Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debezium] Return apd.Decimal from DecodeDecimal #765

Merged
merged 28 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package bigquery

import (
"encoding/json"
"math/big"
"testing"
"time"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestRowToMessage(t *testing.T) {
"c_float_int32": int32(1234),
"c_float_int64": int64(1234),
"c_float_string": "4444.55555",
"c_numeric": decimal.NewDecimal(nil, 5, big.NewFloat(3.1415926)),
"c_numeric": decimal.NewDecimal(nil, numbers.MustParseDecimal("3.14159")),
Copy link
Contributor Author

@nathan-artie nathan-artie Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now rely on the exponent of the decimal instead of passing in a scale, so the decimal places here were trimmed to 5 to match the previous scale.

"c_string": "foo bar",
"c_string_decimal": decimal.NewDecimal(nil, 5, big.NewFloat(1.618033)),
"c_string_decimal": decimal.NewDecimal(nil, numbers.MustParseDecimal("1.61803")),
"c_time": ext.NewExtendedTime(time.Date(0, 0, 0, 4, 5, 6, 7, time.UTC), ext.TimeKindType, ""),
"c_date": ext.NewExtendedTime(time.Date(2001, 2, 3, 0, 0, 0, 0, time.UTC), ext.DateKindType, ""),
"c_datetime": ext.NewExtendedTime(time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC), ext.DateTimeKindType, ""),
Expand Down
1 change: 1 addition & 0 deletions clients/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func DefaultValue(column columns.Column, dialect sql.Dialect, additionalDateFmts
return nil, fmt.Errorf("colVal is not type *decimal.Decimal")
}

// TODO: Call [String] instead.
return val.Value(), nil
case typing.String.Kind:
return sql.QuoteLiteral(fmt.Sprint(column.DefaultValue())), nil
Expand Down
21 changes: 5 additions & 16 deletions lib/debezium/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/big"
"slices"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/cockroachdb/apd/v3"
)

Expand Down Expand Up @@ -89,7 +88,7 @@ func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decima
}
}

// EncodeDecimal is used to encode a [*apd.Decimal] to [org.apache.kafka.connect.data.Decimal].
// EncodeDecimal is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`.
// The scale of the value (which is the negated exponent of the decimal) is returned as the second argument.
func EncodeDecimal(decimal *apd.Decimal) ([]byte, int32) {
bigIntValue := decimal.Coeff.MathBigInt()
Expand All @@ -100,7 +99,7 @@ func EncodeDecimal(decimal *apd.Decimal) ([]byte, int32) {
return encodeBigInt(bigIntValue), -decimal.Exponent
}

// EncodeDecimalWithScale is used to encode a [*apd.Decimal] to [org.apache.kafka.connect.data.Decimal].
// EncodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
Expand All @@ -112,17 +111,7 @@ func EncodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte {
}

// DecodeDecimal is used to decode `org.apache.kafka.connect.data.Decimal`.
func DecodeDecimal(data []byte, precision *int, scale int) *decimal.Decimal {
// Convert the big integer to a big float
bigFloat := new(big.Float).SetInt(decodeBigInt(data))

// Compute divisor as 10^scale with big.Int's Exp, then convert to big.Float
scaleInt := big.NewInt(int64(scale))
ten := big.NewInt(10)
divisorInt := new(big.Int).Exp(ten, scaleInt, nil)
divisorFloat := new(big.Float).SetInt(divisorInt)

// Perform the division
bigFloat.Quo(bigFloat, divisorFloat)
return decimal.NewDecimal(precision, scale, bigFloat)
func DecodeDecimal(data []byte, scale int32) *apd.Decimal {
bigInt := new(apd.BigInt).SetMathBigInt(decodeBigInt(data))
return apd.NewWithBigInt(bigInt, -scale)
}
9 changes: 6 additions & 3 deletions lib/debezium/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ func TestDecimalWithNewExponent(t *testing.T) {
func TestEncodeDecimal(t *testing.T) {
testEncodeDecimal := func(value string, expectedScale int32) {
bytes, scale := EncodeDecimal(numbers.MustParseDecimal(value))
actual := DecodeDecimal(bytes, nil, int(scale)).String()
assert.Equal(t, value, actual, value)
assert.Equal(t, expectedScale, scale, value)

actual := DecodeDecimal(bytes, scale)
assert.Equal(t, value, actual.Text('f'), value)
assert.Equal(t, expectedScale, -actual.Exponent, value)
}

testEncodeDecimal("0", 0)
Expand All @@ -85,7 +87,7 @@ func TestEncodeDecimal(t *testing.T) {
func TestEncodeDecimalWithScale(t *testing.T) {
mustEncodeAndDecodeDecimal := func(value string, scale int32) string {
bytes := EncodeDecimalWithScale(numbers.MustParseDecimal(value), scale)
return DecodeDecimal(bytes, nil, int(scale)).String()
return DecodeDecimal(bytes, scale).String()
}

// Whole numbers:
Expand Down Expand Up @@ -125,6 +127,7 @@ func TestEncodeDecimalWithScale(t *testing.T) {
assert.Equal(t, "-145.1830000000000090", mustEncodeAndDecodeDecimal("-145.183000000000009", 16))

assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails on master.


testCases := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func (f Field) DecodeDecimal(encoded []byte) (*decimal.Decimal, error) {
if err != nil {
return nil, fmt.Errorf("failed to get scale and/or precision: %w", err)
}
return DecodeDecimal(encoded, precision, scale), nil
_decimal := DecodeDecimal(encoded, int32(scale))
return decimal.NewDecimal(precision, _decimal), nil
}

func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error) {
Expand All @@ -259,5 +260,6 @@ func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error
if err != nil {
return nil, err
}
return DecodeDecimal(bytes, ptr.ToInt(decimal.PrecisionNotSpecified), scale), nil
_decimal := DecodeDecimal(bytes, int32(scale))
return decimal.NewDecimal(ptr.ToInt(decimal.PrecisionNotSpecified), _decimal), nil
}
2 changes: 1 addition & 1 deletion lib/debezium/types_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func BenchmarkDecodeDecimal_P64_S10(b *testing.B) {
assert.NoError(b, err)
dec, err := field.DecodeDecimal(bytes)
assert.NoError(b, err)
assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567889", dec.Value())
assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567890", dec.String())
Copy link
Contributor Author

@nathan-artie nathan-artie Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was decoding incorrectly, the unscaled value is 1234567890123456789012345678901234567890123456789012341234567890

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which makes sense since this number is just 1234567890 repeating (with a 1234 before the decimal point).

require.NoError(b, err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/parquetutil/parse_values_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package parquetutil

import (
"math/big"
"testing"

"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/ext"

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestParseValue(t *testing.T) {
},
{
name: "decimal",
colVal: decimal.NewDecimal(ptr.ToInt(30), 5, big.NewFloat(5000.2232)),
colVal: decimal.NewDecimal(ptr.ToInt(30), numbers.MustParseDecimal("5000.22320")),
colKind: columns.NewColumn("", eDecimal),
expectedValue: "5000.22320",
},
Expand Down
24 changes: 15 additions & 9 deletions lib/typing/decimal/decimal.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package decimal

import (
"log/slog"
"math/big"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/cockroachdb/apd/v3"
)

// Decimal is Artie's wrapper around *big.Float which can store large numbers w/ no precision loss.
// Decimal is Artie's wrapper around [*apd.Decimal] which can store large numbers w/ no precision loss.
type Decimal struct {
scale int
precision *int
value *big.Float
value *apd.Decimal
}

const (
Expand All @@ -21,8 +22,9 @@ const (
MaxPrecisionBeforeString = 38
)

func NewDecimal(precision *int, scale int, value *big.Float) *Decimal {
func NewDecimal(precision *int, value *apd.Decimal) *Decimal {
if precision != nil {
scale := int(-value.Exponent)
if scale > *precision && *precision != -1 {
// Note: -1 precision means it's not specified.

Expand All @@ -33,14 +35,13 @@ func NewDecimal(precision *int, scale int, value *big.Float) *Decimal {
}

return &Decimal{
scale: scale,
precision: precision,
value: value,
}
}

func (d *Decimal) Scale() int {
return d.scale
return int(-d.value.Exponent)
}

func (d *Decimal) Precision() *int {
Expand All @@ -51,7 +52,7 @@ func (d *Decimal) Precision() *int {
// This is particularly useful for Snowflake because we're writing all the values as STRINGS into TSV format.
// This function guarantees backwards compatibility.
func (d *Decimal) String() string {
return d.value.Text('f', d.scale)
return d.value.Text('f')
}

func (d *Decimal) Value() any {
Expand All @@ -62,9 +63,14 @@ func (d *Decimal) Value() any {
}

// Depending on the precision, we will want to convert value to STRING or keep as a FLOAT.
return d.value
// TODO: [Value] is only called in one place, look into calling [String] instead.
if out, ok := new(big.Float).SetString(d.String()); ok {
return out
}
slog.Error("Failed to convert apd.Decimal to big.Float", slog.String("value", d.String()))
return d.String()
}

func (d *Decimal) Details() DecimalDetails {
return DecimalDetails{scale: d.scale, precision: d.precision}
return DecimalDetails{scale: d.Scale(), precision: d.precision}
}
4 changes: 2 additions & 2 deletions lib/typing/values/string_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package values

import (
"math/big"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestToString(t *testing.T) {
assert.Equal(t, "123.45", val)

// Decimals
value := decimal.NewDecimal(ptr.ToInt(38), 2, big.NewFloat(585692791691858.25))
value := decimal.NewDecimal(ptr.ToInt(38), numbers.MustParseDecimal("585692791691858.25"))
val, err = ToString(value, columns.Column{KindDetails: typing.EDecimal}, nil)
assert.NoError(t, err)
assert.Equal(t, "585692791691858.25", val)
Expand Down