Skip to content

Commit

Permalink
Enable TBWR operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov committed Nov 7, 2024
1 parent c9e554e commit d689e30
Show file tree
Hide file tree
Showing 13 changed files with 252 additions and 161 deletions.
26 changes: 24 additions & 2 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
}
}(conn)
client := pb.NewBackupServiceClient(conn)
opClient := pb.NewOperationServiceClient(conn)
backups, err := client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
log.Panicf("unexpected error code: %v", err)
}

backupOperation, err := client.MakeBackup(
tbwr, err := client.MakeBackup(
context.Background(), &pb.MakeBackupRequest{
ContainerId: containerID,
DatabaseName: databaseName,
Expand All @@ -73,6 +74,16 @@ func main() {
if err != nil {
log.Panicf("failed to make backup: %v", err)
}
op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: tbwr.Id,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 11) // to wait for operation handler
backups, err = client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand All @@ -86,6 +97,18 @@ func main() {
log.Panicf("Did not list freshly made backup")
}
backupPb := backups.Backups[0]
ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if len(ops.Operations) != 1 {
log.Panicf("expected one TB operation, got %d", len(ops.Operations))
}
backupOperation := ops.Operations[0]
if backupPb.Id != backupOperation.BackupId {
log.Panicf(
"backupOperation backupID %s does not match listed backup id %s", backupOperation.BackupId, backupPb.Id,
Expand Down Expand Up @@ -121,7 +144,6 @@ func main() {
if err != nil {
log.Panicf("failed to make restore: %v", err)
}
opClient := pb.NewOperationServiceClient(conn)
done = false
for range 30 {
op, err := opClient.GetOperation(
Expand Down
16 changes: 15 additions & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,25 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeTBWR,
handlers.NewTBWROperationHandler(
dbConnector,
clientConnector,
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry, metrics)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(),
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
71 changes: 52 additions & 19 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backup_operations
import (
"context"
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-sdk/v3"
"path"
"regexp"
"strings"
Expand Down Expand Up @@ -32,6 +33,7 @@ type MakeBackupInternalRequest struct {
SourcePathsToExclude []string
ScheduleID *string
Ttl *time.Duration
ParentOperationID *string
}

func FromGRPCRequest(request *pb.MakeBackupRequest, scheduleID *string) MakeBackupInternalRequest {
Expand All @@ -58,6 +60,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
SourcePathsToExclude: tbwr.SourcePathsToExclude,
ScheduleID: tbwr.ScheduleID,
Ttl: tbwr.Ttl,
ParentOperationID: &tbwr.ID,
}
}

Expand Down Expand Up @@ -93,6 +96,50 @@ func IsAllowedEndpoint(e string, allowedEndpointDomains []string, allowInsecureE
return false
}

func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector) ([]string, error) {
clientConnectionParams := types.YdbConnectionParams{
Endpoint: req.DatabaseEndpoint,
DatabaseName: req.DatabaseName,
}
dsn := types.MakeYdbConnectionString(clientConnectionParams)
ctx = xlog.With(ctx, zap.String("ClientDSN", dsn))
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()
return ValidateSourcePaths(ctx, req, clientConn, client, dsn)
}

func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector, client *ydb.Driver, dsn string) ([]string, error) {
sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
}
return pathsForExport, nil
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand Down Expand Up @@ -153,25 +200,10 @@ func MakeBackup(
)
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))

sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
}
sourcePaths = append(sourcePaths, fullPath)
}
pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn)

pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude)
if err != nil {
xlog.Error(ctx, "error preparing paths for export", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn)
}

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
return nil, nil, err
}

s3Settings := types.ExportSettings{
Expand Down Expand Up @@ -236,8 +268,9 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
YdbOperationId: clientOperationID,
UpdatedAt: now,
YdbOperationId: clientOperationID,
UpdatedAt: now,
ParentOperationID: req.ParentOperationID,
}

