From e24dd2606572f17e8694766969e85fc243c66c76 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 22 Oct 2024 11:50:22 -0700 Subject: [PATCH] Clean up. --- clients/snowflake/snowflake_test.go | 8 +++--- lib/cdc/relational/debezium_test.go | 2 +- lib/typing/parse.go | 39 ++++++++++++++----------- lib/typing/parse_test.go | 44 ++++++++++++++--------------- lib/typing/typing_bench_test.go | 6 ++-- models/event/event.go | 14 +++++++-- 6 files changed, 65 insertions(+), 48 deletions(-) diff --git a/clients/snowflake/snowflake_test.go b/clients/snowflake/snowflake_test.go index a3e28899c..1fa828ba7 100644 --- a/clients/snowflake/snowflake_test.go +++ b/clients/snowflake/snowflake_test.go @@ -92,7 +92,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() { constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, // Add kindDetails to created_at - "created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)), + "created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)), } var cols columns.Columns @@ -134,7 +134,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() { constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, // Add kindDetails to created_at - "created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)), + "created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)), } var cols columns.Columns @@ -224,7 +224,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { constants.DeleteColumnMarker: typing.Boolean, constants.OnlySetDeleteColumnMarker: typing.Boolean, // Add kindDetails to created_at - "created_at": typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)), + "created_at": typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)), } var cols columns.Columns @@ -273,7 +273,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() { inMemColumns := tableData.ReadOnlyInMemoryCols() // Since sflkColumns overwrote the format, let's set it correctly again. - inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.ParseValue("", nil, time.Now().Format(time.RFC3339Nano)))) + inMemColumns.UpdateColumn(columns.NewColumn("created_at", typing.MustParseValue("", nil, time.Now().Format(time.RFC3339Nano)))) tableData.SetInMemoryColumns(inMemColumns) break } diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index 20b777770..b34fc9671 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -200,7 +200,7 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { optionalSchema, err := evt.GetOptionalSchema() assert.NoError(r.T(), err) - assert.Equal(r.T(), typing.Integer, typing.ParseValue("another_id", optionalSchema, evtData["another_id"])) + assert.Equal(r.T(), typing.Integer, typing.MustParseValue("another_id", optionalSchema, evtData["another_id"])) assert.Equal(r.T(), "sally.thomas@acme.com", evtData["email"]) // Datetime without TZ is emitted in microseconds which is 1000x larger than nanoseconds. diff --git a/lib/typing/parse.go b/lib/typing/parse.go index 7cd6274fa..4d9b575cb 100644 --- a/lib/typing/parse.go +++ b/lib/typing/parse.go @@ -2,59 +2,66 @@ package typing import ( "fmt" - "log/slog" "reflect" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) -func ParseValue(key string, optionalSchema map[string]KindDetails, val any) KindDetails { +// MustParseValue - panics if the value cannot be parsed. This is used only for tests. +func MustParseValue(key string, optionalSchema map[string]KindDetails, val any) KindDetails { + kindDetail, err := ParseValue(key, optionalSchema, val) + if err != nil { + panic(err) + } + + return kindDetail +} + +func ParseValue(key string, optionalSchema map[string]KindDetails, val any) (KindDetails, error) { if kindDetail, isOk := optionalSchema[key]; isOk { - return kindDetail + return kindDetail, nil } switch convertedVal := val.(type) { case nil: - return Invalid + return Invalid, nil case uint, int, uint8, uint16, uint32, uint64, int8, int16, int32, int64: - return Integer + return Integer, nil case float32, float64: // Integers will be parsed as Floats if they come from JSON // This is a limitation with JSON - https://github.com/golang/go/issues/56719 // UNLESS Transfer is provided with a schema object, and we deliberately typecast the value to an integer // before calling ParseValue(). - return Float + return Float, nil case bool: - return Boolean + return Boolean, nil case string: if IsJSON(convertedVal) { - return Struct + return Struct, nil } - return String + return String, nil case *decimal.Decimal: extendedDetails := convertedVal.Details() return KindDetails{ Kind: EDecimal.Kind, ExtendedDecimalDetails: &extendedDetails, - } + }, nil case *ext.ExtendedTime: nestedKind := convertedVal.GetNestedKind() return KindDetails{ Kind: ETime.Kind, ExtendedTimeDetails: &nestedKind, - } + }, nil default: // Check if the val is one of our custom-types if reflect.TypeOf(val).Kind() == reflect.Slice { - return Array + return Array, nil } else if reflect.TypeOf(val).Kind() == reflect.Map { - return Struct + return Struct, nil } - - slog.Warn("Unhandled value, we returning Invalid for this type", slog.String("type", fmt.Sprintf("%T", val)), slog.Any("value", val)) } - return Invalid + return Invalid, fmt.Errorf("unknown type: %T", val) } diff --git a/lib/typing/parse_test.go b/lib/typing/parse_test.go index d7da0ab36..a5bb1df7b 100644 --- a/lib/typing/parse_test.go +++ b/lib/typing/parse_test.go @@ -14,36 +14,36 @@ func Test_ParseValue(t *testing.T) { // Optional schema exists, so we are using it optionalSchema := map[string]KindDetails{"created_at": String} for _, val := range []any{"2023-01-01", nil} { - assert.Equal(t, String, ParseValue("created_at", optionalSchema, val)) + assert.Equal(t, String, MustParseValue("created_at", optionalSchema, val)) } } { // Invalid - assert.Equal(t, ParseValue("", nil, nil), Invalid) - assert.Equal(t, ParseValue("", nil, errors.New("hello")), Invalid) + assert.Equal(t, MustParseValue("", nil, nil), Invalid) + assert.Equal(t, MustParseValue("", nil, errors.New("hello")), Invalid) } { // Nil - assert.Equal(t, ParseValue("", nil, ""), String) - assert.Equal(t, ParseValue("", nil, "nil"), String) - assert.Equal(t, ParseValue("", nil, nil), Invalid) + assert.Equal(t, MustParseValue("", nil, ""), String) + assert.Equal(t, MustParseValue("", nil, "nil"), String) + assert.Equal(t, MustParseValue("", nil, nil), Invalid) } { // Floats - assert.Equal(t, ParseValue("", nil, 7.5), Float) - assert.Equal(t, ParseValue("", nil, -7.4999999), Float) - assert.Equal(t, ParseValue("", nil, 7.0), Float) + assert.Equal(t, MustParseValue("", nil, 7.5), Float) + assert.Equal(t, MustParseValue("", nil, -7.4999999), Float) + assert.Equal(t, MustParseValue("", nil, 7.0), Float) } { // Integers - assert.Equal(t, ParseValue("", nil, 9), Integer) - assert.Equal(t, ParseValue("", nil, math.MaxInt), Integer) - assert.Equal(t, ParseValue("", nil, -1*math.MaxInt), Integer) + assert.Equal(t, MustParseValue("", nil, 9), Integer) + assert.Equal(t, MustParseValue("", nil, math.MaxInt), Integer) + assert.Equal(t, MustParseValue("", nil, -1*math.MaxInt), Integer) } { // Boolean - assert.Equal(t, ParseValue("", nil, true), Boolean) - assert.Equal(t, ParseValue("", nil, false), Boolean) + assert.Equal(t, MustParseValue("", nil, true), Boolean) + assert.Equal(t, MustParseValue("", nil, false), Boolean) } { // Strings @@ -54,20 +54,20 @@ func Test_ParseValue(t *testing.T) { } for _, possibleString := range possibleStrings { - assert.Equal(t, ParseValue("", nil, possibleString), String) + assert.Equal(t, MustParseValue("", nil, possibleString), String) } } { // Arrays - assert.Equal(t, ParseValue("", nil, []string{"a", "b", "c"}), Array) - assert.Equal(t, ParseValue("", nil, []any{"a", 123, "c"}), Array) - assert.Equal(t, ParseValue("", nil, []int64{1}), Array) - assert.Equal(t, ParseValue("", nil, []bool{false}), Array) - assert.Equal(t, ParseValue("", nil, []any{false, true}), Array) + assert.Equal(t, MustParseValue("", nil, []string{"a", "b", "c"}), Array) + assert.Equal(t, MustParseValue("", nil, []any{"a", 123, "c"}), Array) + assert.Equal(t, MustParseValue("", nil, []int64{1}), Array) + assert.Equal(t, MustParseValue("", nil, []bool{false}), Array) + assert.Equal(t, MustParseValue("", nil, []any{false, true}), Array) } { // Time in string w/ no schema - kindDetails := ParseValue("", nil, "00:18:11.13116+00") + kindDetails := MustParseValue("", nil, "00:18:11.13116+00") assert.Equal(t, String, kindDetails) } { @@ -97,7 +97,7 @@ func Test_ParseValue(t *testing.T) { } for _, randomMap := range randomMaps { - assert.Equal(t, ParseValue("", nil, randomMap), Struct, fmt.Sprintf("Failed message is: %v", randomMap)) + assert.Equal(t, MustParseValue("", nil, randomMap), Struct, fmt.Sprintf("Failed message is: %v", randomMap)) } } } diff --git a/lib/typing/typing_bench_test.go b/lib/typing/typing_bench_test.go index 8efe7e3ff..d29bac89c 100644 --- a/lib/typing/typing_bench_test.go +++ b/lib/typing/typing_bench_test.go @@ -36,7 +36,7 @@ func BenchmarkLargeMapLengthQuery_WithMassiveValues(b *testing.B) { func BenchmarkParseValueIntegerArtie(b *testing.B) { for n := 0; n < b.N; n++ { - ParseValue("", nil, 45456312) + MustParseValue("", nil, 45456312) } } @@ -48,7 +48,7 @@ func BenchmarkParseValueIntegerGo(b *testing.B) { func BenchmarkParseValueBooleanArtie(b *testing.B) { for n := 0; n < b.N; n++ { - ParseValue("", nil, true) + MustParseValue("", nil, true) } } @@ -60,7 +60,7 @@ func BenchmarkParseValueBooleanGo(b *testing.B) { func BenchmarkParseValueFloatArtie(b *testing.B) { for n := 0; n < b.N; n++ { - ParseValue("", nil, 7.44) + MustParseValue("", nil, 7.44) } } diff --git a/models/event/event.go b/models/event/event.go index 889bc2845..87b5f009f 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -238,14 +238,24 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName) if !isOk { // This would only happen if the columns did not get passed in initially. - inMemoryColumns.AddColumn(columns.NewColumn(newColName, typing.ParseValue(_col, e.OptionalSchema, val))) + kindDetails, err := typing.ParseValue(_col, e.OptionalSchema, val) + if err != nil { + return false, "", fmt.Errorf("failed to parse value: %w", err) + } + + inMemoryColumns.AddColumn(columns.NewColumn(newColName, kindDetails)) } else { if retrievedColumn.KindDetails == typing.Invalid { // If colType is Invalid, let's see if we can update it to a better type // If everything is nil, we don't need to add a column // However, it's important to create a column even if it's nil. // This is because we don't want to think that it's okay to drop a column in DWH - if kindDetails := typing.ParseValue(_col, e.OptionalSchema, val); kindDetails.Kind != typing.Invalid.Kind { + kindDetails, err := typing.ParseValue(_col, e.OptionalSchema, val) + if err != nil { + return false, "", fmt.Errorf("failed to parse value: %w", err) + } + + if kindDetails.Kind != typing.Invalid.Kind { retrievedColumn.KindDetails = kindDetails inMemoryColumns.UpdateColumn(retrievedColumn) }