Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Oct 22, 2024
1 parent ae18c5c commit e24dd26
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 48 deletions.
8 changes: 4 additions & 4 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
// Add kindDetails to created_at
"created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
"created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
}

var cols columns.Columns
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
// Add kindDetails to created_at
"created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
"created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
}

var cols columns.Columns
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
constants.DeleteColumnMarker: typing.Boolean,
constants.OnlySetDeleteColumnMarker: typing.Boolean,
// Add kindDetails to created_at
"created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
"created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)),
}

var cols columns.Columns
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {

inMemColumns := tableData.ReadOnlyInMemoryCols()
// Since sflkColumns overwrote the format, let's set it correctly again.
inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano))))
inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano))))
tableData.SetInMemoryColumns(inMemColumns)
break
}
Expand Down
2 changes: 1 addition & 1 deletion lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {

optionalSchema, err := evt.GetOptionalSchema()
assert.NoError(r.T(), err)
assert.Equal(r.T(), typing.Integer, typing.ParseValue("another_id", optionalSchema, evtData["another_id"]))
assert.Equal(r.T(), typing.Integer, typing.MustParseValue("another_id", optionalSchema, evtData["another_id"]))
assert.Equal(r.T(), "[email protected]", evtData["email"])

// Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds.
Expand Down
39 changes: 23 additions & 16 deletions lib/typing/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,66 @@ package typing

import (
"fmt"
"log/slog"
"reflect"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func ParseValue(key string, optionalSchema map[string]KindDetails, val any) KindDetails {
// MustParseValue - panics if the value cannot be parsed. This is used only for tests.
func MustParseValue(key string, optionalSchema map[string]KindDetails, val any) KindDetails {
kindDetail, err := ParseValue(key, optionalSchema, val)
if err != nil {
panic(err)
}

return kindDetail
}

func ParseValue(key string, optionalSchema map[string]KindDetails, val any) (KindDetails, error) {
if kindDetail, isOk := optionalSchema[key]; isOk {
return kindDetail
return kindDetail, nil
}

switch convertedVal := val.(type) {
case nil:
return Invalid
return Invalid, nil
case uint, int, uint8, uint16, uint32, uint64, int8, int16, int32, int64:
return Integer
return Integer, nil
case float32, float64:
// Integers will be parsed as Floats if they come from JSON
// This is a limitation with JSON - https://github.com/golang/go/issues/56719
// UNLESS Transfer is provided with a schema object, and we deliberately typecast the value to an integer
// before calling ParseValue().
return Float
return Float, nil
case bool:
return Boolean
return Boolean, nil
case string:
if IsJSON(convertedVal) {
return Struct
return Struct, nil
}

return String
return String, nil
case *decimal.Decimal:
extendedDetails := convertedVal.Details()
return KindDetails{
Kind: EDecimal.Kind,
ExtendedDecimalDetails: &extendedDetails,
}
}, nil
case *ext.ExtendedTime:
nestedKind := convertedVal.GetNestedKind()
return KindDetails{
Kind: ETime.Kind,
ExtendedTimeDetails: &nestedKind,
}
}, nil
default:
// Check if the val is one of our custom-types
if reflect.TypeOf(val).Kind() == reflect.Slice {
return Array
return Array, nil
} else if reflect.TypeOf(val).Kind() == reflect.Map {
return Struct
return Struct, nil
}

slog.Warn("Unhandled value, we returning Invalid for this type", slog.String("type", fmt.Sprintf("%T", val)), slog.Any("value", val))
}

return Invalid
return Invalid, fmt.Errorf("unknown type: %T", val)
}
44 changes: 22 additions & 22 deletions lib/typing/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,36 @@ func Test_ParseValue(t *testing.T) {
// Optional schema exists, so we are using it
optionalSchema := map[string]KindDetails{"created_at": String}
for _, val := range []any{"2023-01-01", nil} {
assert.Equal(t, String, ParseValue("created_at", optionalSchema, val))
assert.Equal(t, String, MustParseValue("created_at", optionalSchema, val))
}
}
{
// Invalid
assert.Equal(t, ParseValue("", nil, nil), Invalid)
assert.Equal(t, ParseValue("", nil, errors.New("hello")), Invalid)
assert.Equal(t, MustParseValue("", nil, nil), Invalid)
assert.Equal(t, MustParseValue("", nil, errors.New("hello")), Invalid)
}
{
// Nil
assert.Equal(t, ParseValue("", nil, ""), String)
assert.Equal(t, ParseValue("", nil, "nil"), String)
assert.Equal(t, ParseValue("", nil, nil), Invalid)
assert.Equal(t, MustParseValue("", nil, ""), String)
assert.Equal(t, MustParseValue("", nil, "nil"), String)
assert.Equal(t, MustParseValue("", nil, nil), Invalid)
}
{
// Floats
assert.Equal(t, ParseValue("", nil, 7.5), Float)
assert.Equal(t, ParseValue("", nil, -7.4999999), Float)
assert.Equal(t, ParseValue("", nil, 7.0), Float)
assert.Equal(t, MustParseValue("", nil, 7.5), Float)
assert.Equal(t, MustParseValue("", nil, -7.4999999), Float)
assert.Equal(t, MustParseValue("", nil, 7.0), Float)
}
{
// Integers
assert.Equal(t, ParseValue("", nil, 9), Integer)
assert.Equal(t, ParseValue("", nil, math.MaxInt), Integer)
assert.Equal(t, ParseValue("", nil, -1*math.MaxInt), Integer)
assert.Equal(t, MustParseValue("", nil, 9), Integer)
assert.Equal(t, MustParseValue("", nil, math.MaxInt), Integer)
assert.Equal(t, MustParseValue("", nil, -1*math.MaxInt), Integer)
}
{
// Boolean
assert.Equal(t, ParseValue("", nil, true), Boolean)
assert.Equal(t, ParseValue("", nil, false), Boolean)
assert.Equal(t, MustParseValue("", nil, true), Boolean)
assert.Equal(t, MustParseValue("", nil, false), Boolean)
}
{
// Strings
Expand All @@ -54,20 +54,20 @@ func Test_ParseValue(t *testing.T) {
}

for _, possibleString := range possibleStrings {
assert.Equal(t, ParseValue("", nil, possibleString), String)
assert.Equal(t, MustParseValue("", nil, possibleString), String)
}
}
{
// Arrays
assert.Equal(t, ParseValue("", nil, []string{"a", "b", "c"}), Array)
assert.Equal(t, ParseValue("", nil, []any{"a", 123, "c"}), Array)
assert.Equal(t, ParseValue("", nil, []int64{1}), Array)
assert.Equal(t, ParseValue("", nil, []bool{false}), Array)
assert.Equal(t, ParseValue("", nil, []any{false, true}), Array)
assert.Equal(t, MustParseValue("", nil, []string{"a", "b", "c"}), Array)
assert.Equal(t, MustParseValue("", nil, []any{"a", 123, "c"}), Array)
assert.Equal(t, MustParseValue("", nil, []int64{1}), Array)
assert.Equal(t, MustParseValue("", nil, []bool{false}), Array)
assert.Equal(t, MustParseValue("", nil, []any{false, true}), Array)
}
{
// Time in string w/ no schema
kindDetails := ParseValue("", nil, "00:18:11.13116+00")
kindDetails := MustParseValue("", nil, "00:18:11.13116+00")
assert.Equal(t, String, kindDetails)
}
{
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_ParseValue(t *testing.T) {
}

for _, randomMap := range randomMaps {
assert.Equal(t, ParseValue("", nil, randomMap), Struct, fmt.Sprintf("Failed message is: %v", randomMap))
assert.Equal(t, MustParseValue("", nil, randomMap), Struct, fmt.Sprintf("Failed message is: %v", randomMap))
}
}
}
6 changes: 3 additions & 3 deletions lib/typing/typing_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func BenchmarkLargeMapLengthQuery_WithMassiveValues(b *testing.B) {

func BenchmarkParseValueIntegerArtie(b *testing.B) {
for n := 0; n < b.N; n++ {
ParseValue("", nil, 45456312)
MustParseValue("", nil, 45456312)
}
}

Expand All @@ -48,7 +48,7 @@ func BenchmarkParseValueIntegerGo(b *testing.B) {

func BenchmarkParseValueBooleanArtie(b *testing.B) {
for n := 0; n < b.N; n++ {
ParseValue("", nil, true)
MustParseValue("", nil, true)
}
}

Expand All @@ -60,7 +60,7 @@ func BenchmarkParseValueBooleanGo(b *testing.B) {

func BenchmarkParseValueFloatArtie(b *testing.B) {
for n := 0; n < b.N; n++ {
ParseValue("", nil, 7.44)
MustParseValue("", nil, 7.44)
}
}

Expand Down
14 changes: 12 additions & 2 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,24 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName)
if !isOk {
// This would only happen if the columns did not get passed in initially.
inMemoryColumns.AddColumn(columns.NewColumn(newColName, typing.ParseValue(_col, e.OptionalSchema, val)))
kindDetails, err := typing.ParseValue(_col, e.OptionalSchema, val)
if err != nil {
return false, "", fmt.Errorf("failed to parse value: %w", err)
}

inMemoryColumns.AddColumn(columns.NewColumn(newColName, kindDetails))
} else {
if retrievedColumn.KindDetails == typing.Invalid {
// If colType is Invalid, let's see if we can update it to a better type
// If everything is nil, we don't need to add a column
// However, it's important to create a column even if it's nil.
// This is because we don't want to think that it's okay to drop a column in DWH
if kindDetails := typing.ParseValue(_col, e.OptionalSchema, val); kindDetails.Kind != typing.Invalid.Kind {
kindDetails, err := typing.ParseValue(_col, e.OptionalSchema, val)
if err != nil {
return false, "", fmt.Errorf("failed to parse value: %w", err)
}

if kindDetails.Kind != typing.Invalid.Kind {
retrievedColumn.KindDetails = kindDetails
inMemoryColumns.UpdateColumn(retrievedColumn)
}
Expand Down

0 comments on commit e24dd26

Please sign in to comment.