Skip to content

Commit

Permalink
feat(ydbcp): implement DeleteBackup
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 5, 2024
1 parent 75bbe4e commit 18eb5f7
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 2 deletions.
8 changes: 8 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)

wg.Add(1)
Expand Down
20 changes: 18 additions & 2 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
d.AddValueParam("$id", table_types.StringValueFromString(operation.GetID()))
d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType())))
d.AddValueParam("$status", table_types.StringValueFromString(operation.GetState().String()))
d.AddValueParam("$message", table_types.StringValueFromString(operation.GetMessage()))
if operation.GetAudit() != nil {
d.AddValueParam(
"$initiated",
Expand Down Expand Up @@ -103,7 +104,6 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
"$operation_id",
table_types.StringValueFromString(tb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(tb.Message))
if len(tb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ",")))
}
Expand Down Expand Up @@ -138,11 +138,27 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
"$operation_id",
table_types.StringValueFromString(rb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(rb.Message))

if len(rb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ",")))
}
} else if operation.GetType() == types.OperationTypeDB {
db, ok := operation.(*types.DeleteBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(db.ContainerID),
)

d.AddValueParam(
"$backup_id",
table_types.StringValueFromString(db.BackupID),
)

d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))

} else {
xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType())))
}
Expand Down
33 changes: 33 additions & 0 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}

type ClientS3Connector struct {
Expand Down Expand Up @@ -104,3 +105,35 @@ func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, er

return size, nil
}

func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error {
if len(keys) == 0 {
return nil
}

objectsPtr := make([]*s3.ObjectIdentifier, len(keys))
for i, object := range keys {
objectsPtr[i] = &s3.ObjectIdentifier{
Key: aws.String(object),
}
}

input := s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &s3.Delete{
Objects: objectsPtr,
Quiet: aws.Bool(true),
},
}

delOut, err := c.s3.DeleteObjects(&input)
if err != nil {
return err
}

if len(delOut.Errors) > 0 {
return fmt.Errorf("can't delete objects: %v", delOut.Errors)
}

return nil
}
122 changes: 122 additions & 0 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package handlers

import (
"context"
"fmt"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
)

func NewDBOperationHandler(
db db.DBConnector, s3 s3.S3Connector, getQueryBuilder func(ctx context.Context) queries.WriteTableQuery,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, getQueryBuilder)
}
}

func DBOperationHandler(
ctx context.Context,
operation types.Operation,
db db.DBConnector,
s3 s3.S3Connector,
getQueryBuilder func(ctx context.Context) queries.WriteTableQuery,
) error {
xlog.Info(
ctx, "received operation",
zap.String("id", operation.GetID()),
zap.String("type", string(operation.GetType())),
zap.String("state", string(operation.GetState())),
zap.String("message", operation.GetMessage()),
)

if operation.GetType() != types.OperationTypeDB {
return fmt.Errorf(
"wrong type %s != %s for operation %s",
operation.GetType(), types.OperationTypeDB, types.OperationToString(operation),
)
}

dbOp, ok := operation.(*types.DeleteBackupOperation)
if !ok {
return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation))
}

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.StringValueFromString(dbOp.BackupID)},
},
),
),
)

if err != nil {
return fmt.Errorf("can't select backups: %v", err)
}

if len(backups) == 0 {
return fmt.Errorf("backup not found")
}

backupToWrite := types.Backup{
ID: dbOp.BackupID,
Status: types.BackupStateUnknown,
}

deleteBackup := func(pathPrefix string, bucket string) error {
objects, err := s3.ListObjects(pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to list S3 objects: %v", err)
}

if len(objects) != 0 {
err = s3.DeleteObjects(objects, bucket)
if err != nil {
return fmt.Errorf("failed to delete S3 objects: %v", err)
}
}

operation.SetState(types.OperationStateDone)
backupToWrite.Status = types.BackupStateDeleted
return nil
}

