From 81cc2bea49a4f78597ba93a41cf7c189cccc483a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 30 Jul 2024 13:06:55 -0700 Subject: [PATCH] Checkpoint. --- lib/cdc/event.go | 2 +- lib/cdc/mongo/debezium.go | 4 +- lib/cdc/relational/debezium_test.go | 9 +- lib/cdc/util/optional_schema.go | 13 +- lib/cdc/util/optional_schema_test.go | 4 +- lib/cdc/util/relational_event_test.go | 5 +- lib/debezium/converters/decimal.go | 86 +++++ lib/debezium/converters/decimal_bench_test.go | 44 +++ lib/debezium/converters/decimal_test.go | 313 ++++++++++++++++++ lib/debezium/{ => encode}/decimal.go | 2 +- lib/debezium/{ => encode}/decimal_test.go | 2 +- lib/debezium/schema.go | 86 ++--- lib/debezium/schema_test.go | 5 +- lib/debezium/types.go | 82 +---- lib/debezium/types_bench_test.go | 56 ---- lib/debezium/types_test.go | 306 ----------------- models/event/event.go | 7 +- models/event/event_test.go | 4 +- 18 files changed, 536 insertions(+), 494 deletions(-) create mode 100644 lib/debezium/converters/decimal.go create mode 100644 lib/debezium/converters/decimal_bench_test.go create mode 100644 lib/debezium/converters/decimal_test.go rename lib/debezium/{ => encode}/decimal.go (99%) rename lib/debezium/{ => encode}/decimal_test.go (99%) delete mode 100644 lib/debezium/types_bench_test.go diff --git a/lib/cdc/event.go b/lib/cdc/event.go index f32ee4924..4acea4475 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -22,7 +22,7 @@ type Event interface { DeletePayload() bool GetTableName() string GetData(pkMap map[string]any, config *kafkalib.TopicConfig) (map[string]any, error) - GetOptionalSchema() map[string]typing.KindDetails + GetOptionalSchema() (map[string]typing.KindDetails, error) // GetColumns will inspect the envelope's payload right now and return. GetColumns() (*columns.Columns, error) } diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index e416dfc29..ee2e0ad4e 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -126,9 +126,9 @@ func (s *SchemaEventPayload) GetTableName() string { return s.Payload.Source.Collection } -func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails { +func (s *SchemaEventPayload) GetOptionalSchema() (map[string]typing.KindDetails, error) { // MongoDB does not have a schema at the database level. - return nil + return nil, nil } func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { diff --git a/lib/cdc/relational/debezium_test.go b/lib/cdc/relational/debezium_test.go index 4d802a37f..2d496cd7c 100644 --- a/lib/cdc/relational/debezium_test.go +++ b/lib/cdc/relational/debezium_test.go @@ -199,7 +199,11 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { // Testing typing. assert.Equal(r.T(), evtData["id"], int64(1001)) assert.Equal(r.T(), evtData["another_id"], int64(333)) - assert.Equal(r.T(), typing.ParseValue(typing.Settings{}, "another_id", evt.GetOptionalSchema(), evtData["another_id"]), typing.Integer) + + schema, err := evt.GetOptionalSchema() + assert.NoError(r.T(), err) + + assert.Equal(r.T(), typing.ParseValue(typing.Settings{}, "another_id", schema, evtData["another_id"]), typing.Integer) assert.Equal(r.T(), evtData["email"], "sally.thomas@acme.com") @@ -518,7 +522,8 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() { assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime()) assert.Equal(r.T(), "customers", evt.GetTableName()) - schema := evt.GetOptionalSchema() + schema, err := evt.GetOptionalSchema() + assert.NoError(r.T(), err) assert.Equal(r.T(), typing.Struct, schema["custom_fields"]) kvMap := map[string]any{ diff --git a/lib/cdc/util/optional_schema.go b/lib/cdc/util/optional_schema.go index 0dc5edf1b..a2f415862 100644 --- a/lib/cdc/util/optional_schema.go +++ b/lib/cdc/util/optional_schema.go @@ -1,22 +1,27 @@ package util import ( + "fmt" "log/slog" "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/typing" ) -func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails { +func (s *SchemaEventPayload) GetOptionalSchema() (map[string]typing.KindDetails, error) { fieldsObject := s.Schema.GetSchemaFromLabel(debezium.After) if fieldsObject == nil { // AFTER schema does not exist. - return nil + return nil, nil } schema := make(map[string]typing.KindDetails) for _, field := range fieldsObject.Fields { - kd := field.ToKindDetails() + kd, err := field.ToKindDetails() + if err != nil { + return nil, fmt.Errorf("failed to convert field (%v), to kind details: %w", field, err) + } + if kd == typing.Invalid { slog.Warn("Skipping field from optional schema b/c we cannot determine the data type", slog.String("field", field.FieldName)) continue @@ -25,5 +30,5 @@ func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails { schema[field.FieldName] = kd } - return schema + return schema, nil } diff --git a/lib/cdc/util/optional_schema_test.go b/lib/cdc/util/optional_schema_test.go index c126ba17e..c3347abc1 100644 --- a/lib/cdc/util/optional_schema_test.go +++ b/lib/cdc/util/optional_schema_test.go @@ -81,7 +81,9 @@ func TestGetOptionalSchema(t *testing.T) { err := json.Unmarshal([]byte(tc.body), &schemaEventPayload) assert.NoError(t, err, idx) - actualData := schemaEventPayload.GetOptionalSchema() + actualData, err := schemaEventPayload.GetOptionalSchema() + assert.NoError(t, err) + for actualKey, actualVal := range actualData { testMsg := fmt.Sprintf("key: %s, actualKind: %s, index: %d", actualKey, actualVal.Kind, idx) diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go index 00ce05f81..5bd61140c 100644 --- a/lib/cdc/util/relational_event_test.go +++ b/lib/cdc/util/relational_event_test.go @@ -57,7 +57,10 @@ func TestSource_GetOptionalSchema(t *testing.T) { }`), &schemaEventPayload) assert.NoError(t, err) - optionalSchema := schemaEventPayload.GetOptionalSchema() + + optionalSchema, err := schemaEventPayload.GetOptionalSchema() + assert.NoError(t, err) + value, isOk := optionalSchema["last_modified"] assert.True(t, isOk) assert.Equal(t, value, typing.String) diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go new file mode 100644 index 000000000..314229a78 --- /dev/null +++ b/lib/debezium/converters/decimal.go @@ -0,0 +1,86 @@ +package converters + +import ( + "encoding/base64" + "fmt" + + "github.com/artie-labs/transfer/lib/debezium/encode" + + "github.com/artie-labs/transfer/lib/maputil" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/decimal" +) + +// toBytes attempts to convert a value (type []byte, or string) to a slice of bytes. +// - If value is already a slice of bytes it will be directly returned. +// - If value is a string we will attempt to base64 decode it. +func toBytes(value any) ([]byte, error) { + var stringVal string + + switch typedValue := value.(type) { + case []byte: + return typedValue, nil + case string: + stringVal = typedValue + default: + return nil, fmt.Errorf("failed to cast value '%v' with type '%T' to []byte", value, value) + } + + data, err := base64.StdEncoding.DecodeString(stringVal) + if err != nil { + return nil, fmt.Errorf("failed to base64 decode: %w", err) + } + return data, nil +} + +type Decimal struct { + precision int32 + scale int32 + + variableNumeric bool +} + +func NewDecimal(precision int32, scale int32, variableNumeric bool) *Decimal { + return &Decimal{ + precision: precision, + scale: scale, + variableNumeric: variableNumeric, + } +} + +func (d Decimal) ToKindDetails() typing.KindDetails { + return typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(d.precision, d.scale)) +} + +func (d Decimal) Convert(val any) (any, error) { + if d.variableNumeric { + valueStruct, isOk := val.(map[string]any) + if !isOk { + return nil, fmt.Errorf("value is not map[string]any type") + } + + scale, err := maputil.GetInt32FromMap(valueStruct, "scale") + if err != nil { + return nil, err + } + + val, isOk := valueStruct["value"] + if !isOk { + return nil, fmt.Errorf("encoded value does not exist") + } + + bytes, err := toBytes(val) + if err != nil { + return nil, err + } + + return decimal.NewDecimal(encode.DecodeDecimal(bytes, scale)), nil + } else { + bytes, err := toBytes(val) + if err != nil { + return nil, err + } + + return decimal.NewDecimalWithPrecision(encode.DecodeDecimal(bytes, d.scale), d.precision), nil + } +} diff --git a/lib/debezium/converters/decimal_bench_test.go b/lib/debezium/converters/decimal_bench_test.go new file mode 100644 index 000000000..70a0320ff --- /dev/null +++ b/lib/debezium/converters/decimal_bench_test.go @@ -0,0 +1,44 @@ +package converters + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/stretchr/testify/assert" +) + +func BenchmarkDecodeDecimal_P64_S10(b *testing.B) { + converter := NewDecimal(64, 10, false) + for i := 0; i < b.N; i++ { + val, err := converter.Convert([]byte("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS")) + assert.NoError(b, err) + + dec, isOk := val.(decimal.Decimal) + assert.True(b, isOk) + assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567890", dec.String()) + } +} + +func BenchmarkDecodeDecimal_P38_S2(b *testing.B) { + converter := NewDecimal(38, 2, false) + for i := 0; i < b.N; i++ { + val, err := converter.Convert("AMCXznvJBxWzS58P/////w==") + assert.NoError(b, err) + + dec, isOk := val.(decimal.Decimal) + assert.True(b, isOk) + assert.Equal(b, "9999999999999999999999999999999999.99", dec.String()) + } +} + +func BenchmarkDecodeDecimal_P5_S2(b *testing.B) { + converter := NewDecimal(5, 2, false) + for i := 0; i < b.N; i++ { + val, err := converter.Convert("AOHJ") + assert.NoError(b, err) + + dec, isOk := val.(decimal.Decimal) + assert.True(b, isOk) + assert.Equal(b, "578.01", dec.String()) + } +} diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go new file mode 100644 index 000000000..b617b67fd --- /dev/null +++ b/lib/debezium/converters/decimal_test.go @@ -0,0 +1,313 @@ +package converters + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestToBytes(t *testing.T) { + type _testCase struct { + name string + value any + + expectedValue []byte + expectedErr string + } + + testCases := []_testCase{ + { + name: "[]byte", + value: []byte{40, 39, 38}, + expectedValue: []byte{40, 39, 38}, + }, + { + name: "base64 encoded string", + value: "aGVsbG8gd29ybGQK", + expectedValue: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0xa}, + }, + { + name: "malformed string", + value: "asdf$$$", + expectedErr: "failed to base64 decode", + }, + { + name: "type that isn't a string or []byte", + value: map[string]any{}, + expectedErr: "failed to cast value 'map[]' with type 'map[string]interface {}", + }, + } + + for _, testCase := range testCases { + actual, err := toBytes(testCase.value) + + if testCase.expectedErr == "" { + assert.Equal(t, testCase.expectedValue, actual, testCase.name) + } else { + assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) + } + } +} + +func TestField_DecodeDecimal(t *testing.T) { + testCases := []struct { + name string + encoded string + params map[string]any + + expectedValue string + expectedPrecision int32 + expectNilPtrPrecision bool + expectedScale int32 + expectedErr string + }{ + { + name: "No scale (nil map)", + expectedErr: "object is empty", + }, + { + name: "No scale (not provided)", + params: map[string]any{ + "connect.decimal.precision": "5", + }, + expectedErr: "key: scale does not exist in object", + }, + { + name: "Precision is not an integer", + params: map[string]any{ + "scale": "2", + "connect.decimal.precision": "abc", + }, + expectedErr: "key: connect.decimal.precision is not type integer", + }, + { + name: "NUMERIC(5,0)", + encoded: "BQ==", + params: map[string]any{ + "scale": "0", + "connect.decimal.precision": "5", + }, + expectedValue: "5", + expectedPrecision: 5, + expectedScale: 0, + }, + { + name: "NUMERIC(5,2)", + encoded: "AOHJ", + params: map[string]any{ + "scale": "2", + "connect.decimal.precision": "5", + }, + expectedValue: "578.01", + expectedPrecision: 5, + expectedScale: 2, + }, + { + name: "NUMERIC(38, 0) - small #", + encoded: "Ajc=", + params: map[string]any{ + "scale": "0", + "connect.decimal.precision": "38", + }, + expectedValue: "567", + expectedPrecision: 38, + expectedScale: 0, + }, + { + name: "NUMERIC(38, 0) - large #", + encoded: "SztMqFqGxHoJiiI//////w==", + params: map[string]any{ + "scale": "0", + "connect.decimal.precision": "38", + }, + expectedValue: "99999999999999999999999999999999999999", + expectedPrecision: 38, + expectedScale: 0, + }, + { + name: "NUMERIC(38, 2) - small #", + encoded: "DPk=", + params: map[string]any{ + "scale": "2", + "connect.decimal.precision": "38", + }, + expectedValue: "33.21", + expectedPrecision: 38, + expectedScale: 2, + }, + { + name: "NUMERIC(38, 2) - large #", + encoded: "AMCXznvJBxWzS58P/////w==", + params: map[string]any{ + "scale": "2", + "connect.decimal.precision": "38", + }, + expectedValue: "9999999999999999999999999999999999.99", + expectedPrecision: 38, + expectedScale: 2, + }, + { + name: "NUMERIC(38, 4) - small #", + encoded: "SeuD", + params: map[string]any{ + "scale": "4", + "connect.decimal.precision": "38", + }, + expectedValue: "484.4419", + expectedPrecision: 38, + expectedScale: 4, + }, + { + name: "NUMERIC(38, 4) - large #", + encoded: "Ae0Jvq2HwDeNjmP/////", + params: map[string]any{ + "scale": "4", + "connect.decimal.precision": "38", + }, + expectedValue: "999999999999999999999999999999.9999", + expectedPrecision: 38, + expectedScale: 4, + }, + { + name: "NUMERIC(39,4) - small #", + encoded: "AKQQ", + params: map[string]any{ + "scale": "4", + "connect.decimal.precision": "39", + }, + expectedValue: "4.2000", + expectedPrecision: 39, + expectedScale: 4, + }, + { + name: "NUMERIC(39,4) - large # ", + encoded: "AuM++mE16PeIpWp/trI=", + params: map[string]any{ + "scale": "4", + "connect.decimal.precision": "39", + }, + expectedValue: "5856910285916918584382586878.1234", + expectedPrecision: 39, + expectedScale: 4, + }, + { + name: "MONEY", + encoded: "ALxhYg==", + params: map[string]any{ + "scale": "2", + }, + expectedValue: "123456.98", + expectedPrecision: -1, + expectedScale: 2, + }, + } + + for _, testCase := range testCases { + field := Field{ + Parameters: testCase.params, + } + + bytes, err := toBytes(testCase.encoded) + assert.NoError(t, err) + + dec, err := field.DecodeDecimal(bytes) + if testCase.expectedErr != "" { + assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) + continue + } + + assert.NoError(t, err) + assert.Equal(t, testCase.expectedValue, dec.String(), testCase.name) + + assert.Equal(t, testCase.expectedPrecision, dec.Details().Precision(), testCase.name) + assert.Equal(t, testCase.expectedScale, dec.Details().Scale(), testCase.name) + } +} + +func TestField_DecodeDebeziumVariableDecimal(t *testing.T) { + type _testCase struct { + name string + value any + + expectedValue string + expectedScale int32 + expectedErr string + } + + testCases := []_testCase{ + { + name: "empty val (nil)", + expectedErr: "value is not map[string]any type", + }, + { + name: "empty map", + value: map[string]any{}, + expectedErr: "object is empty", + }, + { + name: "scale is not an integer", + value: map[string]any{ + "scale": "foo", + }, + expectedErr: "key: scale is not type integer", + }, + { + name: "value exists (scale 3)", + value: map[string]any{ + "scale": 3, + "value": "SOx4FQ==", + }, + expectedValue: "1223456.789", + expectedScale: 3, + }, + { + name: "value exists (scale 2)", + value: map[string]any{ + "scale": 2, + "value": "MDk=", + }, + expectedValue: "123.45", + expectedScale: 2, + }, + { + name: "negative numbers (scale 7)", + value: map[string]any{ + "scale": 7, + "value": "wT9Wmw==", + }, + expectedValue: "-105.2813669", + expectedScale: 7, + }, + { + name: "malformed base64 value", + value: map[string]any{ + "scale": 7, + "value": "==wT9Wmw==", + }, + expectedErr: "failed to base64 decode", + }, + { + name: "[]byte value", + value: map[string]any{ + "scale": 7, + "value": []byte{193, 63, 86, 155}, + }, + expectedValue: "-105.2813669", + expectedScale: 7, + }, + } + + for _, testCase := range testCases { + field := Field{} + dec, err := field.DecodeDebeziumVariableDecimal(testCase.value) + if testCase.expectedErr != "" { + assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) + continue + } + + assert.Equal(t, int32(-1), dec.Details().Precision(), testCase.name) + assert.Equal(t, testCase.expectedScale, dec.Details().Scale(), testCase.name) + assert.Equal(t, testCase.expectedValue, dec.String(), testCase.name) + } + +} diff --git a/lib/debezium/decimal.go b/lib/debezium/encode/decimal.go similarity index 99% rename from lib/debezium/decimal.go rename to lib/debezium/encode/decimal.go index 7e46c997c..0ac50f26d 100644 --- a/lib/debezium/decimal.go +++ b/lib/debezium/encode/decimal.go @@ -1,4 +1,4 @@ -package debezium +package encode import ( "math/big" diff --git a/lib/debezium/decimal_test.go b/lib/debezium/encode/decimal_test.go similarity index 99% rename from lib/debezium/decimal_test.go rename to lib/debezium/encode/decimal_test.go index 931468710..c6dbd99e0 100644 --- a/lib/debezium/decimal_test.go +++ b/lib/debezium/encode/decimal_test.go @@ -1,4 +1,4 @@ -package debezium +package encode import ( "fmt" diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 2453f6066..23cc62b47 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -1,6 +1,8 @@ package debezium import ( + "fmt" + "github.com/artie-labs/transfer/lib/debezium/converters" "github.com/artie-labs/transfer/lib/maputil" "github.com/artie-labs/transfer/lib/ptr" @@ -80,46 +82,26 @@ func (f Field) GetScaleAndPrecision() (int32, *int32, error) { return scale, precisionPtr, nil } -func (f Field) ToValueConverter() converters.ValueConverter { +func (f Field) ToValueConverter() (converters.ValueConverter, error) { switch f.DebeziumType { case DateTimeWithTimezone: - return converters.DateTimeWithTimezone{} + return converters.DateTimeWithTimezone{}, nil case TimeWithTimezone: - return converters.TimeWithTimezone{} + return converters.TimeWithTimezone{}, nil case GeometryPointType: - return converters.GeometryPoint{} + return converters.GeometryPoint{}, nil case GeographyType, GeometryType: - return converters.Geometry{} + return converters.Geometry{}, nil case JSON: - return converters.JSON{} + return converters.JSON{}, nil case Date, DateKafkaConnect: - return converters.Date{} + return converters.Date{}, nil case Time, TimeKafkaConnect: - return converters.Time{} - } - - return nil -} - -func (f Field) ToKindDetails() typing.KindDetails { - // Prioritize converters - if converter := f.ToValueConverter(); converter != nil { - return converter.ToKindDetails() - } - - // TODO: Deprecate this in favor of the converters - - // We'll first cast based on Debezium types - // Then, we'll fall back on the actual data types. - switch f.DebeziumType { - case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect: - return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType) - case MicroTime, NanoTime: - return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType) + return converters.Time{}, nil case KafkaDecimalType: scale, precisionPtr, err := f.GetScaleAndPrecision() if err != nil { - return typing.Invalid + return nil, fmt.Errorf("failed to get scale and precision: %w", err) } precision := decimal.PrecisionNotSpecified @@ -127,30 +109,56 @@ func (f Field) ToKindDetails() typing.KindDetails { precision = *precisionPtr } - return typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(precision, scale)) + return converters.NewDecimal(precision, scale, false), nil case KafkaVariableNumericType: // For variable numeric types, we are defaulting to a scale of 5 // This is because scale is not specified at the column level, rather at the row level // It shouldn't matter much anyway since the column type we are creating is `TEXT` to avoid boundary errors. - return typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(decimal.PrecisionNotSpecified, decimal.DefaultScale)) + return converters.NewDecimal(decimal.PrecisionNotSpecified, decimal.DefaultScale, true), nil + } + + return nil, nil +} + +func (f Field) ToKindDetails() (typing.KindDetails, error) { + // Prioritize converters + converter, err := f.ToValueConverter() + if err != nil { + return typing.Invalid, err + } + + if converter != nil { + return converter.ToKindDetails(), nil + } + + // TODO: Deprecate this in favor of the converters + + // We'll first cast based on Debezium types + // Then, we'll fall back on the actual data types. + switch f.DebeziumType { + case Timestamp, MicroTimestamp, NanoTimestamp, DateTimeKafkaConnect: + return typing.NewKindDetailsFromTemplate(typing.ETime, ext.DateTimeKindType), nil + case MicroTime, NanoTime: + return typing.NewKindDetailsFromTemplate(typing.ETime, ext.TimeKindType), nil } switch f.Type { case Map: - return typing.Struct + return typing.Struct, nil case Int16, Int32, Int64: - return typing.Integer + return typing.Integer, nil case Float, Double: - return typing.Float + return typing.Float, nil case String, Bytes: - return typing.String + return typing.String, nil case Struct: - return typing.Struct + return typing.Struct, nil case Boolean: - return typing.Boolean + return typing.Boolean, nil case Array: - return typing.Array + return typing.Array, nil default: - return typing.Invalid + // TODO: Throw an error + return typing.Invalid, nil } } diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index e7c74f462..7d4a26885 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -274,6 +274,9 @@ func TestField_ToKindDetails(t *testing.T) { } for _, tc := range tcs { - assert.Equal(t, tc.expectedKindDetails, tc.field.ToKindDetails(), tc.name) + kd, err := tc.field.ToKindDetails() + assert.NoError(t, err) + + assert.Equal(t, tc.expectedKindDetails, kd, tc.name) } } diff --git a/lib/debezium/types.go b/lib/debezium/types.go index 8ce2a17b6..fd006db5d 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/artie-labs/transfer/lib/maputil" - "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) @@ -58,28 +56,6 @@ const ( KafkaDecimalPrecisionKey = "connect.decimal.precision" ) -// toBytes attempts to convert a value (type []byte, or string) to a slice of bytes. -// - If value is already a slice of bytes it will be directly returned. -// - If value is a string we will attempt to base64 decode it. -func toBytes(value any) ([]byte, error) { - var stringVal string - - switch typedValue := value.(type) { - case []byte: - return typedValue, nil - case string: - stringVal = typedValue - default: - return nil, fmt.Errorf("failed to cast value '%v' with type '%T' to []byte", value, value) - } - - data, err := base64.StdEncoding.DecodeString(stringVal) - if err != nil { - return nil, fmt.Errorf("failed to base64 decode: %w", err) - } - return data, nil -} - // toInt64 attempts to convert a value of unknown type to a an int64. // - If the value is coming from Kafka it will be decoded as a float64 when it is unmarshalled from JSON. // - If the value is coming from reader the value will be an int16/int32/int64. @@ -114,19 +90,16 @@ func (f Field) ParseValue(value any) (any, error) { } } - if converter := f.ToValueConverter(); converter != nil { + converter, err := f.ToValueConverter() + if err != nil { + return nil, fmt.Errorf("failed to get value converter: %w", err) + } + + if converter != nil { return converter.Convert(value) } switch f.DebeziumType { - case KafkaDecimalType: - bytes, err := toBytes(value) - if err != nil { - return nil, err - } - return f.DecodeDecimal(bytes) - case KafkaVariableNumericType: - return f.DecodeDebeziumVariableDecimal(value) case Timestamp, MicroTimestamp, @@ -181,46 +154,3 @@ func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ex return extTime, nil } - -// DecodeDecimal is used to handle `org.apache.kafka.connect.data.Decimal` where this would be emitted by Debezium when the `decimal.handling.mode` is `precise` -// * Encoded - takes the encoded value as a slice of bytes -// * Parameters - which contains: -// - `scale` (number of digits following decimal point) -// - `connect.decimal.precision` which is an optional parameter. (If -1, then it's variable and .Value() will be in STRING). -func (f Field) DecodeDecimal(encoded []byte) (*decimal.Decimal, error) { - scale, precision, err := f.GetScaleAndPrecision() - if err != nil { - return nil, fmt.Errorf("failed to get scale and/or precision: %w", err) - } - - _decimal := DecodeDecimal(encoded, scale) - if precision == nil { - return decimal.NewDecimal(_decimal), nil - } - - return decimal.NewDecimalWithPrecision(_decimal, *precision), nil -} - -func (Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error) { - valueStruct, isOk := value.(map[string]any) - if !isOk { - return nil, fmt.Errorf("value is not map[string]any type") - } - - scale, err := maputil.GetInt32FromMap(valueStruct, "scale") - if err != nil { - return nil, err - } - - val, isOk := valueStruct["value"] - if !isOk { - return nil, fmt.Errorf("encoded value does not exist") - } - - bytes, err := toBytes(val) - if err != nil { - return nil, err - } - - return decimal.NewDecimal(DecodeDecimal(bytes, scale)), nil -} diff --git a/lib/debezium/types_bench_test.go b/lib/debezium/types_bench_test.go deleted file mode 100644 index c014c9fb7..000000000 --- a/lib/debezium/types_bench_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package debezium - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/stretchr/testify/require" -) - -func BenchmarkDecodeDecimal_P64_S10(b *testing.B) { - parameters := map[string]any{ - "scale": 10, - KafkaDecimalPrecisionKey: 64, - } - field := Field{Parameters: parameters} - for i := 0; i < b.N; i++ { - bytes, err := toBytes("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS") - assert.NoError(b, err) - dec, err := field.DecodeDecimal(bytes) - assert.NoError(b, err) - assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567890", dec.String()) - require.NoError(b, err) - } -} - -func BenchmarkDecodeDecimal_P38_S2(b *testing.B) { - parameters := map[string]any{ - "scale": 2, - KafkaDecimalPrecisionKey: 38, - } - field := Field{Parameters: parameters} - for i := 0; i < b.N; i++ { - bytes, err := toBytes(`AMCXznvJBxWzS58P/////w==`) - assert.NoError(b, err) - dec, err := field.DecodeDecimal(bytes) - assert.NoError(b, err) - assert.Equal(b, "9999999999999999999999999999999999.99", dec.String()) - } -} - -func BenchmarkDecodeDecimal_P5_S2(b *testing.B) { - parameters := map[string]any{ - "scale": 2, - KafkaDecimalPrecisionKey: 5, - } - - field := Field{Parameters: parameters} - for i := 0; i < b.N; i++ { - bytes, err := toBytes(`AOHJ`) - assert.NoError(b, err) - dec, err := field.DecodeDecimal(bytes) - assert.NoError(b, err) - assert.Equal(b, "578.01", dec.String()) - } -} diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index e0084ce7f..a3a5dba72 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -10,49 +10,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestToBytes(t *testing.T) { - type _testCase struct { - name string - value any - - expectedValue []byte - expectedErr string - } - - testCases := []_testCase{ - { - name: "[]byte", - value: []byte{40, 39, 38}, - expectedValue: []byte{40, 39, 38}, - }, - { - name: "base64 encoded string", - value: "aGVsbG8gd29ybGQK", - expectedValue: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0xa}, - }, - { - name: "malformed string", - value: "asdf$$$", - expectedErr: "failed to base64 decode", - }, - { - name: "type that isn't a string or []byte", - value: map[string]any{}, - expectedErr: "failed to cast value 'map[]' with type 'map[string]interface {}", - }, - } - - for _, testCase := range testCases { - actual, err := toBytes(testCase.value) - - if testCase.expectedErr == "" { - assert.Equal(t, testCase.expectedValue, actual, testCase.name) - } else { - assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) - } - } -} - func TestToInt64(t *testing.T) { testCases := []struct { name string @@ -389,266 +346,3 @@ func TestFromDebeziumTypeTimePrecisionConnect(t *testing.T) { assert.NoError(t, err) assert.Equal(t, time.Date(2023, 03, 15, 17, 24, 10, 700000000, time.UTC), extendedTimestamp.Time) } - -func TestField_DecodeDecimal(t *testing.T) { - testCases := []struct { - name string - encoded string - params map[string]any - - expectedValue string - expectedPrecision int32 - expectNilPtrPrecision bool - expectedScale int32 - expectedErr string - }{ - { - name: "No scale (nil map)", - expectedErr: "object is empty", - }, - { - name: "No scale (not provided)", - params: map[string]any{ - "connect.decimal.precision": "5", - }, - expectedErr: "key: scale does not exist in object", - }, - { - name: "Precision is not an integer", - params: map[string]any{ - "scale": "2", - "connect.decimal.precision": "abc", - }, - expectedErr: "key: connect.decimal.precision is not type integer", - }, - { - name: "NUMERIC(5,0)", - encoded: "BQ==", - params: map[string]any{ - "scale": "0", - "connect.decimal.precision": "5", - }, - expectedValue: "5", - expectedPrecision: 5, - expectedScale: 0, - }, - { - name: "NUMERIC(5,2)", - encoded: "AOHJ", - params: map[string]any{ - "scale": "2", - "connect.decimal.precision": "5", - }, - expectedValue: "578.01", - expectedPrecision: 5, - expectedScale: 2, - }, - { - name: "NUMERIC(38, 0) - small #", - encoded: "Ajc=", - params: map[string]any{ - "scale": "0", - "connect.decimal.precision": "38", - }, - expectedValue: "567", - expectedPrecision: 38, - expectedScale: 0, - }, - { - name: "NUMERIC(38, 0) - large #", - encoded: "SztMqFqGxHoJiiI//////w==", - params: map[string]any{ - "scale": "0", - "connect.decimal.precision": "38", - }, - expectedValue: "99999999999999999999999999999999999999", - expectedPrecision: 38, - expectedScale: 0, - }, - { - name: "NUMERIC(38, 2) - small #", - encoded: "DPk=", - params: map[string]any{ - "scale": "2", - "connect.decimal.precision": "38", - }, - expectedValue: "33.21", - expectedPrecision: 38, - expectedScale: 2, - }, - { - name: "NUMERIC(38, 2) - large #", - encoded: "AMCXznvJBxWzS58P/////w==", - params: map[string]any{ - "scale": "2", - "connect.decimal.precision": "38", - }, - expectedValue: "9999999999999999999999999999999999.99", - expectedPrecision: 38, - expectedScale: 2, - }, - { - name: "NUMERIC(38, 4) - small #", - encoded: "SeuD", - params: map[string]any{ - "scale": "4", - "connect.decimal.precision": "38", - }, - expectedValue: "484.4419", - expectedPrecision: 38, - expectedScale: 4, - }, - { - name: "NUMERIC(38, 4) - large #", - encoded: "Ae0Jvq2HwDeNjmP/////", - params: map[string]any{ - "scale": "4", - "connect.decimal.precision": "38", - }, - expectedValue: "999999999999999999999999999999.9999", - expectedPrecision: 38, - expectedScale: 4, - }, - { - name: "NUMERIC(39,4) - small #", - encoded: "AKQQ", - params: map[string]any{ - "scale": "4", - "connect.decimal.precision": "39", - }, - expectedValue: "4.2000", - expectedPrecision: 39, - expectedScale: 4, - }, - { - name: "NUMERIC(39,4) - large # ", - encoded: "AuM++mE16PeIpWp/trI=", - params: map[string]any{ - "scale": "4", - "connect.decimal.precision": "39", - }, - expectedValue: "5856910285916918584382586878.1234", - expectedPrecision: 39, - expectedScale: 4, - }, - { - name: "MONEY", - encoded: "ALxhYg==", - params: map[string]any{ - "scale": "2", - }, - expectedValue: "123456.98", - expectedPrecision: -1, - expectedScale: 2, - }, - } - - for _, testCase := range testCases { - field := Field{ - Parameters: testCase.params, - } - - bytes, err := toBytes(testCase.encoded) - assert.NoError(t, err) - - dec, err := field.DecodeDecimal(bytes) - if testCase.expectedErr != "" { - assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) - continue - } - - assert.NoError(t, err) - assert.Equal(t, testCase.expectedValue, dec.String(), testCase.name) - - assert.Equal(t, testCase.expectedPrecision, dec.Details().Precision(), testCase.name) - assert.Equal(t, testCase.expectedScale, dec.Details().Scale(), testCase.name) - } -} - -func TestField_DecodeDebeziumVariableDecimal(t *testing.T) { - type _testCase struct { - name string - value any - - expectedValue string - expectedScale int32 - expectedErr string - } - - testCases := []_testCase{ - { - name: "empty val (nil)", - expectedErr: "value is not map[string]any type", - }, - { - name: "empty map", - value: map[string]any{}, - expectedErr: "object is empty", - }, - { - name: "scale is not an integer", - value: map[string]any{ - "scale": "foo", - }, - expectedErr: "key: scale is not type integer", - }, - { - name: "value exists (scale 3)", - value: map[string]any{ - "scale": 3, - "value": "SOx4FQ==", - }, - expectedValue: "1223456.789", - expectedScale: 3, - }, - { - name: "value exists (scale 2)", - value: map[string]any{ - "scale": 2, - "value": "MDk=", - }, - expectedValue: "123.45", - expectedScale: 2, - }, - { - name: "negative numbers (scale 7)", - value: map[string]any{ - "scale": 7, - "value": "wT9Wmw==", - }, - expectedValue: "-105.2813669", - expectedScale: 7, - }, - { - name: "malformed base64 value", - value: map[string]any{ - "scale": 7, - "value": "==wT9Wmw==", - }, - expectedErr: "failed to base64 decode", - }, - { - name: "[]byte value", - value: map[string]any{ - "scale": 7, - "value": []byte{193, 63, 86, 155}, - }, - expectedValue: "-105.2813669", - expectedScale: 7, - }, - } - - for _, testCase := range testCases { - field := Field{} - dec, err := field.DecodeDebeziumVariableDecimal(testCase.value) - if testCase.expectedErr != "" { - assert.ErrorContains(t, err, testCase.expectedErr, testCase.name) - continue - } - - assert.Equal(t, int32(-1), dec.Details().Precision(), testCase.name) - assert.Equal(t, testCase.expectedScale, dec.Details().Scale(), testCase.name) - assert.Equal(t, testCase.expectedValue, dec.String(), testCase.name) - } - -} diff --git a/models/event/event.go b/models/event/event.go index 3966e6950..20684b7cc 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -72,12 +72,17 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc *kafkalib.TopicConf delete(evtData, constants.OnlySetDeleteColumnMarker) } + optionalSchema, err := event.GetOptionalSchema() + if err != nil { + return Event{}, err + } + return Event{ mode: cfgMode, Table: tblName, PrimaryKeyMap: pkMap, ExecutionTime: event.GetExecutionTime(), - OptionalSchema: event.GetOptionalSchema(), + OptionalSchema: optionalSchema, Columns: cols, Data: evtData, Deleted: event.DeletePayload(), diff --git a/models/event/event_test.go b/models/event/event_test.go index a66f05745..d1d2d2ec1 100644 --- a/models/event/event_test.go +++ b/models/event/event_test.go @@ -34,8 +34,8 @@ func (f fakeEvent) GetTableName() string { return "foo" } -func (f fakeEvent) GetOptionalSchema() map[string]typing.KindDetails { - return nil +func (f fakeEvent) GetOptionalSchema() (map[string]typing.KindDetails, error) { + return nil, nil } func (f fakeEvent) GetColumns() (*columns.Columns, error) {