diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index cfd457e53..97f4ed9a1 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -201,6 +201,10 @@ func (rd RedshiftDialect) BuildMergeQueries( return parts, nil } +func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, columnName string, newPrecision int32) string { + return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(columnName), newPrecision) +} + func (RedshiftDialect) BuildSweepQuery(_ string, schemaName string) (string, []any) { // `relkind` will filter for only ordinary tables and exclude sequences, views, etc. return ` diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index 8084eee5e..c04d9adbe 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -339,3 +339,12 @@ func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) { parts[2]) } } + +func TestRedshiftDialect_BuildIncreaseStringPrecisionQuery(t *testing.T) { + fakeTableID := &mocks.FakeTableIdentifier{} + fakeTableID.FullyQualifiedNameReturns("{PUBLIC}.{TABLE}") + assert.Equal(t, + `ALTER TABLE {PUBLIC}.{TABLE} ALTER COLUMN "bar" TYPE VARCHAR(5)`, + RedshiftDialect{}.BuildIncreaseStringPrecisionQuery(fakeTableID, "bar", 5), + ) +} diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 6be9007d5..f1084dbf7 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -15,9 +15,31 @@ import ( "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/s3lib" "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" ) -func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { +func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { + fp, colToNewLengthMap, err := s.loadTemporaryTable(tableData, tempTableID) + if err != nil { + return fmt.Errorf("failed to load temporary table: %w", err) + } + + for colName, newValue := range colToNewLengthMap { + // Try to upsert columns first. If this fails, we won't need to update the destination table. + err = tableConfig.Columns().UpsertColumn(colName, columns.UpsertColumnArg{ + StringPrecision: typing.ToPtr(newValue), + }) + + if err != nil { + return fmt.Errorf("failed to update table config with new string precision: %w", err) + } + + if _, err = s.Exec(s.dialect().BuildIncreaseStringPrecisionQuery(parentTableID, colName, newValue)); err != nil { + return fmt.Errorf("failed to increase string precision for table %q: %w", parentTableID.FullyQualifiedName(), err) + } + } + if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -29,16 +51,11 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati Mode: tableData.Mode(), } - if err := tempAlterTableArgs.AlterTable(s, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + if err = tempAlterTableArgs.AlterTable(s, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } } - fp, err := s.loadTemporaryTable(tableData, tempTableID) - if err != nil { - return fmt.Errorf("failed to load temporary table: %w", err) - } - defer func() { // Remove file regardless of outcome to avoid fs build up. if removeErr := os.RemoveAll(fp); removeErr != nil { @@ -54,7 +71,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati }) if err != nil { - return fmt.Errorf("failed to upload %s to s3: %w", fp, err) + return fmt.Errorf("failed to upload %q to s3: %w", fp, err) } // COPY table_name FROM '/path/to/local/file' DELIMITER '\t' NULL '\\N' FORMAT csv; @@ -75,25 +92,24 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati return nil } -func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, error) { +func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, map[string]int32, error) { filePath := fmt.Sprintf("/tmp/%s.csv.gz", newTableID.FullyQualifiedName()) file, err := os.Create(filePath) if err != nil { - return "", err + return "", nil, err } defer file.Close() + gzipWriter := gzip.NewWriter(file) + defer gzipWriter.Close() - gzipWriter := gzip.NewWriter(file) // Create a new gzip writer - defer gzipWriter.Close() // Ensure to close the gzip writer after writing - - writer := csv.NewWriter(gzipWriter) // Create a CSV writer on top of the gzip writer + writer := csv.NewWriter(gzipWriter) writer.Comma = '\t' - - columns := tableData.ReadOnlyInMemoryCols().ValidColumns() + _columns := tableData.ReadOnlyInMemoryCols().ValidColumns() + columnToNewLengthMap := make(map[string]int32) for _, value := range tableData.Rows() { var row []string - for _, col := range columns { + for _, col := range _columns { result, err := castColValStaging( value[col.Name()], col.KindDetails, @@ -102,22 +118,35 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID ) if err != nil { - return "", err + return "", nil, err } - // TODO: Do something about result.NewLength + if result.NewLength > 0 { + _newLength, isOk := columnToNewLengthMap[col.Name()] + if !isOk || result.NewLength > _newLength { + // Update the new length if it's greater than the current one. + columnToNewLengthMap[col.Name()] = result.NewLength + } + } row = append(row, result.Value) } if err = writer.Write(row); err != nil { - return "", fmt.Errorf("failed to write to csv: %w", err) + return "", nil, fmt.Errorf("failed to write to csv: %w", err) } } writer.Flush() if err = writer.Error(); err != nil { - return "", fmt.Errorf("failed to flush csv writer: %w", err) + return "", nil, fmt.Errorf("failed to flush csv writer: %w", err) + } + + // This will update the staging columns with the new string precision. + for colName, newLength := range columnToNewLengthMap { + tableData.InMemoryColumns().UpsertColumn(colName, columns.UpsertColumnArg{ + StringPrecision: typing.ToPtr(newLength), + }) } - return filePath, nil + return filePath, columnToNewLengthMap, nil } diff --git a/lib/config/types.go b/lib/config/types.go index 665c55ccb..7ba7c4e8c 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -39,9 +39,9 @@ type Kafka struct { type SharedDestinationSettings struct { // TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value` TruncateExceededValues bool `yaml:"truncateExceededValues"` - // TODO: Update the yaml annotation once it's supported. - // ExpandStringPrecision - This will expand the string precision based on the values that come in, if the destination supports it. - ExpandStringPrecision bool `yaml:"_expandStringPrecision"` + // ExpandStringPrecision - This will expand the string precision if the incoming data has a higher precision than the destination table. + // This is only supported by Redshift at the moment. + ExpandStringPrecision bool `yaml:"expandStringPrecision"` } type Reporting struct { diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go index 1af0a2c23..4d29ae82b 100644 --- a/lib/typing/columns/columns.go +++ b/lib/typing/columns/columns.go @@ -102,9 +102,10 @@ type Columns struct { } type UpsertColumnArg struct { - ToastCol *bool - PrimaryKey *bool - Backfilled *bool + ToastCol *bool + PrimaryKey *bool + Backfilled *bool + StringPrecision *int32 } // UpsertColumn - just a wrapper around UpdateColumn and AddColumn @@ -127,6 +128,19 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error { col.backfilled = *arg.Backfilled } + if arg.StringPrecision != nil { + var currentPrecision int32 + if col.KindDetails.OptionalStringPrecision != nil { + currentPrecision = *col.KindDetails.OptionalStringPrecision + } + + if currentPrecision > *arg.StringPrecision { + return fmt.Errorf("cannot decrease string precision from %d to %d", currentPrecision, *arg.StringPrecision) + } + + col.KindDetails.OptionalStringPrecision = arg.StringPrecision + } + c.UpdateColumn(col) } else { _col := Column{ @@ -146,6 +160,10 @@ func (c *Columns) UpsertColumn(colName string, arg UpsertColumnArg) error { _col.backfilled = *arg.Backfilled } + if arg.StringPrecision != nil { + _col.KindDetails.OptionalStringPrecision = arg.StringPrecision + } + c.AddColumn(_col) } diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index dca59855e..33a65830c 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -181,17 +181,107 @@ func TestColumns_UpsertColumns(t *testing.T) { keys := []string{"a", "b", "c", "d", "e"} var cols Columns for _, key := range keys { - cols.AddColumn(Column{ - name: key, - KindDetails: typing.String, - }) + cols.AddColumn(Column{name: key, KindDetails: typing.String}) } - - // Now inspect prior to change. - for _, col := range cols.GetColumns() { - assert.False(t, col.ToastColumn) + { + // Now inspect prior to change. + for _, col := range cols.GetColumns() { + assert.False(t, col.ToastColumn) + } + } + { + // Now update a and b to be toast columns + for _, key := range []string{"a", "b"} { + assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{ + ToastCol: typing.ToPtr(true), + })) + + // Now inspect. + col, _ := cols.GetColumn(key) + assert.True(t, col.ToastColumn) + } + } + { + // Increase string precision + { + // Valid - Current column does not have string precision set + assert.NoError(t, cols.UpsertColumn("string_precision_a", UpsertColumnArg{})) + + colA, _ := cols.GetColumn("string_precision_a") + assert.Nil(t, colA.KindDetails.OptionalStringPrecision) + + assert.NoError(t, + cols.UpsertColumn("string_precision_a", + UpsertColumnArg{ + StringPrecision: typing.ToPtr(int32(55)), + }, + ), + ) + colA, _ = cols.GetColumn("string_precision_a") + assert.Equal(t, int32(55), *colA.KindDetails.OptionalStringPrecision) + } + { + // Valid - Current column does have string precision set (but it's less) + assert.NoError(t, + cols.UpsertColumn("string_precision_b", + UpsertColumnArg{ + StringPrecision: typing.ToPtr(int32(5)), + }, + ), + ) + + colB, _ := cols.GetColumn("string_precision_b") + assert.Equal(t, int32(5), *colB.KindDetails.OptionalStringPrecision) + assert.NoError(t, + cols.UpsertColumn("string_precision_b", + UpsertColumnArg{ + StringPrecision: typing.ToPtr(int32(100)), + }, + ), + ) + + colB, _ = cols.GetColumn("string_precision_b") + assert.Equal(t, int32(100), *colB.KindDetails.OptionalStringPrecision) + } + { + // Invalid - Cannot decrease string precision + assert.NoError(t, + cols.UpsertColumn("string_precision_b", + UpsertColumnArg{ + StringPrecision: typing.ToPtr(int32(500)), + }, + ), + ) + + assert.ErrorContains(t, + cols.UpsertColumn("string_precision_b", + UpsertColumnArg{ + StringPrecision: typing.ToPtr(int32(100)), + }, + ), + "cannot decrease string precision from 500 to 100", + ) + } + } + { + // Create a new column zzz + assert.NoError(t, cols.UpsertColumn("zzz", UpsertColumnArg{})) + zzzCol, _ := cols.GetColumn("zzz") + assert.False(t, zzzCol.ToastColumn) + assert.False(t, zzzCol.primaryKey) + assert.Equal(t, zzzCol.KindDetails, typing.Invalid) + } + { + // Create a new column aaa + assert.NoError(t, cols.UpsertColumn("aaa", UpsertColumnArg{ + ToastCol: typing.ToPtr(true), + PrimaryKey: typing.ToPtr(true), + })) + aaaCol, _ := cols.GetColumn("aaa") + assert.True(t, aaaCol.ToastColumn) + assert.True(t, aaaCol.primaryKey) + assert.Equal(t, aaaCol.KindDetails, typing.Invalid) } - // Now selectively update only a, b for _, key := range []string{"a", "b"} { assert.NoError(t, cols.UpsertColumn(key, UpsertColumnArg{