Skip to content

Commit

Permalink
feat(ydbcp): implement DeleteBackup
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 6, 2024
1 parent 75bbe4e commit 993f24f
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 35 deletions.
8 changes: 8 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)

wg.Add(1)
Expand Down
23 changes: 23 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,29 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
SourcePaths: sourcePathsSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
}, nil
} else if operationType == string(types.OperationTypeDB) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for DB operation: %s", operationId)
}

var pathPrefix string
if len(sourcePathsSlice) > 0 {
pathPrefix = sourcePathsSlice[0]
}

return &types.DeleteBackupOperation{
ID: operationId,
BackupID: *backupId,
ContainerID: containerId,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
},
State: operationState,
Message: StringOrEmpty(message),
Audit: auditFromDb(creator, createdAt, completedAt),
PathPrefix: pathPrefix,
}, nil
}

return &types.GenericOperation{ID: operationId}, nil
Expand Down
30 changes: 28 additions & 2 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
d.AddValueParam("$id", table_types.StringValueFromString(operation.GetID()))
d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType())))
d.AddValueParam("$status", table_types.StringValueFromString(operation.GetState().String()))
d.AddValueParam("$message", table_types.StringValueFromString(operation.GetMessage()))
if operation.GetAudit() != nil {
d.AddValueParam(
"$initiated",
Expand Down Expand Up @@ -103,7 +104,6 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
"$operation_id",
table_types.StringValueFromString(tb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(tb.Message))
if len(tb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ",")))
}
Expand Down Expand Up @@ -138,11 +138,37 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
"$operation_id",
table_types.StringValueFromString(rb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(rb.Message))

if len(rb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ",")))
}
} else if operation.GetType() == types.OperationTypeDB {
db, ok := operation.(*types.DeleteBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(db.ContainerID),
)

d.AddValueParam(
"$database",
table_types.StringValueFromString(db.YdbConnectionParams.DatabaseName),
)

d.AddValueParam(
"$endpoint",
table_types.StringValueFromString(db.YdbConnectionParams.Endpoint),
)

d.AddValueParam(
"$backup_id",
table_types.StringValueFromString(db.BackupID),
)

d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))

} else {
xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType())))
}
Expand Down
14 changes: 7 additions & 7 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
func TestQueryBuilder_CreateCreate(t *testing.T) {
const (
queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message, size, initiated, created_at) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0, $size_0, $initiated_0, $created_at_0);
UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1)`
UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1)`
)
opId := types.GenerateObjectID()
backupId := types.GenerateObjectID()
Expand Down Expand Up @@ -119,6 +119,7 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d
table.ValueParam(
"$status_1", table_types.StringValueFromString(string(tbOp.State)),
),
table.ValueParam("$message_1", table_types.StringValueFromString("msg op")),
table.ValueParam(
"$initiated_1",
table_types.StringValueFromString("author"),
Expand Down Expand Up @@ -146,7 +147,6 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d
"$operation_id_1",
table_types.StringValueFromString(tbOp.YdbOperationId),
),
table.ValueParam("$message_1", table_types.StringValueFromString("msg op")),
)
)
query, err := builder.FormatQuery(context.Background())
Expand All @@ -161,7 +161,7 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d
func TestQueryBuilder_UpdateCreate(t *testing.T) {
const (
queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0;
UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, message, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $message_1, $paths_1, $paths_to_exclude_1)`
UPSERT INTO Operations (id, type, status, message, initiated, created_at, container_id, database, endpoint, backup_id, operation_id, paths, paths_to_exclude) VALUES ($id_1, $type_1, $status_1, $message_1, $initiated_1, $created_at_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $operation_id_1, $paths_1, $paths_to_exclude_1)`
)
ctx := context.Background()
opId := types.GenerateObjectID()
Expand Down Expand Up @@ -199,6 +199,10 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d
table.ValueParam(
"$status_1", table_types.StringValueFromString(string(tbOp.State)),
),
table.ValueParam(
"$message_1",
table_types.StringValueFromString(tbOp.Message),
),
table.ValueParam(
"$initiated_1",
table_types.StringValueFromString(""),
Expand Down Expand Up @@ -226,10 +230,6 @@ UPSERT INTO Operations (id, type, status, initiated, created_at, container_id, d
"$operation_id_1",
table_types.StringValueFromString(tbOp.YdbOperationId),
),
table.ValueParam(
"$message_1",
table_types.StringValueFromString(tbOp.Message),
),
table.ValueParam(
"$paths_1",
table_types.StringValueFromString(strings.Join(tbOp.SourcePaths, ",")),
Expand Down
33 changes: 33 additions & 0 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}

type ClientS3Connector struct {
Expand Down Expand Up @@ -104,3 +105,35 @@ func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, er

return size, nil
}

func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error {
if len(keys) == 0 {
return nil
}

objectsPtr := make([]*s3.ObjectIdentifier, len(keys))
for i, object := range keys {
objectsPtr[i] = &s3.ObjectIdentifier{
Key: aws.String(object),
}
}

input := s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &s3.Delete{
Objects: objectsPtr,
Quiet: aws.Bool(true),
},
}

delOut, err := c.s3.DeleteObjects(&input)
if err != nil {
return err
}

if len(delOut.Errors) > 0 {
return fmt.Errorf("can't delete objects: %v", delOut.Errors)
}

return nil
}
12 changes: 11 additions & 1 deletion internal/connectors/s3/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *MockS3Connector) ListObjects(pathPrefix string, _ string) ([]string, er
return nil, fmt.Errorf("objects not found, path: %s", pathPrefix)
}

