From f8a0f589b80ef965ab45bfd4c65123281de1382d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 14 Nov 2024 22:25:29 -0800 Subject: [PATCH] Remove more usages of columns. --- clients/shared/append.go | 4 ++-- clients/shared/merge.go | 4 ++-- lib/destination/types/table_config.go | 7 +++++++ lib/destination/types/table_config_test.go | 4 ++-- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/clients/shared/append.go b/clients/shared/append.go index b36e96d81..1c6823599 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -25,7 +25,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim // We don't care about srcKeysMissing because we don't drop columns when we append. _, targetKeysMissing := columns.Diff( tableData.ReadOnlyInMemoryCols().GetColumns(), - tableConfig.Columns().GetColumns(), + tableConfig.GetColumns(), tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, tableData.TopicConfig().IncludeDatabaseUpdatedAt, @@ -48,7 +48,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim return fmt.Errorf("failed to alter table: %w", err) } - if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 9fd419de8..5ca79f16c 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -30,7 +30,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi srcKeysMissing, targetKeysMissing := columns.Diff( tableData.ReadOnlyInMemoryCols().GetColumns(), - tableConfig.Columns().GetColumns(), + tableConfig.GetColumns(), tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, tableData.TopicConfig().IncludeDatabaseUpdatedAt, @@ -70,7 +70,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi } tableConfig.AuditColumnsToDelete(srcKeysMissing) - if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) } diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index 4f3c0f7d7..d517a47c3 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -52,6 +52,13 @@ func (d *DwhTableConfig) DropDeletedColumns() bool { return d.dropDeletedColumns } +func (d *DwhTableConfig) GetColumns() []columns.Column { + d.RLock() + defer d.RUnlock() + + return d.columns.GetColumns() +} + func (d *DwhTableConfig) Columns() *columns.Columns { if d == nil { return nil diff --git a/lib/destination/types/table_config_test.go b/lib/destination/types/table_config_test.go index f555a4fac..18f01082b 100644 --- a/lib/destination/types/table_config_test.go +++ b/lib/destination/types/table_config_test.go @@ -85,7 +85,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String)) } - assert.Len(t, tc.Columns().GetColumns(), 5) + assert.Len(t, tc.GetColumns(), 5) var wg sync.WaitGroup for _, addCol := range []string{"aa", "bb", "cc", "dd", "ee", "ff"} { wg.Add(1) @@ -104,7 +104,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { } wg.Wait() - assert.Len(t, tc.Columns().GetColumns(), 6) + assert.Len(t, tc.GetColumns(), 6) } func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) {