From 36e82ce683f47b4a3e3396a67a8c33107e3b2034 Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Tue, 3 Sep 2024 09:55:41 +0200 Subject: [PATCH] feat(ydbcp): implement DeleteBackup --- cmd/ydbcp/main.go | 8 + internal/connectors/db/process_result_set.go | 23 ++ internal/connectors/db/yql/queries/write.go | 30 ++- .../connectors/db/yql/queries/write_test.go | 14 +- internal/connectors/s3/connector.go | 33 +++ internal/connectors/s3/mock.go | 12 +- internal/handlers/delete_backup.go | 143 ++++++++++++ internal/handlers/delete_backup_test.go | 220 ++++++++++++++++++ .../server/services/backup/backupservice.go | 85 ++++++- internal/types/backup.go | 69 ++++++ pkg/proto/ydbcp/v1alpha1/backup.pb.go | 54 +++-- pkg/proto/ydbcp/v1alpha1/backup.proto | 1 + 12 files changed, 650 insertions(+), 42 deletions(-) create mode 100644 internal/handlers/delete_backup.go create mode 100644 internal/handlers/delete_backup_test.go diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 5ec131db..320b8852 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -133,6 +133,14 @@ func main() { os.Exit(1) } + if err := handlersRegistry.Add( + types.OperationTypeDB, + handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery(ctx)), + ); err != nil { + xlog.Error(ctx, "failed to register DB handler", zap.Error(err)) + os.Exit(1) + } + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) wg.Add(1) diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 9d1333b4..e172cc5f 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -202,6 +202,29 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { SourcePaths: sourcePathsSlice, Audit: auditFromDb(creator, createdAt, completedAt), }, nil + } else if operationType == string(types.OperationTypeDB) { + if backupId == nil { + return nil, fmt.Errorf("failed to read backup_id for DB operation: %s", operationId) + } + + var pathPrefix string + if len(sourcePathsSlice) > 0 { + pathPrefix = sourcePathsSlice[0] + } + + return &types.DeleteBackupOperation{ + ID: operationId, + BackupID: *backupId, + ContainerID: containerId, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: databaseEndpoint, + DatabaseName: databaseName, + }, + State: operationState, + Message: StringOrEmpty(message), + Audit: auditFromDb(creator, createdAt, completedAt), + PathPrefix: pathPrefix, + }, nil } return &types.GenericOperation{ID: operationId}, nil diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 2a606458..05e8350e 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -61,6 +61,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i d.AddValueParam("$id", table_types.StringValueFromString(operation.GetID())) d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType()))) d.AddValueParam("$status", table_types.StringValueFromString(operation.GetState().String())) + d.AddValueParam("$message", table_types.StringValueFromString(operation.GetMessage())) if operation.GetAudit() != nil { d.AddValueParam( "$initiated", @@ -103,7 +104,6 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(tb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(tb.Message)) if len(tb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ","))) } @@ -138,11 +138,37 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(rb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(rb.Message)) if len(rb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ","))) } + } else if operation.GetType() == types.OperationTypeDB { + db, ok := operation.(*types.DeleteBackupOperation) + if !ok { + xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID())) + } + + d.AddValueParam( + "$container_id", table_types.StringValueFromString(db.ContainerID), + ) + + d.AddValueParam( + "$database", + table_types.StringValueFromString(db.YdbConnectionParams.DatabaseName), + ) + + d.AddValueParam( + "$endpoint", + table_types.StringValueFromString(db.YdbConnectionParams.Endpoint), + ) + + d.AddValueParam( + "$backup_id", + table_types.StringValueFromString(db.BackupID), + ) + + d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) + } else { xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) } diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index eb97da5a..915266fc 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -54,7 +54,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1` func TestQueryBuilder_CreateCreate(t *testing.T) { const ( queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message, size, initiated, created_at) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0, $size_0, $initiated_0, $created_at_0); -UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1)` +UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1)` ) opId := types.GenerateObjectID() backupId := types.GenerateObjectID() @@ -119,6 +119,7 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d table.ValueParam( "$status_1", table_types.StringValueFromString(string(tbOp.State)), ), + table.ValueParam("$message_1", table_types.StringValueFromString("msg op")), table.ValueParam( "$initiated_1", table_types.StringValueFromString("author"), @@ -146,7 +147,6 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d "$operation_id_1", table_types.StringValueFromString(tbOp.YdbOperationId), ), - table.ValueParam("$message_1", table_types.StringValueFromString("msg op")), ) ) query, err := builder.FormatQuery(context.Background()) @@ -161,7 +161,7 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d func TestQueryBuilder_UpdateCreate(t *testing.T) { const ( queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0; -UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1, $paths_1, $paths_to_exclude_1)` +UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $paths_1, $paths_to_exclude_1)` ) ctx := context.Background() opId := types.GenerateObjectID() @@ -199,6 +199,10 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d table.ValueParam( "$status_1", table_types.StringValueFromString(string(tbOp.State)), ), + table.ValueParam( + "$message_1", + table_types.StringValueFromString(tbOp.Message), + ), table.ValueParam( "$initiated_1", table_types.StringValueFromString(""), @@ -226,10 +230,6 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d "$operation_id_1", table_types.StringValueFromString(tbOp.YdbOperationId), ), - table.ValueParam( - "$message_1", - table_types.StringValueFromString(tbOp.Message), - ), table.ValueParam( "$paths_1", table_types.StringValueFromString(strings.Join(tbOp.SourcePaths, ",")), diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index 86ce0470..bae93caa 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -13,6 +13,7 @@ import ( type S3Connector interface { ListObjects(pathPrefix string, bucket string) ([]string, error) GetSize(pathPrefix string, bucket string) (int64, error) + DeleteObjects(keys []string, bucket string) error } type ClientS3Connector struct { @@ -104,3 +105,35 @@ func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, er return size, nil } + +func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error { + if len(keys) == 0 { + return nil + } + + objectsPtr := make([]*s3.ObjectIdentifier, len(keys)) + for i, object := range keys { + objectsPtr[i] = &s3.ObjectIdentifier{ + Key: aws.String(object), + } + } + + input := s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objectsPtr, + Quiet: aws.Bool(true), + }, + } + + delOut, err := c.s3.DeleteObjects(&input) + if err != nil { + return err + } + + if len(delOut.Errors) > 0 { + return fmt.Errorf("can't delete objects: %v", delOut.Errors) + } + + return nil +} diff --git a/internal/connectors/s3/mock.go b/internal/connectors/s3/mock.go index 1dd788d6..e5c9cb95 100644 --- a/internal/connectors/s3/mock.go +++ b/internal/connectors/s3/mock.go @@ -30,7 +30,7 @@ func (m *MockS3Connector) ListObjects(pathPrefix string, _ string) ([]string, er return nil, fmt.Errorf("objects not found, path: %s", pathPrefix) } -func (m *MockS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) { +func (m *MockS3Connector) GetSize(pathPrefix string, _ string) (int64, error) { if content, ok := m.storage[pathPrefix]; ok { var size int64 for _, object := range content { @@ -44,3 +44,13 @@ func (m *MockS3Connector) GetSize(pathPrefix string, bucket string) (int64, erro return 0, fmt.Errorf("objects not found, path: %s", pathPrefix) } + +func (m *MockS3Connector) DeleteObjects(objects []string, _ string) error { + for _, object := range objects { + if _, ok := m.storage[object]; ok { + delete(m.storage, object) + } + } + + return nil +} diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go new file mode 100644 index 00000000..5f9b4f82 --- /dev/null +++ b/internal/handlers/delete_backup.go @@ -0,0 +1,143 @@ +package handlers + +import ( + "context" + "fmt" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + "ydbcp/internal/config" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func NewDBOperationHandler( + db db.DBConnector, + s3 s3.S3Connector, + config config.Config, + queryBuilder queries.WriteTableQuery, +) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return DBOperationHandler(ctx, op, db, s3, config, queryBuilder) + } +} + +func DBOperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + s3 s3.S3Connector, + config config.Config, + queryBuilder queries.WriteTableQuery, +) error { + xlog.Info( + ctx, "received operation", + zap.String("id", operation.GetID()), + zap.String("type", string(operation.GetType())), + zap.String("state", string(operation.GetState())), + zap.String("message", operation.GetMessage()), + ) + + if operation.GetType() != types.OperationTypeDB { + return fmt.Errorf( + "wrong type %s != %s for operation %s", + operation.GetType(), types.OperationTypeDB, types.OperationToString(operation), + ) + } + + dbOp, ok := operation.(*types.DeleteBackupOperation) + if !ok { + return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation)) + } + + backupToWrite := types.Backup{ + ID: dbOp.BackupID, + Status: types.BackupStateUnknown, + } + + if deadlineExceeded(dbOp.Audit.CreatedAt, config) { + backupToWrite.Status = types.BackupStateError + operation.SetState(types.OperationStateError) + operation.SetMessage("Operation deadline exceeded") + operation.GetAudit().CompletedAt = timestamppb.Now() + return db.ExecuteUpsert( + ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) + } + + backups, err := db.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(dbOp.BackupID)}, + }, + ), + ), + ) + + if err != nil { + return fmt.Errorf("can't select backups: %v", err) + } + + if len(backups) == 0 { + return fmt.Errorf("backup not found") + } + + deleteBackup := func(pathPrefix string, bucket string) error { + objects, err := s3.ListObjects(pathPrefix, bucket) + if err != nil { + return fmt.Errorf("failed to list S3 objects: %v", err) + } + + if len(objects) != 0 { + err = s3.DeleteObjects(objects, bucket) + if err != nil { + return fmt.Errorf("failed to delete S3 objects: %v", err) + } + } + + backupToWrite.Status = types.BackupStateDeleted + operation.SetState(types.OperationStateDone) + operation.SetMessage("Success") + operation.GetAudit().CompletedAt = timestamppb.Now() + return nil + } + + switch dbOp.State { + case types.OperationStatePending: + { + backupToWrite.Status = types.BackupStateDeleting + operation.SetState(types.OperationStateRunning) + err := db.ExecuteUpsert( + ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) + if err != nil { + return fmt.Errorf("can't update operation: %v", err) + } + + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + case types.OperationStateRunning: + { + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + default: + return fmt.Errorf("unexpected operation state %s", dbOp.State) + } + + return db.ExecuteUpsert( + ctx, queryBuilder.WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) +} diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go new file mode 100644 index 00000000..8326cc99 --- /dev/null +++ b/internal/handlers/delete_backup_test.go @@ -0,0 +1,220 @@ +package handlers + +import ( + "context" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + "testing" + "ydbcp/internal/config" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + s3Client "ydbcp/internal/connectors/s3" + "ydbcp/internal/types" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" +) + +func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + dbOp := types.DeleteBackupOperation{ + ID: opId, + BackupID: backupID, + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + }, + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStateAvailable, + } + + opMap := make(map[string]types.Operation) + backupMap := make(map[string]types.Backup) + s3ObjectsMap := make(map[string][]*s3.Object) + backupMap[backupID] = backup + opMap[opId] = &dbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + + s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + + handler := NewDBOperationHandler( + dbConnector, s3Connector, config.Config{ + OperationTtlSeconds: 0, + }, queries.NewWriteTableQueryMock(ctx), + ) + + err := handler(ctx, &dbOp) + assert.Empty(t, err) + + // check operation status (should be error because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, dbOp.ID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) +} + +func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + dbOp := types.DeleteBackupOperation{ + ID: opId, + BackupID: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + }, + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStateAvailable, + S3PathPrefix: "pathPrefix", + } + + opMap := make(map[string]types.Operation) + backupMap := make(map[string]types.Backup) + s3ObjectsMap := make(map[string][]*s3.Object) + backupMap[backupID] = backup + opMap[opId] = &dbOp + s3ObjectsMap["pathPrefix"] = []*s3.Object{ + { + Key: aws.String("data_1.csv"), + Size: aws.Int64(100), + }, + { + Key: aws.String("data_2.csv"), + Size: aws.Int64(150), + }, + { + Key: aws.String("data_3.csv"), + Size: aws.Int64(200), + }, + } + + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + + s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + + handler := NewDBOperationHandler( + dbConnector, s3Connector, config.Config{ + OperationTtlSeconds: 1000, + }, queries.NewWriteTableQueryMock(ctx), + ) + + err := handler(ctx, &dbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, dbOp.ID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, "Success", op.GetMessage()) + + // check backup status (should be deleted) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateDeleted, b.Status) + + // check s3 objects (should be deleted) + objects, err := s3Connector.ListObjects("pathPrefix", "bucket") + assert.Empty(t, err) + assert.Empty(t, objects) +} + +func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + dbOp := types.DeleteBackupOperation{ + ID: opId, + BackupID: backupID, + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + }, + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStateAvailable, + S3PathPrefix: "pathPrefix", + } + + opMap := make(map[string]types.Operation) + backupMap := make(map[string]types.Backup) + s3ObjectsMap := make(map[string][]*s3.Object) + backupMap[backupID] = backup + opMap[opId] = &dbOp + s3ObjectsMap["pathPrefix"] = []*s3.Object{ + { + Key: aws.String("data_1.csv"), + Size: aws.Int64(100), + }, + { + Key: aws.String("data_2.csv"), + Size: aws.Int64(150), + }, + { + Key: aws.String("data_3.csv"), + Size: aws.Int64(200), + }, + } + + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + + s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + + handler := NewDBOperationHandler( + dbConnector, s3Connector, config.Config{ + OperationTtlSeconds: 1000, + }, queries.NewWriteTableQueryMock(ctx), + ) + + err := handler(ctx, &dbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, dbOp.ID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, "Success", op.GetMessage()) + + // check backup status (should be deleted) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateDeleted, b.Status) + + // check s3 objects (should be deleted) + objects, err := s3Connector.ListObjects("pathPrefix", "bucket") + assert.Empty(t, err) + assert.Empty(t, objects) +} diff --git a/internal/server/services/backup/backupservice.go b/internal/server/services/backup/backupservice.go index dcab49cb..a1e198e3 100644 --- a/internal/server/services/backup/backupservice.go +++ b/internal/server/services/backup/backupservice.go @@ -171,13 +171,14 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques now := timestamppb.Now() backup := types.Backup{ - ContainerID: req.GetContainerId(), - DatabaseName: req.GetDatabaseName(), - S3Endpoint: s.s3.Endpoint, - S3Region: s.s3.Region, - S3Bucket: s.s3.Bucket, - S3PathPrefix: destinationPrefix, - Status: types.BackupStateRunning, + ContainerID: req.GetContainerId(), + DatabaseName: req.GetDatabaseName(), + DatabaseEndpoint: req.GetDatabaseEndpoint(), + S3Endpoint: s.s3.Endpoint, + S3Region: s.s3.Region, + S3Bucket: s.s3.Bucket, + S3PathPrefix: destinationPrefix, + Status: types.BackupStateRunning, AuditInfo: &pb.AuditInfo{ CreatedAt: now, Creator: subject, @@ -220,6 +221,76 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques return op.Proto(), nil } +func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) { + xlog.Info(ctx, "DeleteBackup", zap.String("request", req.String())) + + backupID, err := types.ParseObjectID(req.BackupId) + if err != nil { + xlog.Error(ctx, "failed to parse BackupId", zap.Error(err)) + return nil, status.Errorf(codes.InvalidArgument, "failed to parse BackupId %s: %v", req.BackupId, err) + } + + backups, err := s.driver.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(backupID)}, + }, + ), + ), + ) + + if err != nil { + xlog.Error(ctx, "can't select backups", zap.Error(err)) + return nil, status.Errorf(codes.Internal, "can't select backups: %v", err) + } + + if len(backups) == 0 { + return nil, status.Error(codes.NotFound, "backup not found") + } + + backup := backups[0] + + subject, err := auth.CheckAuth( + ctx, s.auth, auth.PermissionBackupCreate, backup.ContainerID, "", + ) + + if err != nil { + return nil, err + } + + if !backup.CanBeDeleted() { + return nil, status.Errorf(codes.FailedPrecondition, "backup can't be deleted, status %s", backup.Status) + } + + op := &types.DeleteBackupOperation{ + ContainerID: backup.ContainerID, + BackupID: req.GetBackupId(), + State: types.OperationStatePending, + YdbConnectionParams: types.YdbConnectionParams{ + DatabaseName: backup.DatabaseName, + Endpoint: backup.DatabaseEndpoint, + }, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + Creator: subject, + }, + PathPrefix: backup.S3PathPrefix, + } + + operationID, err := s.driver.CreateOperation(ctx, op) + if err != nil { + return nil, status.Errorf(codes.Internal, "can't create operation: %v", err) + } + + op.ID = operationID + xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op))) + return op.Proto(), nil +} + func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) { xlog.Info(ctx, "MakeRestore", zap.String("request", req.String())) diff --git a/internal/types/backup.go b/internal/types/backup.go index fe1297b0..941f56a5 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -75,6 +75,10 @@ func (o *Backup) Proto() *pb.Backup { } } +func (o *Backup) CanBeDeleted() bool { + return o.Status == BackupStateAvailable || o.Status == BackupStateError || o.Status == BackupStateCancelled +} + type OperationType string type OperationState string @@ -226,6 +230,69 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation { } } +type DeleteBackupOperation struct { + ID string + ContainerID string + YdbConnectionParams YdbConnectionParams + BackupID string + State OperationState + Message string + PathPrefix string + Audit *pb.AuditInfo +} + +func (o *DeleteBackupOperation) GetID() string { + return o.ID +} +func (o *DeleteBackupOperation) SetID(id string) { + o.ID = id +} +func (o *DeleteBackupOperation) GetContainerID() string { + return o.ContainerID +} +func (o *DeleteBackupOperation) GetType() OperationType { + return OperationTypeDB +} +func (o *DeleteBackupOperation) SetType(_ OperationType) { +} +func (o *DeleteBackupOperation) GetState() OperationState { + return o.State +} +func (o *DeleteBackupOperation) SetState(s OperationState) { + o.State = s +} +func (o *DeleteBackupOperation) GetMessage() string { + return o.Message +} +func (o *DeleteBackupOperation) SetMessage(m string) { + o.Message = m +} +func (o *DeleteBackupOperation) GetAudit() *pb.AuditInfo { + return o.Audit +} +func (o *DeleteBackupOperation) Copy() Operation { + copy := *o + return © +} + +func (o *DeleteBackupOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.ID, + ContainerId: o.ContainerID, + Type: string(OperationTypeDB), + DatabaseName: o.YdbConnectionParams.DatabaseName, + DatabaseEndpoint: o.YdbConnectionParams.Endpoint, + YdbServerOperationId: "", + BackupId: o.BackupID, + SourcePaths: []string{o.PathPrefix}, + SourcePathsToExclude: nil, + RestorePaths: nil, + Audit: o.Audit, + Status: o.State.Enum(), + Message: o.Message, + } +} + type GenericOperation struct { ID string ContainerID string @@ -290,12 +357,14 @@ var ( BackupStateAvailable = pb.Backup_AVAILABLE.String() BackupStateError = pb.Backup_ERROR.String() BackupStateCancelled = pb.Backup_CANCELLED.String() + BackupStateDeleting = pb.Backup_DELETING.String() BackupStateDeleted = pb.Backup_DELETED.String() ) const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") + OperationTypeDB = OperationType("DB") BackupTimestampFormat = "20060102_150405" S3ForcePathStyle = true ) diff --git a/pkg/proto/ydbcp/v1alpha1/backup.pb.go b/pkg/proto/ydbcp/v1alpha1/backup.pb.go index 093b7520..b0702d0a 100644 --- a/pkg/proto/ydbcp/v1alpha1/backup.pb.go +++ b/pkg/proto/ydbcp/v1alpha1/backup.pb.go @@ -31,6 +31,7 @@ const ( Backup_CANCELLED Backup_Status = 4 Backup_DELETED Backup_Status = 5 Backup_RUNNING Backup_Status = 6 + Backup_DELETING Backup_Status = 7 ) // Enum value maps for Backup_Status. @@ -43,6 +44,7 @@ var ( 4: "CANCELLED", 5: "DELETED", 6: "RUNNING", + 7: "DELETING", } Backup_Status_value = map[string]int32{ "STATUS_UNSPECIFIED": 0, @@ -52,6 +54,7 @@ var ( "CANCELLED": 4, "DELETED": 5, "RUNNING": 6, + "DELETING": 7, } ) @@ -350,7 +353,7 @@ var file_ydbcp_v1alpha1_backup_proto_rawDesc = []byte{ 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa7, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb5, 0x04, 0x0a, 0x06, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, @@ -378,36 +381,37 @@ var file_ydbcp_v1alpha1_backup_proto_rawDesc = []byte{ 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x63, 0x68, 0x65, - 0x64, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x22, 0x70, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x64, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x22, 0x7e, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x05, 0x12, 0x0b, 0x0a, 0x07, 0x52, - 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x22, 0x79, 0x0a, 0x0a, 0x53, 0x33, 0x4c, 0x6f, - 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, - 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, - 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, - 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x67, 0x69, - 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x61, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x69, - 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x70, 0x61, 0x74, 0x68, 0x50, 0x72, 0x65, - 0x66, 0x69, 0x78, 0x22, 0x9f, 0x01, 0x0a, 0x09, 0x41, 0x75, 0x64, 0x69, 0x74, 0x49, 0x6e, 0x66, - 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x39, 0x0a, 0x0a, 0x63, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x3d, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, - 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, - 0x74, 0x65, 0x64, 0x41, 0x74, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, - 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, - 0x79, 0x64, 0x62, 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x4c, 0x45, + 0x54, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x22, 0x79, 0x0a, 0x0a, 0x53, 0x33, 0x4c, 0x6f, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x67, 0x69, + 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, + 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x61, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x70, 0x61, 0x74, 0x68, 0x50, 0x72, 0x65, 0x66, 0x69, + 0x78, 0x22, 0x9f, 0x01, 0x0a, 0x09, 0x41, 0x75, 0x64, 0x69, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, + 0x18, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, + 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x64, 0x41, 0x74, 0x12, 0x3d, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, + 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, + 0x64, 0x41, 0x74, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x79, + 0x64, 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x79, + 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x79, 0x64, + 0x62, 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/proto/ydbcp/v1alpha1/backup.proto b/pkg/proto/ydbcp/v1alpha1/backup.proto index b459521d..c1f03b37 100644 --- a/pkg/proto/ydbcp/v1alpha1/backup.proto +++ b/pkg/proto/ydbcp/v1alpha1/backup.proto @@ -14,6 +14,7 @@ message Backup { CANCELLED = 4; DELETED = 5; RUNNING = 6; + DELETING = 7; } string id = 1; string container_id = 2;