return backup, op, nil
Expand Down
12 changes: 9 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,14 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
defer c.guard.Unlock()

queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
}
if queryBuilderMock.BackupSchedule != nil {
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = *queryBuilderMock.BackupSchedule
}
return nil
}
18 changes: 9 additions & 9 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

type WriteTableQueryMock struct {
Operation types.Operation
Backup types.Backup
BackupSchedule types.BackupSchedule
Operation *types.Operation
Backup *types.Backup
BackupSchedule *types.BackupSchedule
}

type WriteTableQueryMockOption func(*WriteTableQueryMock)
Expand All @@ -23,31 +23,31 @@ func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult
}

func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}

func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
w.Backup = &backup
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
w.Operation = &operation
return w
}

func (w *WriteTableQueryMock) WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery {
w.BackupSchedule = schedule
w.BackupSchedule = &schedule
return w
}
66 changes: 38 additions & 28 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"errors"
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
"ydbcp/internal/backup_operations"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
Expand All @@ -18,15 +17,12 @@ import (
type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error

func NewBackupScheduleHandler(
clientConn client.ClientConnector,
s3 config.S3Config,
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule, clientConn, s3, clientConfig,
ctx, driver, schedule,
queryBuilderFactory, clock,
)
}
Expand All @@ -36,9 +32,6 @@ func BackupScheduleHandler(
ctx context.Context,
driver db.DBConnector,
schedule types.BackupSchedule,
clientConn client.ClientConnector,
s3 config.S3Config,
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
) error {
Expand All @@ -48,36 +41,53 @@ func BackupScheduleHandler(
}
// do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron.
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(clock.Now()) {

backupRequest := &pb.MakeBackupRequest{
ContainerId: schedule.ContainerID,
DatabaseName: schedule.DatabaseName,
DatabaseEndpoint: schedule.DatabaseEndpoint,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
backoff, err := schedule.GetCronDuration()
if err != nil {
return err
}
now := timestamppb.New(clock.Now())
schedule.ScheduleSettings.Ttl.AsDuration()
tbwr := &types.TakeBackupWithRetryOperation{
TakeBackupOperation: types.TakeBackupOperation{
ID: types.GenerateObjectID(),
ContainerID: schedule.ContainerID,
State: types.OperationStateRunning,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: schedule.DatabaseEndpoint,
DatabaseName: schedule.DatabaseName,
},
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
Audit: &pb.AuditInfo{
Creator: types.OperationCreatorName,
CreatedAt: now,
},
UpdatedAt: now,
},
ScheduleID: &schedule.ID,
RetryConfig: &pb.RetryConfig{
Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(backoff)},
},
}
if schedule.ScheduleSettings != nil {
backupRequest.Ttl = schedule.ScheduleSettings.Ttl
if schedule.ScheduleSettings.Ttl != nil {
d := schedule.ScheduleSettings.Ttl.AsDuration()
tbwr.Ttl = &d
}
}

xlog.Info(
ctx, "call MakeBackup for schedule", zap.String("scheduleID", schedule.ID),
zap.String("backupRequest", backupRequest.String()),
ctx, "create TakeBackupWithRetryOperation for schedule", zap.String("scheduleID", schedule.ID),
zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()),
)

b, op, err := backup_operations.MakeBackup(
ctx, clientConn, s3, clientConfig.AllowedEndpointDomains, clientConfig.AllowInsecureEndpoint,
backup_operations.FromGRPCRequest(backupRequest, &schedule.ID), types.OperationCreatorName, clock,
)
if err != nil {
return err
}
err = schedule.UpdateNextLaunch(clock.Now())
if err != nil {
return err
}
return driver.ExecuteUpsert(
ctx,
queryBuilderFactory().WithCreateBackup(*b).WithCreateOperation(op).WithUpdateBackupSchedule(schedule),
queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(schedule),
)
}
return nil
Expand Down
Loading

0 comments on commit d689e30

Please sign in to comment.