Skip to content

Commit

Permalink
Upgrade Artie Transfer + updating ptrs (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 18, 2024
1 parent 690c621 commit 70dc8cc
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/artie-labs/transfer v1.27.2
github.com/artie-labs/transfer v1.27.7
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.27.2 h1:4uQpU63XU/IpHQ9C46yjhpEE1WDZfuUbb6yVQfjpg+4=
github.com/artie-labs/transfer v1.27.2/go.mod h1:+a/UhlQVRIpdz3muS1yhSvyX42RQL0LHOdovGZfEsDE=
github.com/artie-labs/transfer v1.27.7 h1:X883+8drGY4W0bH60iER0jb/BVt/TB96E5y2SHOq2/U=
github.com/artie-labs/transfer v1.27.7/go.mod h1:+a/UhlQVRIpdz3muS1yhSvyX42RQL0LHOdovGZfEsDE=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
4 changes: 2 additions & 2 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/debezium/converters"
"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestDecimalConverter_ToField(t *testing.T) {
}
{
// With precision
converter := NewDecimalConverter(2, ptr.ToInt(3))
converter := NewDecimalConverter(2, typing.ToPtr(3))
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
Expand Down
23 changes: 10 additions & 13 deletions lib/mssql/schema/schema_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package schema

import (
"testing"

ptr2 "github.com/artie-labs/reader/lib/ptr"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestParseColumnDataType(t *testing.T) {
Expand Down Expand Up @@ -63,7 +60,7 @@ func TestParseColumnDataType(t *testing.T) {
{
// valid
for _, colKind := range []string{"numeric", "decimal"} {
dataType, opts, err := ParseColumnDataType(colKind, ptr.ToInt(1), ptr2.ToUint16(2), nil)
dataType, opts, err := ParseColumnDataType(colKind, typing.ToPtr(1), typing.ToPtr(uint16(2)), nil)
assert.NoError(t, err, colKind)
assert.NotNil(t, opts, colKind)
assert.Equal(t, Numeric, dataType, colKind)
Expand All @@ -74,7 +71,7 @@ func TestParseColumnDataType(t *testing.T) {
{
// invalid, precision is missing
for _, colKind := range []string{"numeric", "decimal"} {
dataType, opts, err := ParseColumnDataType(colKind, nil, ptr2.ToUint16(2), nil)
dataType, opts, err := ParseColumnDataType(colKind, nil, typing.ToPtr(uint16(2)), nil)
assert.ErrorContains(t, err, "expected precision and scale to be not-nil", colKind)
assert.Nil(t, opts, colKind)
assert.Equal(t, -1, int(dataType), colKind)
Expand All @@ -86,7 +83,7 @@ func TestParseColumnDataType(t *testing.T) {
{
// Default
for i := 0; i <= 3; i++ {
dataType, opts, err := ParseColumnDataType("time", nil, nil, ptr.ToInt(i))
dataType, opts, err := ParseColumnDataType("time", nil, nil, typing.ToPtr(i))
assert.NoError(t, err, i)
assert.Nil(t, opts, i)
assert.Equal(t, Time, dataType, i)
Expand All @@ -95,15 +92,15 @@ func TestParseColumnDataType(t *testing.T) {
{
// Micro
for i := 4; i <= 6; i++ {
dataType, opts, err := ParseColumnDataType("time", nil, nil, ptr.ToInt(i))
dataType, opts, err := ParseColumnDataType("time", nil, nil, typing.ToPtr(i))
assert.NoError(t, err, i)
assert.Nil(t, opts, i)
assert.Equal(t, TimeMicro, dataType, i)
}
}
{
// Nano
dataType, opts, err := ParseColumnDataType("time", nil, nil, ptr.ToInt(7))
dataType, opts, err := ParseColumnDataType("time", nil, nil, typing.ToPtr(7))
assert.NoError(t, err)
assert.Nil(t, opts)
assert.Equal(t, TimeNano, dataType)
Expand Down Expand Up @@ -139,7 +136,7 @@ func TestParseColumnDataType(t *testing.T) {
{
// Default
for i := 0; i <= 3; i++ {
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, ptr.ToInt(i))
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, typing.ToPtr(i))
assert.NoError(t, err, i)
assert.Nil(t, opts, i)
assert.Equal(t, Datetime2, dataType, i)
Expand All @@ -148,15 +145,15 @@ func TestParseColumnDataType(t *testing.T) {
{
// Micro
for i := 4; i <= 6; i++ {
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, ptr.ToInt(i))
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, typing.ToPtr(i))
assert.NoError(t, err, i)
assert.Nil(t, opts, i)
assert.Equal(t, Datetime2Micro, dataType, i)
}
}
{
// nano
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, ptr.ToInt(7))
dataType, opts, err := ParseColumnDataType("datetime2", nil, nil, typing.ToPtr(7))
assert.NoError(t, err)
assert.Nil(t, opts)
assert.Equal(t, Datetime2Nano, dataType)
Expand Down
8 changes: 3 additions & 5 deletions lib/mysql/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ import (
"database/sql"
"errors"
"fmt"
ptr2 "github.com/artie-labs/reader/lib/ptr"
"log/slog"
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/lib/rdbms/column"
"github.com/artie-labs/reader/lib/rdbms/primary_key"
"github.com/artie-labs/transfer/lib/typing"
)

type DataType int
Expand Down Expand Up @@ -172,7 +170,7 @@ func parseColumnDataType(originalS string) (DataType, *Opts, error) {
if err != nil {
return -1, nil, fmt.Errorf("failed to parse scale value %q: %w", s, err)
}
return Decimal, &Opts{Precision: ptr.ToInt(precision), Scale: ptr2.ToUint16(uint16(scale))}, nil
return Decimal, &Opts{Precision: typing.ToPtr(precision), Scale: typing.ToPtr(uint16(scale))}, nil
case "float":
return Float, nil, nil
case "double":
Expand All @@ -196,7 +194,7 @@ func parseColumnDataType(originalS string) (DataType, *Opts, error) {
if err != nil {
return -1, nil, fmt.Errorf("failed to parse varchar size: %w", err)
}
return Varchar, &Opts{Size: ptr.ToInt(size)}, nil
return Varchar, &Opts{Size: typing.ToPtr(size)}, nil
case "binary":
return Binary, nil, nil
case "varbinary":
Expand Down
11 changes: 4 additions & 7 deletions lib/mysql/schema/schema_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package schema

import (
"testing"

ptr2 "github.com/artie-labs/reader/lib/ptr"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestQuoteIdentifier(t *testing.T) {
Expand Down Expand Up @@ -80,14 +77,14 @@ func TestParseColumnDataType(t *testing.T) {
dataType, opts, err := parseColumnDataType("varchar(255)")
assert.NoError(t, err)
assert.Equal(t, Varchar, dataType)
assert.Equal(t, &Opts{Size: ptr.ToInt(255)}, opts)
assert.Equal(t, &Opts{Size: typing.ToPtr(255)}, opts)
}
{
// Decimal
dataType, opts, err := parseColumnDataType("decimal(5,2)")
assert.NoError(t, err)
assert.Equal(t, Decimal, dataType)
assert.Equal(t, &Opts{Precision: ptr.ToInt(5), Scale: ptr2.ToUint16(2)}, opts)
assert.Equal(t, &Opts{Precision: typing.ToPtr(5), Scale: typing.ToPtr(uint16(2))}, opts)
}
{
// Blob
Expand Down
19 changes: 8 additions & 11 deletions lib/postgres/schema/schema_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package schema

import (
"testing"

ptr2 "github.com/artie-labs/reader/lib/ptr"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestParseColumnDataType(t *testing.T) {
Expand Down Expand Up @@ -81,8 +78,8 @@ func TestParseColumnDataType(t *testing.T) {
{
name: "numeric - with scale + precision",
colKind: "numeric",
scale: ptr2.ToUint16(2),
precision: ptr.ToInt(3),
scale: typing.ToPtr(uint16(2)),
precision: typing.ToPtr(3),
expectedDataType: Numeric,
expectedOpts: &Opts{
Scale: 2,
Expand All @@ -102,25 +99,25 @@ func TestParseColumnDataType(t *testing.T) {
{
name: "hstore",
colKind: "user-defined",
udtName: ptr.ToString("hstore"),
udtName: typing.ToPtr("hstore"),
expectedDataType: HStore,
},
{
name: "geometry",
colKind: "user-defined",
udtName: ptr.ToString("geometry"),
udtName: typing.ToPtr("geometry"),
expectedDataType: Geometry,
},
{
name: "geography",
colKind: "user-defined",
udtName: ptr.ToString("geography"),
udtName: typing.ToPtr("geography"),
expectedDataType: Geography,
},
{
name: "user-defined text",
colKind: "user-defined",
udtName: ptr.ToString("foo"),
udtName: typing.ToPtr("foo"),
expectedDataType: UserDefinedText,
},
{
Expand Down
29 changes: 14 additions & 15 deletions lib/s3lib/s3lib_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package s3lib

import (
"testing"

"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBucketAndPrefixFromFilePath(t *testing.T) {
Expand All @@ -18,39 +17,39 @@ func TestBucketAndPrefixFromFilePath(t *testing.T) {
{
name: "valid path (w/ S3 prefix)",
fp: "s3://bucket/prefix",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
{
name: "valid path (w/ S3 prefix) with trailing slash",
fp: "s3://bucket/prefix/",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
{
name: "valid path (w/ S3 prefix) with multiple slashes",
fp: "s3://bucket/prefix/with/multiple/slashes",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
// Without S3 prefix
{
name: "valid path (w/o S3 prefix)",
fp: "bucket/prefix",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix"),
},
{
name: "valid path (w/o S3 prefix) with trailing slash",
fp: "bucket/prefix/",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/"),
},
{
name: "valid path (w/o S3 prefix) with multiple slashes",
fp: "bucket/prefix/with/multiple/slashes",
expectedBucket: ptr.ToString("bucket"),
expectedPrefix: ptr.ToString("prefix/with/multiple/slashes"),
expectedBucket: typing.ToPtr("bucket"),
expectedPrefix: typing.ToPtr("prefix/with/multiple/slashes"),
},
{
name: "invalid path",
Expand Down
8 changes: 4 additions & 4 deletions sources/dynamodb/stream/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"

Expand Down Expand Up @@ -77,13 +77,13 @@ func (s *Store) processShard(ctx context.Context, shard types.Shard, writer writ
}

iteratorInput := &dynamodbstreams.GetShardIteratorInput{
StreamArn: ptr.ToString(s.streamArn),
StreamArn: typing.ToPtr(s.streamArn),
ShardId: shard.ShardId,
ShardIteratorType: iteratorType,
}

if startingSequenceNumber != "" {
iteratorInput.SequenceNumber = ptr.ToString(startingSequenceNumber)
iteratorInput.SequenceNumber = typing.ToPtr(startingSequenceNumber)
}

iteratorOutput, err := s.streams.GetShardIterator(ctx, iteratorInput)
Expand All @@ -97,7 +97,7 @@ func (s *Store) processShard(ctx context.Context, shard types.Shard, writer writ
for shardIterator != nil {
getRecordsInput := &dynamodbstreams.GetRecordsInput{
ShardIterator: shardIterator,
Limit: ptr.ToInt32(1000),
Limit: typing.ToPtr(int32(1000)),
}

getRecordsOutput, err := s.streams.GetRecords(ctx, getRecordsInput)
Expand Down
8 changes: 3 additions & 5 deletions sources/mysql/adapter/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package adapter

import (
"github.com/artie-labs/transfer/lib/typing"
"testing"

ptr2 "github.com/artie-labs/reader/lib/ptr"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/reader/lib/mysql"
Expand Down Expand Up @@ -137,8 +135,8 @@ func TestValueConverterForType(t *testing.T) {
name: "decimal",
dataType: schema.Decimal,
opts: &schema.Opts{
Scale: ptr2.ToUint16(3),
Precision: ptr.ToInt(5),
Scale: typing.ToPtr(uint16(3)),
Precision: typing.ToPtr(5),
},
expected: debezium.Field{
Type: "bytes",
Expand Down

0 comments on commit 70dc8cc

Please sign in to comment.