switch dbOp.State {
case types.OperationStatePending:
{
operation.SetState(types.OperationStateRunning)
err := db.UpdateOperation(ctx, operation)
if err != nil {
return fmt.Errorf("can't update operation: %v", err)
}

err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket)
if err != nil {
return err
}
}
case types.OperationStateRunning:
{
err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket)
if err != nil {
return err
}
}
default:
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return db.ExecuteUpsert(
ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}
62 changes: 62 additions & 0 deletions internal/server/services/backup/backupservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,68 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
return op.Proto(), nil
}

func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) {
xlog.Info(ctx, "DeleteBackup", zap.String("request", req.String()))

backupID, err := types.ParseObjectID(req.BackupId)
if err != nil {
xlog.Error(ctx, "failed to parse BackupId", zap.Error(err))
return nil, status.Errorf(codes.InvalidArgument, "failed to parse BackupId %s: %v", req.BackupId, err)
}

backups, err := s.driver.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.StringValueFromString(backupID)},
},
),
),
)

if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return nil, status.Errorf(codes.Internal, "can't select backups: %v", err)
}

if len(backups) == 0 {
return nil, status.Error(codes.NotFound, "backup not found")
}

backup := backups[0]

subject, err := auth.CheckAuth(
ctx, s.auth, auth.PermissionBackupCreate, backup.ContainerID, "",
)

if err != nil {
return nil, err
}

op := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: req.GetBackupId(),
State: types.OperationStatePending,
Audit: &pb.AuditInfo{
CreatedAt: timestamppb.Now(),
Creator: subject,
},
PathPrefix: backup.S3PathPrefix,
}

operationID, err := s.driver.CreateOperation(ctx, op)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create operation: %v", err)
}

op.ID = operationID
xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op)))
return op.Proto(), nil
}

func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) {
xlog.Info(ctx, "MakeRestore", zap.String("request", req.String()))

Expand Down
63 changes: 63 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,68 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation {
}
}

type DeleteBackupOperation struct {
ID string
ContainerID string
BackupID string
State OperationState
Message string
PathPrefix string
Audit *pb.AuditInfo
}

func (o *DeleteBackupOperation) GetID() string {
return o.ID
}
func (o *DeleteBackupOperation) SetID(id string) {
o.ID = id
}
func (o *DeleteBackupOperation) GetContainerID() string {
return o.ContainerID
}
func (o *DeleteBackupOperation) GetType() OperationType {
return OperationTypeDB
}
func (o *DeleteBackupOperation) SetType(_ OperationType) {
}
func (o *DeleteBackupOperation) GetState() OperationState {
return o.State
}
func (o *DeleteBackupOperation) SetState(s OperationState) {
o.State = s
}
func (o *DeleteBackupOperation) GetMessage() string {
return o.Message
}
func (o *DeleteBackupOperation) SetMessage(m string) {
o.Message = m
}
func (o *DeleteBackupOperation) GetAudit() *pb.AuditInfo {
return o.Audit
}
func (o *DeleteBackupOperation) Copy() Operation {
copy := *o
return &copy
}

func (o *DeleteBackupOperation) Proto() *pb.Operation {
return &pb.Operation{
Id: o.ID,
ContainerId: o.ContainerID,
Type: string(OperationTypeDB),
DatabaseName: "",
DatabaseEndpoint: "",
YdbServerOperationId: "",
BackupId: o.BackupID,
SourcePaths: []string{o.PathPrefix},
SourcePathsToExclude: nil,
RestorePaths: nil,
Audit: o.Audit,
Status: o.State.Enum(),
Message: o.Message,
}
}

type GenericOperation struct {
ID string
ContainerID string
Expand Down Expand Up @@ -296,6 +358,7 @@ var (
const (
OperationTypeTB = OperationType("TB")
OperationTypeRB = OperationType("RB")
OperationTypeDB = OperationType("DB")
BackupTimestampFormat = "20060102_150405"
S3ForcePathStyle = true
)
Expand Down

0 comments on commit 18eb5f7

Please sign in to comment.