Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 17, 2024
1 parent a326332 commit 9a450ff
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 23 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/storagewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestColumnToTableFieldSchema(t *testing.T) {
{
// ETime - Invalid:
_, err := typing.NewTimeDetailsFromTemplate(typing.ETime, "", "")
assert.ErrorContains(t, err, "unsupported extended time details type:")
assert.ErrorContains(t, err, "unsupported extended time kind type:")
}
{
// Struct:
Expand Down
16 changes: 13 additions & 3 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
tableID := NewTableIdentifier("coffee_shop", "public", "orders")

_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(s.T(), err)

var cols columns.Columns
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType),
"created_at": _timestampTZ,
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}
Expand All @@ -52,13 +55,17 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {

func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
tableID := NewTableIdentifier("coffee_shop", "orders", "public")

_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(s.T(), err)

var cols columns.Columns
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType),
"created_at": _timestampTZ,
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}
Expand Down Expand Up @@ -91,13 +98,16 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
}

func (s *SnowflakeTestSuite) TestManipulateShouldDeleteColumn() {
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(s.T(), err)

var cols columns.Columns
for colName, kindDetails := range map[string]typing.KindDetails{
"id": typing.Integer,
"customer_id": typing.Integer,
"price": typing.Float,
"name": typing.String,
"created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType),
"created_at": _timestampTZ,
} {
cols.AddColumn(columns.NewColumn(colName, kindDetails))
}
Expand Down
5 changes: 4 additions & 1 deletion clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,12 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
tableData.InsertRow(pk, row, false)
}

_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(s.T(), err)

snowflakeColToKindDetailsMap := map[string]typing.KindDetails{
"id": typing.Integer,
"created_at": typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType),
"created_at": _timestampTZ,
"name": typing.String,
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
Expand Down
21 changes: 16 additions & 5 deletions lib/debezium/converters/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import (
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"
)

func TestZonedTimestamp_Convert(t *testing.T) {
Expand Down Expand Up @@ -126,7 +125,13 @@ func TestTime_Convert(t *testing.T) {
}

func TestNanoTime_Converter(t *testing.T) {
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), NanoTime{}.ToKindDetails())
_time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "")
assert.NoError(t, err)

kd, err := NanoTime{}.ToKindDetails()
assert.NoError(t, err)

assert.Equal(t, _time, kd)
{
// Invalid data
_, err := NanoTime{}.Convert("123")
Expand All @@ -141,7 +146,13 @@ func TestNanoTime_Converter(t *testing.T) {
}

func TestMicroTime_Converter(t *testing.T) {
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), MicroTime{}.ToKindDetails())
_time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "")
assert.NoError(t, err)

kd, err := MicroTime{}.ToKindDetails()
assert.NoError(t, err)

assert.Equal(t, _time, kd)
{
// Invalid data
_, err := MicroTime{}.Convert("123")
Expand Down
24 changes: 21 additions & 3 deletions lib/debezium/converters/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import (
)

func TestTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), Timestamp{}.ToKindDetails())
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(t, err)

kd, err := Timestamp{}.ToKindDetails()
assert.NoError(t, err)

assert.Equal(t, _timestampTZ, kd)
{
// Invalid conversion
_, err := Timestamp{}.Convert("invalid")
Expand All @@ -30,7 +36,13 @@ func TestTimestamp_Converter(t *testing.T) {
}

func TestMicroTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), MicroTimestamp{}.ToKindDetails())
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(t, err)

kd, err := MicroTimestamp{}.ToKindDetails()
assert.NoError(t, err)

assert.Equal(t, _timestampTZ, kd)
{
// Invalid conversion
_, err := MicroTimestamp{}.Convert("invalid")
Expand All @@ -51,7 +63,13 @@ func TestMicroTimestamp_Converter(t *testing.T) {
}

func TestNanoTimestamp_Converter(t *testing.T) {
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), NanoTimestamp{}.ToKindDetails())
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(t, err)

kd, err := NanoTimestamp{}.ToKindDetails()
assert.NoError(t, err)

assert.Equal(t, _timestampTZ, kd)
{
// Invalid conversion
_, err := NanoTimestamp{}.Convert("invalid")
Expand Down
34 changes: 28 additions & 6 deletions lib/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,48 @@ func TestField_ToKindDetails(t *testing.T) {
{
// Timestamp
// Datetime (for now)
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect, MicroTimestamp, NanoTimestamp, ZonedTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType), kd)
{
for _, dbzType := range []SupportedDebeziumType{MicroTimestamp, NanoTimestamp, ZonedTimestamp} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)

_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(t, err)
assert.Equal(t, _timestampTZ, kd)
}
}
{
for _, dbzType := range []SupportedDebeziumType{Timestamp, TimestampKafkaConnect} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)

_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(t, err)
assert.Equal(t, _timestampTZ, kd)
}
}

}
{
// Dates
for _, dbzType := range []SupportedDebeziumType{Date, DateKafkaConnect} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.DateKindType), kd)

_date, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.DateKindType, "")
assert.NoError(t, err)
assert.Equal(t, _date, kd)
}
}
{
// Time
for _, dbzType := range []SupportedDebeziumType{Time, TimeKafkaConnect, MicroTime, NanoTime, TimeWithTimezone} {
kd, err := Field{DebeziumType: dbzType}.ToKindDetails()
assert.NoError(t, err)
assert.Equal(t, typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType), kd)

_time, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimeKindType, "")
assert.NoError(t, err)
assert.Equal(t, _time, kd)
}
}
{
Expand Down
20 changes: 16 additions & 4 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ func (d *DDLTestSuite) TestAlterComplexObjects() {
}

func (d *DDLTestSuite) TestAlterIdempotency() {
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(d.T(), err)

cols := []columns.Column{
columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)),
columns.NewColumn("created_at", _timestampTZ),
columns.NewColumn("id", typing.Integer),
columns.NewColumn("order_name", typing.String),
columns.NewColumn("start", typing.String),
Expand Down Expand Up @@ -80,8 +83,11 @@ func (d *DDLTestSuite) TestAlterIdempotency() {

func (d *DDLTestSuite) TestAlterTableAdd() {
// Test adding a bunch of columns
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(d.T(), err)

cols := []columns.Column{
columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)),
columns.NewColumn("created_at", _timestampTZ),
columns.NewColumn("id", typing.Integer),
columns.NewColumn("order_name", typing.String),
columns.NewColumn("start", typing.String),
Expand Down Expand Up @@ -122,8 +128,11 @@ func (d *DDLTestSuite) TestAlterTableAdd() {

func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
// Test adding a bunch of columns
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(d.T(), err)

cols := []columns.Column{
columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)),
columns.NewColumn("created_at", _timestampTZ),
columns.NewColumn("id", typing.Integer),
columns.NewColumn("name", typing.String),
columns.NewColumn("start", typing.String),
Expand Down Expand Up @@ -179,8 +188,11 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {

func (d *DDLTestSuite) TestAlterTableDelete() {
// Test adding a bunch of columns
_timestampTZ, err := typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType, "")
assert.NoError(d.T(), err)

cols := []columns.Column{
columns.NewColumn("created_at", typing.NewTimeDetailsFromTemplate(typing.ETime, ext.TimestampTzKindType)),
columns.NewColumn("created_at", _timestampTZ),
columns.NewColumn("id", typing.Integer),
columns.NewColumn("name", typing.String),
columns.NewColumn("col_to_delete", typing.String),
Expand Down

0 comments on commit 9a450ff

Please sign in to comment.