diff --git a/cmd/integration/make_backup/main.go b/cmd/integration/make_backup/main.go index 0b2121a5..1cab2210 100644 --- a/cmd/integration/make_backup/main.go +++ b/cmd/integration/make_backup/main.go @@ -31,6 +31,7 @@ func main() { } }(conn) client := pb.NewBackupServiceClient(conn) + opClient := pb.NewOperationServiceClient(conn) backups, err := client.ListBackups( context.Background(), &pb.ListBackupsRequest{ ContainerId: containerID, @@ -61,7 +62,7 @@ func main() { log.Panicf("unexpected error code: %v", err) } - backupOperation, err := client.MakeBackup( + tbwr, err := client.MakeBackup( context.Background(), &pb.MakeBackupRequest{ ContainerId: containerID, DatabaseName: databaseName, @@ -73,6 +74,16 @@ func main() { if err != nil { log.Panicf("failed to make backup: %v", err) } + op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{ + Id: tbwr.Id, + }) + if err != nil { + log.Panicf("failed to get operation: %v", err) + } + if op.GetType() != types.OperationTypeTBWR.String() { + log.Panicf("unexpected operation type: %v", op.GetType()) + } + time.Sleep(time.Second * 11) // to wait for operation handler backups, err = client.ListBackups( context.Background(), &pb.ListBackupsRequest{ ContainerId: containerID, @@ -86,6 +97,18 @@ func main() { log.Panicf("Did not list freshly made backup") } backupPb := backups.Backups[0] + ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{ + ContainerId: containerID, + DatabaseNameMask: databaseName, + OperationTypes: []string{types.OperationTypeTB.String()}, + }) + if err != nil { + log.Panicf("failed to list operations: %v", err) + } + if len(ops.Operations) != 1 { + log.Panicf("expected one TB operation, got %d", len(ops.Operations)) + } + backupOperation := ops.Operations[0] if backupPb.Id != backupOperation.BackupId { log.Panicf( "backupOperation backupID %s does not match listed backup id %s", backupOperation.BackupId, backupPb.Id, @@ -121,7 +144,6 @@ func main() { if err != nil { log.Panicf("failed to make restore: %v", err) } - opClient := pb.NewOperationServiceClient(conn) done = false for range 30 { op, err := opClient.GetOperation( diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index db80af67..dee03e04 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -149,11 +149,25 @@ func main() { os.Exit(1) } + if err := handlersRegistry.Add( + types.OperationTypeTBWR, + handlers.NewTBWROperationHandler( + dbConnector, + clientConnector, + configInstance.S3, + configInstance.ClientConnection, + queries.NewWriteTableQuery, + clockwork.NewRealClock()), + ); err != nil { + xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err)) + os.Exit(1) + } + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry, metrics) ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery) backupScheduleHandler := handlers.NewBackupScheduleHandler( - clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(), + queries.NewWriteTableQuery, clockwork.NewRealClock(), ) schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler) xlog.Info(ctx, "YDBCP started") diff --git a/internal/backup_operations/make_backup.go b/internal/backup_operations/make_backup.go index f18bb50b..ad2efa76 100644 --- a/internal/backup_operations/make_backup.go +++ b/internal/backup_operations/make_backup.go @@ -3,6 +3,7 @@ package backup_operations import ( "context" "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3" "path" "regexp" "strings" @@ -32,6 +33,7 @@ type MakeBackupInternalRequest struct { SourcePathsToExclude []string ScheduleID *string Ttl *time.Duration + ParentOperationID *string } func FromGRPCRequest(request *pb.MakeBackupRequest, scheduleID *string) MakeBackupInternalRequest { @@ -58,6 +60,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter SourcePathsToExclude: tbwr.SourcePathsToExclude, ScheduleID: tbwr.ScheduleID, Ttl: tbwr.Ttl, + ParentOperationID: &tbwr.ID, } } @@ -93,6 +96,50 @@ func IsAllowedEndpoint(e string, allowedEndpointDomains []string, allowInsecureE return false } +func OpenConnAndValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector) ([]string, error) { + clientConnectionParams := types.YdbConnectionParams{ + Endpoint: req.DatabaseEndpoint, + DatabaseName: req.DatabaseName, + } + dsn := types.MakeYdbConnectionString(clientConnectionParams) + ctx = xlog.With(ctx, zap.String("ClientDSN", dsn)) + client, err := clientConn.Open(ctx, dsn) + if err != nil { + xlog.Error(ctx, "can't open client connection", zap.Error(err)) + return nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn) + } + defer func() { + if err := clientConn.Close(ctx, client); err != nil { + xlog.Error(ctx, "can't close client connection", zap.Error(err)) + } + }() + return ValidateSourcePaths(ctx, req, clientConn, client, dsn) +} + +func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, clientConn client.ClientConnector, client *ydb.Driver, dsn string) ([]string, error) { + sourcePaths := make([]string, 0, len(req.SourcePaths)) + for _, p := range req.SourcePaths { + fullPath, ok := SafePathJoin(req.DatabaseName, p) + if !ok { + xlog.Error(ctx, "incorrect source path", zap.String("path", p)) + return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p) + } + sourcePaths = append(sourcePaths, fullPath) + } + + pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude) + if err != nil { + xlog.Error(ctx, "error preparing paths for export", zap.Error(err)) + return nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn) + } + + if len(pathsForExport) == 0 { + xlog.Error(ctx, "empty list of paths for export") + return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export") + } + return pathsForExport, nil +} + func MakeBackup( ctx context.Context, clientConn client.ClientConnector, @@ -153,25 +200,10 @@ func MakeBackup( ) ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix)) - sourcePaths := make([]string, 0, len(req.SourcePaths)) - for _, p := range req.SourcePaths { - fullPath, ok := SafePathJoin(req.DatabaseName, p) - if !ok { - xlog.Error(ctx, "incorrect source path", zap.String("path", p)) - return nil, nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p) - } - sourcePaths = append(sourcePaths, fullPath) - } + pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn) - pathsForExport, err := clientConn.PreparePathsForExport(ctx, client, sourcePaths, req.SourcePathsToExclude) if err != nil { - xlog.Error(ctx, "error preparing paths for export", zap.Error(err)) - return nil, nil, status.Errorf(codes.Unknown, "error preparing paths for export, dsn %s", dsn) - } - - if len(pathsForExport) == 0 { - xlog.Error(ctx, "empty list of paths for export") - return nil, nil, status.Error(codes.FailedPrecondition, "empty list of paths for export") + return nil, nil, err } s3Settings := types.ExportSettings{ @@ -236,8 +268,9 @@ func MakeBackup( CreatedAt: now, Creator: subject, }, - YdbOperationId: clientOperationID, - UpdatedAt: now, + YdbOperationId: clientOperationID, + UpdatedAt: now, + ParentOperationID: req.ParentOperationID, } return backup, op, nil diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 8cdf7945..8d42b699 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -246,8 +246,14 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries. defer c.guard.Unlock() queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) - c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation - c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup - c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule + if queryBuilderMock.Operation != nil { + c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation + } + if queryBuilderMock.Backup != nil { + c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup + } + if queryBuilderMock.BackupSchedule != nil { + c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = *queryBuilderMock.BackupSchedule + } return nil } diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go index ae1eff79..14f0c3c5 100644 --- a/internal/connectors/db/yql/queries/write_mock.go +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -7,9 +7,9 @@ import ( ) type WriteTableQueryMock struct { - Operation types.Operation - Backup types.Backup - BackupSchedule types.BackupSchedule + Operation *types.Operation + Backup *types.Backup + BackupSchedule *types.BackupSchedule } type WriteTableQueryMockOption func(*WriteTableQueryMock) @@ -23,31 +23,31 @@ func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult } func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery { - w.Backup = backup + w.Backup = &backup return w } func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery { - w.Operation = operation + w.Operation = &operation return w } func (w *WriteTableQueryMock) WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery { - w.BackupSchedule = schedule + w.BackupSchedule = &schedule return w } func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery { - w.Backup = backup + w.Backup = &backup return w } func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery { - w.Operation = operation + w.Operation = &operation return w } func (w *WriteTableQueryMock) WithUpdateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery { - w.BackupSchedule = schedule + w.BackupSchedule = &schedule return w } diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index 799dadeb..a692b51e 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -5,9 +5,8 @@ import ( "errors" "github.com/jonboulle/clockwork" "go.uber.org/zap" - "ydbcp/internal/backup_operations" - "ydbcp/internal/config" - "ydbcp/internal/connectors/client" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -18,15 +17,12 @@ import ( type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error func NewBackupScheduleHandler( - clientConn client.ClientConnector, - s3 config.S3Config, - clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBulderFactory, clock clockwork.Clock, ) BackupScheduleHandlerType { return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error { return BackupScheduleHandler( - ctx, driver, schedule, clientConn, s3, clientConfig, + ctx, driver, schedule, queryBuilderFactory, clock, ) } @@ -36,9 +32,6 @@ func BackupScheduleHandler( ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule, - clientConn client.ClientConnector, - s3 config.S3Config, - clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBulderFactory, clock clockwork.Clock, ) error { @@ -48,36 +41,53 @@ func BackupScheduleHandler( } // do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron. if schedule.NextLaunch != nil && schedule.NextLaunch.Before(clock.Now()) { - - backupRequest := &pb.MakeBackupRequest{ - ContainerId: schedule.ContainerID, - DatabaseName: schedule.DatabaseName, - DatabaseEndpoint: schedule.DatabaseEndpoint, - SourcePaths: schedule.SourcePaths, - SourcePathsToExclude: schedule.SourcePathsToExclude, + backoff, err := schedule.GetCronDuration() + if err != nil { + return err + } + now := timestamppb.New(clock.Now()) + schedule.ScheduleSettings.Ttl.AsDuration() + tbwr := &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + ContainerID: schedule.ContainerID, + State: types.OperationStateRunning, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: schedule.DatabaseEndpoint, + DatabaseName: schedule.DatabaseName, + }, + SourcePaths: schedule.SourcePaths, + SourcePathsToExclude: schedule.SourcePathsToExclude, + Audit: &pb.AuditInfo{ + Creator: types.OperationCreatorName, + CreatedAt: now, + }, + UpdatedAt: now, + }, + ScheduleID: &schedule.ID, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(backoff)}, + }, } if schedule.ScheduleSettings != nil { - backupRequest.Ttl = schedule.ScheduleSettings.Ttl + if schedule.ScheduleSettings.Ttl != nil { + d := schedule.ScheduleSettings.Ttl.AsDuration() + tbwr.Ttl = &d + } } + xlog.Info( - ctx, "call MakeBackup for schedule", zap.String("scheduleID", schedule.ID), - zap.String("backupRequest", backupRequest.String()), + ctx, "create TakeBackupWithRetryOperation for schedule", zap.String("scheduleID", schedule.ID), + zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()), ) - b, op, err := backup_operations.MakeBackup( - ctx, clientConn, s3, clientConfig.AllowedEndpointDomains, clientConfig.AllowInsecureEndpoint, - backup_operations.FromGRPCRequest(backupRequest, &schedule.ID), types.OperationCreatorName, clock, - ) - if err != nil { - return err - } err = schedule.UpdateNextLaunch(clock.Now()) if err != nil { return err } return driver.ExecuteUpsert( ctx, - queryBuilderFactory().WithCreateBackup(*b).WithCreateOperation(op).WithUpdateBackupSchedule(schedule), + queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(schedule), ) } return nil diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index 8a2b7c06..33dae79c 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -4,11 +4,8 @@ import ( "context" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "testing" "time" - "ydbcp/internal/config" - "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -34,7 +31,6 @@ func TestBackupScheduleHandler(t *testing.T) { } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - ydbOpMap := make(map[string]*Ydb_Operations.Operation) scheduleMap := make(map[string]types.BackupSchedule) scheduleMap[schedule.ID] = schedule dbConnector := db.NewMockDBConnector( @@ -42,39 +38,26 @@ func TestBackupScheduleHandler(t *testing.T) { db.WithOperations(opMap), db.WithBackupSchedules(scheduleMap), ) - clientConnector := client.NewMockClientConnector( - client.WithOperations(ydbOpMap), - ) handler := NewBackupScheduleHandler( - clientConnector, - config.S3Config{ - S3ForcePathStyle: false, - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, ) err := handler(ctx, dbConnector, schedule) assert.Empty(t, err) - // check operation status (should be pending) + // check operation status (should be running) ops, err := dbConnector.SelectOperations(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) assert.NotEmpty(t, ops) assert.Equal(t, len(ops), 1) + assert.Equal(t, types.OperationTypeTBWR, ops[0].GetType()) assert.Equal(t, types.OperationStateRunning, ops[0].GetState()) - // check backup status (should be running) + // check backup status (should be empty) backups, err := dbConnector.SelectBackups(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) - assert.NotEmpty(t, backups) - assert.Equal(t, len(backups), 1) - assert.Equal(t, types.BackupStateRunning, backups[0].Status) + assert.Empty(t, backups) // check schedule next launch schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{}) diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index 8d5d4258..1ee6a5df 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -36,12 +36,32 @@ const ( INTERNAL_MAX_RETRIES = 20 MIN_BACKOFF = time.Minute BACKOFF_EXP = 1.5 - Error = iota - RunNewTb = iota - Skip = iota - Success = iota ) +type RetryDecision int + +const ( + Error RetryDecision = iota + RunNewTb RetryDecision = iota + Skip RetryDecision = iota + Success RetryDecision = iota +) + +func (r RetryDecision) String() string { + switch r { + case Error: + return "Error" + case RunNewTb: + return "RunNewTb" + case Skip: + return "Skip" + case Success: + return "Success" + default: + return "Unknown" + } +} + func exp(p int) time.Duration { return time.Duration(math.Pow(BACKOFF_EXP, float64(p))) } @@ -78,7 +98,7 @@ func shouldRetry(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clo return &res } -func HandleTbOps(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) (int, error) { +func HandleTbOps(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) (RetryDecision, error) { //select last tbOp. //if nothing, run new, skip //if there is a tbOp, check its status @@ -137,7 +157,7 @@ func TBWROperationHandler( ops, err := db.SelectOperations(ctx, queries.NewReadTableQuery( queries.WithTableName("Operations"), - queries.WithIndex("idx_pc"), + queries.WithIndex("idx_p"), queries.WithQueryFilters(queries.QueryFilter{ Field: "parent_operation_id", Values: []table_types.Value{table_types.StringValueFromString(tbwr.ID)}, @@ -167,6 +187,12 @@ func TBWROperationHandler( } return err } + xlog.Info( + ctx, + "TBWROperationHandler", + zap.String("OperationID", operation.GetID()), + zap.String("decision", do.String()), + ) switch do { case Success: { diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 76c402e5..fbc06302 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -4,7 +4,6 @@ import ( "context" "github.com/jonboulle/clockwork" "strconv" - "ydbcp/internal/auth" "ydbcp/internal/backup_operations" "ydbcp/internal/config" @@ -87,23 +86,45 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) + now := timestamppb.Now() - backup, op, err := backup_operations.MakeBackup( - ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, backup_operations.FromGRPCRequest(req, nil), subject, s.clock, - ) + tbwr := &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: types.GenerateObjectID(), + ContainerID: req.ContainerId, + State: types.OperationStateRunning, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: req.DatabaseEndpoint, + DatabaseName: req.DatabaseName, + }, + SourcePaths: req.SourcePaths, + SourcePathsToExclude: req.SourcePathsToExclude, + Audit: &pb.AuditInfo{ + Creator: subject, + CreatedAt: now, + }, + UpdatedAt: now, + }, + RetryConfig: nil, + } + if d := req.Ttl.AsDuration(); req.Ttl != nil { + tbwr.Ttl = &d + } + _, err = backup_operations.OpenConnAndValidateSourcePaths(ctx, backup_operations.FromTBWROperation(tbwr), s.clientConn) if err != nil { return nil, err } + err = s.driver.ExecuteUpsert( - ctx, queries.NewWriteTableQuery().WithCreateBackup(*backup).WithCreateOperation(op), + ctx, queries.NewWriteTableQuery().WithCreateOperation(tbwr), ) if err != nil { return nil, err } - ctx = xlog.With(ctx, zap.String("OperationID", op.GetID())) - xlog.Debug(ctx, "MakeBackup was started successfully", zap.String("operation", types.OperationToString(op))) - return op.Proto(), nil + ctx = xlog.With(ctx, zap.String("OperationID", tbwr.ID)) + xlog.Debug(ctx, "MakeBackup was started successfully", zap.String("operation", types.OperationToString(tbwr))) + return tbwr.Proto(), nil } func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) { diff --git a/internal/server/services/operation/operation_service.go b/internal/server/services/operation/operation_service.go index 895caaab..59a07b9b 100644 --- a/internal/server/services/operation/operation_service.go +++ b/internal/server/services/operation/operation_service.go @@ -62,6 +62,18 @@ func (s *OperationService) ListOperations( }, ) } + if len(request.OperationTypes) > 0 { + typeValues := make([]table_types.Value, len(request.OperationTypes)) + for i, opType := range request.OperationTypes { + typeValues[i] = table_types.StringValueFromString(opType) + } + queryFilters = append( + queryFilters, queries.QueryFilter{ + Field: "type", + Values: typeValues, + }, + ) + } pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) if err != nil { diff --git a/internal/types/backup_schedule.go b/internal/types/backup_schedule.go index e89c4cf8..37da86d2 100644 --- a/internal/types/backup_schedule.go +++ b/internal/types/backup_schedule.go @@ -106,3 +106,12 @@ func (b *BackupSchedule) UpdateNextLaunch(now time.Time) error { b.NextLaunch = &nextTime return nil } + +func (b *BackupSchedule) GetCronDuration() (time.Duration, error) { + expr, err := ParseCronExpr(b.ScheduleSettings.SchedulePattern.Crontab) + if err != nil { + return time.Duration(0), err + } + nextTime := expr.NextN(time.Now(), 2) + return nextTime[1].Sub(nextTime[0]), nil +} diff --git a/internal/types/operation.go b/internal/types/operation.go index 36a4613f..8e24a223 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -311,7 +311,7 @@ func (o *TakeBackupWithRetryOperation) Proto() *pb.Operation { return &pb.Operation{ Id: o.ID, ContainerId: o.ContainerID, - Type: string(OperationTypeTB), + Type: string(OperationTypeTBWR), DatabaseName: o.YdbConnectionParams.DatabaseName, DatabaseEndpoint: o.YdbConnectionParams.Endpoint, YdbServerOperationId: o.YdbOperationId, diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index 00ccc96e..c589d728 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -4,12 +4,9 @@ import ( "context" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" "sync" "testing" "time" - "ydbcp/internal/config" - "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/handlers" @@ -54,7 +51,6 @@ func TestScheduleWatcherSimple(t *testing.T) { } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - ydbOpMap := make(map[string]*Ydb_Operations.Operation) scheduleMap := make(map[string]types.BackupSchedule) scheduleMap[schedule.ID] = schedule dbConnector := db.NewMockDBConnector( @@ -62,20 +58,8 @@ func TestScheduleWatcherSimple(t *testing.T) { db.WithOperations(opMap), db.WithBackupSchedules(scheduleMap), ) - clientConnector := client.NewMockClientConnector( - client.WithOperations(ydbOpMap), - ) handler := handlers.NewBackupScheduleHandler( - clientConnector, - config.S3Config{ - S3ForcePathStyle: false, - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, ) @@ -117,13 +101,10 @@ func TestScheduleWatcherSimple(t *testing.T) { assert.Equal(t, len(ops), 1) assert.Equal(t, types.OperationStateRunning, ops[0].GetState()) - // check backup status (should be running) + // check backup status (should be empty) backups, err := dbConnector.SelectBackups(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) - assert.NotEmpty(t, backups) - assert.Equal(t, len(backups), 1) - assert.Equal(t, types.BackupStateRunning, backups[0].Status) - assert.Equal(t, schedule.ID, *backups[0].ScheduleID) + assert.Empty(t, backups) // check schedule next launch schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{}) @@ -177,7 +158,6 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - ydbOpMap := make(map[string]*Ydb_Operations.Operation) scheduleMap := make(map[string]types.BackupSchedule) scheduleMap[s1.ID] = s1 scheduleMap[s2.ID] = s2 @@ -186,20 +166,8 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { db.WithOperations(opMap), db.WithBackupSchedules(scheduleMap), ) - clientConnector := client.NewMockClientConnector( - client.WithOperations(ydbOpMap), - ) handler := handlers.NewBackupScheduleHandler( - clientConnector, - config.S3Config{ - S3ForcePathStyle: false, - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, ) @@ -240,14 +208,13 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { assert.NotEmpty(t, ops) assert.Equal(t, len(ops), 1) assert.Equal(t, types.OperationStateRunning, ops[0].GetState()) + assert.Equal(t, types.OperationTypeTBWR, ops[0].GetType()) + assert.Equal(t, s1.ID, *ops[0].(*types.TakeBackupWithRetryOperation).ScheduleID) - // check backup status (should be running) + // check backup status (should be empty) backups, err := dbConnector.SelectBackups(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) - assert.NotEmpty(t, backups) - assert.Equal(t, len(backups), 1) - assert.Equal(t, types.BackupStateRunning, backups[0].Status) - assert.Equal(t, s1.ID, *backups[0].ScheduleID) + assert.Empty(t, backups) m := map[string]time.Time{ "1": now.Add(time.Minute), @@ -307,7 +274,6 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - ydbOpMap := make(map[string]*Ydb_Operations.Operation) scheduleMap := make(map[string]types.BackupSchedule) scheduleMap[s1.ID] = s1 scheduleMap[s2.ID] = s2 @@ -316,20 +282,8 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { db.WithOperations(opMap), db.WithBackupSchedules(scheduleMap), ) - clientConnector := client.NewMockClientConnector( - client.WithOperations(ydbOpMap), - ) handler := handlers.NewBackupScheduleHandler( - clientConnector, - config.S3Config{ - S3ForcePathStyle: false, - IsMock: true, - }, - config.ClientConnectionConfig{ - AllowedEndpointDomains: []string{".valid.com"}, - AllowInsecureEndpoint: true, - }, queries.NewWriteTableQueryMock, clock, ) @@ -364,6 +318,11 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { wg.Wait() + m := map[string]time.Time{ + "1": now.Add(time.Minute * 61), + "2": now.Add(time.Hour * 2), + } + // check operation status (should be pending) ops, err := dbConnector.SelectOperations(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) @@ -371,21 +330,15 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { assert.Equal(t, len(ops), 2) for _, op := range ops { assert.Equal(t, types.OperationStateRunning, op.GetState()) - assert.Equal(t, types.OperationTypeTB, op.GetType()) + assert.Equal(t, types.OperationTypeTBWR, op.GetType()) + _, ok := m[*op.(*types.TakeBackupWithRetryOperation).ScheduleID] + assert.True(t, ok) } - // check backup status (should be running) + // check backup status (should be none) backups, err := dbConnector.SelectBackups(ctx, &queries.ReadTableQueryImpl{}) assert.Empty(t, err) - assert.NotEmpty(t, backups) - assert.Equal(t, len(backups), 2) - assert.Equal(t, types.BackupStateRunning, backups[0].Status) - assert.Equal(t, types.BackupStateRunning, backups[1].Status) - - m := map[string]time.Time{ - "1": now.Add(time.Minute * 61), - "2": now.Add(time.Hour * 2), - } + assert.Empty(t, backups) // check schedule next launch schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{})