From 18eb5f791f4911d7009774ed521b471565fa9c68 Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Tue, 3 Sep 2024 09:55:41 +0200 Subject: [PATCH] feat(ydbcp): implement DeleteBackup --- cmd/ydbcp/main.go | 8 ++ internal/connectors/db/yql/queries/write.go | 20 ++- internal/connectors/s3/connector.go | 33 +++++ internal/handlers/delete_backup.go | 122 ++++++++++++++++++ .../server/services/backup/backupservice.go | 62 +++++++++ internal/types/backup.go | 63 +++++++++ 6 files changed, 306 insertions(+), 2 deletions(-) create mode 100644 internal/handlers/delete_backup.go diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 5ec131db..96226f2b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -133,6 +133,14 @@ func main() { os.Exit(1) } + if err := handlersRegistry.Add( + types.OperationTypeDB, + handlers.NewDBOperationHandler(dbConnector, s3Connector, queries.NewWriteTableQuery), + ); err != nil { + xlog.Error(ctx, "failed to register DB handler", zap.Error(err)) + os.Exit(1) + } + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) wg.Add(1) diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 2a606458..2cf4e07c 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -61,6 +61,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i d.AddValueParam("$id", table_types.StringValueFromString(operation.GetID())) d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType()))) d.AddValueParam("$status", table_types.StringValueFromString(operation.GetState().String())) + d.AddValueParam("$message", table_types.StringValueFromString(operation.GetMessage())) if operation.GetAudit() != nil { d.AddValueParam( "$initiated", @@ -103,7 +104,6 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(tb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(tb.Message)) if len(tb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ","))) } @@ -138,11 +138,27 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(rb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(rb.Message)) if len(rb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ","))) } + } else if operation.GetType() == types.OperationTypeDB { + db, ok := operation.(*types.DeleteBackupOperation) + if !ok { + xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID())) + } + + d.AddValueParam( + "$container_id", table_types.StringValueFromString(db.ContainerID), + ) + + d.AddValueParam( + "$backup_id", + table_types.StringValueFromString(db.BackupID), + ) + + d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) + } else { xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) } diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index 86ce0470..bae93caa 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -13,6 +13,7 @@ import ( type S3Connector interface { ListObjects(pathPrefix string, bucket string) ([]string, error) GetSize(pathPrefix string, bucket string) (int64, error) + DeleteObjects(keys []string, bucket string) error } type ClientS3Connector struct { @@ -104,3 +105,35 @@ func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, er return size, nil } + +func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error { + if len(keys) == 0 { + return nil + } + + objectsPtr := make([]*s3.ObjectIdentifier, len(keys)) + for i, object := range keys { + objectsPtr[i] = &s3.ObjectIdentifier{ + Key: aws.String(object), + } + } + + input := s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objectsPtr, + Quiet: aws.Bool(true), + }, + } + + delOut, err := c.s3.DeleteObjects(&input) + if err != nil { + return err + } + + if len(delOut.Errors) > 0 { + return fmt.Errorf("can't delete objects: %v", delOut.Errors) + } + + return nil +} diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go new file mode 100644 index 00000000..6a27d97f --- /dev/null +++ b/internal/handlers/delete_backup.go @@ -0,0 +1,122 @@ +package handlers + +import ( + "context" + "fmt" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func NewDBOperationHandler( + db db.DBConnector, s3 s3.S3Connector, getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, +) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return DBOperationHandler(ctx, op, db, s3, getQueryBuilder) + } +} + +func DBOperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + s3 s3.S3Connector, + getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, +) error { + xlog.Info( + ctx, "received operation", + zap.String("id", operation.GetID()), + zap.String("type", string(operation.GetType())), + zap.String("state", string(operation.GetState())), + zap.String("message", operation.GetMessage()), + ) + + if operation.GetType() != types.OperationTypeDB { + return fmt.Errorf( + "wrong type %s != %s for operation %s", + operation.GetType(), types.OperationTypeDB, types.OperationToString(operation), + ) + } + + dbOp, ok := operation.(*types.DeleteBackupOperation) + if !ok { + return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation)) + } + + backups, err := db.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(dbOp.BackupID)}, + }, + ), + ), + ) + + if err != nil { + return fmt.Errorf("can't select backups: %v", err) + } + + if len(backups) == 0 { + return fmt.Errorf("backup not found") + } + + backupToWrite := types.Backup{ + ID: dbOp.BackupID, + Status: types.BackupStateUnknown, + } + + deleteBackup := func(pathPrefix string, bucket string) error { + objects, err := s3.ListObjects(pathPrefix, bucket) + if err != nil { + return fmt.Errorf("failed to list S3 objects: %v", err) + } + + if len(objects) != 0 { + err = s3.DeleteObjects(objects, bucket) + if err != nil { + return fmt.Errorf("failed to delete S3 objects: %v", err) + } + } + + operation.SetState(types.OperationStateDone) + backupToWrite.Status = types.BackupStateDeleted + return nil + } + + switch dbOp.State { + case types.OperationStatePending: + { + operation.SetState(types.OperationStateRunning) + err := db.UpdateOperation(ctx, operation) + if err != nil { + return fmt.Errorf("can't update operation: %v", err) + } + + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + case types.OperationStateRunning: + { + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + default: + return fmt.Errorf("unexpected operation state %s", dbOp.State) + } + + return db.ExecuteUpsert( + ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) +} diff --git a/internal/server/services/backup/backupservice.go b/internal/server/services/backup/backupservice.go index dcab49cb..6318d072 100644 --- a/internal/server/services/backup/backupservice.go +++ b/internal/server/services/backup/backupservice.go @@ -220,6 +220,68 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques return op.Proto(), nil } +func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) { + xlog.Info(ctx, "DeleteBackup", zap.String("request", req.String())) + + backupID, err := types.ParseObjectID(req.BackupId) + if err != nil { + xlog.Error(ctx, "failed to parse BackupId", zap.Error(err)) + return nil, status.Errorf(codes.InvalidArgument, "failed to parse BackupId %s: %v", req.BackupId, err) + } + + backups, err := s.driver.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(backupID)}, + }, + ), + ), + ) + + if err != nil { + xlog.Error(ctx, "can't select backups", zap.Error(err)) + return nil, status.Errorf(codes.Internal, "can't select backups: %v", err) + } + + if len(backups) == 0 { + return nil, status.Error(codes.NotFound, "backup not found") + } + + backup := backups[0] + + subject, err := auth.CheckAuth( + ctx, s.auth, auth.PermissionBackupCreate, backup.ContainerID, "", + ) + + if err != nil { + return nil, err + } + + op := &types.DeleteBackupOperation{ + ContainerID: backup.ContainerID, + BackupID: req.GetBackupId(), + State: types.OperationStatePending, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + Creator: subject, + }, + PathPrefix: backup.S3PathPrefix, + } + + operationID, err := s.driver.CreateOperation(ctx, op) + if err != nil { + return nil, status.Errorf(codes.Internal, "can't create operation: %v", err) + } + + op.ID = operationID + xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op))) + return op.Proto(), nil +} + func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) { xlog.Info(ctx, "MakeRestore", zap.String("request", req.String())) diff --git a/internal/types/backup.go b/internal/types/backup.go index fe1297b0..8efc0dcd 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -226,6 +226,68 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation { } } +type DeleteBackupOperation struct { + ID string + ContainerID string + BackupID string + State OperationState + Message string + PathPrefix string + Audit *pb.AuditInfo +} + +func (o *DeleteBackupOperation) GetID() string { + return o.ID +} +func (o *DeleteBackupOperation) SetID(id string) { + o.ID = id +} +func (o *DeleteBackupOperation) GetContainerID() string { + return o.ContainerID +} +func (o *DeleteBackupOperation) GetType() OperationType { + return OperationTypeDB +} +func (o *DeleteBackupOperation) SetType(_ OperationType) { +} +func (o *DeleteBackupOperation) GetState() OperationState { + return o.State +} +func (o *DeleteBackupOperation) SetState(s OperationState) { + o.State = s +} +func (o *DeleteBackupOperation) GetMessage() string { + return o.Message +} +func (o *DeleteBackupOperation) SetMessage(m string) { + o.Message = m +} +func (o *DeleteBackupOperation) GetAudit() *pb.AuditInfo { + return o.Audit +} +func (o *DeleteBackupOperation) Copy() Operation { + copy := *o + return © +} + +func (o *DeleteBackupOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.ID, + ContainerId: o.ContainerID, + Type: string(OperationTypeDB), + DatabaseName: "", + DatabaseEndpoint: "", + YdbServerOperationId: "", + BackupId: o.BackupID, + SourcePaths: []string{o.PathPrefix}, + SourcePathsToExclude: nil, + RestorePaths: nil, + Audit: o.Audit, + Status: o.State.Enum(), + Message: o.Message, + } +} + type GenericOperation struct { ID string ContainerID string @@ -296,6 +358,7 @@ var ( const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") + OperationTypeDB = OperationType("DB") BackupTimestampFormat = "20060102_150405" S3ForcePathStyle = true )