Skip to content

Commit

Permalink
Refactor how we qualify a deleted row in relational_event (#965)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Oct 17, 2024
1 parent 8a44105 commit f8d1a64
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
6 changes: 2 additions & 4 deletions lib/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package cdc
import (
"time"

"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/transfer/lib/typing"

"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

type Format interface {
Expand Down
10 changes: 6 additions & 4 deletions lib/cdc/util/relational_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ func (s *SchemaEventPayload) GetTableName() string {
}

func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConfig) (map[string]any, error) {
var err error
var retMap map[string]any
if len(s.Payload.After) == 0 {
switch s.Operation() {
case "d":
if len(s.Payload.Before) > 0 {
var err error
retMap, err = s.parseAndMutateMapInPlace(s.Payload.Before, debezium.Before)
if err != nil {
return nil, err
Expand All @@ -100,14 +101,15 @@ func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc kafkalib.TopicConf
for k, v := range pkMap {
retMap[k] = v
}
} else {
var err error
case "r", "u", "c":
retMap, err = s.parseAndMutateMapInPlace(s.Payload.After, debezium.After)
if err != nil {
return nil, err
}
retMap[constants.DeleteColumnMarker] = false
retMap[constants.OnlySetDeleteColumnMarker] = false
default:
return nil, fmt.Errorf("unknown operation %q", s.Operation())
}

if tc.IncludeArtieUpdatedAt {
Expand Down

0 comments on commit f8d1a64

Please sign in to comment.