diff --git a/README.md b/README.md index 4bfb54e1f..48c3bbfab 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur - [Destinations](https://docs.artie.so/real-time-destinations/overview): - Snowflake - BigQuery + - Redshift - [Sources](https://docs.artie.so/real-time-sources/overview): - MongoDB diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index bbdbe79f2..4e709fdca 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -68,7 +68,7 @@ func (s *Store) backfillColumn(ctx context.Context, column columns.Column, fqTab defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{ Escape: true, - BigQuery: true, + DestKind: s.Label(), }) if err != nil { diff --git a/clients/redshift/cast.go b/clients/redshift/cast.go new file mode 100644 index 000000000..5f33589dc --- /dev/null +++ b/clients/redshift/cast.go @@ -0,0 +1,88 @@ +package redshift + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + + "github.com/artie-labs/transfer/lib/array" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" +) + +// CastColValStaging - takes `colVal` interface{} and `colKind` typing.Column and converts the value into a string value +// This is necessary because CSV writers require values to in `string`. +func CastColValStaging(colVal interface{}, colKind columns.Column) (string, error) { + if colVal == nil { + // This matches the COPY clause for NULL terminator. + return `\N`, nil + } + + colValString := fmt.Sprint(colVal) + switch colKind.KindDetails.Kind { + // All the other types do not need string wrapping. + case typing.ETime.Kind: + extTime, err := ext.ParseFromInterface(colVal) + if err != nil { + return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err) + } + + switch extTime.NestedKind.Type { + case ext.TimeKindType: + colValString = extTime.String(ext.PostgresTimeFormatNoTZ) + default: + colValString = extTime.String("") + } + + case typing.String.Kind: + // TODO: Worth writing a benchmark whether we should check for prefix and suffix of `[ ]` + // Check if it's an array. + list, err := array.InterfaceToArrayString(colVal) + if err == nil { + colValString = "[" + strings.Join(list, ",") + "]" + } else { + colValString = stringutil.Wrap(colVal, true) + } + + case typing.Struct.Kind: + if colKind.KindDetails == typing.Struct { + if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { + colVal = map[string]interface{}{ + "key": constants.ToastUnavailableValuePlaceholder, + } + } + + if reflect.TypeOf(colVal).Kind() != reflect.String { + colValBytes, err := json.Marshal(colVal) + if err != nil { + return "", err + } + + colValString = string(colValBytes) + } + } + case typing.Array.Kind: + colValBytes, err := json.Marshal(colVal) + if err != nil { + return "", err + } + + colValString = string(colValBytes) + case typing.EDecimal.Kind: + val, isOk := colVal.(*decimal.Decimal) + if !isOk { + return "", fmt.Errorf("colVal is not *decimal.Decimal type") + } + + return val.String(), nil + } + + return colValString, nil + +} diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go new file mode 100644 index 000000000..7cdc68c41 --- /dev/null +++ b/clients/redshift/cast_test.go @@ -0,0 +1,263 @@ +package redshift + +import ( + "fmt" + "math/big" + "testing" + "time" + + "github.com/artie-labs/transfer/lib/ptr" + + "github.com/artie-labs/transfer/lib/typing/decimal" + + "github.com/artie-labs/transfer/lib/typing/columns" + + "github.com/artie-labs/transfer/lib/config/constants" + + "github.com/artie-labs/transfer/lib/typing/ext" + + "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" +) + +type _testCase struct { + name string + colVal interface{} + colKind columns.Column + + expectedString string + expectErr bool +} + +func evaluateTestCase(t *testing.T, testCase _testCase) { + actualString, actualErr := CastColValStaging(testCase.colVal, testCase.colKind) + if testCase.expectErr { + assert.Error(t, actualErr, testCase.name) + } else { + assert.NoError(t, actualErr, testCase.name) + } + assert.Equal(t, testCase.expectedString, actualString, testCase.name) +} + +func (r *RedshiftTestSuite) TestCastColValStaging_Basic() { + testCases := []_testCase{ + { + name: "empty string", + colVal: "", + colKind: columns.Column{ + KindDetails: typing.String, + }, + + expectedString: "", + }, + { + name: "null value (string, not that it matters)", + colVal: nil, + colKind: columns.Column{ + KindDetails: typing.String, + }, + + expectedString: `\N`, + }, + { + name: "string", + colVal: "foo", + colKind: columns.Column{ + KindDetails: typing.String, + }, + + expectedString: "foo", + }, + { + name: "integer", + colVal: 7, + colKind: columns.Column{ + KindDetails: typing.Integer, + }, + expectedString: "7", + }, + { + name: "boolean", + colVal: true, + colKind: columns.Column{ + KindDetails: typing.Boolean, + }, + expectedString: "true", + }, + { + name: "array", + colVal: []string{"hello", "there"}, + colKind: columns.Column{ + KindDetails: typing.Array, + }, + expectedString: `["hello","there"]`, + }, + { + name: "array (string with interface type)", + colVal: []interface{}{"hello", "there", "world"}, + colKind: columns.Column{ + KindDetails: typing.String, + }, + expectedString: `["hello","there","world"]`, + }, + { + name: "JSON string", + colVal: `{"hello": "world"}`, + colKind: columns.Column{ + KindDetails: typing.Struct, + }, + expectedString: `{"hello": "world"}`, + }, + { + name: "JSON struct", + colVal: map[string]string{"hello": "world"}, + colKind: columns.Column{ + KindDetails: typing.Struct, + }, + expectedString: `{"hello":"world"}`, + }, + { + name: "numeric data types (backwards compatibility)", + colVal: decimal.NewDecimal(2, ptr.ToInt(5), big.NewFloat(55.22)), + colKind: columns.Column{ + KindDetails: typing.Float, + }, + + expectedString: "55.22", + }, + { + name: "numeric data types", + colVal: decimal.NewDecimal(2, ptr.ToInt(38), big.NewFloat(585692791691858.25)), + colKind: columns.Column{ + KindDetails: typing.EDecimal, + }, + expectedString: "585692791691858.25", + }, + } + + for _, testCase := range testCases { + evaluateTestCase(r.T(), testCase) + } +} + +func (r *RedshiftTestSuite) TestCastColValStaging_Array() { + testCases := []_testCase{ + { + name: "array w/ numbers", + colVal: []int{1, 2, 3, 4, 5}, + colKind: columns.Column{ + KindDetails: typing.Array, + }, + expectedString: `[1,2,3,4,5]`, + }, + { + name: "array w/ nested objects (JSON)", + colKind: columns.Column{ + KindDetails: typing.Array, + }, + colVal: []map[string]interface{}{ + { + "dusty": "the mini aussie", + }, + { + "robin": "tang", + }, + { + "foo": "bar", + }, + }, + expectedString: `[{"dusty":"the mini aussie"},{"robin":"tang"},{"foo":"bar"}]`, + }, + { + name: "array w/ bools", + colKind: columns.Column{ + KindDetails: typing.Array, + }, + colVal: []bool{ + true, + true, + false, + false, + true, + }, + expectedString: `[true,true,false,false,true]`, + }, + } + + for _, testCase := range testCases { + evaluateTestCase(r.T(), testCase) + } +} + +// TestCastColValStaging_Time - will test all the variants of date, time and date time. +func (r *RedshiftTestSuite) TestCastColValStaging_Time() { + birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) + // date + dateKind := typing.ETime + dateKind.ExtendedTimeDetails = &ext.Date + // time + timeKind := typing.ETime + timeKind.ExtendedTimeDetails = &ext.Time + // date time + dateTimeKind := typing.ETime + dateTimeKind.ExtendedTimeDetails = &ext.DateTime + + birthdate, err := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "") + assert.NoError(r.T(), err) + + birthTime, err := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "") + assert.NoError(r.T(), err) + + birthDateTime, err := ext.NewExtendedTime(birthday, dateTimeKind.ExtendedTimeDetails.Type, "") + assert.NoError(r.T(), err) + + testCases := []_testCase{ + { + name: "date", + colVal: birthdate, + colKind: columns.Column{ + KindDetails: dateKind, + }, + expectedString: "2022-09-06", + }, + { + name: "time", + colVal: birthTime, + colKind: columns.Column{ + KindDetails: timeKind, + }, + expectedString: "03:19:24.942", + }, + { + name: "datetime", + colVal: birthDateTime, + colKind: columns.Column{ + KindDetails: dateTimeKind, + }, + expectedString: "2022-09-06T03:19:24.942Z", + }, + } + + for _, testCase := range testCases { + evaluateTestCase(r.T(), testCase) + } +} + +func (r *RedshiftTestSuite) TestCastColValStaging_TOAST() { + // Toast only really matters for JSON blobs since it'll return a STRING value that's not a JSON object. + // We're testing that we're casting the unavailable value correctly into a JSON object so that it can compile. + testCases := []_testCase{ + { + name: "struct with TOAST value", + colVal: constants.ToastUnavailableValuePlaceholder, + colKind: columns.Column{ + KindDetails: typing.Struct, + }, + expectedString: fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), + }, + } + + for _, testCase := range testCases { + evaluateTestCase(r.T(), testCase) + } +} diff --git a/clients/redshift/ddl.go b/clients/redshift/ddl.go new file mode 100644 index 000000000..32ca83cf8 --- /dev/null +++ b/clients/redshift/ddl.go @@ -0,0 +1,118 @@ +package redshift + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" + + "github.com/artie-labs/transfer/lib/dwh/types" + "github.com/artie-labs/transfer/lib/logger" + "github.com/artie-labs/transfer/lib/typing/columns" +) + +type getTableConfigArgs struct { + Table string + Schema string + DropDeletedColumns bool +} + +const ( + describeNameCol = "column_name" + describeTypeCol = "data_type" + describeDescriptionCol = "description" +) + +func (s *Store) getTableConfig(ctx context.Context, args getTableConfigArgs) (*types.DwhTableConfig, error) { + fqName := fmt.Sprintf("%s.%s", args.Schema, args.Table) + + // Check if it already exists in cache + tableConfig := s.configMap.TableConfig(fqName) + if tableConfig != nil { + return tableConfig, nil + } + + log := logger.FromContext(ctx) + // This query is a modified fork from: https://gist.github.com/alexanderlz/7302623 + query := fmt.Sprintf(`select c.column_name,c.data_type,d.description +from information_schema.columns c +left join pg_class c1 on c.table_name=c1.relname +left join pg_catalog.pg_namespace n on c.table_schema=n.nspname and c1.relnamespace=n.oid +left join pg_catalog.pg_description d on d.objsubid=c.ordinal_position and d.objoid=c1.oid +where c.table_name='%s' and c.table_schema='%s'`, args.Table, args.Schema) + rows, err := s.Query(query) + defer func() { + if rows != nil { + err = rows.Close() + if err != nil { + log.WithError(err).Warn("Failed to close the row") + } + } + }() + + var tableMissing bool + if err != nil { + return nil, fmt.Errorf("failed to query redshift, err: %v", err) + } + + var redshiftCols columns.Columns + for rows != nil && rows.Next() { + // figure out what columns were returned + // the column names will be the JSON object field keys + cols, err := rows.ColumnTypes() + if err != nil { + return nil, err + } + + var columnNameList []string + // Scan needs an array of pointers to the values it is setting + // This creates the object and sets the values correctly + values := make([]interface{}, len(cols)) + for idx, column := range cols { + values[idx] = new(interface{}) + columnNameList = append(columnNameList, strings.ToLower(column.Name())) + } + + err = rows.Scan(values...) + if err != nil { + return nil, err + } + + row := make(map[string]string) + for idx, val := range values { + interfaceVal, isOk := val.(*interface{}) + if !isOk || interfaceVal == nil { + return nil, errors.New("invalid value") + } + + row[columnNameList[idx]] = strings.ToLower(fmt.Sprint(*interfaceVal)) + } + + col := columns.NewColumn(row[describeNameCol], typing.RedshiftTypeToKind(row[describeTypeCol])) + if comment, isOk := row[describeDescriptionCol]; isOk && comment != "" { + // Try to parse the comment. + var _colComment constants.ColComment + err = json.Unmarshal([]byte(comment), &_colComment) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal comment, err: %v", err) + } + + col.SetBackfilled(_colComment.Backfilled) + } + + redshiftCols.AddColumn(col) + } + + // Do it this way via rows.Next() because that will move the iterator and cause us to miss a column. + if len(redshiftCols.GetColumns()) == 0 { + tableMissing = true + } + + redshiftTableCfg := types.NewDwhTableConfig(&redshiftCols, nil, tableMissing, args.DropDeletedColumns) + s.configMap.AddTableToConfig(fqName, redshiftTableCfg) + return redshiftTableCfg, nil +} diff --git a/clients/redshift/redshift.go b/clients/redshift/redshift.go new file mode 100644 index 000000000..94bd03fbd --- /dev/null +++ b/clients/redshift/redshift.go @@ -0,0 +1,196 @@ +package redshift + +import ( + "context" + "fmt" + + "github.com/artie-labs/transfer/clients/utils" + + "github.com/artie-labs/transfer/lib/dwh/dml" + "github.com/artie-labs/transfer/lib/ptr" + + "github.com/artie-labs/transfer/lib/dwh/ddl" + "github.com/artie-labs/transfer/lib/logger" + "github.com/artie-labs/transfer/lib/typing/columns" + + "github.com/artie-labs/transfer/lib/config" + _ "github.com/lib/pq" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/db" + "github.com/artie-labs/transfer/lib/dwh/types" + "github.com/artie-labs/transfer/lib/optimization" +) + +type Store struct { + credentialsClause string + bucket string + optionalS3Prefix string + configMap *types.DwhToTablesConfigMap + db.Store +} + +func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap { + if s == nil { + return nil + } + + return s.configMap +} + +func (s *Store) Label() constants.DestinationKind { + return constants.Redshift +} + +func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil { + // There's no rows or columns. Let's skip. + return nil + } + + tableConfig, err := s.getTableConfig(ctx, getTableConfigArgs{ + Table: tableData.Name(), + Schema: tableData.TopicConfig.Schema, + DropDeletedColumns: tableData.TopicConfig.DropDeletedColumns, + }) + if err != nil { + return err + } + + log := logger.FromContext(ctx) + fqName := tableData.ToFqName(ctx, s.Label()) + // Check if all the columns exist in Redshift + srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(), tableData.TopicConfig.SoftDelete) + createAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + FqTableName: fqName, + CreateTable: tableConfig.CreateTable(), + ColumnOp: constants.Add, + CdcTime: tableData.LatestCDCTs, + } + + // Keys that exist in CDC stream, but not in Redshift + err = ddl.AlterTable(ctx, createAlterTableArgs, targetKeysMissing...) + if err != nil { + log.WithError(err).Warn("failed to apply alter table") + return err + } + + // Keys that exist in Redshift, but don't exist in our CDC stream. + // createTable is set to false because table creation requires a column to be added + // Which means, we'll only do it upon Add columns. + deleteAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + FqTableName: fqName, + CreateTable: false, + ColumnOp: constants.Delete, + CdcTime: tableData.LatestCDCTs, + } + + err = ddl.AlterTable(ctx, deleteAlterTableArgs, srcKeysMissing...) + if err != nil { + log.WithError(err).Warn("failed to apply alter table") + return err + } + + // Make sure we are still trying to delete it. + // If not, then we should assume the column is good and then remove it from our in-mem store. + for colToDelete := range tableConfig.ReadOnlyColumnsToDelete() { + var found bool + for _, col := range srcKeysMissing { + if found = col.Name(nil) == colToDelete; found { + // Found it. + break + } + } + + if !found { + // Only if it is NOT found shall we try to delete from in-memory (because we caught up) + tableConfig.ClearColumnsToDeleteByColName(colToDelete) + } + } + + tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...) + + // Temporary tables cannot specify schemas, so we just prefix it instead. + temporaryTableName := fmt.Sprintf("%s_%s", tableData.ToFqName(ctx, s.Label()), tableData.TempTableSuffix()) + if err = s.prepareTempTable(ctx, tableData, tableConfig, temporaryTableName); err != nil { + return err + } + + // Now iterate over all the in-memory cols and see which one requires backfill. + for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() { + err = utils.BackfillColumn(ctx, s, col, tableData.ToFqName(ctx, s.Label())) + if err != nil { + defaultVal, _ := col.DefaultValue(nil) + return fmt.Errorf("failed to backfill col: %v, default value: %v, error: %v", + col.Name(nil), defaultVal, err) + } + + tableConfig.Columns().UpsertColumn(col.Name(nil), columns.UpsertColumnArg{ + Backfilled: ptr.ToBool(true), + }) + } + + // Prepare merge statement + mergeParts, err := dml.MergeStatementParts(dml.MergeArgument{ + FqTableName: fqName, + SubQuery: temporaryTableName, + IdempotentKey: tableData.TopicConfig.IdempotentKey, + PrimaryKeys: tableData.PrimaryKeys(&columns.NameArgs{ + Escape: true, + DestKind: s.Label(), + }), + Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(&columns.NameArgs{ + Escape: true, + DestKind: s.Label(), + }), + ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(), + SoftDelete: tableData.TopicConfig.SoftDelete, + Redshift: true, + }) + + tx, err := s.Begin() + if err != nil { + return fmt.Errorf("failed to start tx, err: %v", err) + } + + for _, mergeQuery := range mergeParts { + _, err = tx.Exec(mergeQuery) + if err != nil { + return fmt.Errorf("failed to merge, query: %v, err: %v", mergeQuery, err) + } + } + + if err = tx.Commit(); err != nil { + return fmt.Errorf("failed to merge, parts: %v, err: %v", mergeParts, err) + } + + _ = ddl.DropTemporaryTable(ctx, s, temporaryTableName, false) + return err +} + +func LoadRedshift(ctx context.Context, _store *db.Store) *Store { + if _store != nil { + // Used for tests. + return &Store{ + Store: *_store, + configMap: &types.DwhToTablesConfigMap{}, + } + } + + settings := config.FromContext(ctx) + connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=require", + settings.Config.Redshift.Host, settings.Config.Redshift.Port, settings.Config.Redshift.Username, + settings.Config.Redshift.Password, settings.Config.Redshift.Database) + + return &Store{ + credentialsClause: settings.Config.Redshift.CredentialsClause, + bucket: settings.Config.Redshift.Bucket, + optionalS3Prefix: settings.Config.Redshift.OptionalS3Prefix, + Store: db.Open(ctx, "postgres", connStr), + configMap: &types.DwhToTablesConfigMap{}, + } +} diff --git a/clients/redshift/redshift_suite_test.go b/clients/redshift/redshift_suite_test.go new file mode 100644 index 000000000..6a6c4fabf --- /dev/null +++ b/clients/redshift/redshift_suite_test.go @@ -0,0 +1,32 @@ +package redshift + +import ( + "context" + "testing" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/db" + "github.com/artie-labs/transfer/lib/mocks" + "github.com/stretchr/testify/suite" +) + +type RedshiftTestSuite struct { + suite.Suite + fakeStore *mocks.FakeStore + store *Store + ctx context.Context +} + +func (r *RedshiftTestSuite) SetupTest() { + r.ctx = config.InjectSettingsIntoContext(context.Background(), &config.Settings{ + VerboseLogging: false, + }) + + r.fakeStore = &mocks.FakeStore{} + store := db.Store(r.fakeStore) + r.store = LoadRedshift(r.ctx, &store) +} + +func TestRedshiftTestSuite(t *testing.T) { + suite.Run(t, new(RedshiftTestSuite)) +} diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go new file mode 100644 index 000000000..f911998bc --- /dev/null +++ b/clients/redshift/staging.go @@ -0,0 +1,106 @@ +package redshift + +import ( + "context" + "encoding/csv" + "fmt" + "os" + "time" + + "github.com/artie-labs/transfer/lib/typing" + + "github.com/artie-labs/transfer/lib/s3" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/dwh/ddl" + "github.com/artie-labs/transfer/lib/dwh/types" + "github.com/artie-labs/transfer/lib/logger" + "github.com/artie-labs/transfer/lib/optimization" +) + +func (s *Store) prepareTempTable(ctx context.Context, tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableName string) error { + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + FqTableName: tempTableName, + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + } + + if err := ddl.AlterTable(ctx, tempAlterTableArgs, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table, error: %v", err) + } + + expiryString := typing.ExpiresDate(time.Now().UTC().Add(ddl.TempTableTTL)) + // Now add a comment to the temporary table. + if _, err := s.Exec(fmt.Sprintf(`COMMENT ON TABLE %s IS '%s';`, tempTableName, ddl.ExpiryComment(expiryString))); err != nil { + return fmt.Errorf("failed to add comment to table, tableName: %v, err: %v", tempTableName, err) + } + + fp, err := s.loadTemporaryTable(tableData, tempTableName) + if err != nil { + return fmt.Errorf("failed to load temporary table, err: %v", err) + } + + // Load fp into s3, get S3 URI and pass it down. + s3Uri, err := s3.UploadLocalFileToS3(ctx, s3.UploadArgs{ + OptionalS3Prefix: s.optionalS3Prefix, + Bucket: s.bucket, + FilePath: fp, + }) + + if err != nil { + return fmt.Errorf("failed to upload this to s3, err: %v", err) + } + + // COPY table_name FROM '/path/to/local/file' DELIMITER '\t' NULL '\\N' FORMAT csv; + // Note, we need to specify `\\N` here and in `CastColVal(..)` we are only doing `\N`, this is because Redshift treats backslashes as an escape character. + // So, it'll convert `\N` => `\\N` during COPY. + copyStmt := fmt.Sprintf(`COPY %s FROM '%s' DELIMITER '\t' NULL AS '\\N' FORMAT CSV %s dateformat 'auto' timeformat 'auto';`, tempTableName, s3Uri, s.credentialsClause) + if _, err = s.Exec(copyStmt); err != nil { + return fmt.Errorf("failed to run COPY for temporary table, err: %v, copy: %v", err, copyStmt) + } + + if deleteErr := os.RemoveAll(fp); deleteErr != nil { + logger.FromContext(ctx).WithError(deleteErr).WithField("filePath", fp).Warn("failed to delete temp file") + } + + return nil +} + +// loadTemporaryTable will write the data into /tmp/newTableName.csv +// This way, another function can call this and then invoke a Snowflake PUT. +// Returns the file path and potential error +func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableName string) (string, error) { + filePath := fmt.Sprintf("/tmp/%s.csv", newTableName) + file, err := os.Create(filePath) + if err != nil { + return "", err + } + + defer file.Close() + writer := csv.NewWriter(file) + writer.Comma = '\t' + for _, value := range tableData.RowsData() { + var row []string + for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(nil) { + colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col) + colVal := value[col] + // Check + castedValue, castErr := CastColValStaging(colVal, colKind) + if castErr != nil { + return "", castErr + } + + row = append(row, castedValue) + } + + if err = writer.Write(row); err != nil { + return "", fmt.Errorf("failed to write to csv, err: %v", err) + } + } + + writer.Flush() + return filePath, writer.Error() +} diff --git a/clients/redshift/sweep.go b/clients/redshift/sweep.go new file mode 100644 index 000000000..7e5f1bd85 --- /dev/null +++ b/clients/redshift/sweep.go @@ -0,0 +1,60 @@ +package redshift + +import ( + "context" + "database/sql" + "fmt" + + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/dwh/ddl" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/logger" +) + +func (s *Store) Sweep(ctx context.Context) error { + logger.FromContext(ctx).Info("looking to see if there are any dangling artie temporary tables to delete...") + // Find all the database and schema pairings + // Then iterate over information schema + // Find anything that has __artie__ in the table name + // Find the comment + // If the table should be killed, it will drop it. + tcs, err := config.FromContext(ctx).Config.TopicConfigs() + if err != nil { + return err + } + + dbAndSchemaPairs := kafkalib.GetUniqueDatabaseAndSchema(tcs) + for _, dbAndSchemaPair := range dbAndSchemaPairs { + // ILIKE is used to be case-insensitive since Snowflake stores all the tables in UPPER. + var rows *sql.Rows + rows, err = s.Store.Query(fmt.Sprintf( + `select c.relname, d.description from pg_catalog.pg_description d +JOIN pg_class c on d.objoid = c.oid +JOIN pg_catalog.pg_namespace n on n.oid = c.relnamespace +WHERE n.nspname = '%s' and c.relname ILIKE '%s';`, + dbAndSchemaPair.Schema, + "%"+constants.ArtiePrefix+"%")) + if err != nil { + return err + } + + for rows != nil && rows.Next() { + var tableName, comment string + err = rows.Scan(&tableName, &comment) + if err != nil { + return err + } + + if ddl.ShouldDelete(comment) { + err = ddl.DropTemporaryTable(ctx, s, + fmt.Sprintf("%s.%s.%s", dbAndSchemaPair.Database, dbAndSchemaPair.Schema, tableName), true) + if err != nil { + return err + } + } + } + } + + return nil +} diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 5a08e8598..2edf0c0f9 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -7,6 +7,8 @@ import ( "os" "strings" + "github.com/artie-labs/transfer/clients/utils" + "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing/columns" @@ -19,43 +21,6 @@ import ( "github.com/artie-labs/transfer/lib/optimization" ) -// BackfillColumn will perform a backfill to the destination and also update the comment within a transaction. -// Source: https://docs.snowflake.com/en/sql-reference/sql/comment -func (s *Store) backfillColumn(ctx context.Context, column columns.Column, fqTableName string) error { - if !column.ShouldBackfill() { - // If we don't need to backfill, don't backfill. - return nil - } - - fqTableName = strings.ToLower(fqTableName) - defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{ - Escape: true, - }) - if err != nil { - return fmt.Errorf("failed to escape default value, err: %v", err) - } - - escapedCol := column.Name(&columns.NameArgs{Escape: true, DestKind: s.Label()}) - query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`, - // UPDATE table SET col = default_val WHERE col IS NULL - fqTableName, escapedCol, defaultVal, escapedCol, - ) - logger.FromContext(ctx).WithFields(map[string]interface{}{ - "colName": column.Name(nil), - "query": query, - "table": fqTableName, - }).Info("backfilling column") - - _, err = s.Exec(query) - if err != nil { - return fmt.Errorf("failed to backfill, err: %v, query: %v", err, query) - } - - query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, fqTableName, escapedCol, `{"backfilled": true}`) - _, err = s.Exec(query) - return err -} - // prepareTempTable does the following: // 1) Create the temporary table // 2) Load in-memory table -> CSV @@ -216,7 +181,7 @@ func (s *Store) mergeWithStages(ctx context.Context, tableData *optimization.Tab // Now iterate over all the in-memory cols and see which one requires backfill. for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() { - err = s.backfillColumn(ctx, col, tableData.ToFqName(ctx, s.Label())) + err = utils.BackfillColumn(ctx, s, col, tableData.ToFqName(ctx, s.Label())) if err != nil { defaultVal, _ := col.DefaultValue(nil) return fmt.Errorf("failed to backfill col: %v, default value: %v, error: %v", diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 510ad220b..c4c8fb782 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -7,6 +7,8 @@ import ( "os" "strings" + "github.com/artie-labs/transfer/clients/utils" + "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/dwh/types" @@ -53,7 +55,7 @@ func (s *SnowflakeTestSuite) TestBackfillColumn() { } for _, testCase := range testCases { - err := s.stageStore.backfillColumn(s.ctx, testCase.col, fqTableName) + err := utils.BackfillColumn(s.ctx, s.stageStore, testCase.col, fqTableName) if testCase.expectErr { assert.Error(s.T(), err, testCase.name) continue diff --git a/clients/snowflake/sweep.go b/clients/snowflake/sweep.go index 582c390ce..20998c7c3 100644 --- a/clients/snowflake/sweep.go +++ b/clients/snowflake/sweep.go @@ -4,8 +4,6 @@ import ( "context" "database/sql" "fmt" - "strings" - "time" "github.com/artie-labs/transfer/lib/dwh/ddl" @@ -13,25 +11,8 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/logger" - "github.com/artie-labs/transfer/lib/typing" ) -func shouldDelete(comment string) (shouldDelete bool) { - // expires:2023-05-26 05:57:48 UTC - if strings.HasPrefix(comment, constants.SnowflakeExpireCommentPrefix) { - trimmedComment := strings.TrimPrefix(comment, constants.SnowflakeExpireCommentPrefix) - ts, err := typing.FromExpiresDateStringToTime(trimmedComment) - if err != nil { - return false - } - - // We should delete it if the time right now is AFTER the ts in the comment. - return time.Now().After(ts) - } - - return false -} - func (s *Store) Sweep(ctx context.Context) error { if !s.useStaging { return nil @@ -67,7 +48,7 @@ func (s *Store) Sweep(ctx context.Context) error { return err } - if shouldDelete(comment) { + if ddl.ShouldDelete(comment) { err = ddl.DropTemporaryTable(ctx, s, fmt.Sprintf("%s.%s.%s", dbAndSchemaPair.Database, dbAndSchemaPair.Schema, tableName), true) if err != nil { diff --git a/clients/snowflake/sweep_test.go b/clients/snowflake/sweep_test.go index 27eccfd87..4c6971e9e 100644 --- a/clients/snowflake/sweep_test.go +++ b/clients/snowflake/sweep_test.go @@ -1,70 +1,13 @@ package snowflake import ( - "fmt" - "time" - "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) -func (s *SnowflakeTestSuite) TestShouldDelete() { - type _testCase struct { - name string - comment string - expectDelete bool - } - now := time.Now() - oneHourAgo := now.Add(-1 * time.Hour) - oneHourFromNow := now.Add(1 * time.Hour) - testCases := []_testCase{ - { - name: "random", - comment: "random", - expectDelete: false, - }, - { - name: "one hour from now, but no expires: prefix", - comment: typing.ExpiresDate(oneHourFromNow), - expectDelete: false, - }, - { - name: "one hour ago, but no expires: prefix", - comment: typing.ExpiresDate(oneHourAgo), - expectDelete: false, - }, - { - name: "one hour ago, with prefix, but extra space", - comment: fmt.Sprintf("%s %s", constants.SnowflakeExpireCommentPrefix, typing.ExpiresDate(oneHourAgo)), - expectDelete: false, - }, - { - name: "one hour from now, with prefix, but extra space", - comment: fmt.Sprintf("%s %s", constants.SnowflakeExpireCommentPrefix, typing.ExpiresDate(oneHourFromNow)), - expectDelete: false, - }, - { - name: "one hour ago (expired)", - comment: fmt.Sprintf("%s%s", constants.SnowflakeExpireCommentPrefix, typing.ExpiresDate(oneHourAgo)), - expectDelete: true, - }, - { - name: "one hour from now (not yet expired)", - comment: fmt.Sprintf("%s%s", constants.SnowflakeExpireCommentPrefix, typing.ExpiresDate(oneHourFromNow)), - expectDelete: false, - }, - } - - for _, testCase := range testCases { - actualShouldDelete := shouldDelete(testCase.comment) - assert.Equal(s.T(), testCase.expectDelete, actualShouldDelete, testCase.name) - } -} - func (s *SnowflakeTestSuite) TestSweep() { // This is a no-op, since store isn't a store w/ stages. assert.NoError(s.T(), s.store.Sweep(s.ctx)) diff --git a/clients/utils/utils.go b/clients/utils/utils.go new file mode 100644 index 000000000..cf9a6a5b9 --- /dev/null +++ b/clients/utils/utils.go @@ -0,0 +1,53 @@ +package utils + +import ( + "context" + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/config/constants" + + "github.com/artie-labs/transfer/lib/dwh" + "github.com/artie-labs/transfer/lib/logger" + "github.com/artie-labs/transfer/lib/typing/columns" +) + +func BackfillColumn(ctx context.Context, dwh dwh.DataWarehouse, column columns.Column, fqTableName string) error { + if dwh.Label() == constants.BigQuery { + return fmt.Errorf("bigquery does not use this method") + } + + if !column.ShouldBackfill() { + // If we don't need to backfill, don't backfill. + return nil + } + + fqTableName = strings.ToLower(fqTableName) + defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{ + Escape: true, + DestKind: dwh.Label(), + }) + if err != nil { + return fmt.Errorf("failed to escape default value, err: %v", err) + } + + escapedCol := column.Name(&columns.NameArgs{Escape: true, DestKind: dwh.Label()}) + query := fmt.Sprintf(`UPDATE %s SET %s = %v WHERE %s IS NULL;`, + // UPDATE table SET col = default_val WHERE col IS NULL + fqTableName, escapedCol, defaultVal, escapedCol, + ) + logger.FromContext(ctx).WithFields(map[string]interface{}{ + "colName": column.Name(nil), + "query": query, + "table": fqTableName, + }).Info("backfilling column") + + _, err = dwh.Exec(query) + if err != nil { + return fmt.Errorf("failed to backfill, err: %v, query: %v", err, query) + } + + query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, fqTableName, escapedCol, `{"backfilled": true}`) + _, err = dwh.Exec(query) + return err +} diff --git a/go.mod b/go.mod index 1dc45b271..e3a264bd9 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,13 @@ require ( cloud.google.com/go/bigquery v1.51.2 cloud.google.com/go/pubsub v1.30.0 github.com/DataDog/datadog-go v4.8.3+incompatible + github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/aws/aws-sdk-go-v2/config v1.17.8 + github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0 github.com/evalphobia/logrus_sentry v0.8.2 github.com/google/uuid v1.3.0 github.com/jessevdk/go-flags v1.5.0 + github.com/lib/pq v1.10.9 github.com/segmentio/kafka-go v0.4.38 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0 github.com/sirupsen/logrus v1.9.0 @@ -33,24 +36,22 @@ require ( github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect github.com/apache/arrow/go/v12 v12.0.0 // indirect github.com/apache/thrift v0.16.0 // indirect - github.com/aws/aws-sdk-go-v2 v1.16.16 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.12.21 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.17 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.34 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.18 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.17 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.17 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.23 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.6 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.19 // indirect - github.com/aws/smithy-go v1.13.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d // indirect @@ -93,14 +94,14 @@ require ( github.com/viant/xunsafe v0.8.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/crypto v0.7.0 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.9.0 // indirect + golang.org/x/crypto v0.10.0 // indirect + golang.org/x/mod v0.11.0 // indirect + golang.org/x/net v0.11.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.7.0 // indirect - golang.org/x/text v0.9.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/sync v0.2.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/text v0.10.0 // indirect + golang.org/x/tools v0.9.3 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/go.sum b/go.sum index 2e206801d..851c8616d 100644 --- a/go.sum +++ b/go.sum @@ -59,10 +59,12 @@ github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/P github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts= -github.com/aws/aws-sdk-go-v2 v1.16.16 h1:M1fj4FE2lB4NzRb9Y0xdWsn2P0+2UHVxwKyOa4YJNjk= github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 h1:tcFliCWne+zOuUfKNRn8JdFBuWPDuISDH08wD2ULkhk= +github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= +github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8/go.mod h1:JTnlBSot91steJeti4ryyu/tLd4Sk84O5W22L7O2EQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10/go.mod h1:VeTZetY5KRJLuD/7fkQXMU6Mw7H5m/KP2J5Iy9osMno= github.com/aws/aws-sdk-go-v2/config v1.17.2/go.mod h1:jumS/AMwul4WaG8vyXsF6kUndG9zndR+yfYBwl4i9ds= github.com/aws/aws-sdk-go-v2/config v1.17.8 h1:b9LGqNnOdg9vR4Q43tBTVWk4J6F+W774MSchvKJsqnE= github.com/aws/aws-sdk-go-v2/config v1.17.8/go.mod h1:UkCI3kb0sCdvtjiXYiU4Zx5h07BOpgBTtkPu/49r+kA= @@ -75,27 +77,35 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.17/go.mod h1:yIkQcCDYNsZfXpd github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.34 h1:1PNtaCM+2ruo1dfYL2RweUdtbuPvinjAejjNcPa/RQY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.34/go.mod h1:+Six+CXNHYllXam32j+YW8ixk82+am345ei89kEz8p4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.19/go.mod h1:llxE6bwUZhuCas0K7qGiu5OgMis3N7kdWtFSxoHmJ7E= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23 h1:s4g/wnzMf+qepSNgTvaQQHNxyMLKSawNhKCPNy++2xY= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23/go.mod h1:2DFxAQ9pfIRy0imBCJv+vZ2X6RKxves6fbnEuSry6b4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.13/go.mod h1:lB12mkZqCSo5PsdBFLNqc2M/OOYgNAy8UtaktyuWvE8= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17 h1:/K482T5A3623WJgWT8w1yRAFK4RzGzEl7y39yhtn9eA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17/go.mod h1:pRwaTYCJemADaqCbUAxltMoHKata7hmB5PjEXeu0kfg= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.20/go.mod h1:bfTcsThj5a9P5pIGRy0QudJ8k4+issxXX+O6Djnd5Cs= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 h1:wj5Rwc05hvUSvKuOF29IYb9QrCLjU+rHAy/x/o0DK2c= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24/go.mod h1:jULHjqqjDlbyTa7pfM7WICATnOv+iOhjletM3N0Xbu8= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 h1:ZSIPAkAsCCjYrhqfw2+lNzWDzxzHXEckFkTePL5RSWQ= github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14/go.mod h1:AyGgqiKv9ECM6IZeNQtdT8NnMvUb3/2wokeq2Fgryto= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 h1:Lh1AShsuIJTwMkoxVCAYPJgNG5H+eN6SmoUn8nOZ5wE= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 h1:wscW+pnn3J1OYnanMnza5ZVYXLX4cKk5rAvUAl4Qu+c= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26/go.mod h1:MtYiox5gvyB+OyP0Mr0Sm/yzbEAIPL9eijj/ouHAPw0= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9/go.mod h1:a9j48l6yL5XINLHLcOKInjdvknN+vWqPBxqeIDw7ktw= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.18 h1:BBYoNQt2kUZUUK4bIPsKrCcjVPUMNsgQpNAwhznK/zo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 h1:y2+VQzC6Zh2ojtV2LoC0MNwHWc6qXv/j2vrQtlftkdA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.18/go.mod h1:NS55eQ4YixUJPTC+INxi2/jCqe1y2Uw3rnh9wEOVJxY= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 h1:zZSLP3v3riMOP14H7b4XP0uyfREDQOYv2cqIrvTXDNQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29/go.mod h1:z7EjRjVwZ6pWcWdI2H64dKttvzaP99jRIj5hphW0M5U= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.13/go.mod h1:V390DK4MQxLpDdXxFqizyz8KUxuWImkW/xzgXMz0yyk= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.17 h1:Jrd/oMh0PKQc6+BowB+pLEwLIgaQF29eYbe7E1Av9Ug= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.17/go.mod h1:4nYOrY41Lrbk2170/BGkcJKBhws9Pfn8MG3aGqjjeFI= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.17 h1:HfVVR1vItaG6le+Bpw6P4midjBDMKnjMyZnw9MXYUcE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 h1:bkRyG4a929RCnpVSTvLM2j/T4ls015ZhhYApbmYs15s= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28/go.mod h1:jj7znCIg05jXlaGBlFMGP8+7UN3VtCkRBG2spnmRQkU= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.17/go.mod h1:YqMdV+gEKCQ59NrB7rzrJdALeBIsYiVi8Inj3+KcqHI= -github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 h1:3/gm/JTX9bX8CpzTgIlrtYpB3EVBDxyg/GY/QdcIEZw= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 h1:dBL3StFxHtpBzJJ/mNEsjXVgfO+7jR0dAIEwLqMapEA= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3/go.mod h1:f1QyiAsvIv4B49DmCqrhlXqyaR+0IxMmyX+1P+AnzOM= github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11/go.mod h1:fmgDANqTUCxciViKl9hb/zD5LFbvPINFRgWhDbR+vZo= +github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0 h1:ya7fmrN2fE7s1P2gaPbNg5MTkERVWfsH8ToP1YC4Z9o= +github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0/go.mod h1:aVbf0sko/TsLWHx30c/uVu7c62+0EAJ3vbxaJga0xCw= github.com/aws/aws-sdk-go-v2/service/sso v1.11.18/go.mod h1:ytmEi5+qwcSNcV2pVA8PIb1DnKT/0Bu/K4nfJHwoM6c= github.com/aws/aws-sdk-go-v2/service/sso v1.11.23 h1:pwvCchFUEnlceKIgPUouBJwK81aCkQ8UDMORfeFtW10= github.com/aws/aws-sdk-go-v2/service/sso v1.11.23/go.mod h1:/w0eg9IhFGjGyyncHIQrXtU8wvNsTJOP0R6PPj0wf80= @@ -106,8 +116,9 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.16.14/go.mod h1:Y+BUV19q3OmQVqNUlbZ4 github.com/aws/aws-sdk-go-v2/service/sts v1.16.19 h1:9pPi0PsFNAGILFfPCk8Y0iyEBGc6lu6OQ97U7hmdesg= github.com/aws/aws-sdk-go-v2/service/sts v1.16.19/go.mod h1:h4J3oPZQbxLhzGnk+j9dfYHi5qIOVJ5kczZd658/ydM= github.com/aws/smithy-go v1.13.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/aws/smithy-go v1.13.3 h1:l7LYxGuzK6/K+NzJ2mC+VvLUbae0sL3bXU//04MkmnA= github.com/aws/smithy-go v1.13.3/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= @@ -280,6 +291,8 @@ github.com/lestrrat-go/jwx v1.2.25 h1:tAx93jN2SdPvFn08fHNAhqFJazn5mBBOB8Zli0g0ot github.com/lestrrat-go/jwx v1.2.25/go.mod h1:zoNuZymNl5lgdcu6P7K6ie2QRll5HVfF4xwxBBK1NxY= github.com/lestrrat-go/option v1.0.0 h1:WqAWL8kh8VcSoD6xjSH34/1m8yxluXQbDeKNfvFeEO4= github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= @@ -414,8 +427,8 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -442,8 +455,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -468,8 +481,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -484,8 +497,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -509,11 +522,11 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= +golang.org/x/term v0.9.0 h1:GRRCnKYhdQrD8kfRAdQ6Zcw1P0OcELxGLKJvtjVMZ28= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -521,8 +534,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= +golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -538,8 +551,8 @@ golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= +golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lib/config/config.go b/lib/config/config.go index 13797f979..27081858b 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -5,6 +5,8 @@ import ( "io" "os" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/numbers" "gopkg.in/yaml.v3" @@ -56,6 +58,18 @@ type BigQuery struct { ProjectID string `yaml:"projectID"` } +type Redshift struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Bucket string `yaml:"bucket"` + OptionalS3Prefix string `yaml:"optionalS3Prefix"` + // https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html + CredentialsClause string `yaml:"credentialsClause"` +} + type Snowflake struct { AccountID string `yaml:"account"` Username string `yaml:"username"` @@ -106,6 +120,7 @@ type Config struct { // Supported destinations BigQuery *BigQuery `yaml:"bigquery"` Snowflake *Snowflake `yaml:"snowflake"` + Redshift *Redshift `yaml:"redshift"` Reporting struct { Sentry *Sentry `yaml:"sentry"` @@ -159,6 +174,27 @@ func readFileToConfig(pathToConfig string) (*Config, error) { return &config, nil } +func (c *Config) ValidateRedshift() error { + if c.Output != constants.Redshift { + return fmt.Errorf("output is not redshift, output: %v", c.Output) + } + + if c.Redshift == nil { + return fmt.Errorf("redshift cfg is nil") + } + + if empty := stringutil.Empty(c.Redshift.Host, c.Redshift.Database, c.Redshift.Username, + c.Redshift.Password, c.Redshift.Bucket, c.Redshift.CredentialsClause); empty { + return fmt.Errorf("one of redshift settings is empty") + } + + if c.Redshift.Port <= 0 { + return fmt.Errorf("redshift invalid port") + } + + return nil +} + // Validate will check the output source validity // It will also check if a topic exists + iterate over each topic to make sure it's valid. // The actual output source (like Snowflake) and CDC parser will be loaded and checked by other funcs. @@ -180,14 +216,21 @@ func (c *Config) Validate() error { return fmt.Errorf("config is invalid, buffer pool is too small, min value: %v, actual: %v", bufferPoolSizeStart, int(c.BufferRows)) } - if c.Output == constants.Snowflake && int(c.BufferRows) > bufferPoolSizeEnd { - return fmt.Errorf("snowflake does not allow more than 15k rows, actual: %v", int(c.BufferRows)) - } - if !constants.IsValidDestination(c.Output) { return fmt.Errorf("config is invalid, output: %s is invalid", c.Output) } + switch c.Output { + case constants.Redshift: + if err := c.ValidateRedshift(); err != nil { + return err + } + case constants.Snowflake: + if int(c.BufferRows) > bufferPoolSizeEnd { + return fmt.Errorf("snowflake does not allow more than 15k rows, actual: %v", int(c.BufferRows)) + } + } + if c.Queue == constants.Kafka { if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 { return fmt.Errorf("config is invalid, no kafka topic configs, kafka: %v", c.Kafka) diff --git a/lib/config/config_validate_test.go b/lib/config/config_validate_test.go new file mode 100644 index 000000000..4e9a612ec --- /dev/null +++ b/lib/config/config_validate_test.go @@ -0,0 +1,80 @@ +package config + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/config/constants" + + "github.com/stretchr/testify/assert" +) + +func TestCfg_ValidateRedshift(t *testing.T) { + type _testCase struct { + Name string + Redshift *Redshift + ExpectErr bool + } + + testCases := []_testCase{ + { + Name: "nil", + Redshift: nil, + ExpectErr: true, + }, + { + Name: "redshift settings exist, but all empty", + Redshift: &Redshift{}, + ExpectErr: true, + }, + { + Name: "redshift settings all set (missing port)", + Redshift: &Redshift{ + Host: "host", + Database: "db", + Username: "user", + Password: "pw", + Bucket: "bucket", + CredentialsClause: "creds", + }, + ExpectErr: true, + }, + { + Name: "redshift settings all set (neg port)", + Redshift: &Redshift{ + Host: "host", + Port: -500, + Database: "db", + Username: "user", + Password: "pw", + Bucket: "bucket", + CredentialsClause: "creds", + }, + ExpectErr: true, + }, + { + Name: "redshift settings all set", + Redshift: &Redshift{ + Host: "host", + Port: 123, + Database: "db", + Username: "user", + Password: "pw", + Bucket: "bucket", + CredentialsClause: "creds", + }, + }, + } + + for _, testCase := range testCases { + cfg := &Config{ + Redshift: testCase.Redshift, + Output: constants.Redshift, + } + err := cfg.ValidateRedshift() + if testCase.ExpectErr { + assert.Error(t, err, testCase.Name) + } else { + assert.NoError(t, err, testCase.Name) + } + } +} diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index 417d61eaf..01b7da077 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -1,6 +1,8 @@ package constants -import "time" +import ( + "time" +) const ( ToastUnavailableValuePlaceholder = "__debezium_unavailable_value" @@ -10,18 +12,15 @@ const ( // We will strip this out from our partition key parsing. DebeziumTopicRoutingKey = "__dbz__physicalTableIdentifier" - SnowflakeExpireCommentPrefix = "expires:" - ArtiePrefix = "__artie" - DeleteColumnMarker = ArtiePrefix + "_delete" - DeletionConfidencePadding = 4 * time.Hour + ArtiePrefix = "__artie" + DeleteColumnMarker = ArtiePrefix + "_delete" + DeletionConfidencePadding = 4 * time.Hour // DBZPostgresFormat is the only supported CDC format right now DBZPostgresFormat = "debezium.postgres" DBZPostgresAltFormat = "debezium.postgres.wal2json" DBZMongoFormat = "debezium.mongodb" DBZMySQLFormat = "debezium.mysql" - - BigQueryTempTableTTL = 6 * time.Hour ) // ReservedKeywords is populated from: https://docs.snowflake.com/en/sql-reference/reserved-keywords @@ -115,12 +114,14 @@ const ( Snowflake DestinationKind = "snowflake" Test DestinationKind = "test" BigQuery DestinationKind = "bigquery" + Redshift DestinationKind = "redshift" ) var validDestinations = []DestinationKind{ BigQuery, Snowflake, SnowflakeStages, + Redshift, Test, } diff --git a/lib/db/db.go b/lib/db/db.go index 74a5046fd..21a4997e4 100644 --- a/lib/db/db.go +++ b/lib/db/db.go @@ -10,6 +10,7 @@ import ( type Store interface { Exec(query string, args ...any) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) + Begin() (*sql.Tx, error) } func Open(ctx context.Context, driverName, dsn string) Store { diff --git a/lib/db/mock/db.go b/lib/db/mock/db.go index ece7c388f..2eabf439c 100644 --- a/lib/db/mock/db.go +++ b/lib/db/mock/db.go @@ -22,3 +22,8 @@ func (m *DB) Query(query string, args ...any) (*sql.Rows, error) { fmt.Println("Mock DB is querying", "query", query, "args", args) return m.Fake.Query(query, args) } + +func (m *DB) Begin() (*sql.Tx, error) { + fmt.Println("Mock DB Begin()") + return m.Fake.Begin() +} diff --git a/lib/dwh/ddl/ddl.go b/lib/dwh/ddl/ddl.go index 36476f1c9..0a4aaa4e3 100644 --- a/lib/dwh/ddl/ddl.go +++ b/lib/dwh/ddl/ddl.go @@ -108,8 +108,10 @@ func AlterTable(ctx context.Context, args AlterTableArgs, cols ...columns.Column if args.CreateTable { var sqlQuery string if args.TemporaryTable { - expiryString := typing.ExpiresDate(time.Now().UTC().Add(constants.BigQueryTempTableTTL)) + expiryString := typing.ExpiresDate(time.Now().UTC().Add(TempTableTTL)) switch args.Dwh.Label() { + case constants.Redshift: + sqlQuery = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", args.FqTableName, strings.Join(colSQLParts, ",")) case constants.BigQuery: sqlQuery = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s) OPTIONS (expiration_timestamp = TIMESTAMP("%s"))`, args.FqTableName, strings.Join(colSQLParts, ","), expiryString) @@ -121,7 +123,8 @@ func AlterTable(ctx context.Context, args AlterTableArgs, cols ...columns.Column sqlQuery = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE) COMMENT='%s'`, args.FqTableName, strings.Join(colSQLParts, ","), // Comment on the table - fmt.Sprintf("%s%s", constants.SnowflakeExpireCommentPrefix, expiryString)) + ExpiryComment(expiryString), + ) default: return fmt.Errorf("unexpected dwh: %v trying to create a temporary table", args.Dwh.Label()) } diff --git a/lib/dwh/ddl/expiry.go b/lib/dwh/ddl/expiry.go new file mode 100644 index 000000000..2982b9d20 --- /dev/null +++ b/lib/dwh/ddl/expiry.go @@ -0,0 +1,34 @@ +package ddl + +import ( + "fmt" + "strings" + "time" + + "github.com/artie-labs/transfer/lib/typing" +) + +const ( + ExpireCommentPrefix = "expires:" + TempTableTTL = 6 * time.Hour +) + +func ExpiryComment(expiryString string) string { + return fmt.Sprintf("%s%s", ExpireCommentPrefix, expiryString) +} + +func ShouldDelete(comment string) (shouldDelete bool) { + // expires:2023-05-26 05:57:48 UTC + if strings.HasPrefix(comment, ExpireCommentPrefix) { + trimmedComment := strings.TrimPrefix(comment, ExpireCommentPrefix) + ts, err := typing.FromExpiresDateStringToTime(trimmedComment) + if err != nil { + return false + } + + // We should delete it if the time right now is AFTER the ts in the comment. + return time.Now().After(ts) + } + + return false +} diff --git a/lib/dwh/ddl/expiry_test.go b/lib/dwh/ddl/expiry_test.go new file mode 100644 index 000000000..cd36b50e5 --- /dev/null +++ b/lib/dwh/ddl/expiry_test.go @@ -0,0 +1,63 @@ +package ddl + +import ( + "fmt" + "testing" + "time" + + "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" +) + +func TestShouldDelete(t *testing.T) { + type _testCase struct { + name string + comment string + expectDelete bool + } + now := time.Now() + oneHourAgo := now.Add(-1 * time.Hour) + oneHourFromNow := now.Add(1 * time.Hour) + testCases := []_testCase{ + { + name: "random", + comment: "random", + expectDelete: false, + }, + { + name: "one hour from now, but no expires: prefix", + comment: typing.ExpiresDate(oneHourFromNow), + expectDelete: false, + }, + { + name: "one hour ago, but no expires: prefix", + comment: typing.ExpiresDate(oneHourAgo), + expectDelete: false, + }, + { + name: "one hour ago, with prefix, but extra space", + comment: fmt.Sprintf("%s %s", ExpireCommentPrefix, typing.ExpiresDate(oneHourAgo)), + expectDelete: false, + }, + { + name: "one hour from now, with prefix, but extra space", + comment: fmt.Sprintf("%s %s", ExpireCommentPrefix, typing.ExpiresDate(oneHourFromNow)), + expectDelete: false, + }, + { + name: "one hour ago (expired)", + comment: fmt.Sprintf("%s%s", ExpireCommentPrefix, typing.ExpiresDate(oneHourAgo)), + expectDelete: true, + }, + { + name: "one hour from now (not yet expired)", + comment: fmt.Sprintf("%s%s", ExpireCommentPrefix, typing.ExpiresDate(oneHourFromNow)), + expectDelete: false, + }, + } + + for _, testCase := range testCases { + actualShouldDelete := ShouldDelete(testCase.comment) + assert.Equal(t, testCase.expectDelete, actualShouldDelete, testCase.name) + } +} diff --git a/lib/dwh/dml/merge.go b/lib/dwh/dml/merge.go index 598df9681..af166e87f 100644 --- a/lib/dwh/dml/merge.go +++ b/lib/dwh/dml/merge.go @@ -27,10 +27,119 @@ type MergeArgument struct { // BigQuery is used to: // 1) escape JSON columns // 2) merge temp table vs. subquery - BigQuery bool + BigQuery bool + // Redshift is used for: + // 1) Using as part of the MergeStatementIndividual + Redshift bool SoftDelete bool } +func MergeStatementParts(m MergeArgument) ([]string, error) { + if !m.Redshift { + return nil, fmt.Errorf("err - this is meant for redshift only") + } + + // We should not need idempotency key for DELETE + // This is based on the assumption that the primary key would be atomically increasing or UUID based + // With AI, the sequence will increment (never decrement). And UUID is there to prevent universal hash collision + // However, there may be edge cases where folks end up restoring deleted rows (which will contain the same PK). + + // We also need to do staged table's idempotency key is GTE target table's idempotency key + // This is because Snowflake does not respect NS granularity. + var idempotentClause string + if m.IdempotentKey != "" { + idempotentClause = fmt.Sprintf(" AND cc.%s >= c.%s", m.IdempotentKey, m.IdempotentKey) + } + + var equalitySQLParts []string + for _, primaryKey := range m.PrimaryKeys { + // We'll need to escape the primary key as well. + equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName()) + equalitySQLParts = append(equalitySQLParts, equalitySQL) + } + + // TODO solve for idempotency + if m.SoftDelete { + return []string{ + // INSERT + fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, + // insert into target (col1, col2, col3) + m.FqTableName, strings.Join(m.Columns, ","), + // SELECT cc.col1, cc.col2, ... FROM staging as CC + array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ + Vals: m.Columns, + Separator: ",", + Prefix: "cc.", + }), m.SubQuery, + // LEFT JOIN table on pk(s) + m.FqTableName, strings.Join(equalitySQLParts, " and "), + // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) + m.PrimaryKeys[0].EscapedName()), + // UPDATE + fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`, + // UPDATE table set col1 = cc. col1 + m.FqTableName, columns.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.Redshift), + // FROM table (temp) WHERE join on PK(s) + m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, + ), + }, nil + } + + // We also need to remove __artie flags since it does not exist in the destination table + var removed bool + for idx, col := range m.Columns { + if col == constants.DeleteColumnMarker { + m.Columns = append(m.Columns[:idx], m.Columns[idx+1:]...) + removed = true + break + } + } + + if !removed { + return nil, errors.New("artie delete flag doesn't exist") + } + + var pks []string + for _, pk := range m.PrimaryKeys { + pks = append(pks, pk.EscapedName()) + } + + return []string{ + // INSERT + fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`, + // insert into target (col1, col2, col3) + m.FqTableName, strings.Join(m.Columns, ","), + // SELECT cc.col1, cc.col2, ... FROM staging as CC + array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ + Vals: m.Columns, + Separator: ",", + Prefix: "cc.", + }), m.SubQuery, + // LEFT JOIN table on pk(s) + m.FqTableName, strings.Join(equalitySQLParts, " and "), + // Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts) + m.PrimaryKeys[0].EscapedName()), + // UPDATE + fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`, + // UPDATE table set col1 = cc. col1 + m.FqTableName, columns.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.Redshift), + // FROM staging WHERE join on PK(s) + m.SubQuery, strings.Join(equalitySQLParts, " and "), idempotentClause, constants.DeleteColumnMarker, + ), + // DELETE + fmt.Sprintf(`DELETE FROM %s WHERE (%s) IN (SELECT %s FROM %s as cc WHERE cc.%s = true);`, + // DELETE from table where (pk_1, pk_2) + m.FqTableName, strings.Join(pks, ","), + // IN (cc.pk_1, cc.pk_2) FROM staging + array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ + Vals: pks, + Separator: ",", + Prefix: "cc.", + }), m.SubQuery, constants.DeleteColumnMarker, + ), + }, nil +} + // TODO - add validation to merge argument // TODO - simplify the whole escape / unescape columns logic. func MergeStatement(m MergeArgument) (string, error) { @@ -133,5 +242,4 @@ func MergeStatement(m MergeArgument) (string, error) { Separator: ",", Prefix: "cc.", })), nil - } diff --git a/lib/dwh/dml/merge_parts_test.go b/lib/dwh/dml/merge_parts_test.go new file mode 100644 index 000000000..f19f77443 --- /dev/null +++ b/lib/dwh/dml/merge_parts_test.go @@ -0,0 +1,264 @@ +package dml + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/config/constants" + + "github.com/artie-labs/transfer/lib/typing" + + "github.com/artie-labs/transfer/lib/typing/columns" + "github.com/stretchr/testify/assert" +) + +func TestMergeStatementPartsValidation(t *testing.T) { + for _, arg := range []MergeArgument{ + {BigQuery: true}, + {}, + } { + parts, err := MergeStatementParts(arg) + assert.Error(t, err) + assert.Nil(t, parts) + } +} + +type result struct { + PrimaryKeys []columns.Wrapper + Columns []string + ColumnsToTypes columns.Columns +} + +// getBasicColumnsForTest - will return you all the columns within `result` that are needed for tests. +// * In here, we'll return if compositeKey=false - id (pk), email, first_name, last_name, created_at, toast_text (TOAST-able) +// * Else if compositeKey=true - id(pk), email (pk), first_name, last_name, created_at, toast_text (TOAST-able) +func getBasicColumnsForTest(compositeKey bool) result { + idCol := columns.NewColumn("id", typing.Float) + emailCol := columns.NewColumn("email", typing.String) + textToastCol := columns.NewColumn("toast_text", typing.String) + textToastCol.ToastColumn = true + + var cols columns.Columns + cols.AddColumn(idCol) + cols.AddColumn(emailCol) + cols.AddColumn(columns.NewColumn("first_name", typing.String)) + cols.AddColumn(columns.NewColumn("last_name", typing.String)) + cols.AddColumn(columns.NewColumn("created_at", typing.ETime)) + cols.AddColumn(textToastCol) + cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)) + + var pks []columns.Wrapper + pks = append(pks, columns.NewWrapper(idCol, &columns.NameArgs{ + Escape: true, + DestKind: constants.Redshift, + })) + + if compositeKey { + pks = append(pks, columns.NewWrapper(emailCol, &columns.NameArgs{ + Escape: true, + DestKind: constants.Redshift, + })) + } + + var rawCols []string + for _, col := range cols.GetColumns() { + rawCols = append(rawCols, col.Name(&columns.NameArgs{ + Escape: true, + DestKind: constants.Redshift, + })) + } + + return result{ + PrimaryKeys: pks, + ColumnsToTypes: cols, + Columns: rawCols, + } +} + +func TestMergeStatementPartsSoftDelete(t *testing.T) { + fqTableName := "public.tableName" + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(false) + m := MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + SoftDelete: true, + } + + parts, err := MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 2, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text,__artie_delete) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text,cc.__artie_delete FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id WHERE c.id IS NULL;`, + parts[0]) + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END,__artie_delete=cc.__artie_delete FROM public.tableName__temp as cc WHERE c.id = cc.id;`, + parts[1]) + + m.IdempotentKey = "created_at" + parts, err = MergeStatementParts(m) + // Parts[0] for insertion should be identical + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text,__artie_delete) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text,cc.__artie_delete FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id WHERE c.id IS NULL;`, + parts[0]) + // Parts[1] where we're doing UPDATES will have idempotency key. + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END,__artie_delete=cc.__artie_delete FROM public.tableName__temp as cc WHERE c.id = cc.id AND cc.created_at >= c.created_at;`, + parts[1]) +} + +func TestMergeStatementPartsSoftDeleteComposite(t *testing.T) { + fqTableName := "public.tableName" + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(true) + m := MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + SoftDelete: true, + } + + parts, err := MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 2, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text,__artie_delete) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text,cc.__artie_delete FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id and c.email = cc.email WHERE c.id IS NULL;`, + parts[0]) + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END,__artie_delete=cc.__artie_delete FROM public.tableName__temp as cc WHERE c.id = cc.id and c.email = cc.email;`, + parts[1]) + + m.IdempotentKey = "created_at" + parts, err = MergeStatementParts(m) + // Parts[0] for insertion should be identical + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text,__artie_delete) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text,cc.__artie_delete FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id and c.email = cc.email WHERE c.id IS NULL;`, + parts[0]) + // Parts[1] where we're doing UPDATES will have idempotency key. + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END,__artie_delete=cc.__artie_delete FROM public.tableName__temp as cc WHERE c.id = cc.id and c.email = cc.email AND cc.created_at >= c.created_at;`, + parts[1]) +} + +func TestMergeStatementParts(t *testing.T) { + // Biggest difference with this test are: + // 1. We are not saving `__artie_deleted` column + // 2. There are 3 SQL queries (INSERT, UPDATE and DELETE) + fqTableName := "public.tableName" + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(false) + m := MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + } + + parts, err := MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id WHERE c.id IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END FROM public.tableName__temp as cc WHERE c.id = cc.id AND COALESCE(cc.__artie_delete, false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE (id) IN (SELECT cc.id FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`, + parts[2]) + + m = MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + IdempotentKey: "created_at", + } + + parts, err = MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id WHERE c.id IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END FROM public.tableName__temp as cc WHERE c.id = cc.id AND cc.created_at >= c.created_at AND COALESCE(cc.__artie_delete, false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE (id) IN (SELECT cc.id FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`, + parts[2]) +} + +func TestMergeStatementPartsCompositeKey(t *testing.T) { + fqTableName := "public.tableName" + tempTableName := "public.tableName__temp" + res := getBasicColumnsForTest(true) + m := MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + } + + parts, err := MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id and c.email = cc.email WHERE c.id IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END FROM public.tableName__temp as cc WHERE c.id = cc.id and c.email = cc.email AND COALESCE(cc.__artie_delete, false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE (id,email) IN (SELECT cc.id,cc.email FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`, + parts[2]) + + m = MergeArgument{ + FqTableName: fqTableName, + SubQuery: tempTableName, + PrimaryKeys: res.PrimaryKeys, + Columns: res.Columns, + ColumnsToTypes: res.ColumnsToTypes, + Redshift: true, + IdempotentKey: "created_at", + } + + parts, err = MergeStatementParts(m) + assert.NoError(t, err) + assert.Equal(t, 3, len(parts)) + + assert.Equal(t, + `INSERT INTO public.tableName (id,email,first_name,last_name,created_at,toast_text) SELECT cc.id,cc.email,cc.first_name,cc.last_name,cc.created_at,cc.toast_text FROM public.tableName__temp as cc LEFT JOIN public.tableName as c on c.id = cc.id and c.email = cc.email WHERE c.id IS NULL;`, + parts[0]) + + assert.Equal(t, + `UPDATE public.tableName as c SET id=cc.id,email=cc.email,first_name=cc.first_name,last_name=cc.last_name,created_at=cc.created_at,toast_text= CASE WHEN cc.toast_text != '__debezium_unavailable_value' THEN cc.toast_text ELSE c.toast_text END FROM public.tableName__temp as cc WHERE c.id = cc.id and c.email = cc.email AND cc.created_at >= c.created_at AND COALESCE(cc.__artie_delete, false) = false;`, + parts[1]) + + assert.Equal(t, + `DELETE FROM public.tableName WHERE (id,email) IN (SELECT cc.id,cc.email FROM public.tableName__temp as cc WHERE cc.__artie_delete = true);`, + parts[2]) +} diff --git a/lib/dwh/utils/load.go b/lib/dwh/utils/load.go index f96f177e1..291ec0c67 100644 --- a/lib/dwh/utils/load.go +++ b/lib/dwh/utils/load.go @@ -3,6 +3,8 @@ package utils import ( "context" + "github.com/artie-labs/transfer/clients/redshift" + "github.com/artie-labs/transfer/clients/bigquery" "github.com/artie-labs/transfer/clients/snowflake" "github.com/artie-labs/transfer/lib/config" @@ -35,10 +37,15 @@ func DataWarehouse(ctx context.Context, store *db.Store) dwh.DataWarehouse { if err := s.Sweep(ctx); err != nil { logger.FromContext(ctx).WithError(err).Fatalf("failed to clean up snowflake") } - return s case constants.BigQuery: return bigquery.LoadBigQuery(ctx, store) + case constants.Redshift: + s := redshift.LoadRedshift(ctx, store) + if err := s.Sweep(ctx); err != nil { + logger.FromContext(ctx).WithError(err).Fatalf("failed to clean up redshift") + } + return s } logger.FromContext(ctx).WithFields(map[string]interface{}{ diff --git a/lib/mocks/db.store.mock.go b/lib/mocks/db.store.mock.go index 562c8df8e..3ae721b0d 100644 --- a/lib/mocks/db.store.mock.go +++ b/lib/mocks/db.store.mock.go @@ -9,6 +9,18 @@ import ( ) type FakeStore struct { + BeginStub func() (*sql.Tx, error) + beginMutex sync.RWMutex + beginArgsForCall []struct { + } + beginReturns struct { + result1 *sql.Tx + result2 error + } + beginReturnsOnCall map[int]struct { + result1 *sql.Tx + result2 error + } ExecStub func(string, ...any) (sql.Result, error) execMutex sync.RWMutex execArgsForCall []struct { @@ -41,6 +53,62 @@ type FakeStore struct { invocationsMutex sync.RWMutex } +func (fake *FakeStore) Begin() (*sql.Tx, error) { + fake.beginMutex.Lock() + ret, specificReturn := fake.beginReturnsOnCall[len(fake.beginArgsForCall)] + fake.beginArgsForCall = append(fake.beginArgsForCall, struct { + }{}) + stub := fake.BeginStub + fakeReturns := fake.beginReturns + fake.recordInvocation("Begin", []interface{}{}) + fake.beginMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeStore) BeginCallCount() int { + fake.beginMutex.RLock() + defer fake.beginMutex.RUnlock() + return len(fake.beginArgsForCall) +} + +func (fake *FakeStore) BeginCalls(stub func() (*sql.Tx, error)) { + fake.beginMutex.Lock() + defer fake.beginMutex.Unlock() + fake.BeginStub = stub +} + +func (fake *FakeStore) BeginReturns(result1 *sql.Tx, result2 error) { + fake.beginMutex.Lock() + defer fake.beginMutex.Unlock() + fake.BeginStub = nil + fake.beginReturns = struct { + result1 *sql.Tx + result2 error + }{result1, result2} +} + +func (fake *FakeStore) BeginReturnsOnCall(i int, result1 *sql.Tx, result2 error) { + fake.beginMutex.Lock() + defer fake.beginMutex.Unlock() + fake.BeginStub = nil + if fake.beginReturnsOnCall == nil { + fake.beginReturnsOnCall = make(map[int]struct { + result1 *sql.Tx + result2 error + }) + } + fake.beginReturnsOnCall[i] = struct { + result1 *sql.Tx + result2 error + }{result1, result2} +} + func (fake *FakeStore) Exec(arg1 string, arg2 ...any) (sql.Result, error) { fake.execMutex.Lock() ret, specificReturn := fake.execReturnsOnCall[len(fake.execArgsForCall)] @@ -174,6 +242,8 @@ func (fake *FakeStore) QueryReturnsOnCall(i int, result1 *sql.Rows, result2 erro func (fake *FakeStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.beginMutex.RLock() + defer fake.beginMutex.RUnlock() fake.execMutex.RLock() defer fake.execMutex.RUnlock() fake.queryMutex.RLock() diff --git a/lib/optimization/event.go b/lib/optimization/event.go index c63bd18e0..dfc0e8897 100644 --- a/lib/optimization/event.go +++ b/lib/optimization/event.go @@ -132,6 +132,10 @@ func (t *TableData) RowsData() map[string]map[string]interface{} { func (t *TableData) ToFqName(ctx context.Context, kind constants.DestinationKind) string { switch kind { + case constants.Redshift: + // Redshift is Postgres compatible, so when establishing a connection, we'll specify a database. + // Thus, we only need to specify schema and table name here. + return fmt.Sprintf("%s.%s", t.TopicConfig.Schema, t.Name()) case constants.BigQuery: // The fully qualified name for BigQuery is: project_id.dataset.tableName. return fmt.Sprintf("%s.%s.%s", config.FromContext(ctx).Config.BigQuery.ProjectID, t.TopicConfig.Database, t.Name()) diff --git a/lib/optimization/event_test.go b/lib/optimization/event_test.go index 38136161c..42206cd55 100644 --- a/lib/optimization/event_test.go +++ b/lib/optimization/event_test.go @@ -19,29 +19,41 @@ import ( func TestNewTableData_TableName(t *testing.T) { type _testCase struct { - name string - tableName string - overrideName string + name string + tableName string + overrideName string + schema string + db string + expectedName string expectedSnowflakeFqName string expectedBigQueryFqName string + expectedRedshiftFqName string } testCases := []_testCase{ { - name: "no override is provided", - tableName: "food", + name: "no override is provided", + tableName: "food", + schema: "public", + db: "db", + expectedName: "food", - expectedSnowflakeFqName: "..food", - expectedBigQueryFqName: "artie..food", + expectedSnowflakeFqName: "db.public.food", + expectedBigQueryFqName: "artie.db.food", + expectedRedshiftFqName: "public.food", }, { - name: "override is provided", - tableName: "food", - overrideName: "drinks", + name: "override is provided", + tableName: "food", + schema: "public", + overrideName: "drinks", + db: "db", + expectedName: "drinks", - expectedSnowflakeFqName: "..drinks", // db, schema - expectedBigQueryFqName: "artie..drinks", // data set only + expectedSnowflakeFqName: "db.public.drinks", + expectedBigQueryFqName: "artie.db.drinks", + expectedRedshiftFqName: "public.food", }, } @@ -54,12 +66,17 @@ func TestNewTableData_TableName(t *testing.T) { }) for _, testCase := range testCases { - td := NewTableData(nil, nil, kafkalib.TopicConfig{TableName: testCase.overrideName}, testCase.tableName) + td := NewTableData(nil, nil, kafkalib.TopicConfig{ + Database: testCase.db, + TableName: testCase.overrideName, + Schema: testCase.schema, + }, testCase.tableName) assert.Equal(t, testCase.expectedName, td.Name(), testCase.name) assert.Equal(t, testCase.expectedName, td.name, testCase.name) assert.Equal(t, testCase.expectedSnowflakeFqName, td.ToFqName(ctx, constants.SnowflakeStages)) assert.Equal(t, testCase.expectedSnowflakeFqName, td.ToFqName(ctx, constants.Snowflake)) assert.Equal(t, testCase.expectedBigQueryFqName, td.ToFqName(ctx, constants.BigQuery)) + assert.Equal(t, testCase.expectedBigQueryFqName, td.ToFqName(ctx, constants.BigQuery)) } } diff --git a/lib/s3/s3.go b/lib/s3/s3.go new file mode 100644 index 000000000..ed745ad4b --- /dev/null +++ b/lib/s3/s3.go @@ -0,0 +1,55 @@ +package s3 + +import ( + "context" + "fmt" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type UploadArgs struct { + Bucket string + OptionalS3Prefix string + FilePath string +} + +// UploadLocalFileToS3 - takes a filepath with the file and bucket and optional expiry +// It will then upload it and then return the S3 URI and any error(s). +func UploadLocalFileToS3(ctx context.Context, args UploadArgs) (string, error) { + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return "", err + } + + s3Client := s3.NewFromConfig(cfg) + file, err := os.Open(args.FilePath) + if err != nil { + return "", err + } + + defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + return "", err + } + + objectKey := fileInfo.Name() + if args.OptionalS3Prefix != "" { + objectKey = fmt.Sprintf("%s/%s", args.OptionalS3Prefix, objectKey) + } + + _, err = s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(args.Bucket), + Key: aws.String(objectKey), + Body: file, + }) + + if err != nil { + return "", err + } + + return fmt.Sprintf("s3://%s/%s", args.Bucket, objectKey), nil +} diff --git a/lib/typing/columns/default.go b/lib/typing/columns/default.go index 78c1e02e2..fb0c34efb 100644 --- a/lib/typing/columns/default.go +++ b/lib/typing/columns/default.go @@ -3,6 +3,8 @@ package columns import ( "fmt" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing/ext" "github.com/artie-labs/transfer/lib/stringutil" @@ -12,7 +14,7 @@ import ( type DefaultValueArgs struct { Escape bool - BigQuery bool + DestKind constants.DestinationKind } func (c *Column) DefaultValue(args *DefaultValueArgs) (interface{}, error) { @@ -27,8 +29,11 @@ func (c *Column) DefaultValue(args *DefaultValueArgs) (interface{}, error) { switch c.KindDetails.Kind { case typing.Struct.Kind, typing.Array.Kind: - if args.BigQuery { + switch args.DestKind { + case constants.BigQuery: return "JSON" + stringutil.Wrap(c.defaultValue, false), nil + case constants.Redshift: + return stringutil.Wrap(c.defaultValue, false), nil } case typing.ETime.Kind: extTime, err := ext.ParseFromInterface(c.defaultValue) diff --git a/lib/typing/columns/default_test.go b/lib/typing/columns/default_test.go index 3202c6c6b..00ecaa118 100644 --- a/lib/typing/columns/default_test.go +++ b/lib/typing/columns/default_test.go @@ -3,6 +3,8 @@ package columns import ( "testing" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" @@ -65,10 +67,22 @@ func TestColumn_DefaultValue(t *testing.T) { }, args: &DefaultValueArgs{ Escape: true, - BigQuery: true, + DestKind: constants.BigQuery, }, expectedValue: "JSON'{}'", }, + { + name: "json (redshift)", + col: &Column{ + KindDetails: typing.Struct, + defaultValue: "{}", + }, + args: &DefaultValueArgs{ + Escape: true, + DestKind: constants.Redshift, + }, + expectedValue: "'{}'", + }, } for _, testCase := range testCases { diff --git a/lib/typing/decimal/decimal.go b/lib/typing/decimal/decimal.go index b9665c116..c2e8f2be3 100644 --- a/lib/typing/decimal/decimal.go +++ b/lib/typing/decimal/decimal.go @@ -67,6 +67,7 @@ func (d *Decimal) Value() interface{} { return d.value } +// SnowflakeKind - is used to determine whether a NUMERIC data type should be a STRING or NUMERIC(p, s). func (d *Decimal) SnowflakeKind() string { precision := MaxPrecisionBeforeString if d.precision != nil { @@ -80,6 +81,21 @@ func (d *Decimal) SnowflakeKind() string { return fmt.Sprintf("NUMERIC(%v, %v)", precision, d.scale) } +// RedshiftKind - is used to determine whether a NUMERIC data type should be a STRING or NUMERIC(p, s). +// This has the same max precision of 38 digits like Snowflake. +func (d *Decimal) RedshiftKind() string { + precision := MaxPrecisionBeforeString + if d.precision != nil { + precision = *d.precision + } + + if precision > MaxPrecisionBeforeString || precision == -1 { + return "TEXT" + } + + return fmt.Sprintf("NUMERIC(%v, %v)", precision, d.scale) +} + func (d *Decimal) BigQueryKind() string { precision := MaxPrecisionBeforeStringBigQuery if d.precision != nil { diff --git a/lib/typing/decimal/decimal_test.go b/lib/typing/decimal/decimal_test.go new file mode 100644 index 000000000..1e819c019 --- /dev/null +++ b/lib/typing/decimal/decimal_test.go @@ -0,0 +1,62 @@ +package decimal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/ptr" +) + +func TestDecimalKind(t *testing.T) { + type _testCase struct { + Name string + Precision int + Scale int + + ExpectedSnowflakeKind string + ExpectedRedshiftKind string + ExpectedBigQueryKind string + } + + testCases := []_testCase{ + { + Name: "-1 precision", + Precision: -1, + ExpectedSnowflakeKind: "STRING", + ExpectedRedshiftKind: "TEXT", + ExpectedBigQueryKind: "STRING", + }, + { + Name: "numeric(39, 5)", + Precision: 39, + Scale: 5, + ExpectedSnowflakeKind: "STRING", + ExpectedRedshiftKind: "TEXT", + ExpectedBigQueryKind: "STRING", + }, + { + Name: "numeric(38, 2)", + Precision: 38, + Scale: 2, + ExpectedSnowflakeKind: "NUMERIC(38, 2)", + ExpectedRedshiftKind: "NUMERIC(38, 2)", + ExpectedBigQueryKind: "STRING", + }, + { + Name: "numeric(31, 2)", + Precision: 31, + Scale: 2, + ExpectedSnowflakeKind: "NUMERIC(31, 2)", + ExpectedRedshiftKind: "NUMERIC(31, 2)", + ExpectedBigQueryKind: "NUMERIC(31, 2)", + }, + } + + for _, testCase := range testCases { + d := NewDecimal(testCase.Scale, ptr.ToInt(testCase.Precision), nil) + assert.Equal(t, testCase.ExpectedSnowflakeKind, d.SnowflakeKind(), testCase.Name) + assert.Equal(t, testCase.ExpectedRedshiftKind, d.RedshiftKind(), testCase.Name) + assert.Equal(t, testCase.ExpectedBigQueryKind, d.BigQueryKind(), testCase.Name) + } +} diff --git a/lib/typing/redshift.go b/lib/typing/redshift.go new file mode 100644 index 000000000..e92ff2481 --- /dev/null +++ b/lib/typing/redshift.go @@ -0,0 +1,59 @@ +package typing + +import ( + "strings" + + "github.com/artie-labs/transfer/lib/typing/ext" +) + +func RedshiftTypeToKind(rawType string) KindDetails { + switch strings.ToLower(rawType) { + case "integer", "bigint": + return Integer + case "character varying": + return String + case "double precision": + return Float + case "timestamp with time zone", "timestamp without time zone": + return NewKindDetailsFromTemplate(ETime, ext.DateTimeKindType) + case "time without time zone": + return NewKindDetailsFromTemplate(ETime, ext.TimeKindType) + case "date": + return NewKindDetailsFromTemplate(ETime, ext.DateKindType) + case "boolean": + return Boolean + case "numeric": + return EDecimal + } + + return Invalid +} + +func kindToRedShift(kd KindDetails) string { + switch kd.Kind { + case Integer.Kind: + // int4 is 2^31, whereas int8 is 2^63. + // we're using a larger data type to not have an integer overflow. + return "INT8" + case String.Kind, Struct.Kind, Array.Kind: + // Redshift does not have a built-in JSON type (which means we'll cast STRUCT and ARRAY kinds as TEXT). + // As a result, Artie will store this in JSON string and customers will need to extract this data out via SQL. + return "text" + case Boolean.Kind: + // We need to append `NULL` to let Redshift know that NULL is an acceptable data type. + return "BOOLEAN NULL" + case ETime.Kind: + switch kd.ExtendedTimeDetails.Type { + case ext.DateTimeKindType: + return "timestamp with time zone" + case ext.DateKindType: + return "date" + case ext.TimeKindType: + return "time" + } + case EDecimal.Kind: + return kd.ExtendedDecimalDetails.RedshiftKind() + } + + return kd.Kind +} diff --git a/lib/typing/redshift_test.go b/lib/typing/redshift_test.go new file mode 100644 index 000000000..e25e777fc --- /dev/null +++ b/lib/typing/redshift_test.go @@ -0,0 +1,55 @@ +package typing + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRedshiftTypeToKind(t *testing.T) { + type _testCase struct { + name string + rawTypes []string + expectedKd KindDetails + } + + testCases := []_testCase{ + { + name: "Integer", + rawTypes: []string{"integer", "bigint", "INTEGER"}, + expectedKd: Integer, + }, + { + name: "String", + rawTypes: []string{"character varying"}, + expectedKd: String, + }, + { + name: "Double Precision", + rawTypes: []string{"double precision", "DOUBLE precision"}, + expectedKd: Float, + }, + { + name: "Time", + rawTypes: []string{"timestamp with time zone", "timestamp without time zone", "time without time zone", "date"}, + expectedKd: ETime, + }, + { + name: "Boolean", + rawTypes: []string{"boolean"}, + expectedKd: Boolean, + }, + { + name: "Numeric", + rawTypes: []string{"numeric"}, + expectedKd: EDecimal, + }, + } + + for _, testCase := range testCases { + for _, rawType := range testCase.rawTypes { + kd := RedshiftTypeToKind(rawType) + assert.Equal(t, testCase.expectedKd.Kind, kd.Kind, testCase.name) + } + } +} diff --git a/lib/typing/typing.go b/lib/typing/typing.go index db17a9a6f..ae6d7ae14 100644 --- a/lib/typing/typing.go +++ b/lib/typing/typing.go @@ -172,6 +172,8 @@ func KindToDWHType(kd KindDetails, dwh constants.DestinationKind) string { return kindToSnowflake(kd) case constants.BigQuery: return kindToBigQuery(kd) + case constants.Redshift: + return kindToRedShift(kd) } return ""