Skip to content

Commit

Permalink
Merge branch 'master' into postgres-bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 4, 2024
2 parents 4d615b2 + 901b6e6 commit e51f5ce
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
name: Go tests
name: Go checks

on: [push]

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Install dependencies

- name: Download dependencies
run: go mod download

- name: Run vet
run: make vet

- name: Run staticcheck
env:
SC_VERSION: "2024.1.1"
run: |
SC_URL="https://github.com/dominikh/go-tools/releases/download/$SC_VERSION/staticcheck_linux_amd64.tar.gz"
wget -q ${SC_URL} -O - | tar -xzf - --strip-components 1 -C /usr/local/bin staticcheck/staticcheck
make static
- name: Run tests + race condition check
run: make race

- name: Check Go files are properly formatted
run: test -z $(gofmt -l .)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [push]

jobs:
Postgres:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
container: golang:1.23
services:
postgres:
Expand All @@ -24,8 +24,9 @@ jobs:
go-version: 1.23
- name: Run integration test
run: PG_HOST=postgres make postgres-itest

MySQL:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
container: golang:1.23
services:
mysql:
Expand All @@ -45,8 +46,9 @@ jobs:
go-version: 1.23
- name: Run integration test
run: MYSQL_HOST=mysql make mysql-itest

MongoDB:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
container: golang:1.23
services:
mongo:
Expand Down
27 changes: 27 additions & 0 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ CREATE TABLE %s (
c_macaddr8 macaddr8,
c_money money,
c_numeric numeric(7, 2),
c_numeric_nan numeric(7, 2),
c_numeric_variable numeric,
c_numeric_variable_nan numeric,
-- c_path path,
-- c_pg_lsn pg_lsn,
-- c_pg_snapshot pg_snapshot,
Expand Down Expand Up @@ -196,8 +198,12 @@ INSERT INTO %s VALUES (
'52093.89',
-- c_numeric
'987.654',
-- c_numeric_nan
'NaN',
-- c_numeric_variable,
'10987.65401',
-- c_numeric_variable_nan,
'NaN',
-- c_path
-- Not supported
-- c_pg_lsn
Expand Down Expand Up @@ -471,6 +477,17 @@ const expectedPayloadTemplate = `{
"scale": "2"
}
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_numeric_nan",
"name": "org.apache.kafka.connect.data.Decimal",
"parameters": {
"connect.decimal.precision": "7",
"scale": "2"
}
},
{
"type": "struct",
"optional": false,
Expand All @@ -479,6 +496,14 @@ const expectedPayloadTemplate = `{
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
"default": null,
"field": "c_numeric_variable_nan",
"name": "io.debezium.data.VariableScaleDecimal",
"parameters": null
},
{
"type": "struct",
"optional": false,
Expand Down Expand Up @@ -702,10 +727,12 @@ const expectedPayloadTemplate = `{
"c_macaddr8": "12:34:56:78:90:ab:cd:ef",
"c_money": "T30t",
"c_numeric": "AYHN",
"c_numeric_nan": null,
"c_numeric_variable": {
"scale": 5,
"value": "QX3UWQ=="
},
"c_numeric_variable_nan": null,
"c_numrange": "[11.1,22.2)",
"c_point": {
"x": 12.34,
Expand Down
12 changes: 12 additions & 0 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) {
if decimal.Form != apd.Finite {
return nil, fmt.Errorf("decimal (%v) is not finite", decimal)
}

targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
// Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`.
Expand Down Expand Up @@ -59,6 +63,10 @@ func (d DecimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

return encodeDecimalWithScale(decimal, int32(d.scale))
}

Expand All @@ -83,6 +91,10 @@ func (VariableNumericConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

if decimal.Form == apd.NaN {
return nil, nil
}

bytes, scale := converters.EncodeDecimal(decimal)
return map[string]any{
"scale": scale,
Expand Down
17 changes: 17 additions & 0 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func TestEncodeDecimalWithScale(t *testing.T) {
assert.Equal(t, "-9063701308.217222135", mustEncodeAndDecodeDecimal("-9063701308.217222135", 9))
assert.Equal(t, "-74961544796695.89960242", mustEncodeAndDecodeDecimal("-74961544796695.89960242", 8))

// Values that are not finite:
assert.ErrorContains(t, mustReturnError("NaN", 5), "decimal (NaN) is not finite")
assert.ErrorContains(t, mustReturnError("Infinity", 5), "decimal (Infinity) is not finite")
assert.ErrorContains(t, mustReturnError("-Infinity", 5), "decimal (-Infinity) is not finite")

testCases := []struct {
name string
value string
Expand Down Expand Up @@ -189,6 +194,12 @@ func TestDecimalConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "1.23", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

func TestVariableNumericConverter_ToField(t *testing.T) {
Expand Down Expand Up @@ -228,4 +239,10 @@ func TestVariableNumericConverter_Convert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "12.34", actualValue.(*decimal.Decimal).String())
}
{
// NaN:
converted, err := converter.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, converted)
}
}

0 comments on commit e51f5ce

Please sign in to comment.