Skip to content

Commit

Permalink
[debezium] Use app.Decimal
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 25, 2024
1 parent aee91c0 commit 0a8abaf
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 287 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.25.28
github.com/artie-labs/transfer v1.25.29
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.19
Expand Down Expand Up @@ -61,6 +61,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/cockroachdb/apd/v3 v3.2.1 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.25.28 h1:SOezpfhKLKzR1SMHLW3MLJ60X5yNk8Bk5d85Dw9dpow=
github.com/artie-labs/transfer v1.25.28/go.mod h1:PxZjjW1+OnZDgRRJwVXUoiGY2iPsLnY2TUMrdcY3zfg=
github.com/artie-labs/transfer v1.25.29 h1:afMF9YrRRidCs4QYh+fbrD+i7s6qnUUVd1E4ZGxT01E=
github.com/artie-labs/transfer v1.25.29/go.mod h1:pMn7/nkM2gQVw4rgjNIWIGUKtCkCO7yS6Y1IVDgrS3k=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY=
github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
Expand Down Expand Up @@ -167,6 +169,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/apd/v3 v3.2.1 h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg=
github.com/cockroachdb/apd/v3 v3.2.1/go.mod h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
Expand Down
107 changes: 50 additions & 57 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,77 @@
package converters

import (
"fmt"
"strings"
"fmt"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/cockroachdb/apd/v3"

"github.com/artie-labs/transfer/lib/debezium"
)

type decimalConverter struct {
scale uint16
precision *int
scale uint16
precision *int
}

func NewDecimalConverter(scale uint16, precision *int) decimalConverter {
return decimalConverter{scale: scale, precision: precision}
return decimalConverter{scale: scale, precision: precision}
}

func (d decimalConverter) ToField(name string) debezium.Field {
field := debezium.Field{
FieldName: name,
Type: debezium.Bytes,
DebeziumType: debezium.KafkaDecimalType,
Parameters: map[string]any{
"scale": fmt.Sprint(d.scale),
},
}

if d.precision != nil {
field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision)
}

return field
field := debezium.Field{
FieldName: name,
Type: debezium.Bytes,
DebeziumType: debezium.KafkaDecimalType,
Parameters: map[string]any{
"scale": fmt.Sprint(d.scale),
},
}

if d.precision != nil {
field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision)
}

return field
}

func (d decimalConverter) Convert(value any) (any, error) {
castValue, isOk := value.(string)
if !isOk {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}
return debezium.EncodeDecimal(castValue, d.scale)
}

