From 2195ec9d8484edcc407ed4b770b01eb1bd6dd123 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Thu, 3 Oct 2024 23:01:02 -0700 Subject: [PATCH] [postgres] Support numeric columns with "NaN" values --- integration_tests/postgres/main.go | 27 +++++++++++++++++++++++++ lib/debezium/converters/decimal.go | 12 +++++++++++ lib/debezium/converters/decimal_test.go | 19 ++++++++++++++++- 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/integration_tests/postgres/main.go b/integration_tests/postgres/main.go index 5980063a9..8566b1888 100644 --- a/integration_tests/postgres/main.go +++ b/integration_tests/postgres/main.go @@ -103,7 +103,9 @@ CREATE TABLE %s ( c_macaddr8 macaddr8, c_money money, c_numeric numeric(7, 2), + c_numeric_nan numeric(7, 2), c_numeric_variable numeric, + c_numeric_variable_nan numeric, -- c_path path, -- c_pg_lsn pg_lsn, -- c_pg_snapshot pg_snapshot, @@ -194,8 +196,12 @@ INSERT INTO %s VALUES ( '52093.89', -- c_numeric '987.654', + -- c_numeric_nan + 'NaN', -- c_numeric_variable, '10987.65401', + -- c_numeric_variable_nan, + 'NaN', -- c_path -- Not supported -- c_pg_lsn @@ -451,6 +457,17 @@ const expectedPayloadTemplate = `{ "scale": "2" } }, + { + "type": "bytes", + "optional": false, + "default": null, + "field": "c_numeric_nan", + "name": "org.apache.kafka.connect.data.Decimal", + "parameters": { + "connect.decimal.precision": "7", + "scale": "2" + } + }, { "type": "struct", "optional": false, @@ -459,6 +476,14 @@ const expectedPayloadTemplate = `{ "name": "io.debezium.data.VariableScaleDecimal", "parameters": null }, + { + "type": "struct", + "optional": false, + "default": null, + "field": "c_numeric_variable_nan", + "name": "io.debezium.data.VariableScaleDecimal", + "parameters": null + }, { "type": "struct", "optional": false, @@ -680,10 +705,12 @@ const expectedPayloadTemplate = `{ "c_macaddr8": "12:34:56:78:90:ab:cd:ef", "c_money": "T30t", "c_numeric": "AYHN", + "c_numeric_nan": null, "c_numeric_variable": { "scale": 5, "value": "QX3UWQ==" }, + "c_numeric_variable_nan": null, "c_numrange": "[11.1,22.2)", "c_point": { "x": 12.34, diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index e73421849..d6f8aaa51 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -12,6 +12,10 @@ import ( // encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal` // using a specific scale. func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) { + if decimal.Form != apd.Finite { + return nil, fmt.Errorf("decimal (%v) is not finite", decimal) + } + targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative. if decimal.Exponent != targetExponent { // Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`. @@ -59,6 +63,10 @@ func (d DecimalConverter) Convert(value any) (any, error) { return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err) } + if decimal.Form == apd.NaN { + return nil, nil + } + return encodeDecimalWithScale(decimal, int32(d.scale)) } @@ -83,6 +91,10 @@ func (VariableNumericConverter) Convert(value any) (any, error) { return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err) } + if decimal.Form == apd.NaN { + return nil, nil + } + bytes, scale := converters.EncodeDecimal(decimal) return map[string]any{ "scale": scale, diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go index 8446ff1c4..79eda8a67 100644 --- a/lib/debezium/converters/decimal_test.go +++ b/lib/debezium/converters/decimal_test.go @@ -36,7 +36,7 @@ func TestEncodeDecimalWithScale(t *testing.T) { } // Scale of 15 that is equal to the amount of decimal places in the value: - assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15)) + assert.Equal(t, "145.000000000000000", mustEncodeAndDecodeDecimal("145.000000000000000", 15)) assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15)) // If scale is smaller than the amount of decimal places then an error should be returned: assert.ErrorContains(t, mustReturnError("145.183000000000000", 14), "value scale (15) is different from schema scale (14)") @@ -46,6 +46,11 @@ func TestEncodeDecimalWithScale(t *testing.T) { assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9)) assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8)) + // Values that are not finite: + assert.ErrorContains(t, mustReturnError("NaN", 5), "decimal (NaN) is not finite") + assert.ErrorContains(t, mustReturnError("Infinity", 5), "decimal (Infinity) is not finite") + assert.ErrorContains(t, mustReturnError("-Infinity", 5), "decimal (-Infinity) is not finite") + testCases := []struct { name string value string @@ -189,6 +194,12 @@ func TestDecimalConverter_Convert(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "1.23", actualValue.(*decimal.Decimal).String()) } + { + // NaN: + converted, err := converter.Convert("NaN") + assert.NoError(t, err) + assert.Nil(t, converted) + } } func TestVariableNumericConverter_ToField(t *testing.T) { @@ -228,4 +239,10 @@ func TestVariableNumericConverter_Convert(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "12.34", actualValue.(*decimal.Decimal).String()) } + { + // NaN: + converted, err := converter.Convert("NaN") + assert.NoError(t, err) + assert.Nil(t, converted) + } }