diff --git a/lib/cdc/event.go b/lib/cdc/event.go index db6c43bbe..b146948c1 100644 --- a/lib/cdc/event.go +++ b/lib/cdc/event.go @@ -3,11 +3,9 @@ package cdc import ( "time" - "github.com/artie-labs/transfer/lib/typing/columns" - - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" ) type Format interface { diff --git a/lib/cdc/util/relational_event.go b/lib/cdc/util/relational_event.go index 2c500f412..3a0dda38b 100644 --- a/lib/cdc/util/relational_event.go +++ b/lib/cdc/util/relational_event.go @@ -77,10 +77,11 @@ func (s *SchemaEventPayload) GetTableName() string { } func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConfig) (map[string]any, error) { + var err error var retMap map[string]any - if len(s.Payload.After) == 0 { + switch s.Operation() { + case "d": if len(s.Payload.Before) > 0 { - var err error retMap, err = s.parseAndMutateMapInPlace(s.Payload.Before, debezium.Before) if err != nil { return nil, err @@ -100,14 +101,15 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf for k, v := range pkMap { retMap[k] = v } - } else { - var err error + case "r", "u", "c": retMap, err = s.parseAndMutateMapInPlace(s.Payload.After, debezium.After) if err != nil { return nil, err } retMap[constants.DeleteColumnMarker] = false retMap[constants.OnlySetDeleteColumnMarker] = false + default: + return nil, fmt.Errorf("unknown operation %q", s.Operation()) } if tc.IncludeArtieUpdatedAt {