func getScale(value string) uint16 {
// Find the index of the decimal point
i := strings.IndexRune(value, '.')
stringValue, isOk := value.(string)
if !isOk {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}

if i == -1 {
// No decimal point: scale is 0
return 0
}
decimal, _, err := apd.NewFromString(stringValue)
if err != nil {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

// The scale is the number of digits after the decimal point
return uint16(len(value[i+1:]))
return debezium.EncodeDecimalWithScale(decimal, int32(d.scale)), nil
}

type VariableNumericConverter struct{}

func (VariableNumericConverter) ToField(name string) debezium.Field {
return debezium.Field{
FieldName: name,
Type: debezium.Struct,
DebeziumType: debezium.KafkaVariableNumericType,
}
return debezium.Field{
FieldName: name,
Type: debezium.Struct,
DebeziumType: debezium.KafkaVariableNumericType,
}
}

func (VariableNumericConverter) Convert(value any) (any, error) {
stringValue, ok := value.(string)
if !ok {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}

scale := getScale(stringValue)

bytes, err := debezium.EncodeDecimal(stringValue, scale)
if err != nil {
return nil, err
}

return map[string]any{
"scale": int32(scale),
"value": bytes,
}, nil
stringValue, ok := value.(string)
if !ok {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}

decimal, _, err := apd.NewFromString(stringValue)
if err != nil {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

bytes, scale := debezium.EncodeDecimal(decimal)
return map[string]any{
"scale": scale,
"value": bytes,
}, nil
}
201 changes: 85 additions & 116 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,102 @@
package converters

import (
"fmt"
"testing"
"fmt"
"testing"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/stretchr/testify/assert"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/stretchr/testify/assert"
)

func TestDecimalConverter_ToField(t *testing.T) {
{
// Without precision
converter := NewDecimalConverter(2, nil)
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
{
// With precision
converter := NewDecimalConverter(2, ptr.ToInt(3))
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"connect.decimal.precision": "3",
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
{
// Without precision
converter := NewDecimalConverter(2, nil)
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
{
// With precision
converter := NewDecimalConverter(2, ptr.ToInt(3))
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"connect.decimal.precision": "3",
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
}

func TestDecimalConverter_Convert(t *testing.T) {
converter := NewDecimalConverter(2, nil)
{
// Malformed value - empty string.
_, err := converter.Convert("")
assert.ErrorContains(t, err, `unable to use "" as a floating-point number`)
}
{
// Malformed value - not a floating-point.
_, err := converter.Convert("11qwerty00")
assert.ErrorContains(t, err, `unable to use "11qwerty00" as a floating-point number`)
}
{
// Happy path.
converted, err := converter.Convert("1.23")
assert.NoError(t, err)
bytes, ok := converted.([]byte)
assert.True(t, ok)
actualValue, err := converter.ToField("").DecodeDecimal(bytes)
assert.NoError(t, err)
assert.Equal(t, "1.23", fmt.Sprint(actualValue))
}
}

func TestGetScale(t *testing.T) {
type _testCase struct {
name string
value string
expectedScale uint16
}

testCases := []_testCase{
{
name: "0 scale",
value: "5",
expectedScale: 0,
},
{
name: "2 scale",
value: "9.99",
expectedScale: 2,
},
{
name: "5 scale",
value: "9.12345",
expectedScale: 5,
},
}

for _, testCase := range testCases {
actualScale := getScale(testCase.value)
assert.Equal(t, testCase.expectedScale, actualScale, testCase.name)
}
converter := NewDecimalConverter(2, nil)
{
// Malformed value - empty string.
_, err := converter.Convert("")
assert.ErrorContains(t, err, `unable to use "" as a decimal: parse mantissa:`)
}
{
// Malformed value - not a floating-point.
_, err := converter.Convert("11qwerty00")
assert.ErrorContains(t, err, `unable to use "11qwerty00" as a decimal: parse exponent:`)
}
{
// Happy path.
converted, err := converter.Convert("1.23")
assert.NoError(t, err)
bytes, ok := converted.([]byte)
assert.True(t, ok)
actualValue, err := converter.ToField("").DecodeDecimal(bytes)
assert.NoError(t, err)
assert.Equal(t, "1.23", fmt.Sprint(actualValue))
}
}

func TestVariableNumericConverter_ToField(t *testing.T) {
converter := VariableNumericConverter{}
expected := debezium.Field{
FieldName: "col",
Type: "struct",
DebeziumType: "io.debezium.data.VariableScaleDecimal",
}
assert.Equal(t, expected, converter.ToField("col"))
converter := VariableNumericConverter{}
expected := debezium.Field{
FieldName: "col",
Type: "struct",
DebeziumType: "io.debezium.data.VariableScaleDecimal",
}
assert.Equal(t, expected, converter.ToField("col"))
}

func TestVariableNumericConverter_Convert(t *testing.T) {
converter := VariableNumericConverter{}
{
// Wrong type
_, err := converter.Convert(1234)
assert.ErrorContains(t, err, "expected string got int with value: 1234")
}
{
// Malformed value - empty string.
_, err := converter.Convert("")
assert.ErrorContains(t, err, `unable to use "" as a floating-point number`)
}
{
// Malformed value - not a floating point.
_, err := converter.Convert("malformed")
assert.ErrorContains(t, err, `unable to use "malformed" as a floating-point number`)
}
{
// Happy path
converted, err := converter.Convert("12.34")
assert.NoError(t, err)
assert.Equal(t, map[string]any{"scale": int32(2), "value": []byte{0x4, 0xd2}}, converted)
actualValue, err := converter.ToField("").DecodeDebeziumVariableDecimal(converted)
assert.NoError(t, err)
assert.Equal(t, "12.34", actualValue.String())
}
converter := VariableNumericConverter{}
{
// Wrong type
_, err := converter.Convert(1234)
assert.ErrorContains(t, err, "expected string got int with value: 1234")
}
{
// Malformed value - empty string.
_, err := converter.Convert("")
assert.ErrorContains(t, err, `unable to use "" as a decimal: parse mantissa:`)
}
{
// Malformed value - not a floating point.
_, err := converter.Convert("malformed")
assert.ErrorContains(t, err, `unable to use "malformed" as a decimal: parse exponent`)
}
{
// Happy path
converted, err := converter.Convert("12.34")
assert.NoError(t, err)
assert.Equal(t, map[string]any{"scale": int32(2), "value": []byte{0x4, 0xd2}}, converted)
actualValue, err := converter.ToField("").DecodeDebeziumVariableDecimal(converted)
assert.NoError(t, err)
assert.Equal(t, "12.34", actualValue.String())
}
}
Loading

0 comments on commit 0a8abaf

Please sign in to comment.