Skip to content

Commit

Permalink
Merge branch 'master' into nv/mongodb-uri
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Oct 9, 2024
2 parents e04c2be + 5349dd7 commit 47d5fa3
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.27.13
github.com/artie-labs/transfer v1.27.14
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.27.13 h1:kTQs/x5TheufGZm1yznLTkUFer+jB8GymAepVPvyAd8=
github.com/artie-labs/transfer v1.27.13/go.mod h1:Lbrj8nz/cCq5BycDR++l3K+kc2GUbEnGRyrVDyA8MfM=
github.com/artie-labs/transfer v1.27.14 h1:wN4tBGJmcCKlvUacFgBgug+1XtuTiW9QbmmwSh6OcDc=
github.com/artie-labs/transfer v1.27.14/go.mod h1:Lbrj8nz/cCq5BycDR++l3K+kc2GUbEnGRyrVDyA8MfM=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
36 changes: 36 additions & 0 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ CREATE TABLE %s (
c_bit bit,
c_bit1 bit(1),
c_bit5 bit(5),
c_bit_varying bit varying,
c_bit_varying5 bit varying(5),
c_bit_varying10 bit varying(10),
c_boolean boolean,
-- c_box box,
c_bytea bytea,
Expand Down Expand Up @@ -152,6 +155,12 @@ INSERT INTO %s VALUES (
B'1',
-- c_bit5
B'10101',
-- c_bit_varying
B'10101',
-- c_bit_varying5
B'10101',
-- c_bit_varying10
B'10101',
-- c_boolean
true,
-- c_box
Expand Down Expand Up @@ -320,6 +329,30 @@ const expectedPayloadTemplate = `{
"length": "5"
}
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying5",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "bytes",
"optional": false,
"default": null,
"field": "c_bit_varying10",
"name": "io.debezium.data.Bits",
"parameters": null
},
{
"type": "boolean",
"optional": false,
Expand Down Expand Up @@ -686,6 +719,9 @@ const expectedPayloadTemplate = `{
"c_bit": true,
"c_bit1": true,
"c_bit5": "FQ==",
"c_bit_varying": "FQ==",
"c_bit_varying10": "FQ==",
"c_bit_varying5": "FQ==",
"c_boolean": true,
"c_bytea": "YWJjIGtsbSAqqVQ=",
"c_character": "X",
Expand Down
44 changes: 44 additions & 0 deletions lib/debezium/converters/bit_varying.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package converters

import (
"fmt"
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"
)

// BitVaryingConverter - Precision here is optional, if it's not specified - then it's infinite.
// If it's specified, then it's the maximum number of bits that can be stored.
type BitVaryingConverter struct {
optionalCharMaxLength int
}

func NewBitVaryingConverter(optionalCharMaxLength int) BitVaryingConverter {
return BitVaryingConverter{optionalCharMaxLength: optionalCharMaxLength}
}

func (BitVaryingConverter) ToField(name string) debezium.Field {
return debezium.Field{
FieldName: name,
DebeziumType: debezium.Bits,
Type: debezium.Bytes,
}
}

func (b BitVaryingConverter) Convert(value any) (any, error) {
stringValue, err := typing.AssertType[string](value)
if err != nil {
return nil, err
}

if b.optionalCharMaxLength > 0 && len(stringValue) > b.optionalCharMaxLength {
return nil, fmt.Errorf("bit varying converter failed: value exceeds char max length, value: %q, length: %d", stringValue, len(stringValue))
}

for _, char := range stringValue {
if char != '0' && char != '1' {
return nil, fmt.Errorf("invalid binary string %q: contains non-binary characters", stringValue)
}
}

return stringToByteA(stringValue)
}
97 changes: 97 additions & 0 deletions lib/debezium/converters/bit_varying_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package converters

import (
"github.com/artie-labs/transfer/lib/debezium"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBitVaryingConverter_ToField(t *testing.T) {
{
// char size not specified
field := NewBitVaryingConverter(0).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Equal(t, debezium.Bits, field.DebeziumType)
assert.Nil(t, field.Parameters)
}
{
// char max size 1
field := NewBitVaryingConverter(1).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Nil(t, field.Parameters)
}
{
// char max size 5
field := NewBitVaryingConverter(5).ToField("foo")
assert.Equal(t, "foo", field.FieldName)
assert.Equal(t, "bytes", string(field.Type))
assert.Equal(t, debezium.Bits, field.DebeziumType)
assert.Nil(t, field.Parameters)
}
}

func TestBitVaryingConverter_Convert(t *testing.T) {
{
// char size not specified
_, err := BitVaryingConverter{}.Convert("foo")
assert.ErrorContains(t, err, `invalid binary string "foo": contains non-binary characters`)
}
{
// char max size 1
converter := NewBitVaryingConverter(1)
{
// Invalid value - wrong type
_, err := converter.Convert(1234)
assert.ErrorContains(t, err, "expected type string, got int")
}
{
// Valid value - 0
value, err := converter.Convert("0")
assert.NoError(t, err)
assert.Equal(t, []byte{}, value)
}
{
// Valid value - 1
value, err := converter.Convert("1")
assert.NoError(t, err)
assert.Equal(t, []byte{1}, value)
}
{
// Invalid value - 2
_, err := converter.Convert("2")
assert.ErrorContains(t, err, `invalid binary string "2": contains non-binary characters`)
}
}
{
// char max size - 5
{
// Length not matching, but it's fine.
converter := NewBitVaryingConverter(8)
value, err := converter.Convert("101111")
assert.NoError(t, err)
assert.Equal(t, []byte{47}, value)
}
{
// Invalid, value contains non 0s and 1s
converter := NewBitVaryingConverter(5)
_, err := converter.Convert("1011a")
assert.ErrorContains(t, err, "invalid binary string")
}
{
// Valid
converter := NewBitVaryingConverter(5)
value, err := converter.Convert("10101")
assert.NoError(t, err)
assert.Equal(t, []byte{21}, value)
}
{
// Valid #2
converter := NewBitVaryingConverter(5)
value, err := converter.Convert("10011")
assert.NoError(t, err)
assert.Equal(t, []byte{19}, value)
}
}
}
2 changes: 1 addition & 1 deletion lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error
return nil
}

func (b *BatchWriter) OnComplete() error {
func (b *BatchWriter) OnComplete(_ context.Context) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion lib/postgres/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestScanAdapter_ParsePrimaryKeyValueForOverrides(t *testing.T) {
name: "unsupported data type",
dataType: schema.Array,
value: "1234",
expectedErr: `DataType(21) for column "col" is not supported for use as a primary key`,
expectedErr: `DataType(22) for column "col" is not supported for use as a primary key`,
},
{
name: "boolean - malformed",
Expand Down
8 changes: 8 additions & 0 deletions lib/postgres/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type DataType int

const (
Bit DataType = iota + 1
BitVarying
Boolean
Int16
Int32
Expand Down Expand Up @@ -103,6 +104,13 @@ func parseColumnDataType(colKind string, precision *int, scale *uint16, charMaxL
}

return Bit, &Opts{CharMaxLength: *charMaxLength}, nil
case "bit varying":
opts := &Opts{}
if charMaxLength != nil {
opts.CharMaxLength = *charMaxLength
}

return BitVarying, opts, nil
case "boolean":
return Boolean, nil, nil
case "smallint":
Expand Down
6 changes: 6 additions & 0 deletions sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func valueConverterForType(dataType schema.DataType, opts *schema.Opts) (convert
}

return converters.NewBitConverter(opts.CharMaxLength), nil
case schema.BitVarying:
if opts == nil {
return nil, fmt.Errorf("missing options for bit varying data type")
}

return converters.NewBitVaryingConverter(opts.CharMaxLength), nil
case schema.Boolean:
return converters.BooleanPassthrough{}, nil
case schema.Int16:
Expand Down
25 changes: 25 additions & 0 deletions sources/postgres/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,5 +301,30 @@ func TestValueConverterForType_Convert(t *testing.T) {
assert.Equal(t, []byte{21}, actualValue)
}
}
{
// bit varying
{
// no options
_, err := valueConverterForType(schema.BitVarying, nil)
assert.ErrorContains(t, err, "missing options for bit varying data type")
}
{
// bit varying
converter, err := valueConverterForType(schema.BitVarying, &schema.Opts{CharMaxLength: 0})
assert.NoError(t, err)

actualValue, actualErr := converter.Convert("1")
assert.NoError(t, actualErr)
assert.Equal(t, []byte{1}, actualValue)
}
{
// bit varying (5)
converter, err := valueConverterForType(schema.BitVarying, &schema.Opts{CharMaxLength: 5})
assert.NoError(t, err)

actualValue, actualErr := converter.Convert("10101")
assert.NoError(t, actualErr)
assert.Equal(t, []byte{21}, actualValue)
}
}
}
14 changes: 7 additions & 7 deletions writers/transfer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (w *Writer) messageToEvent(message lib.RawMessage) (event.Event, error) {
return memoryEvent, nil
}

func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error {
func (w *Writer) Write(ctx context.Context, messages []lib.RawMessage) error {
if len(messages) == 0 {
return nil
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error {
}

if shouldFlush {
if err = w.flush(flushReason); err != nil {
if err = w.flush(ctx, flushReason); err != nil {
return err
}
}
Expand All @@ -170,7 +170,7 @@ func (w *Writer) getTableData() (string, *models.TableData, error) {
return "", nil, fmt.Errorf("expected exactly one table")
}

func (w *Writer) flush(reason string) error {
func (w *Writer) flush(ctx context.Context, reason string) error {
tableName, tableData, err := w.getTableData()
if err != nil {
return err
Expand Down Expand Up @@ -198,7 +198,7 @@ func (w *Writer) flush(reason string) error {
tableData.ResetTempTableSuffix()
if isMicrosoftSQLServer(w.destination) {
// Microsoft SQL Server uses MERGE not append
if err = w.destination.Merge(tableData.TableData); err != nil {
if err = w.destination.Merge(ctx, tableData.TableData); err != nil {
tags["what"] = "merge_fail"
tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err))
return fmt.Errorf("failed to merge data to destination: %w", err)
Expand All @@ -210,7 +210,7 @@ func (w *Writer) flush(reason string) error {
}

tableData.InMemoryColumns().DeleteColumn(constants.OnlySetDeleteColumnMarker)
if err = w.destination.Append(tableData.TableData, isBigQuery(w.destination)); err != nil {
if err = w.destination.Append(ctx, tableData.TableData, isBigQuery(w.destination)); err != nil {
tags["what"] = "merge_fail"
tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err))
return fmt.Errorf("failed to append data to destination: %w", err)
Expand All @@ -221,12 +221,12 @@ func (w *Writer) flush(reason string) error {
return nil
}

func (w *Writer) OnComplete() error {
func (w *Writer) OnComplete(ctx context.Context) error {
if len(w.primaryKeys) == 0 {
return fmt.Errorf("primary keys not set")
}

if err := w.flush("complete"); err != nil {
if err := w.flush(ctx, "complete"); err != nil {
return fmt.Errorf("failed to flush: %w", err)
}

Expand Down
8 changes: 4 additions & 4 deletions writers/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type DestinationWriter interface {
Write(ctx context.Context, rawMsgs []lib.RawMessage) error
OnComplete() error
OnComplete(ctx context.Context) error
}

type Writer struct {
Expand Down Expand Up @@ -57,16 +57,16 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess

// Only run [OnComplete] if we wrote messages out. Otherwise, primary keys may not be loaded.
if count > 0 {
if err := w.destinationWriter.OnComplete(); err != nil {
if err := w.destinationWriter.OnComplete(ctx); err != nil {
return 0, fmt.Errorf("failed running destination OnComplete: %w", err)
}
}

return count, nil
}

func (w *Writer) OnComplete() error {
if err := w.destinationWriter.OnComplete(); err != nil {
func (w *Writer) OnComplete(ctx context.Context) error {
if err := w.destinationWriter.OnComplete(ctx); err != nil {
return fmt.Errorf("failed running destination OnComplete: %w", err)
}

Expand Down
Loading

0 comments on commit 47d5fa3

Please sign in to comment.