Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return errors from Event.GetData and Event.GetColumns #454

Merged
merged 7 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type Event interface {
Operation() string
DeletePayload() bool
GetTableName() string
GetData(pkMap map[string]any, config *kafkalib.TopicConfig) map[string]any
GetData(pkMap map[string]any, config *kafkalib.TopicConfig) (map[string]any, error)
GetOptionalSchema() map[string]typing.KindDetails
// GetColumns will inspect the envelope's payload right now and return.
GetColumns() *columns.Columns
GetColumns() (*columns.Columns, error)
}

// FieldLabelKind is used when the schema is turned on. Each schema object will be labelled.
Expand Down
10 changes: 5 additions & 5 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails {
return nil
}

func (s *SchemaEventPayload) GetColumns() *columns.Columns {
func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) {
fieldsObject := s.Schema.GetSchemaFromLabel(cdc.After)
if fieldsObject == nil {
// AFTER schema does not exist.
return nil
return nil, nil
}

var cols columns.Columns
Expand All @@ -146,10 +146,10 @@ func (s *SchemaEventPayload) GetColumns() *columns.Columns {
cols.AddColumn(columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid))
}

return &cols
return &cols, nil
}

func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any {
func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) (map[string]any, error) {
var retMap map[string]any
if len(s.Payload.afterMap) == 0 {
// This is a delete event, so mark it as deleted.
Expand Down Expand Up @@ -187,5 +187,5 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon
retMap[constants.DatabaseUpdatedColumnMarker] = s.GetExecutionTime().Format(ext.ISO8601)
}

return retMap
return retMap, nil
}
18 changes: 12 additions & 6 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,25 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() {

evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
assert.NoError(p.T(), err)
evtData := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(p.T(), isOk)
assert.Equal(p.T(), evtData["_id"], 1003)
assert.Equal(p.T(), evtData["first_name"], "Robin")
assert.Equal(p.T(), evtData["last_name"], "Tang")
assert.Equal(p.T(), evtData["email"], "[email protected]")

evtDataWithIncludedAt := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
_, isOk = evtDataWithIncludedAt[constants.UpdateColumnMarker]
assert.False(p.T(), isOk)

evtDataWithIncludedAt = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
evtDataWithIncludedAt, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
IncludeDatabaseUpdatedAt: true,
IncludeArtieUpdatedAt: true,
})
assert.NoError(p.T(), err)

assert.Equal(p.T(), "2022-11-18T06:35:21+00:00", evtDataWithIncludedAt[constants.DatabaseUpdatedColumnMarker])
_, err = time.Parse(ext.ISO8601, evtDataWithIncludedAt[constants.UpdateColumnMarker].(string))
Expand Down Expand Up @@ -235,7 +238,8 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() {

evt, err := p.Debezium.GetEventFromBytes(typing.Settings{}, []byte(payload))
assert.NoError(p.T(), err)
evtData := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
evtData, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
assert.Equal(p.T(), "customers123", evt.GetTableName())
_, isOk := evtData[constants.UpdateColumnMarker]
assert.False(p.T(), isOk)
Expand All @@ -245,9 +249,10 @@ func (p *MongoTestSuite) TestMongoDBEventCustomerBefore() {
time.Date(2022, time.November, 18, 6, 35, 21, 0, time.UTC))
assert.Equal(p.T(), true, evt.DeletePayload())

evtData = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
evtData, err = evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{
IncludeArtieUpdatedAt: true,
})
assert.NoError(p.T(), err)
_, isOk = evtData[constants.UpdateColumnMarker]
assert.True(p.T(), isOk)

Expand Down Expand Up @@ -456,6 +461,7 @@ func (p *MongoTestSuite) TestMongoDBEventWithSchema() {
Type: debezium.String,
})
assert.False(p.T(), evt.DeletePayload())
cols := schemaEvt.GetColumns()
cols, err := schemaEvt.GetColumns()
assert.NoError(p.T(), err)
assert.NotNil(p.T(), cols)
}
9 changes: 6 additions & 3 deletions lib/cdc/mysql/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {
"id": 1001,
}

