Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 19, 2024
1 parent 7106826 commit 4e7dec4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/kafkalib/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type TopicConfig struct {
// TODO: Deprecate BigQueryPartitionSettings and use AdditionalMergePredicates instead.
BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"`
AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"`
ColumnsToHash []string `yaml:"columnsToHash,omitempty"`

// Internal metadata
opsToSkipMap map[string]bool `yaml:"-"`
Expand Down
15 changes: 15 additions & 0 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/cryptography"

"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config"
Expand Down Expand Up @@ -134,13 +136,26 @@ func (e *Event) PrimaryKeyValue() string {
return key
}

func (e *Event) hashData(tc kafkalib.TopicConfig) {
for _, columnToHash := range tc.ColumnsToHash {
if value, isOk := e.Data[columnToHash]; isOk {
e.Data[columnToHash] = cryptography.HashValue(value)
}
}

return

Check failure on line 146 in models/event/event.go

View workflow job for this annotation

GitHub Actions / test

redundant return statement (S1023)
}

// Save will save the event into our in memory event
// It will return (flush bool, flushReason string, err error)
func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkalib.TopicConfig, message artie.Message) (bool, string, error) {
if !e.IsValid() {
return false, "", errors.New("event not valid")
}

// Are there any columns that need to be hashed?
e.hashData(tc)

// Does the table exist?
td := inMemDB.GetOrCreateTableData(e.Table)
td.Lock()
Expand Down

0 comments on commit 4e7dec4

Please sign in to comment.