diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 1d9583130..102297aeb 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -309,8 +309,8 @@ func (rd RedshiftDialect) BuildMergeQueries( return parts, nil } -func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, column columns.Column, newPrecision int32) string { - return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(column.Name()), newPrecision) +func (rd RedshiftDialect) BuildIncreaseStringPrecisionQuery(tableID sql.TableIdentifier, columnName string, newPrecision int32) string { + return fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE VARCHAR(%d)", tableID.FullyQualifiedName(), rd.QuoteIdentifier(columnName), newPrecision) } func (RedshiftDialect) BuildSweepQuery(_ string, schemaName string) (string, []any) { diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 0d10afa86..c6c3a086e 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/optimization" @@ -20,11 +21,24 @@ import ( ) func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error { - fp, err := s.loadTemporaryTable(tableData, tempTableID) + fp, colToNewLengthMap, err := s.loadTemporaryTable(tableData, tempTableID) if err != nil { return fmt.Errorf("failed to load temporary table: %w", err) } + var queries []string + for colName, newValue := range colToNewLengthMap { + // Generate queries and then update the [tableConfig] with the new string precision. + queries = append(queries, s.dialect().BuildIncreaseStringPrecisionQuery(parentTableID, colName, newValue)) + tableConfig.Columns().UpsertColumn(colName, columns.UpsertColumnArg{ + StringPrecision: typing.ToPtr(newValue), + }) + } + + if err = destination.ExecStatements(s, queries); err != nil { + return fmt.Errorf("failed to increase string precision for table %q: %w", parentTableID.FullyQualifiedName(), err) + } + if createTempTable { tempAlterTableArgs := ddl.AlterTableArgs{ Dialect: s.Dialect(), @@ -77,11 +91,11 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo return nil } -func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, error) { +func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID sql.TableIdentifier) (string, map[string]int32, error) { filePath := fmt.Sprintf("/tmp/%s.csv.gz", newTableID.FullyQualifiedName()) file, err := os.Create(filePath) if err != nil { - return "", err + return "", nil, err } defer file.Close() @@ -103,7 +117,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID ) if err != nil { - return "", err + return "", nil, err } if result.NewLength > 0 { @@ -117,20 +131,21 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableID } if err = writer.Write(row); err != nil { - return "", fmt.Errorf("failed to write to csv: %w", err) + return "", nil, fmt.Errorf("failed to write to csv: %w", err) } } writer.Flush() if err = writer.Error(); err != nil { - return "", fmt.Errorf("failed to flush csv writer: %w", err) + return "", nil, fmt.Errorf("failed to flush csv writer: %w", err) } + // This will update the staging columns with the new string precision. for colName, newLength := range columnToNewLengthMap { tableData.InMemoryColumns().UpsertColumn(colName, columns.UpsertColumnArg{ StringPrecision: typing.ToPtr(newLength), }) } - return filePath, nil + return filePath, columnToNewLengthMap, nil }