Skip to content

Commit

Permalink
Merge branch 'master' into initiate-snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 2, 2024
2 parents f7d7f33 + 2a4bc1b commit 82c4c9e
Show file tree
Hide file tree
Showing 43 changed files with 654 additions and 298 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @artie-labs/engineering
2 changes: 2 additions & 0 deletions .github/workflows/gha-go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ jobs:
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Run vet
run: make vet
- name: Run staticcheck
env:
SC_VERSION: "2024.1"
Expand Down
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ archives:
checksum:
name_template: 'checksums.txt'
snapshot:
name_template: "{{ incpatch .Version }}-next"
version_template: "{{ incpatch .Version }}-next"
changelog:
sort: asc
filters:
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
.PHONY: vet
vet:
go vet ./...

.PHONY: static
static:
staticcheck ./...
Expand Down Expand Up @@ -39,3 +43,9 @@ generate:
go get github.com/maxbrunsfeld/counterfeiter/v6
go generate ./...
go mod tidy

.PHONY: upgrade
upgrade:
go get github.com/artie-labs/transfer
go mod tidy
echo "Upgrade complete"
7 changes: 7 additions & 0 deletions config/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type MSSQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MSSQL) ToDSN() string {
Expand Down Expand Up @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error {
if stringutil.Empty(table.Name, table.Schema) {
return fmt.Errorf("table name and schema must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions config/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) {

assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in")
}
{
// Exclude and include columns at the same time
m := &MSSQL{
Host: "host",
Port: 1,
Username: "username",
Password: "password",
Database: "database",
Tables: []*MSSQLTable{
{
Name: "name",
Schema: "schema",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
},
},
}

assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time")
}
{
// Valid
m := &MSSQL{
Expand Down
7 changes: 7 additions & 0 deletions config/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MySQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MySQLTable) GetBatchSize() uint {
Expand Down Expand Up @@ -91,6 +93,11 @@ func (m *MySQL) Validate() error {
if table.Name == "" {
return fmt.Errorf("table name must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions config/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ func TestMySQL_Validate(t *testing.T) {
c.Tables = append(c.Tables, &MySQLTable{})
assert.ErrorContains(t, c.Validate(), "table name must be passed in")
}
{
// exclude and include at the same time
c := createValidConfig()
c.Tables = append(c.Tables, &MySQLTable{
Name: "foo",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
})

assert.ErrorContains(t, c.Validate(), "cannot exclude and include columns at the same time")
}
}

func TestMySQL_ToDSN(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions config/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ type PostgreSQLTable struct {

// Optional settings
BatchSize uint `yaml:"batchSize,omitempty"`
PrimaryKeysOverride []string `yaml:"primaryKeysOverride,omitempty"`
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (p *PostgreSQLTable) GetBatchSize() uint {
Expand Down Expand Up @@ -98,6 +101,11 @@ func (p *PostgreSQL) Validate() error {
if table.Schema == "" {
return fmt.Errorf("schema must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions config/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ func TestPostgreSQL_Validate(t *testing.T) {

assert.ErrorContains(t, p.Validate(), "schema must be passed in")
}
{
// Filtering and excluding at the same time
p := &PostgreSQL{
Host: "host",
Port: 1,
Username: "username",
Password: "password",
Database: "database",
Tables: []*PostgreSQLTable{
{
Name: "name",
Schema: "schema",
ExcludeColumns: []string{"a"},
IncludeColumns: []string{"b"},
},
},
}

assert.ErrorContains(t, p.Validate(), "cannot exclude and include columns at the same time")
}
{
// Valid
p := &PostgreSQL{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.26.8
github.com/artie-labs/transfer v1.27.11
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.26.8 h1:aNhd4f3KwHOl0NsCwS1c4SJfU+CGveleqQgMgCAZG/0=
github.com/artie-labs/transfer v1.26.8/go.mod h1:BlYxzzlXGHOMNSgbpcjzw1zQSD/wXmb93NoPBhOmcqA=
github.com/artie-labs/transfer v1.27.11 h1:2J5kV/q2RmB7PUJrqIMx+gL9Rp5zJS6Ey72koqSuEpk=
github.com/artie-labs/transfer v1.27.11/go.mod h1:+a/UhlQVRIpdz3muS1yhSvyX42RQL0LHOdovGZfEsDE=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
55 changes: 28 additions & 27 deletions integration_tests/mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math/rand/v2"
"os"
"reflect"
"time"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/integration_tests/utils"
"github.com/artie-labs/reader/lib"
Expand All @@ -16,11 +22,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log/slog"
"math/rand/v2"
"os"
"reflect"
"time"
)

func main() {
Expand Down Expand Up @@ -96,25 +97,25 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB)
ts := time.Date(2020, 10, 5, 12, 0, 0, 0, time.UTC)

_, err = collection.InsertOne(ctx, bson.D{
{"_id", objId},
{"string", "This is a string"},
{"int32", int32(32)},
{"int64", int64(64)},
{"double", 3.14},
{"bool", true},
{"datetime", ts},
{"embeddedDocument", bson.D{
{"field1", "value1"},
{"field2", "value2"},
{Key: "_id", Value: objId},
{Key: "string", Value: "This is a string"},
{Key: "int32", Value: int32(32)},
{Key: "int64", Value: int64(64)},
{Key: "double", Value: 3.14},
{Key: "bool", Value: true},
{Key: "datetime", Value: ts},
{Key: "embeddedDocument", Value: bson.D{
{Key: "field1", Value: "value1"},
{Key: "field2", Value: "value2"},
}},
{"embeddedMap", bson.M{"foo": "bar", "hello": "world", "pi": 3.14159}},
{"array", bson.A{"item1", 2, true, 3.14}},
{"binary", []byte("binary data")},
{"objectId", objId},
{"null", nil},
{"timestamp", primitive.Timestamp{T: uint32(ts.Unix()), I: 1}},
{"minKey", primitive.MinKey{}},
{"maxKey", primitive.MaxKey{}},
{Key: "embeddedMap", Value: bson.M{"foo": "bar", "hello": "world", "pi": 3.14159}},
{Key: "array", Value: bson.A{"item1", 2, true, 3.14}},
{Key: "binary", Value: []byte("binary data")},
{Key: "objectId", Value: objId},
{Key: "null", Value: nil},
{Key: "timestamp", Value: primitive.Timestamp{T: uint32(ts.Unix()), I: 1}},
{Key: "minKey", Value: primitive.MinKey{}},
{Key: "maxKey", Value: primitive.MaxKey{}},
})
if err != nil {
return fmt.Errorf("failed to insert row: %w", err)
Expand Down Expand Up @@ -169,24 +170,24 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB)
return fmt.Errorf("failed to get event from bytes: %w", err)
}

pkMap, err := dbz.GetPrimaryKey(actualPkBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
pkMap, err := dbz.GetPrimaryKey(actualPkBytes, kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
if err != nil {
return fmt.Errorf("failed to get primary key: %w", err)
}

data, err := evt.GetData(pkMap, &kafkalib.TopicConfig{})
data, err := evt.GetData(pkMap, kafkalib.TopicConfig{})
if err != nil {
return fmt.Errorf("failed to get data: %w", err)
}

expectedPayload := map[string]any{
"objectId": "66a95fae3776c2f21f0ff568",
"array": []any{"item1", int32(2), true, 3.14},
"datetime": ts.Format(ext.ISO8601),
"datetime": ext.NewExtendedTime(ts, ext.TimestampTzKindType, "2006-01-02T15:04:05.999-07:00"),
"int64": int64(64),
"__artie_delete": false,
"__artie_only_set_delete": false,
"timestamp": ts.Format(ext.ISO8601),
"timestamp": ext.NewExtendedTime(ts, ext.TimestampTzKindType, "2006-01-02T15:04:05.999-07:00"),
"embeddedDocument": `{"field1":"value1","field2":"value2"}`,
"embeddedMap": `{"foo":"bar","hello":"world","pi":3.14159}`,
"binary": `{"$binary":{"base64":"YmluYXJ5IGRhdGE=","subType":"00"}}`,
Expand Down
31 changes: 23 additions & 8 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@ package converters

import (
"fmt"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/debezium/converters"
"github.com/artie-labs/transfer/lib/typing"
"github.com/cockroachdb/apd/v3"
)

type decimalConverter struct {
// encodeDecimalWithScale is used to encode a [*apd.Decimal] to `org.apache.kafka.connect.data.Decimal`
// using a specific scale.
func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) ([]byte, error) {
targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative.
if decimal.Exponent != targetExponent {
// Return an error if the scales are different, this maintains parity with `org.apache.kafka.connect.data.Decimal`.
// https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69
return nil, fmt.Errorf("value scale (%d) is different from schema scale (%d)", -decimal.Exponent, scale)
}
bytes, _ := converters.EncodeDecimal(decimal)
return bytes, nil
}

type DecimalConverter struct {
scale uint16
precision *int
}

func NewDecimalConverter(scale uint16, precision *int) decimalConverter {
return decimalConverter{scale: scale, precision: precision}
func NewDecimalConverter(scale uint16, precision *int) DecimalConverter {
return DecimalConverter{scale: scale, precision: precision}
}

func (d decimalConverter) ToField(name string) debezium.Field {
func (d DecimalConverter) ToField(name string) debezium.Field {
field := debezium.Field{
FieldName: name,
Type: debezium.Bytes,
Expand All @@ -33,7 +48,7 @@ func (d decimalConverter) ToField(name string) debezium.Field {
return field
}

func (d decimalConverter) Convert(value any) (any, error) {
func (d DecimalConverter) Convert(value any) (any, error) {
stringValue, err := typing.AssertType[string](value)
if err != nil {
return nil, err
Expand All @@ -44,7 +59,7 @@ func (d decimalConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

return debezium.EncodeDecimalWithScale(decimal, int32(d.scale)), nil
return encodeDecimalWithScale(decimal, int32(d.scale))
}

type VariableNumericConverter struct{}
Expand All @@ -68,9 +83,9 @@ func (VariableNumericConverter) Convert(value any) (any, error) {
return nil, fmt.Errorf(`unable to use %q as a decimal: %w`, stringValue, err)
}

bytes, scale := debezium.EncodeDecimal(decimal)
bytes, scale := converters.EncodeDecimal(decimal)
return map[string]any{
"scale": int32(scale),
"scale": scale,
"value": bytes,
}, nil
}
Loading

0 comments on commit 82c4c9e

Please sign in to comment.