Skip to content

Commit

Permalink
Supporting tables named after keywords (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 11, 2023
1 parent 56269c9 commit 6fb875d
Show file tree
Hide file tree
Showing 46 changed files with 777 additions and 496 deletions.
9 changes: 5 additions & 4 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type Store struct {

func (s *Store) getTableConfig(ctx context.Context, tableData *optimization.TableData) (*types.DwhTableConfig, error) {
return utils.GetTableConfig(ctx, utils.GetTableCfgArgs{
Dwh: s,
FqName: tableData.ToFqName(ctx, constants.BigQuery),
ConfigMap: s.configMap,
Query: fmt.Sprintf("SELECT column_name, data_type, description FROM `%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name='%s';", tableData.TopicConfig.Database, tableData.Name()),
Dwh: s,
FqName: tableData.ToFqName(ctx, constants.BigQuery, true),
ConfigMap: s.configMap,
Query: fmt.Sprintf("SELECT column_name, data_type, description FROM `%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name='%s';",
tableData.TopicConfig.Database, tableData.Name(ctx, nil)),
ColumnNameLabel: describeNameCol,
ColumnTypeLabel: describeTypeCol,
ColumnDescLabel: describeCommentCol,
Expand Down
38 changes: 20 additions & 18 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/transfer/lib/jitter"
Expand Down Expand Up @@ -36,11 +38,11 @@ func (r *Row) Save() (map[string]bigquery.Value, string, error) {
return r.data, bigquery.NoDedupeID, nil
}

func merge(tableData *optimization.TableData) ([]*Row, error) {
func merge(ctx context.Context, tableData *optimization.TableData) ([]*Row, error) {
var rows []*Row
for _, value := range tableData.RowsData() {
data := make(map[string]bigquery.Value)
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(nil) {
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(ctx, nil) {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal, err := CastColVal(value[col], colKind)
if err != nil {
Expand Down Expand Up @@ -76,13 +78,13 @@ func (s *Store) backfillColumn(ctx context.Context, column columns.Column, fqTab
}

fqTableName = strings.ToLower(fqTableName)
escapedCol := column.Name(&columns.NameArgs{Escape: true, DestKind: s.Label()})
escapedCol := column.Name(ctx, &sql.NameArgs{Escape: true, DestKind: s.Label()})
query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`,
// UPDATE table SET col = default_val WHERE col IS NULL
fqTableName, escapedCol, defaultVal, escapedCol)

logger.FromContext(ctx).WithFields(map[string]interface{}{
"colName": column.Name(nil),
"colName": column.Name(ctx, nil),
"query": query,
"table": fqTableName,
}).Info("backfilling column")
Expand Down Expand Up @@ -113,12 +115,12 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

log := logger.FromContext(ctx)
// Check if all the columns exist in BigQuery
srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(),
srcKeysMissing, targetKeysMissing := columns.Diff(ctx, tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(), tableData.TopicConfig.SoftDelete, tableData.TopicConfig.IncludeArtieUpdatedAt)
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(ctx, s.Label()),
FqTableName: tableData.ToFqName(ctx, s.Label(), true),
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Expand All @@ -137,7 +139,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
deleteAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tableData.ToFqName(ctx, s.Label()),
FqTableName: tableData.ToFqName(ctx, s.Label(), true),
CreateTable: false,
ColumnOp: constants.Delete,
ContainOtherOperations: tableData.ContainOtherOperations(),
Expand All @@ -155,7 +157,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
for colToDelete := range tableConfig.ReadOnlyColumnsToDelete() {
var found bool
for _, col := range srcKeysMissing {
if found = col.Name(nil) == colToDelete; found {
if found = col.Name(ctx, nil) == colToDelete; found {
// Found it.
break
}
Expand All @@ -168,13 +170,13 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

// Infer the right data types from BigQuery before temp table creation.
tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
tableData.UpdateInMemoryColumnsFromDestination(ctx, tableConfig.Columns().GetColumns()...)

// Start temporary table creation
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label()), tableData.TempTableSuffix()),
FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label(), false), tableData.TempTableSuffix()),
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Expand All @@ -193,9 +195,9 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

var attempts int
for {
err = s.backfillColumn(ctx, col, tableData.ToFqName(ctx, s.Label()))
err = s.backfillColumn(ctx, col, tableData.ToFqName(ctx, s.Label(), true))
if err == nil {
tableConfig.Columns().UpsertColumn(col.Name(nil), columns.UpsertColumnArg{
tableConfig.Columns().UpsertColumn(col.Name(ctx, nil), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
break
Expand All @@ -208,30 +210,30 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
} else {
defaultVal, _ := col.DefaultValue(nil)
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v",
col.Name(nil), defaultVal, err)
col.Name(ctx, nil), defaultVal, err)
}
}

}

// Perform actual merge now
rows, err := merge(tableData)
rows, err := merge(ctx, tableData)
if err != nil {
log.WithError(err).Warn("failed to generate the merge query")
return err
}

tableName := fmt.Sprintf("%s_%s", tableData.Name(), tableData.TempTableSuffix())
tableName := fmt.Sprintf("%s_%s", tableData.Name(ctx, nil), tableData.TempTableSuffix())
err = s.PutTable(ctx, tableData.TopicConfig.Database, tableName, rows)
if err != nil {
return fmt.Errorf("failed to insert into temp table: %s, error: %v", tableName, err)
}

mergeQuery, err := dml.MergeStatement(&dml.MergeArgument{
FqTableName: tableData.ToFqName(ctx, constants.BigQuery),
mergeQuery, err := dml.MergeStatement(ctx, &dml.MergeArgument{
FqTableName: tableData.ToFqName(ctx, constants.BigQuery, true),
SubQuery: tempAlterTableArgs.FqTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(&columns.NameArgs{
PrimaryKeys: tableData.PrimaryKeys(ctx, &sql.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
Expand Down
27 changes: 16 additions & 11 deletions clients/redshift/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/clients/utils"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/dwh/ddl"
Expand All @@ -21,7 +23,10 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

tableConfig, err := s.getTableConfig(ctx, getTableConfigArgs{
Table: tableData.Name(),
Table: tableData.Name(ctx, &sql.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
Schema: tableData.TopicConfig.Schema,
DropDeletedColumns: tableData.TopicConfig.DropDeletedColumns,
})
Expand All @@ -30,9 +35,9 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

log := logger.FromContext(ctx)
fqName := tableData.ToFqName(ctx, s.Label())
fqName := tableData.ToFqName(ctx, s.Label(), true)
// Check if all the columns exist in Redshift
srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
srcKeysMissing, targetKeysMissing := columns.Diff(ctx, tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig.SoftDelete, tableData.TopicConfig.IncludeArtieUpdatedAt)
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Expand Down Expand Up @@ -74,7 +79,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
for colToDelete := range tableConfig.ReadOnlyColumnsToDelete() {
var found bool
for _, col := range srcKeysMissing {
if found = col.Name(nil) == colToDelete; found {
if found = col.Name(ctx, nil) == colToDelete; found {
// Found it.
break
}
Expand All @@ -86,10 +91,10 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}
}

tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...)
tableData.UpdateInMemoryColumnsFromDestination(ctx, tableConfig.Columns().GetColumns()...)

// Temporary tables cannot specify schemas, so we just prefix it instead.
temporaryTableName := fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label()), tableData.TempTableSuffix())
temporaryTableName := fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label(), false), tableData.TempTableSuffix())
if err = s.prepareTempTable(ctx, tableData, tableConfig, temporaryTableName); err != nil {
return err
}
Expand All @@ -100,24 +105,24 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
continue
}

err = utils.BackfillColumn(ctx, s, col, tableData.ToFqName(ctx, s.Label()))
err = utils.BackfillColumn(ctx, s, col, tableData.ToFqName(ctx, s.Label(), true))
if err != nil {
defaultVal, _ := col.DefaultValue(nil)
return fmt.Errorf("failed to backfill col: %v, default value: %v, error: %v",
col.Name(nil), defaultVal, err)
col.Name(ctx, nil), defaultVal, err)
}

tableConfig.Columns().UpsertColumn(col.Name(nil), columns.UpsertColumnArg{
tableConfig.Columns().UpsertColumn(col.Name(ctx, nil), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
}

// Prepare merge statement
mergeParts, err := dml.MergeStatementParts(&dml.MergeArgument{
mergeParts, err := dml.MergeStatementParts(ctx, &dml.MergeArgument{
FqTableName: fqName,
SubQuery: temporaryTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(&columns.NameArgs{
PrimaryKeys: tableData.PrimaryKeys(ctx, &sql.NameArgs{
Escape: true,
DestKind: s.Label(),
}),
Expand Down
6 changes: 3 additions & 3 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *Store) prepareTempTable(ctx context.Context, tableData *optimization.Ta
return fmt.Errorf("failed to add comment to table, tableName: %v, err: %v", tempTableName, err)
}

fp, err := s.loadTemporaryTable(tableData, tempTableName)
fp, err := s.loadTemporaryTable(ctx, tableData, tempTableName)
if err != nil {
return fmt.Errorf("failed to load temporary table, err: %v", err)
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func (s *Store) prepareTempTable(ctx context.Context, tableData *optimization.Ta
// loadTemporaryTable will write the data into /tmp/newTableName.csv
// This way, another function can call this and then invoke a Snowflake PUT.
// Returns the file path and potential error
func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableName string) (string, error) {
func (s *Store) loadTemporaryTable(ctx context.Context, tableData *optimization.TableData, newTableName string) (string, error) {
filePath := fmt.Sprintf("/tmp/%s.csv", newTableName)
file, err := os.Create(filePath)
if err != nil {
Expand All @@ -84,7 +84,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableNa
writer.Comma = '\t'
for _, value := range tableData.RowsData() {
var row []string
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(nil) {
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(ctx, nil) {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal := value[col]
// Check
Expand Down
12 changes: 6 additions & 6 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfig(fqName)

val := tc.ShouldDeleteColumn(s.ctx, nameCol.Name(nil), time.Now().Add(-1*6*time.Hour), true)
val := tc.ShouldDeleteColumn(s.ctx, nameCol.Name(s.ctx, nil), time.Now().Add(-1*6*time.Hour), true)
assert.False(s.T(), val, "should not try to delete this column")
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(fqName).ReadOnlyColumnsToDelete()), 1)

// Now let's try to add this column back, it should delete it from the cache.
tc.MutateInMemoryColumns(false, constants.Add, nameCol)
tc.MutateInMemoryColumns(s.ctx, false, constants.Add, nameCol)
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(fqName).ReadOnlyColumnsToDelete()), 0)
}

Expand All @@ -63,23 +63,23 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {

nameCol := columns.NewColumn("name", typing.String)
// Let's try to delete name.
allowed := s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(nil),
allowed := s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(s.ctx, nil),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process tried to delete, but it's lagged.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(nil),
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(s.ctx, nil),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process now caught up, and is asking if we can delete, should still be no.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(nil), time.Now(), true)
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(s.ctx, nil), time.Now(), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete still")

// Process is finally ahead, has permission to delete now.
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(nil),
allowed = s.stageStore.configMap.TableConfig(fqName).ShouldDeleteColumn(s.ctx, nameCol.Name(s.ctx, nil),
time.Now().Add(2*constants.DeletionConfidencePadding), true)

assert.Equal(s.T(), allowed, true, "should now be allowed to delete")
Expand Down
20 changes: 10 additions & 10 deletions clients/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
}

tableData := optimization.NewTableData(&cols, []string{"id"}, topicConfig, "foo")
assert.Equal(s.T(), topicConfig.TableName, tableData.Name(), "override is working")
assert.Equal(s.T(), topicConfig.TableName, tableData.Name(s.ctx, nil), "override is working")

for pk, row := range rowsData {
tableData.InsertRow(pk, row, false)
Expand All @@ -64,7 +64,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeNilEdgeCase() {
anotherCols.AddColumn(columns.NewColumn(colName, kindDetails))
}

s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake),
s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true),
types.NewDwhTableConfig(&anotherCols, nil, false, true))

err := s.stageStore.Merge(s.ctx, tableData)
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *SnowflakeTestSuite) TestExecuteMergeReestablishAuth() {
tableData.InsertRow(pk, row, false)
}

s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake),
s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true),
types.NewDwhTableConfig(&cols, nil, false, true))

s.fakeStageStore.ExecReturnsOnCall(0, nil, fmt.Errorf("390114: Authentication token has expired. The user must authenticate again."))
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *SnowflakeTestSuite) TestExecuteMerge() {

var idx int
for _, destKind := range []constants.DestinationKind{constants.Snowflake, constants.SnowflakeStages} {
fqName := tableData.ToFqName(s.ctx, destKind)
fqName := tableData.ToFqName(s.ctx, destKind, true)
s.stageStore.configMap.AddTableToConfig(fqName, types.NewDwhTableConfig(&cols, nil, false, true))
err := s.stageStore.Merge(s.ctx, tableData)
assert.Nil(s.T(), err)
Expand Down Expand Up @@ -244,18 +244,18 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {

sflkCols.AddColumn(columns.NewColumn("new", typing.String))
config := types.NewDwhTableConfig(&sflkCols, nil, false, true)
s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake), config)
s.stageStore.configMap.AddTableToConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true), config)

err := s.stageStore.Merge(s.ctx, tableData)
assert.Nil(s.T(), err)
s.fakeStageStore.ExecReturns(nil, nil)
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 5, "called merge")

// Check the temp deletion table now.
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()), 1,
s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true)).ReadOnlyColumnsToDelete()), 1,
s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true)).ReadOnlyColumnsToDelete())

_, isOk := s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()["new"]
_, isOk := s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true)).ReadOnlyColumnsToDelete()["new"]
assert.True(s.T(), isOk)

// Now try to execute merge where 1 of the rows have the column now
Expand All @@ -276,8 +276,8 @@ func (s *SnowflakeTestSuite) TestExecuteMergeDeletionFlagRemoval() {
assert.Equal(s.T(), s.fakeStageStore.ExecCallCount(), 10, "called merge again")

// Caught up now, so columns should be 0.
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete()), 0,
s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake)).ReadOnlyColumnsToDelete())
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true)).ReadOnlyColumnsToDelete()), 0,
s.stageStore.configMap.TableConfig(tableData.ToFqName(s.ctx, constants.Snowflake, true)).ReadOnlyColumnsToDelete())
}

func (s *SnowflakeTestSuite) TestExecuteMergeExitEarly() {
Expand Down
Loading

0 comments on commit 6fb875d

Please sign in to comment.