diff --git a/lib/kafkalib/topic.go b/lib/kafkalib/topic.go index 6ec8bacd4..a2151fae8 100644 --- a/lib/kafkalib/topic.go +++ b/lib/kafkalib/topic.go @@ -39,6 +39,7 @@ type TopicConfig struct { // TODO: Deprecate BigQueryPartitionSettings and use AdditionalMergePredicates instead. BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"` AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"` + ColumnsToHash []string `yaml:"columnsToHash,omitempty"` // Internal metadata opsToSkipMap map[string]bool `yaml:"-"` diff --git a/models/event/event.go b/models/event/event.go index 4d87a13af..c148263a3 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/artie-labs/transfer/lib/cryptography" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/config" @@ -134,6 +136,16 @@ func (e *Event) PrimaryKeyValue() string { return key } +func (e *Event) hashData(tc kafkalib.TopicConfig) { + for _, columnToHash := range tc.ColumnsToHash { + if value, isOk := e.Data[columnToHash]; isOk { + e.Data[columnToHash] = cryptography.HashValue(value) + } + } + + return +} + // Save will save the event into our in memory event // It will return (flush bool, flushReason string, err error) func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkalib.TopicConfig, message artie.Message) (bool, string, error) { @@ -141,6 +153,9 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali return false, "", errors.New("event not valid") } + // Are there any columns that need to be hashed? + e.hashData(tc) + // Does the table exist? td := inMemDB.GetOrCreateTableData(e.Table) td.Lock()