Skip to content

Commit

Permalink
[postgres] Support numeric columns with "NaN" values
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Oct 4, 2024
1 parent db98a68 commit 2195ec9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
27 changes: 27 additions & 0 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ CREATE TABLE %s (
c_macaddr8 macaddr8,
c_money money,
c_numeric numeric(7, 2),
c_numeric_nan numeric(7, 2),
c_numeric_variable numeric,
c_numeric_variable_nan numeric,
-- c_path path,
-- c_pg_lsn pg_lsn,
-- c_pg_snapshot pg_snapshot,
Expand Down Expand Up @@ -194,8 +196,12 @@ INSERT INTO %s VALUES (
'52093.89',
-- c_numeric
'987.654',
-- c_numeric_nan
'NaN',
-- c_numeric_variable,
'10987.65401',
-- c_numeric_variable_nan,
'NaN',
-- c_path
-- Not supported
-- c_pg_lsn
Expand Down Expand Up @@ -451,6 +457,17 @@ const expectedPayloadTemplate = `{
"scale": "2"
}
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_numeric_nan",
"name": "org.apache.kafka.connect.data.Decimal",
"parameters": {
"connect.decimal.precision": "7",
"scale": "2"
}
},
{
"type": "struct",
"optional": false,
Expand All @@ -459,6 +476,14 @@ const expectedPayloadTemplate = `{
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
"default": null,
"field": "c_numeric_variable_nan",
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
Expand Down Expand Up @@ -680,10 +705,12 @@ const expectedPayloadTemplate = `{
"c_macaddr8": "12:34:56:78:90:ab:cd:ef",
"c_money": "T30t",
"c_numeric": "AYHN",
"c_numeric_nan": null,
"c_numeric_variable": {
"scale": 5,
"value": "QX3UWQ=="
},
"c_numeric_variable_nan": null,
"c_numrange": "[11.1,22.2)",
"c_point": {
"x": 12.34,
Expand Down
12 changes: 12 additions & 0 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) {
if decimal.Form != apd.Finite {
return nil, fmt.Errorf("decimal (%v) is not finite", decimal)
}

targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
// Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`.
Expand Down Expand Up @@ -59,6 +63,10 @@ func (d DecimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

return encodeDecimalWithScale(decimal, int32(d.scale))
}

Expand All @@ -83,6 +91,10 @@ func (VariableNumericConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

bytes, scale := converters.EncodeDecimal(decimal)
return map[string]any{
"scale": scale,
Expand Down
19 changes: 18 additions & 1 deletion lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestEncodeDecimalWithScale(t *testing.T) {
}

// Scale of 15 that is equal to the amount of decimal places in the value:
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15))
assert.Equal(t, "145.000000000000000", mustEncodeAndDecodeDecimal("145.000000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15))
// If scale is smaller than the amount of decimal places then an error should be returned:
assert.ErrorContains(t, mustReturnError("145.183000000000000", 14), "value scale (15) is different from schema scale (14)")
Expand All @@ -46,6 +46,11 @@ func TestEncodeDecimalWithScale(t *testing.T) {
assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))

// Values that are not finite:
assert.ErrorContains(t, mustReturnError("NaN", 5), "decimal (NaN) is not finite")
assert.ErrorContains(t, mustReturnError("Infinity", 5), "decimal (Infinity) is not finite")
assert.ErrorContains(t, mustReturnError("-Infinity", 5), "decimal (-Infinity) is not finite")

testCases := []struct {
name string
value string
Expand Down Expand Up @@ -189,6 +194,12 @@ func TestDecimalConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "1.23", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

func TestVariableNumericConverter_ToField(t *testing.T) {
Expand Down Expand Up @@ -228,4 +239,10 @@ func TestVariableNumericConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "12.34", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

0 comments on commit 2195ec9

Please sign in to comment.