Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redshift] Increase String Precision - Part 2 #955

Merged
merged 24 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func (rd RedshiftDialect) BuildMergeQueries(
return parts, nil
}

func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, columnName string, newPrecision int32) string {
return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(columnName), newPrecision)
}

func (RedshiftDialect) BuildSweepQuery(_ string, schemaName string) (string, []any) {
// `relkind` will filter for only ordinary tables and exclude sequences, views, etc.
return `
Expand Down
9 changes: 9 additions & 0 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,12 @@ func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
parts[2])
}
}

func TestRedshiftDialect_BuildIncreaseStringPrecisionQuery(t *testing.T) {
fakeTableID := &mocks.FakeTableIdentifier{}
fakeTableID.FullyQualifiedNameReturns("{PUBLIC}.{TABLE}")
assert.Equal(t,
`ALTER TABLE {PUBLIC}.{TABLE} ALTER COLUMN "bar" TYPE VARCHAR(5)`,
RedshiftDialect{}.BuildIncreaseStringPrecisionQuery(fakeTableID, "bar", 5),
)
}
73 changes: 51 additions & 22 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,31 @@ import (
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/s3lib"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
fp, colToNewLengthMap, err := s.loadTemporaryTable(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
}

for colName, newValue := range colToNewLengthMap {
// Try to upsert columns first. If this fails, we won't need to update the destination table.
err = tableConfig.Columns().UpsertColumn(colName, columns.UpsertColumnArg{
StringPrecision: typing.ToPtr(newValue),
})

if err != nil {
return fmt.Errorf("failed to update table config with new string precision: %w", err)
}

if _, err = s.Exec(s.dialect().BuildIncreaseStringPrecisionQuery(parentTableID, colName, newValue)); err != nil {
return fmt.Errorf("failed to increase string precision for table %q: %w", parentTableID.FullyQualifiedName(), err)
}
}

if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dialect: s.Dialect(),
Expand All @@ -29,16 +51,11 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(s, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
if err = tempAlterTableArgs.AlterTable(s, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
}

fp, err := s.loadTemporaryTable(tableData, tempTableID)
if err != nil {
return fmt.Errorf("failed to load temporary table: %w", err)
}

defer func() {
// Remove file regardless of outcome to avoid fs build up.
if removeErr := os.RemoveAll(fp); removeErr != nil {
Expand All @@ -54,7 +71,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
})

if err != nil {
return fmt.Errorf("failed to upload %s to s3: %w", fp, err)
return fmt.Errorf("failed to upload %q to s3: %w", fp, err)
}

// COPY table_name FROM '/path/to/local/file' DELIMITER '\t' NULL '\\N' FORMAT csv;
Expand All @@ -75,25 +92,24 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
return nil
}

func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, error) {
func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, map[string]int32, error) {
filePath := fmt.Sprintf("/tmp/%s.csv.gz", newTableID.FullyQualifiedName())
file, err := os.Create(filePath)
if err != nil {
return "", err
return "", nil, err
}

defer file.Close()
gzipWriter := gzip.NewWriter(file)
defer gzipWriter.Close()

gzipWriter := gzip.NewWriter(file) // Create a new gzip writer
defer gzipWriter.Close() // Ensure to close the gzip writer after writing

writer := csv.NewWriter(gzipWriter) // Create a CSV writer on top of the gzip writer
writer := csv.NewWriter(gzipWriter)
writer.Comma = '\t'

columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
_columns := tableData.ReadOnlyInMemoryCols().ValidColumns()
columnToNewLengthMap := make(map[string]int32)
for _, value := range tableData.Rows() {
var row []string
for _, col := range columns {
for _, col := range _columns {
result, err := castColValStaging(
value[col.Name()],
col.KindDetails,
Expand All @@ -102,22 +118,35 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID
)

if err != nil {
return "", err
return "", nil, err
}

// TODO: Do something about result.NewLength
if result.NewLength > 0 {
_newLength, isOk := columnToNewLengthMap[col.Name()]
if !isOk || result.NewLength > _newLength {
// Update the new length if it's greater than the current one.
danafallon marked this conversation as resolved.
Show resolved Hide resolved
columnToNewLengthMap[col.Name()] = result.NewLength
}
}
row = append(row, result.Value)
}

if err = writer.Write(row); err != nil {
return "", fmt.Errorf("failed to write to csv: %w", err)
return "", nil, fmt.Errorf("failed to write to csv: %w", err)
}
}

writer.Flush()
if err = writer.Error(); err != nil {
return "", fmt.Errorf("failed to flush csv writer: %w", err)
return "", nil, fmt.Errorf("failed to flush csv writer: %w", err)
}

// This will update the staging columns with the new string precision.
for colName, newLength := range columnToNewLengthMap {
tableData.InMemoryColumns().UpsertColumn(colName, columns.UpsertColumnArg{
StringPrecision: typing.ToPtr(newLength),
})
}

return filePath, nil
return filePath, columnToNewLengthMap, nil
}
6 changes: 3 additions & 3 deletions lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type Kafka struct {
type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
// TODO: Update the yaml annotation once it's supported.
// ExpandStringPrecision - This will expand the string precision based on the values that come in, if the destination supports it.
ExpandStringPrecision bool `yaml:"_expandStringPrecision"`
// ExpandStringPrecision - This will expand the string precision if the incoming data has a higher precision than the destination table.
// This is only supported by Redshift at the moment.
ExpandStringPrecision bool `yaml:"expandStringPrecision"`
}

type Reporting struct {
Expand Down
24 changes: 21 additions & 3 deletions lib/typing/columns/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ type Columns struct {
}

type UpsertColumnArg struct {
ToastCol *bool
PrimaryKey *bool
Backfilled *bool
ToastCol *bool
PrimaryKey *bool
Backfilled *bool
StringPrecision *int32
}

// UpsertColumn - just a wrapper around UpdateColumn and AddColumn
Expand All @@ -127,6 +128,19 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error {
col.backfilled = *arg.Backfilled
}

if arg.StringPrecision != nil {
var currentPrecision int32
if col.KindDetails.OptionalStringPrecision != nil {
currentPrecision = *col.KindDetails.OptionalStringPrecision
}

if currentPrecision > *arg.StringPrecision {
return fmt.Errorf("cannot decrease string precision from %d to %d", currentPrecision, *arg.StringPrecision)
}

col.KindDetails.OptionalStringPrecision = arg.StringPrecision
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this has access to the existing column metadata, is this a better place to check that the new precision is greater than the previous?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have guardrail in how NewLength is being set, I can add something here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern was addressed on the other comment, but yeah it might be nice to add a backup check here too, just in case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't see why not. I'll add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna open up another PR to add error to the signature for UpsertColumn so there's less diff

}

c.UpdateColumn(col)
} else {
_col := Column{
Expand All @@ -146,6 +160,10 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error {
_col.backfilled = *arg.Backfilled
}

if arg.StringPrecision != nil {
_col.KindDetails.OptionalStringPrecision = arg.StringPrecision
}

c.AddColumn(_col)
}

Expand Down
108 changes: 99 additions & 9 deletions lib/typing/columns/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,107 @@ func TestColumns_UpsertColumns(t *testing.T) {
keys := []string{"a", "b", "c", "d", "e"}
var cols Columns
for _, key := range keys {
cols.AddColumn(Column{
name: key,
KindDetails: typing.String,
})
cols.AddColumn(Column{name: key, KindDetails: typing.String})
}

// Now inspect prior to change.
for _, col := range cols.GetColumns() {
assert.False(t, col.ToastColumn)
{
// Now inspect prior to change.
for _, col := range cols.GetColumns() {
assert.False(t, col.ToastColumn)
}
}
{
// Now update a and b to be toast columns
for _, key := range []string{"a", "b"} {
assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{
ToastCol: typing.ToPtr(true),
}))

// Now inspect.
col, _ := cols.GetColumn(key)
assert.True(t, col.ToastColumn)
}
}
{
// Increase string precision
{
// Valid - Current column does not have string precision set
assert.NoError(t, cols.UpsertColumn("string_precision_a", UpsertColumnArg{}))

colA, _ := cols.GetColumn("string_precision_a")
assert.Nil(t, colA.KindDetails.OptionalStringPrecision)

assert.NoError(t,
cols.UpsertColumn("string_precision_a",
UpsertColumnArg{
StringPrecision: typing.ToPtr(int32(55)),
},
),
)
colA, _ = cols.GetColumn("string_precision_a")
assert.Equal(t, int32(55), *colA.KindDetails.OptionalStringPrecision)
}
{
// Valid - Current column does have string precision set (but it's less)
assert.NoError(t,
cols.UpsertColumn("string_precision_b",
UpsertColumnArg{
StringPrecision: typing.ToPtr(int32(5)),
},
),
)

colB, _ := cols.GetColumn("string_precision_b")
assert.Equal(t, int32(5), *colB.KindDetails.OptionalStringPrecision)
assert.NoError(t,
cols.UpsertColumn("string_precision_b",
UpsertColumnArg{
StringPrecision: typing.ToPtr(int32(100)),
},
),
)

colB, _ = cols.GetColumn("string_precision_b")
assert.Equal(t, int32(100), *colB.KindDetails.OptionalStringPrecision)
}
{
// Invalid - Cannot decrease string precision
assert.NoError(t,
cols.UpsertColumn("string_precision_b",
UpsertColumnArg{
StringPrecision: typing.ToPtr(int32(500)),
},
),
)

assert.ErrorContains(t,
cols.UpsertColumn("string_precision_b",
UpsertColumnArg{
StringPrecision: typing.ToPtr(int32(100)),
},
),
"cannot decrease string precision from 500 to 100",
)
}
}
{
// Create a new column zzz
assert.NoError(t, cols.UpsertColumn("zzz", UpsertColumnArg{}))
zzzCol, _ := cols.GetColumn("zzz")
assert.False(t, zzzCol.ToastColumn)
assert.False(t, zzzCol.primaryKey)
assert.Equal(t, zzzCol.KindDetails, typing.Invalid)
}
{
// Create a new column aaa
assert.NoError(t, cols.UpsertColumn("aaa", UpsertColumnArg{
ToastCol: typing.ToPtr(true),
PrimaryKey: typing.ToPtr(true),
}))
aaaCol, _ := cols.GetColumn("aaa")
assert.True(t, aaaCol.ToastColumn)
assert.True(t, aaaCol.primaryKey)
assert.Equal(t, aaaCol.KindDetails, typing.Invalid)
}

// Now selectively update only a, b
for _, key := range []string{"a", "b"} {
assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{
Expand Down
Loading