Skip to content

Commit

Permalink
[Postgres] Support BIT VARYING (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 9, 2024
1 parent 9ba8e74 commit b3652de
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 1 deletion.
36 changes: 36 additions & 0 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ CREATE TABLE %s (
c_bit bit,
c_bit1 bit(1),
c_bit5 bit(5),
c_bit_varying bit varying,
c_bit_varying5 bit varying(5),
c_bit_varying10 bit varying(10),
c_boolean boolean,
-- c_box box,
c_bytea bytea,
Expand Down Expand Up @@ -152,6 +155,12 @@ INSERT INTO %s VALUES (
B'1',
-- c_bit5
B'10101',
-- c_bit_varying
B'10101',
-- c_bit_varying5
B'10101',
-- c_bit_varying10
B'10101',
-- c_boolean
true,
-- c_box
Expand Down Expand Up @@ -320,6 +329,30 @@ const expectedPayloadTemplate = `{
"length": "5"
}
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying5",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying10",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "boolean",
"optional": false,
Expand Down Expand Up @@ -686,6 +719,9 @@ const expectedPayloadTemplate = `{
"c_bit": true,
"c_bit1": true,
"c_bit5": "FQ==",
"c_bit_varying": "FQ==",
"c_bit_varying10": "FQ==",
"c_bit_varying5": "FQ==",
"c_boolean": true,
"c_bytea": "YWJjIGtsbSAqqVQ=",
"c_character": "X",
Expand Down
44 changes: 44 additions & 0 deletions lib/debezium/converters/bit_varying.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package converters

import (
"fmt"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"
)

// BitVaryingConverter - Precision here is optional, if it's not specified - then it's infinite.
// If it's specified, then it's the maximum number of bits that can be stored.
type BitVaryingConverter struct {
optionalCharMaxLength int
}

func NewBitVaryingConverter(optionalCharMaxLength int) BitVaryingConverter {
return BitVaryingConverter{optionalCharMaxLength: optionalCharMaxLength}
}

func (BitVaryingConverter) ToField(name string) debezium.Field {
return debezium.Field{
FieldName: name,
DebeziumType: debezium.Bits,
Type: debezium.Bytes,
}
}

func (b BitVaryingConverter) Convert(value any) (any, error) {
stringValue, err := typing.AssertType[string](value)
if err != nil {
return nil, err
}

if b.optionalCharMaxLength > 0 && len(stringValue) > b.optionalCharMaxLength {
return nil, fmt.Errorf("bit varying converter failed: value exceeds char max length, value: %q, length: %d", stringValue, len(stringValue))
}

for _, char := range stringValue {
if char != '0' && char != '1' {
return nil, fmt.Errorf("invalid binary string %q: contains non-binary characters", stringValue)
}
}

return stringToByteA(stringValue)
}
97 changes: 97 additions & 0 deletions lib/debezium/converters/bit_varying_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package converters

import (
"github.com/artie-labs/transfer/lib/debezium"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBitVaryingConverter_ToField(t *testing.T) {
{
// char size not specified
field := NewBitVaryingConverter(0).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Equal(t, debezium.Bits, field.DebeziumType)
assert.Nil(t, field.Parameters)
}
{
// char max size 1
field := NewBitVaryingConverter(1).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Nil(t, field.Parameters)
}
{
// char max size 5
field := NewBitVaryingConverter(5).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Equal(t, debezium.Bits, field.DebeziumType)
assert.Nil(t, field.Parameters)
}
}

func TestBitVaryingConverter_Convert(t *testing.T) {
{
// char size not specified
_, err := BitVaryingConverter{}.Convert("foo")
assert.ErrorContains(t, err, `invalid binary string "foo": contains non-binary characters`)
}
{
// char max size 1
converter := NewBitVaryingConverter(1)
{
// Invalid value - wrong type
_, err := converter.Convert(1234)
assert.ErrorContains(t, err, "expected type string, got int")
}
{
// Valid value - 0
value, err := converter.Convert("0")
assert.NoError(t, err)
assert.Equal(t, []byte{}, value)
}
{
// Valid value - 1
value, err := converter.Convert("1")
assert.NoError(t, err)
assert.Equal(t, []byte{1}, value)
}
{
// Invalid value - 2
_, err := converter.Convert("2")
assert.ErrorContains(t, err, `invalid binary string "2": contains non-binary characters`)
}
}
{
// char max size - 5
{
// Length not matching, but it's fine.
converter := NewBitVaryingConverter(8)
value, err := converter.Convert("101111")
assert.NoError(t, err)
assert.Equal(t, []byte{47}, value)
}
{
// Invalid, value contains non 0s and 1s
converter := NewBitVaryingConverter(5)
_, err := converter.Convert("1011a")
assert.ErrorContains(t, err, "invalid binary string")
}
{
// Valid
converter := NewBitVaryingConverter(5)
value, err := converter.Convert("10101")
assert.NoError(t, err)
assert.Equal(t, []byte{21}, value)
}
{
// Valid #2
converter := NewBitVaryingConverter(5)
value, err := converter.Convert("10011")
assert.NoError(t, err)
assert.Equal(t, []byte{19}, value)
}
}
}
2 changes: 1 addition & 1 deletion lib/postgres/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestScanAdapter_ParsePrimaryKeyValueForOverrides(t *testing.T) {
name: "unsupported data type",
dataType: schema.Array,
value: "1234",
expectedErr: `DataType(21) for column "col" is not supported for use as a primary key`,
expectedErr: `DataType(22) for column "col" is not supported for use as a primary key`,
},
{
name: "boolean - malformed",
Expand Down
8 changes: 8 additions & 0 deletions lib/postgres/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type DataType int

const (
Bit DataType = iota + 1
BitVarying
Boolean
Int16
Int32
Expand Down Expand Up @@ -103,6 +104,13 @@ func parseColumnDataType(colKind string, precision *int, scale *uint16, charMaxL
}

return Bit, &Opts{CharMaxLength: *charMaxLength}, nil
case "bit varying":
opts := &Opts{}
if charMaxLength != nil {
opts.CharMaxLength = *charMaxLength
}

return BitVarying, opts, nil
case "boolean":
return Boolean, nil, nil
case "smallint":
Expand Down
6 changes: 6 additions & 0 deletions sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func valueConverterForType(dataType schema.DataType, opts *schema.Opts) (convert
}

return converters.NewBitConverter(opts.CharMaxLength), nil
case schema.BitVarying:
if opts == nil {
return nil, fmt.Errorf("missing options for bit varying data type")
}

return converters.NewBitVaryingConverter(opts.CharMaxLength), nil
case schema.Boolean:
return converters.BooleanPassthrough{}, nil
case schema.Int16:
Expand Down
25 changes: 25 additions & 0 deletions sources/postgres/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,5 +301,30 @@ func TestValueConverterForType_Convert(t *testing.T) {
assert.Equal(t, []byte{21}, actualValue)
}
}
{
// bit varying
{
// no options
_, err := valueConverterForType(schema.BitVarying, nil)
assert.ErrorContains(t, err, "missing options for bit varying data type")
}
{
// bit varying
converter, err := valueConverterForType(schema.BitVarying, &schema.Opts{CharMaxLength: 0})
assert.NoError(t, err)

actualValue, actualErr := converter.Convert("1")
assert.NoError(t, actualErr)
assert.Equal(t, []byte{1}, actualValue)
}
{
// bit varying (5)
converter, err := valueConverterForType(schema.BitVarying, &schema.Opts{CharMaxLength: 5})
assert.NoError(t, err)

actualValue, actualErr := converter.Convert("10101")
assert.NoError(t, actualErr)
assert.Equal(t, []byte{21}, actualValue)
}
}
}

0 comments on commit b3652de

Please sign in to comment.