From 2700fda10f9a6796f37e36b53c1a6e039e0a7156 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Wed, 21 Aug 2024 19:11:28 +0300 Subject: [PATCH] write source_paths_to_exclude to db --- internal/connectors/db/process_result_set.go | 40 +++++++++++-------- internal/connectors/db/yql/queries/write.go | 6 +++ .../connectors/db/yql/queries/write_test.go | 15 +++++-- .../db/yql/schema/create_tables.yql | 1 + 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index cd820d16..7b2fe8be 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -49,6 +49,8 @@ func auditFromDb(initiated *string, createdAt *time.Time, completedAt *time.Time } } +//TODO: unit test this + func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { var ( backupId string @@ -110,15 +112,15 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { databaseName string databaseEndpoint string - backupId *string - ydbOperationId *string - operationStateBuf *string - message *string - paths *string - - creator *string - createdAt *time.Time - completedAt *time.Time + backupId *string + ydbOperationId *string + operationStateBuf *string + message *string + sourcePaths *string + sourcePathsToExclude *string + creator *string + createdAt *time.Time + completedAt *time.Time ) err := res.ScanNamed( named.Required("id", &operationId), @@ -131,7 +133,8 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { named.Optional("operation_id", &ydbOperationId), named.Optional("status", &operationStateBuf), named.Optional("message", &message), - named.Optional("paths", &paths), + named.Optional("paths", &sourcePaths), + named.Optional("paths_to_exclude", &sourcePathsToExclude), named.Optional("created_at", &createdAt), named.Optional("completed_at", &completedAt), @@ -144,9 +147,13 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { if operationStateBuf != nil { operationState = types.OperationState(*operationStateBuf) } - sourcePaths := make([]string, 0) - if paths != nil { - sourcePaths = strings.Split(*paths, ",") + sourcePathsSlice := make([]string, 0) + sourcePathsToExcludeSlice := make([]string, 0) + if sourcePaths != nil { + sourcePathsSlice = strings.Split(*sourcePaths, ",") + } + if sourcePathsToExclude != nil { + sourcePathsToExcludeSlice = strings.Split(*sourcePathsToExclude, ",") } if operationType == string(types.OperationTypeTB) { if backupId == nil { @@ -162,9 +169,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { Endpoint: databaseEndpoint, DatabaseName: databaseName, }, - SourcePaths: sourcePaths, - YdbOperationId: StringOrEmpty(ydbOperationId), - Audit: auditFromDb(creator, createdAt, completedAt), + SourcePaths: sourcePathsSlice, + SourcePathToExclude: sourcePathsToExcludeSlice, + YdbOperationId: StringOrEmpty(ydbOperationId), + Audit: auditFromDb(creator, createdAt, completedAt), }, nil } else if operationType == string(types.OperationTypeRB) { if backupId == nil { diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 7a8630ef..43ac901a 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -102,6 +102,12 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i if len(tb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ","))) } + if len(tb.SourcePathToExclude) > 0 { + d.AddValueParam( + "$paths_to_exclude", + table_types.StringValueFromString(strings.Join(tb.SourcePathToExclude, ",")), + ) + } } else { //TODO: support RestoreBackup operation xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index b9b99bc4..0c86817d 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -2,6 +2,7 @@ package queries import ( "context" + "strings" "testing" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -149,7 +150,7 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d func TestQueryBuilder_UpdateCreate(t *testing.T) { const ( queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0; -UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1)` +UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1, $paths_1, $paths_to_exclude_1)` ) ctx := context.Background() opId := types.GenerateObjectID() @@ -165,8 +166,8 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d DatabaseName: "dbname", }, YdbOperationId: "1234", - SourcePaths: nil, - SourcePathToExclude: nil, + SourcePaths: []string{"path"}, + SourcePathToExclude: []string{"exclude1", "exclude2"}, Audit: &pb.AuditInfo{ CreatedAt: timestamppb.Now(), }, @@ -218,6 +219,14 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d "$message_1", table_types.StringValueFromString(tbOp.Message), ), + table.ValueParam( + "$paths_1", + table_types.StringValueFromString(strings.Join(tbOp.SourcePaths, ",")), + ), + table.ValueParam( + "$paths_to_exclude_1", + table_types.StringValueFromString(strings.Join(tbOp.SourcePathToExclude, ",")), + ), ) ) query, err := builder.FormatQuery(context.Background()) diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index 31c90cf4..de3b45ab 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -50,6 +50,7 @@ CREATE TABLE Operations ( message String, paths String, + paths_to_exclude String, operation_id String, INDEX idx_cc GLOBAL ON (container_id, created_at, id),