Skip to content

Commit

Permalink
[postgres] Support numeric columns with "NaN" values
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Oct 4, 2024
1 parent 6c23184 commit c8c114f
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 1 deletion.
28 changes: 28 additions & 0 deletions .github/workflows/gha-go-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Go tests

on: [push]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Install dependencies
run: go mod download
- name: Run vet
run: make vet
- name: Run staticcheck
env:
SC_VERSION: "2024.1.1"
run: |
SC_URL="https://github.com/dominikh/go-tools/releases/download/$SC_VERSION/staticcheck_linux_amd64.tar.gz"
wget -q ${SC_URL} -O - | tar -xzf - --strip-components 1 -C /usr/local/bin staticcheck/staticcheck
make static
- name: Run tests + race condition check
run: make race
- name: Check Go files are properly formatted
run: test -z $(gofmt -l .)
64 changes: 64 additions & 0 deletions .github/workflows/gha-integration-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Integration tests

on: [push]

jobs:
Postgres:
runs-on: ubuntu-latest
container: golang:1.23
services:
postgres:
image: postgis/postgis:16-3.4-alpine
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Run integration test
run: PG_HOST=postgres make postgres-itest
MySQL:
runs-on: ubuntu-latest
container: golang:1.23
services:
mysql:
image: mysql:8.3
env:
MYSQL_ROOT_PASSWORD: mysql
options: >-
--health-cmd "mysqladmin ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Run integration test
run: MYSQL_HOST=mysql make mysql-itest
MongoDB:
runs-on: ubuntu-latest
container: golang:1.23
services:
mongo:
image: mongo:7.0
env:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Run integration test
run: MONGO_HOST=mongodb://mongo make mongo-itest
27 changes: 27 additions & 0 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ CREATE TABLE %s (
c_macaddr8 macaddr8,
c_money money,
c_numeric numeric(7, 2),
c_numeric_nan numeric(7, 2),
c_numeric_variable numeric,
c_numeric_variable_nan numeric,
-- c_path path,
-- c_pg_lsn pg_lsn,
-- c_pg_snapshot pg_snapshot,
Expand Down Expand Up @@ -194,8 +196,12 @@ INSERT INTO %s VALUES (
'52093.89',
-- c_numeric
'987.654',
-- c_numeric_nan
'NaN',
-- c_numeric_variable,
'10987.65401',
-- c_numeric_variable_nan,
'NaN',
-- c_path
-- Not supported
-- c_pg_lsn
Expand Down Expand Up @@ -451,6 +457,17 @@ const expectedPayloadTemplate = `{
"scale": "2"
}
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_numeric_nan",
"name": "org.apache.kafka.connect.data.Decimal",
"parameters": {
"connect.decimal.precision": "7",
"scale": "2"
}
},
{
"type": "struct",
"optional": false,
Expand All @@ -459,6 +476,14 @@ const expectedPayloadTemplate = `{
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
"default": null,
"field": "c_numeric_variable_nan",
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
Expand Down Expand Up @@ -680,10 +705,12 @@ const expectedPayloadTemplate = `{
"c_macaddr8": "12:34:56:78:90:ab:cd:ef",
"c_money": "T30t",
"c_numeric": "AYHN",
"c_numeric_nan": null,
"c_numeric_variable": {
"scale": 5,
"value": "QX3UWQ=="
},
"c_numeric_variable_nan": null,
"c_numrange": "[11.1,22.2)",
"c_point": {
"x": 12.34,
Expand Down
12 changes: 12 additions & 0 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) {
if decimal.Form != apd.Finite {
return nil, fmt.Errorf("decimal (%v) is not finite", decimal)
}

targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
// Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`.
Expand Down Expand Up @@ -59,6 +63,10 @@ func (d DecimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

return encodeDecimalWithScale(decimal, int32(d.scale))
}

Expand All @@ -83,6 +91,10 @@ func (VariableNumericConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

bytes, scale := converters.EncodeDecimal(decimal)
return map[string]any{
"scale": scale,
Expand Down
19 changes: 18 additions & 1 deletion lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestEncodeDecimalWithScale(t *testing.T) {
}

// Scale of 15 that is equal to the amount of decimal places in the value:
assert.Equal(t, "145.183000000000000", mustEncodeAndDecodeDecimal("145.183000000000000", 15))
assert.Equal(t, "145.000000000000000", mustEncodeAndDecodeDecimal("145.000000000000000", 15))
assert.Equal(t, "-145.183000000000000", mustEncodeAndDecodeDecimal("-145.183000000000000", 15))
// If scale is smaller than the amount of decimal places then an error should be returned:
assert.ErrorContains(t, mustReturnError("145.183000000000000", 14), "value scale (15) is different from schema scale (14)")
Expand All @@ -46,6 +46,11 @@ func TestEncodeDecimalWithScale(t *testing.T) {
assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))

// Values that are not finite:
assert.ErrorContains(t, mustReturnError("NaN", 5), "decimal (NaN) is not finite")
assert.ErrorContains(t, mustReturnError("Infinity", 5), "decimal (Infinity) is not finite")
assert.ErrorContains(t, mustReturnError("-Infinity", 5), "decimal (-Infinity) is not finite")

testCases := []struct {
name string
value string
Expand Down Expand Up @@ -189,6 +194,12 @@ func TestDecimalConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "1.23", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

func TestVariableNumericConverter_ToField(t *testing.T) {
Expand Down Expand Up @@ -228,4 +239,10 @@ func TestVariableNumericConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "12.34", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

0 comments on commit c8c114f

Please sign in to comment.