From 7bfb2c04687187ab0cd35365dd125e617e57917d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 26 Apr 2024 16:06:40 -0700 Subject: [PATCH] Clean up. --- clients/snowflake/writes.go | 9 ++++++--- lib/kafkalib/partition/settings.go | 4 ++++ lib/kafkalib/topic.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/clients/snowflake/writes.go b/clients/snowflake/writes.go index f787098e7..5ad4c17da 100644 --- a/clients/snowflake/writes.go +++ b/clients/snowflake/writes.go @@ -4,6 +4,8 @@ import ( "fmt" "log/slog" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/logger" @@ -52,9 +54,10 @@ func (s *Store) Merge(tableData *optimization.TableData) error { } var additionalEqualityStrings []string - if len(tableData.TopicConfig.AdditionalMergeColumns) > 0 { - for _, additionalMergeCol := range tableData.TopicConfig.AdditionalMergeColumns { - additionalEqualityStrings = append(additionalEqualityStrings, fmt.Sprintf("c.%s = cc.%s", additionalMergeCol, additionalMergeCol)) + if len(tableData.TopicConfig().AdditionalMergePredicates) > 0 { + for _, additionalMergePredicate := range tableData.TopicConfig().AdditionalMergePredicates { + mergePredicateColumn := sql.EscapeName(additionalMergePredicate.PartitionField, s.ShouldUppercaseEscapedNames(), s.Label()) + additionalEqualityStrings = append(additionalEqualityStrings, fmt.Sprintf("c.%s = cc.%s", mergePredicateColumn, mergePredicateColumn)) } } diff --git a/lib/kafkalib/partition/settings.go b/lib/kafkalib/partition/settings.go index 6a21b2ef8..949584f47 100644 --- a/lib/kafkalib/partition/settings.go +++ b/lib/kafkalib/partition/settings.go @@ -17,6 +17,10 @@ var ValidPartitionBy = []string{ "daily", } +type MergePredicates struct { + PartitionField string +} + type BigQuerySettings struct { PartitionType string `yaml:"partitionType" json:"partitionType"` PartitionField string `yaml:"partitionField" json:"partitionField"` diff --git a/lib/kafkalib/topic.go b/lib/kafkalib/topic.go index a780878d1..77a5aac48 100644 --- a/lib/kafkalib/topic.go +++ b/lib/kafkalib/topic.go @@ -47,7 +47,7 @@ type TopicConfig struct { IncludeArtieUpdatedAt bool `yaml:"includeArtieUpdatedAt"` IncludeDatabaseUpdatedAt bool `yaml:"includeDatabaseUpdatedAt"` BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"` - AdditionalMergeColumns []string `yaml:"additionalMergeColumns,omitempty"` + AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"` // Internal metadata opsToSkipMap map[string]bool `yaml:"-"`