func (m *MockS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) {
func (m *MockS3Connector) GetSize(pathPrefix string, _ string) (int64, error) {
if content, ok := m.storage[pathPrefix]; ok {
var size int64
for _, object := range content {
Expand All @@ -44,3 +44,13 @@ func (m *MockS3Connector) GetSize(pathPrefix string, bucket string) (int64, erro

return 0, fmt.Errorf("objects not found, path: %s", pathPrefix)
}

func (m *MockS3Connector) DeleteObjects(objects []string, _ string) error {
for _, object := range objects {
if _, ok := m.storage[object]; ok {
delete(m.storage, object)
}
}

return nil
}
136 changes: 136 additions & 0 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package handlers

import (
"context"
"fmt"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
)

func NewDBOperationHandler(
db db.DBConnector,
s3 s3.S3Connector,
config config.Config,
getQueryBuilder func(ctx context.Context) queries.WriteTableQuery,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, getQueryBuilder)
}
}

func DBOperationHandler(
ctx context.Context,
operation types.Operation,
db db.DBConnector,
s3 s3.S3Connector,
config config.Config,
getQueryBuilder func(ctx context.Context) queries.WriteTableQuery,
) error {
xlog.Info(
ctx, "received operation",
zap.String("id", operation.GetID()),
zap.String("type", string(operation.GetType())),
zap.String("state", string(operation.GetState())),
zap.String("message", operation.GetMessage()),
)

if operation.GetType() != types.OperationTypeDB {
return fmt.Errorf(
"wrong type %s != %s for operation %s",
operation.GetType(), types.OperationTypeDB, types.OperationToString(operation),
)
}

dbOp, ok := operation.(*types.DeleteBackupOperation)
if !ok {
return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation))
}

if deadlineExceeded(dbOp.Audit.CreatedAt, config) {
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
}

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.StringValueFromString(dbOp.BackupID)},
},
),
),
)

if err != nil {
return fmt.Errorf("can't select backups: %v", err)
}

if len(backups) == 0 {
return fmt.Errorf("backup not found")
}

backupToWrite := types.Backup{
ID: dbOp.BackupID,
Status: types.BackupStateUnknown,
}

deleteBackup := func(pathPrefix string, bucket string) error {
objects, err := s3.ListObjects(pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to list S3 objects: %v", err)
}

if len(objects) != 0 {
err = s3.DeleteObjects(objects, bucket)
if err != nil {
return fmt.Errorf("failed to delete S3 objects: %v", err)
}
}

backupToWrite.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
operation.GetAudit().CompletedAt = timestamppb.Now()
return nil
}

switch dbOp.State {
case types.OperationStatePending:
{
operation.SetState(types.OperationStateRunning)
err := db.UpdateOperation(ctx, operation)
if err != nil {
return fmt.Errorf("can't update operation: %v", err)
}

err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket)
if err != nil {
return err
}
}
case types.OperationStateRunning:
{
err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket)
if err != nil {
return err
}
}
default:
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return db.ExecuteUpsert(
ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
}
Loading

0 comments on commit 993f24f

Please sign in to comment.