evtData := evt.GetData(kvMap, &kafkalib.TopicConfig{})
evtData, err := evt.GetData(kvMap, &kafkalib.TopicConfig{})
assert.NoError(m.T(), err)

// Should have no Artie updated or database updated fields
_, isOk := evtData[constants.UpdateColumnMarker]
Expand All @@ -332,10 +333,11 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {
_, isOk = evtData[constants.DatabaseUpdatedColumnMarker]
assert.False(m.T(), isOk)

evtData = evt.GetData(kvMap, &kafkalib.TopicConfig{
evtData, err = evt.GetData(kvMap, &kafkalib.TopicConfig{
IncludeDatabaseUpdatedAt: true,
IncludeArtieUpdatedAt: true,
})
assert.NoError(m.T(), err)

assert.Equal(m.T(), "2023-03-13T19:19:24+00:00", evtData[constants.DatabaseUpdatedColumnMarker])

Expand All @@ -345,7 +347,8 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {
assert.Equal(m.T(), evtData["id"], 1001)
assert.Equal(m.T(), evtData["first_name"], "Sally")
assert.Equal(m.T(), evtData["bool_test"], false)
cols := evt.GetColumns()
cols, err := evt.GetColumns()
assert.NoError(m.T(), err)
assert.NotNil(m.T(), cols)

col, isOk := cols.GetColumn("abcdef")
Expand Down
6 changes: 4 additions & 2 deletions lib/cdc/postgres/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ func (p *PostgresTestSuite) TestPostgresEvent() {
assert.Nil(p.T(), err)
assert.False(p.T(), evt.DeletePayload())

evtData := evt.GetData(map[string]any{"id": 59}, &kafkalib.TopicConfig{
evtData, err := evt.GetData(map[string]any{"id": 59}, &kafkalib.TopicConfig{
IncludeDatabaseUpdatedAt: true,
})
assert.NoError(p.T(), err)
assert.Equal(p.T(), float64(59), evtData["id"])
assert.Equal(p.T(), "2022-11-16T04:01:53+00:00", evtData[constants.DatabaseUpdatedColumnMarker])

Expand Down Expand Up @@ -190,7 +191,8 @@ func (p *PostgresTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
assert.Nil(p.T(), err)
assert.False(p.T(), evt.DeletePayload())

evtData := evt.GetData(map[string]any{"id": 1001}, &kafkalib.TopicConfig{})
evtData, err := evt.GetData(map[string]any{"id": 1001}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)

// Testing typing.
assert.Equal(p.T(), evtData["id"], 1001)
Expand Down
44 changes: 20 additions & 24 deletions lib/cdc/util/relational_event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package util

import (
"log/slog"
"fmt"
"time"

"github.com/artie-labs/transfer/lib/cdc"
Expand Down Expand Up @@ -34,11 +34,11 @@ type Source struct {
Table string `json:"table"`
}

func (s *SchemaEventPayload) GetColumns() *columns.Columns {
func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) {
fieldsObject := s.Schema.GetSchemaFromLabel(cdc.After)
if fieldsObject == nil {
// AFTER schema does not exist.
return nil
return nil, nil
}

var cols columns.Columns
Expand All @@ -48,21 +48,15 @@ func (s *SchemaEventPayload) GetColumns() *columns.Columns {
col := columns.NewColumn(columns.EscapeName(field.FieldName), typing.Invalid)
val, parseErr := field.ParseValue(field.Default)
if parseErr != nil {
slog.Warn("Failed to parse field for default value, using original value",
slog.Any("err", parseErr),
slog.String("field", field.FieldName),
slog.Any("value", field.Default),
slog.String("debezium_type", string(field.DebeziumType)),
)
col.SetDefaultValue(field.Default)
return nil, fmt.Errorf("failed to parse field %q: %w", field.FieldName, parseErr)
} else {
col.SetDefaultValue(val)
}

cols.AddColumn(col)
}

return &cols
return &cols, nil
}

func (s *SchemaEventPayload) Operation() string {
Expand All @@ -81,11 +75,15 @@ func (s *SchemaEventPayload) GetTableName() string {
return s.Payload.Source.Table
}

func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any {
func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) (map[string]any, error) {
var retMap map[string]any
if len(s.Payload.After) == 0 {
if len(s.Payload.Before) > 0 {
retMap = s.parseAndMutateMapInPlace(s.Payload.Before, cdc.Before)
var err error
retMap, err = s.parseAndMutateMapInPlace(s.Payload.Before, cdc.Before)
if err != nil {
return nil, err
}
} else {
retMap = make(map[string]any)
}
Expand All @@ -103,7 +101,11 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon
retMap[tc.IdempotentKey] = s.GetExecutionTime().Format(ext.ISO8601)
}
} else {
retMap = s.parseAndMutateMapInPlace(s.Payload.After, cdc.After)
var err error
retMap, err = s.parseAndMutateMapInPlace(s.Payload.After, cdc.After)
if err != nil {
return nil, err
}
retMap[constants.DeleteColumnMarker] = false
}

Expand All @@ -115,13 +117,13 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicCon
retMap[constants.DatabaseUpdatedColumnMarker] = s.GetExecutionTime().Format(ext.ISO8601)
}

return retMap
return retMap, nil
}

// parseAndMutateMapInPlace will take `retMap` and `kind` (which part of the schema should we be inspecting) and then parse the values accordingly.
// This will unpack any Debezium-specific values and convert them back into their original types.
// NOTE: `retMap` and the returned object are the same object.
func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kind cdc.FieldLabelKind) map[string]any {
func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kind cdc.FieldLabelKind) (map[string]any, error) {
if schemaObject := s.Schema.GetSchemaFromLabel(kind); schemaObject != nil {
for _, field := range schemaObject.Fields {
fieldVal, isOk := retMap[field.FieldName]
Expand All @@ -132,16 +134,10 @@ func (s *SchemaEventPayload) parseAndMutateMapInPlace(retMap map[string]any, kin
if val, parseErr := field.ParseValue(fieldVal); parseErr == nil {
retMap[field.FieldName] = val
} else {
// TODO: Make this a hard failure, confirm this with Datadog logs.
slog.Warn("Failed to parse field, using original value",
slog.Any("err", parseErr),
slog.String("field", field.FieldName),
slog.Any("value", fieldVal),
slog.String("debezium_type", string(field.DebeziumType)),
)
return nil, fmt.Errorf("failed to parse field %q: %w", field.FieldName, parseErr)
}
}
}

return retMap
return retMap, nil
}
12 changes: 8 additions & 4 deletions lib/cdc/util/relational_event_decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func TestSchemaEventPayload_MiscNumbers_GetData(t *testing.T) {
err = json.Unmarshal(bytes, &schemaEventPayload)
assert.NoError(t, err)

retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)
assert.Equal(t, retMap["smallint_test"], 1)
assert.Equal(t, retMap["smallserial_test"], 2)
assert.Equal(t, retMap["int_test"], 3)
Expand All @@ -43,7 +44,8 @@ func TestSchemaEventPayload_Numeric_GetData(t *testing.T) {
var schemaEventPayload SchemaEventPayload
err = json.Unmarshal(bytes, &schemaEventPayload)
assert.NoError(t, err)
retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)

assert.Equal(t, "123456.789", retMap["numeric_test"].(*decimal.Decimal).Value())
assert.Equal(t, 0, big.NewFloat(1234).Cmp(retMap["numeric_5"].(*decimal.Decimal).Value().(*big.Float)))
Expand Down Expand Up @@ -75,7 +77,8 @@ func TestSchemaEventPayload_Decimal_GetData(t *testing.T) {
var schemaEventPayload SchemaEventPayload
err = json.Unmarshal(bytes, &schemaEventPayload)
assert.NoError(t, err)
retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)
assert.Equal(t, "123.45", retMap["decimal_test"].(*decimal.Decimal).Value())
decimalWithScaleMap := map[string]string{
"decimal_test_5": "123",
Expand Down Expand Up @@ -104,7 +107,8 @@ func TestSchemaEventPayload_Money_GetData(t *testing.T) {
var schemaEventPayload SchemaEventPayload
err = json.Unmarshal(bytes, &schemaEventPayload)
assert.NoError(t, err)
retMap := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
retMap, err := schemaEventPayload.GetData(nil, &kafkalib.TopicConfig{})
assert.NoError(t, err)

decimalWithScaleMap := map[string]string{
"money_test": "123456.78",
Expand Down
24 changes: 16 additions & 8 deletions lib/cdc/util/relational_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestSource_GetOptionalSchema(t *testing.T) {
assert.True(t, isOk)
assert.Equal(t, value, typing.String)

cols := schemaEventPayload.GetColumns()
cols, err := schemaEventPayload.GetColumns()
assert.NoError(t, err)
assert.Equal(t, 6, len(cols.GetColumns()))

col, isOk := cols.GetColumn("boolean_column")
Expand Down Expand Up @@ -111,7 +112,8 @@ func TestGetDataTestInsert(t *testing.T) {

assert.False(t, schemaEventPayload.DeletePayload())

evtData := schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{})
evtData, err := schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{})
assert.NoError(t, err)
assert.Equal(t, len(after), len(evtData), "has deletion flag")

deletionFlag, isOk := evtData[constants.DeleteColumnMarker]
Expand All @@ -124,9 +126,10 @@ func TestGetDataTestInsert(t *testing.T) {
delete(evtData, constants.DeleteColumnMarker)
assert.Equal(t, after, evtData)

evtData = schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{
evtData, err = schemaEventPayload.GetData(map[string]any{"pk": 1}, &kafkalib.TopicConfig{
IncludeArtieUpdatedAt: true,
})
assert.NoError(t, err)

_, isOk = evtData[constants.UpdateColumnMarker]
assert.True(t, isOk)
Expand All @@ -151,7 +154,8 @@ func TestGetData_TestDelete(t *testing.T) {
var schemaEventPayload SchemaEventPayload
assert.NoError(t, json.Unmarshal([]byte(PostgresDelete), &schemaEventPayload))
assert.True(t, schemaEventPayload.DeletePayload())
data := schemaEventPayload.GetData(kvMap, tc)
data, err := schemaEventPayload.GetData(kvMap, tc)
assert.NoError(t, err)
for expectedKey, expectedValue := range expectedKeyValues {
value, isOk := data[expectedKey]
assert.True(t, isOk)
Expand All @@ -163,7 +167,8 @@ func TestGetData_TestDelete(t *testing.T) {
var schemaEventPayload SchemaEventPayload
assert.NoError(t, json.Unmarshal([]byte(MySQLDelete), &schemaEventPayload))
assert.True(t, schemaEventPayload.DeletePayload())
data := schemaEventPayload.GetData(kvMap, tc)
data, err := schemaEventPayload.GetData(kvMap, tc)
assert.NoError(t, err)
for expectedKey, expectedValue := range expectedKeyValues {
value, isOk := data[expectedKey]
assert.True(t, isOk)
Expand Down Expand Up @@ -202,7 +207,8 @@ func TestGetDataTestUpdate(t *testing.T) {
assert.False(t, schemaEventPayload.DeletePayload())
kvMap := map[string]any{"pk": 1}

evtData := schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{})
evtData, err := schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{})
assert.NoError(t, err)
assert.Equal(t, len(after), len(evtData), "has deletion flag")

deletionFlag, isOk := evtData[constants.DeleteColumnMarker]
Expand All @@ -215,9 +221,10 @@ func TestGetDataTestUpdate(t *testing.T) {
delete(evtData, constants.DeleteColumnMarker)
assert.Equal(t, after, evtData)

evtData = schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{
evtData, err = schemaEventPayload.GetData(kvMap, &kafkalib.TopicConfig{
IncludeArtieUpdatedAt: true,
})
assert.NoError(t, err)

_, isOk = evtData[constants.UpdateColumnMarker]
assert.True(t, isOk)
Expand Down Expand Up @@ -247,7 +254,8 @@ func TestSchemaEventPayload_ParseAndMutateMapInPlace(t *testing.T) {
},
},
}
returnedMap := schemaEventPayload.parseAndMutateMapInPlace(mapToPassIn, cdc.After)
returnedMap, err := schemaEventPayload.parseAndMutateMapInPlace(mapToPassIn, cdc.After)
assert.NoError(t, err)
assert.Equal(t, mapToPassIn, returnedMap)
assert.Equal(t, 123, mapToPassIn["id"])
}
Loading