From 1e1b854b59877f122c2f1e908d9b96058b2eda64 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Wed, 17 Apr 2024 23:25:46 -0700 Subject: [PATCH 1/5] Hard error --- lib/cdc/event.go | 2 +- lib/cdc/mongo/debezium.go | 6 +++--- lib/cdc/mongo/debezium_test.go | 3 ++- lib/cdc/mysql/debezium_test.go | 3 ++- lib/cdc/util/relational_event.go | 11 +++++------ lib/cdc/util/relational_event_test.go | 3 ++- models/event/event.go | 9 ++++++--- models/event/event_test.go | 16 ++++++++++------ processes/consumer/process.go | 5 ++++- 9 files changed, 35 insertions(+), 23 deletions(-) diff --git a/lib/cdc/event.go b/lib/cdc/event.go index f14848824..ffd41bb08 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -24,7 +24,7 @@ type Event interface { GetData(pkMap map[string]any, config *kafkalib.TopicConfig) map[string]any GetOptionalSchema() map[string]typing.KindDetails // GetColumns will inspect the envelope's payload right now and return. - GetColumns() *columns.Columns + GetColumns() (*columns.Columns, error) } // FieldLabelKind is used when the schema is turned on. Each schema object will be labelled. diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index 17a22e9f8..f23062d83 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -132,11 +132,11 @@ func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails { return nil } -func (s *SchemaEventPayload) GetColumns() *columns.Columns { +func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { fieldsObject := s.Schema.GetSchemaFromLabel(cdc.After) if fieldsObject == nil { // AFTER schema does not exist. - return nil + return nil, nil } var cols columns.Columns @@ -146,7 +146,7 @@ func (s *SchemaEventPayload) GetColumns() *columns.Columns { cols.AddColumn(columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid)) } - return &cols + return &cols, nil } func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any { diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index fd5db4c3e..6e76b943c 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -456,6 +456,7 @@ func (p *MongoTestSuite) TestMongoDBEventWithSchema() { Type: debezium.String, }) assert.False(p.T(), evt.DeletePayload()) - cols := schemaEvt.GetColumns() + cols, err := schemaEvt.GetColumns() + assert.NoError(p.T(), err) assert.NotNil(p.T(), cols) } diff --git a/lib/cdc/mysql/debezium_test.go b/lib/cdc/mysql/debezium_test.go index 568a15850..d1d5a0152 100644 --- a/lib/cdc/mysql/debezium_test.go +++ b/lib/cdc/mysql/debezium_test.go @@ -345,7 +345,8 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() { assert.Equal(m.T(), evtData["id"], 1001) assert.Equal(m.T(), evtData["first_name"], "Sally") assert.Equal(m.T(), evtData["bool_test"], false) - cols := evt.GetColumns() + cols, err := evt.GetColumns() + assert.NoError(m.T(), err) assert.NotNil(m.T(), cols) col, isOk := cols.GetColumn("abcdef") diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go index 696254e56..a7a3d4a00 100644 --- a/lib/cdc/util/relational_event.go +++ b/lib/cdc/util/relational_event.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "log/slog" "time" @@ -34,11 +35,11 @@ type Source struct { Table string `json:"table"` } -func (s *SchemaEventPayload) GetColumns() *columns.Columns { +func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { fieldsObject := s.Schema.GetSchemaFromLabel(cdc.After) if fieldsObject == nil { // AFTER schema does not exist. - return nil + return nil, nil } var cols columns.Columns @@ -48,9 +49,7 @@ func (s *SchemaEventPayload) GetColumns() *columns.Columns { col := columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid) val, parseErr := field.ParseValue(field.Default) if parseErr != nil { - slog.Warn("Failed to parse field, using original value", slog.Any("err", parseErr), - slog.String("field", field.FieldName), slog.Any("value", field.Default)) - col.SetDefaultValue(field.Default) + return nil, fmt.Errorf("failed to parse field %w: ", parseErr) } else { col.SetDefaultValue(val) } @@ -58,7 +57,7 @@ func (s *SchemaEventPayload) GetColumns() *columns.Columns { cols.AddColumn(col) } - return &cols + return &cols, nil } func (s *SchemaEventPayload) Operation() string { diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go index 7e3327598..f8d82eade 100644 --- a/lib/cdc/util/relational_event_test.go +++ b/lib/cdc/util/relational_event_test.go @@ -64,7 +64,8 @@ func TestSource_GetOptionalSchema(t *testing.T) { assert.True(t, isOk) assert.Equal(t, value, typing.String) - cols := schemaEventPayload.GetColumns() + cols, err := schemaEventPayload.GetColumns() + assert.NoError(t, err) assert.Equal(t, 6, len(cols.GetColumns())) col, isOk := cols.GetColumn("boolean_column") diff --git a/models/event/event.go b/models/event/event.go index 06e476bd8..e4d457e66 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -35,8 +35,11 @@ type Event struct { mode config.Mode } -func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc *kafkalib.TopicConfig, cfgMode config.Mode) Event { - cols := event.GetColumns() +func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc *kafkalib.TopicConfig, cfgMode config.Mode) (Event, error) { + cols, err := event.GetColumns() + if err != nil { + return Event{}, err + } // Now iterate over pkMap and tag each column that is a primary key if cols != nil { for primaryKey := range pkMap { @@ -70,7 +73,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc *kafkalib.TopicConf Columns: cols, Data: evtData, Deleted: event.DeletePayload(), - } + }, nil } func (e *Event) IsValid() bool { diff --git a/models/event/event_test.go b/models/event/event_test.go index 6b2971942..9bed2f9e8 100644 --- a/models/event/event_test.go +++ b/models/event/event_test.go @@ -38,8 +38,8 @@ func (f fakeEvent) GetOptionalSchema() map[string]typing.KindDetails { return nil } -func (f fakeEvent) GetColumns() *columns.Columns { - return &columns.Columns{} +func (f fakeEvent) GetColumns() (*columns.Columns, error) { + return &columns.Columns{}, nil } func (f fakeEvent) GetData(pkMap map[string]any, config *kafkalib.TopicConfig) map[string]any { @@ -95,21 +95,25 @@ func (e *EventsTestSuite) TestEvent_TableName() { var f fakeEvent { // Don't pass in tableName. - evt := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{}, config.Replication) + evt, err := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{}, config.Replication) + assert.NoError(e.T(), err) assert.Equal(e.T(), f.GetTableName(), evt.Table) } { // Now pass it in, it should override. - evt := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "orders"}, config.Replication) + evt, err := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "orders"}, config.Replication) + assert.NoError(e.T(), err) assert.Equal(e.T(), "orders", evt.Table) } { // Now, if it's history mode... - evt := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "orders"}, config.History) + evt, err := ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "orders"}, config.History) + assert.NoError(e.T(), err) assert.Equal(e.T(), "orders__history", evt.Table) // Table already has history suffix, so it won't add extra. - evt = ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "dusty__history"}, config.History) + evt, err = ToMemoryEvent(f, idMap, &kafkalib.TopicConfig{TableName: "dusty__history"}, config.History) + assert.NoError(e.T(), err) assert.Equal(e.T(), "dusty__history", evt.Table) } } diff --git a/processes/consumer/process.go b/processes/consumer/process.go index ba5e7bebd..4d1338739 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -60,7 +60,10 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo } tags["op"] = _event.Operation() - evt := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) + evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) + if err != nil { + return "", err + } // Table name is only available after event has been cast tags["table"] = evt.Table From b04bc2d67deebcd01c0d816bd3ecaea2e84b5d24 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Wed, 17 Apr 2024 23:34:02 -0700 Subject: [PATCH 2/5] More --- lib/cdc/event.go | 2 +- lib/cdc/mongo/debezium.go | 4 +-- lib/cdc/mongo/debezium_test.go | 15 +++++++---- lib/cdc/mysql/debezium_test.go | 6 +++-- lib/cdc/postgres/debezium_test.go | 6 +++-- lib/cdc/util/relational_event.go | 27 +++++++++++-------- lib/cdc/util/relational_event_decimal_test.go | 12 ++++++--- lib/cdc/util/relational_event_test.go | 21 ++++++++++----- models/event/event.go | 5 +++- models/event/event_test.go | 4 +-- 10 files changed, 65 insertions(+), 37 deletions(-) diff --git a/lib/cdc/event.go b/lib/cdc/event.go index ffd41bb08..6022e98e6 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -21,7 +21,7 @@ type Event interface { Operation() string DeletePayload() bool GetTableName() string - GetData(pkMap map[string]any, config *kafkalib.TopicConfig) map[string]any + GetData(pkMap map[string]any, config *kafkalib.TopicConfig) (map[string]any, error) GetOptionalSchema() map[string]typing.KindDetails // GetColumns will inspect the envelope's payload right now and return. GetColumns() (*columns.Columns, error) diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index f23062d83..cea3ac467 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -149,7 +149,7 @@ func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { return &cols, nil } -func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any { +func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) (map[string]any, error) { var retMap map[string]any if len(s.Payload.afterMap) == 0 { // This is a delete event, so mark it as deleted. @@ -187,5 +187,5 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon retMap[constants.DatabaseUpdatedColumnMarker] = s.GetExecutionTime().Format(ext.ISO8601) } - return retMap + return retMap, nil } diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go index 6e76b943c..d01e21726 100644 --- a/lib/cdc/mongo/debezium_test.go +++ b/lib/cdc/mongo/debezium_test.go @@ -169,7 +169,8 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() { evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) assert.NoError(p.T(), err) - evtData := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + assert.NoError(p.T(), err) _, isOk := evtData[constants.UpdateColumnMarker] assert.False(p.T(), isOk) assert.Equal(p.T(), evtData["_id"], 1003) @@ -177,14 +178,16 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() { assert.Equal(p.T(), evtData["last_name"], "Tang") assert.Equal(p.T(), evtData["email"], "robin@artie.so") - evtDataWithIncludedAt := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + assert.NoError(p.T(), err) _, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker] assert.False(p.T(), isOk) - evtDataWithIncludedAt = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ + evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true, }) + assert.NoError(p.T(), err) assert.Equal(p.T(), "2022-11-18T06:35:21+00:00", evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker]) _, err = time.Parse(ext.ISO8601, evtDataWithIncludedAt[constants.UpdateColumnMarker].(string)) @@ -235,7 +238,8 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() { evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload)) assert.NoError(p.T(), err) - evtData := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{}) + assert.NoError(p.T(), err) assert.Equal(p.T(), "customers123", evt.GetTableName()) _, isOk := evtData[constants.UpdateColumnMarker] assert.False(p.T(), isOk) @@ -245,9 +249,10 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() { time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC)) assert.Equal(p.T(), true, evt.DeletePayload()) - evtData = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ + evtData, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{ IncludeArtieUpdatedAt: true, }) + assert.NoError(p.T(), err) _, isOk = evtData[constants.UpdateColumnMarker] assert.True(p.T(), isOk) diff --git a/lib/cdc/mysql/debezium_test.go b/lib/cdc/mysql/debezium_test.go index d1d5a0152..783ab83c8 100644 --- a/lib/cdc/mysql/debezium_test.go +++ b/lib/cdc/mysql/debezium_test.go @@ -323,7 +323,8 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() { "id": 1001, } - evtData := evt.GetData(kvMap, &kafkalib.TopicConfig{}) + evtData, err := evt.GetData(kvMap, &kafkalib.TopicConfig{}) + assert.NoError(m.T(), err) // Should have no Artie updated or database updated fields _, isOk := evtData[constants.UpdateColumnMarker] @@ -332,10 +333,11 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() { _, isOk = evtData[constants.DatabaseUpdatedColumnMarker] assert.False(m.T(), isOk) - evtData = evt.GetData(kvMap, &kafkalib.TopicConfig{ + evtData, err = evt.GetData(kvMap, &kafkalib.TopicConfig{ IncludeDatabaseUpdatedAt: true, IncludeArtieUpdatedAt: true, }) + assert.NoError(m.T(), err) assert.Equal(m.T(), "2023-03-13T19:19:24+00:00", evtData[constants.DatabaseUpdatedColumnMarker]) diff --git a/lib/cdc/postgres/debezium_test.go b/lib/cdc/postgres/debezium_test.go index 229dd5206..1169ed456 100644 --- a/lib/cdc/postgres/debezium_test.go +++ b/lib/cdc/postgres/debezium_test.go @@ -85,9 +85,10 @@ func (p *PostgresTestSuite) TestPostgresEvent() { assert.Nil(p.T(), err) assert.False(p.T(), evt.DeletePayload()) - evtData := evt.GetData(map[string]any{"id": 59}, &kafkalib.TopicConfig{ + evtData, err := evt.GetData(map[string]any{"id": 59}, &kafkalib.TopicConfig{ IncludeDatabaseUpdatedAt: true, }) + assert.NoError(p.T(), err) assert.Equal(p.T(), float64(59), evtData["id"]) assert.Equal(p.T(), "2022-11-16T04:01:53+00:00", evtData[constants.DatabaseUpdatedColumnMarker]) @@ -190,7 +191,8 @@ func (p *PostgresTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() { assert.Nil(p.T(), err) assert.False(p.T(), evt.DeletePayload()) - evtData := evt.GetData(map[string]any{"id": 1001}, &kafkalib.TopicConfig{}) + evtData, err := evt.GetData(map[string]any{"id": 1001}, &kafkalib.TopicConfig{}) + assert.NoError(p.T(), err) // Testing typing. assert.Equal(p.T(), evtData["id"], 1001) diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go index a7a3d4a00..a0a0f475d 100644 --- a/lib/cdc/util/relational_event.go +++ b/lib/cdc/util/relational_event.go @@ -2,7 +2,6 @@ package util import ( "fmt" - "log/slog" "time" "github.com/artie-labs/transfer/lib/cdc" @@ -49,7 +48,7 @@ func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { col := columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid) val, parseErr := field.ParseValue(field.Default) if parseErr != nil { - return nil, fmt.Errorf("failed to parse field %w: ", parseErr) + return nil, fmt.Errorf("failed to parse field %q: %w", field.FieldName, parseErr) } else { col.SetDefaultValue(val) } @@ -76,11 +75,15 @@ func (s *SchemaEventPayload) GetTableName() string { return s.Payload.Source.Table } -func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any { +func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) (map[string]any, error) { var retMap map[string]any if len(s.Payload.After) == 0 { if len(s.Payload.Before) > 0 { - retMap = s.parseAndMutateMapInPlace(s.Payload.Before, cdc.Before) + var err error + retMap, err = s.parseAndMutateMapInPlace(s.Payload.Before, cdc.Before) + if err != nil { + return nil, err + } } else { retMap = make(map[string]any) } @@ -98,7 +101,11 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon retMap[tc.IdempotentKey] = s.GetExecutionTime().Format(ext.ISO8601) } } else { - retMap = s.parseAndMutateMapInPlace(s.Payload.After, cdc.After) + var err error + retMap, err = s.parseAndMutateMapInPlace(s.Payload.After, cdc.After) + if err != nil { + return nil, err + } retMap[constants.DeleteColumnMarker] = false } @@ -110,13 +117,13 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon retMap[constants.DatabaseUpdatedColumnMarker] = s.GetExecutionTime().Format(ext.ISO8601) } - return retMap + return retMap, nil } // parseAndMutateMapInPlace will take `retMap` and `kind` (which part of the schema should we be inspecting) and then parse the values accordingly. // This will unpack any Debezium-specific values and convert them back into their original types. // NOTE: `retMap` and the returned object are the same object. -func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kind cdc.FieldLabelKind) map[string]any { +func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kind cdc.FieldLabelKind) (map[string]any, error) { if schemaObject := s.Schema.GetSchemaFromLabel(kind); schemaObject != nil { for _, field := range schemaObject.Fields { fieldVal, isOk := retMap[field.FieldName] @@ -127,12 +134,10 @@ func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kin if val, parseErr := field.ParseValue(fieldVal); parseErr == nil { retMap[field.FieldName] = val } else { - // TODO: Make this a hard failure, confirm this with Datadog logs. - slog.Warn("Failed to parse field, using original value", slog.Any("err", parseErr), - slog.String("field", field.FieldName), slog.Any("value", fieldVal)) + return nil, fmt.Errorf("failed to parse field %q: %w", field.FieldName, parseErr) } } } - return retMap + return retMap, nil } diff --git a/lib/cdc/util/relational_event_decimal_test.go b/lib/cdc/util/relational_event_decimal_test.go index 14acf50cc..47785fe03 100644 --- a/lib/cdc/util/relational_event_decimal_test.go +++ b/lib/cdc/util/relational_event_decimal_test.go @@ -24,7 +24,8 @@ func TestSchemaEventPayload_MiscNumbers_GetData(t *testing.T) { err = json.Unmarshal(bytes, &schemaEventPayload) assert.NoError(t, err) - retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + assert.NoError(t, err) assert.Equal(t, retMap["smallint_test"], 1) assert.Equal(t, retMap["smallserial_test"], 2) assert.Equal(t, retMap["int_test"], 3) @@ -43,7 +44,8 @@ func TestSchemaEventPayload_Numeric_GetData(t *testing.T) { var schemaEventPayload SchemaEventPayload err = json.Unmarshal(bytes, &schemaEventPayload) assert.NoError(t, err) - retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + assert.NoError(t, err) assert.Equal(t, "123456.789", retMap["numeric_test"].(*decimal.Decimal).Value()) assert.Equal(t, 0, big.NewFloat(1234).Cmp(retMap["numeric_5"].(*decimal.Decimal).Value().(*big.Float))) @@ -75,7 +77,8 @@ func TestSchemaEventPayload_Decimal_GetData(t *testing.T) { var schemaEventPayload SchemaEventPayload err = json.Unmarshal(bytes, &schemaEventPayload) assert.NoError(t, err) - retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + assert.NoError(t, err) assert.Equal(t, "123.45", retMap["decimal_test"].(*decimal.Decimal).Value()) decimalWithScaleMap := map[string]string{ "decimal_test_5": "123", @@ -104,7 +107,8 @@ func TestSchemaEventPayload_Money_GetData(t *testing.T) { var schemaEventPayload SchemaEventPayload err = json.Unmarshal(bytes, &schemaEventPayload) assert.NoError(t, err) - retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{}) + assert.NoError(t, err) decimalWithScaleMap := map[string]string{ "money_test": "123456.78", diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go index f8d82eade..e5a421ed7 100644 --- a/lib/cdc/util/relational_event_test.go +++ b/lib/cdc/util/relational_event_test.go @@ -112,7 +112,8 @@ func TestGetDataTestInsert(t *testing.T) { assert.False(t, schemaEventPayload.DeletePayload()) - evtData := schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{}) + evtData, err := schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{}) + assert.NoError(t, err) assert.Equal(t, len(after), len(evtData), "has deletion flag") deletionFlag, isOk := evtData[constants.DeleteColumnMarker] @@ -125,9 +126,10 @@ func TestGetDataTestInsert(t *testing.T) { delete(evtData, constants.DeleteColumnMarker) assert.Equal(t, after, evtData) - evtData = schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{ + evtData, err = schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{ IncludeArtieUpdatedAt: true, }) + assert.NoError(t, err) _, isOk = evtData[constants.UpdateColumnMarker] assert.True(t, isOk) @@ -152,7 +154,8 @@ func TestGetData_TestDelete(t *testing.T) { var schemaEventPayload SchemaEventPayload assert.NoError(t, json.Unmarshal([]byte(PostgresDelete), &schemaEventPayload)) assert.True(t, schemaEventPayload.DeletePayload()) - data := schemaEventPayload.GetData(kvMap, tc) + data, err := schemaEventPayload.GetData(kvMap, tc) + assert.NoError(t, err) for expectedKey, expectedValue := range expectedKeyValues { value, isOk := data[expectedKey] assert.True(t, isOk) @@ -164,7 +167,8 @@ func TestGetData_TestDelete(t *testing.T) { var schemaEventPayload SchemaEventPayload assert.NoError(t, json.Unmarshal([]byte(MySQLDelete), &schemaEventPayload)) assert.True(t, schemaEventPayload.DeletePayload()) - data := schemaEventPayload.GetData(kvMap, tc) + data, err := schemaEventPayload.GetData(kvMap, tc) + assert.NoError(t, err) for expectedKey, expectedValue := range expectedKeyValues { value, isOk := data[expectedKey] assert.True(t, isOk) @@ -203,7 +207,8 @@ func TestGetDataTestUpdate(t *testing.T) { assert.False(t, schemaEventPayload.DeletePayload()) kvMap := map[string]any{"pk": 1} - evtData := schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{}) + evtData, err := schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{}) + assert.NoError(t, err) assert.Equal(t, len(after), len(evtData), "has deletion flag") deletionFlag, isOk := evtData[constants.DeleteColumnMarker] @@ -216,9 +221,10 @@ func TestGetDataTestUpdate(t *testing.T) { delete(evtData, constants.DeleteColumnMarker) assert.Equal(t, after, evtData) - evtData = schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{ + evtData, err = schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{ IncludeArtieUpdatedAt: true, }) + assert.NoError(t, err) _, isOk = evtData[constants.UpdateColumnMarker] assert.True(t, isOk) @@ -248,7 +254,8 @@ func TestSchemaEventPayload_ParseAndMutateMapInPlace(t *testing.T) { }, }, } - returnedMap := schemaEventPayload.parseAndMutateMapInPlace(mapToPassIn, cdc.After) + returnedMap, err := schemaEventPayload.parseAndMutateMapInPlace(mapToPassIn, cdc.After) + assert.NoError(t, err) assert.Equal(t, mapToPassIn, returnedMap) assert.Equal(t, 123, mapToPassIn["id"]) } diff --git a/models/event/event.go b/models/event/event.go index e4d457e66..acab51e86 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -49,7 +49,10 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc *kafkalib.TopicConf } } - evtData := event.GetData(pkMap, tc) + evtData, err := event.GetData(pkMap, tc) + if err != nil { + return Event{}, err + } tblName := stringutil.Override(event.GetTableName(), tc.TableName) if cfgMode == config.History { if !strings.HasSuffix(tblName, constants.HistoryModeSuffix) { diff --git a/models/event/event_test.go b/models/event/event_test.go index 9bed2f9e8..3a9d9a793 100644 --- a/models/event/event_test.go +++ b/models/event/event_test.go @@ -42,8 +42,8 @@ func (f fakeEvent) GetColumns() (*columns.Columns, error) { return &columns.Columns{}, nil } -func (f fakeEvent) GetData(pkMap map[string]any, config *kafkalib.TopicConfig) map[string]any { - return map[string]any{constants.DeleteColumnMarker: false} +func (f fakeEvent) GetData(pkMap map[string]any, config *kafkalib.TopicConfig) (map[string]any, error) { + return map[string]any{constants.DeleteColumnMarker: false}, nil } func (e *EventsTestSuite) TestEvent_IsValid() { From c5e34170652fc507e92b34e801939386f949efbe Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Fri, 19 Apr 2024 12:14:58 -0700 Subject: [PATCH 3/5] Add tag --- processes/consumer/process.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 4d1338739..8f80ac960 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -62,6 +62,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo tags["op"] = _event.Operation() evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) if err != nil { + tags["what"] = "to_mem_event_err" return "", err } // Table name is only available after event has been cast From 5988897ace5068f207dd07c257ad3ac436e52e47 Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 13:24:17 -0700 Subject: [PATCH 4/5] Wrap error --- processes/consumer/process.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 8f80ac960..8e30a303b 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -63,7 +63,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo evt, err := event.ToMemoryEvent(_event, pkMap, topicConfig.tc, cfg.Mode) if err != nil { tags["what"] = "to_mem_event_err" - return "", err + return "", fmt.Errorf("cannot convert to memory event: %w", err) } // Table name is only available after event has been cast tags["table"] = evt.Table From 38aae9184a266560b3f5df5d70c59457662b01ce Mon Sep 17 00:00:00 2001 From: Nathan Villaescusa Date: Mon, 22 Apr 2024 13:28:29 -0700 Subject: [PATCH 5/5] Update message --- lib/cdc/util/relational_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go index a0a0f475d..aa52dbe44 100644 --- a/lib/cdc/util/relational_event.go +++ b/lib/cdc/util/relational_event.go @@ -48,7 +48,7 @@ func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) { col := columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid) val, parseErr := field.ParseValue(field.Default) if parseErr != nil { - return nil, fmt.Errorf("failed to parse field %q: %w", field.FieldName, parseErr) + return nil, fmt.Errorf("failed to parse field %q for default value: %w", field.FieldName, parseErr) } else { col.SetDefaultValue(val) }