Skip to content

Commit

Permalink
Merge pull request #57 from ydb-platform/feature/NBYDB-303
Browse files Browse the repository at this point in the history
feat(ydbcp): add running state for operations and backups
  • Loading branch information
ulya-sidorina authored Sep 5, 2024
2 parents 4e816ce + 68ead2f commit 75bbe4e
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 87 deletions.
1 change: 1 addition & 0 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.OperationStatePending.String()),
table_types.StringValueFromString(types.OperationStateRunning.String()),
table_types.StringValueFromString(types.OperationStateCancelling.String()),
table_types.StringValueFromString(types.OperationStateStartCancelling.String()),
},
Expand Down
10 changes: 8 additions & 2 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func RBOperationHandler(
opResponse := ydbOpResponse.opResponse

switch mr.State {
case types.OperationStatePending:
case types.OperationStateRunning:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(mr.Audit.CreatedAt, config) {
Expand All @@ -94,7 +94,11 @@ func RBOperationHandler(
operation.SetMessage("Success")
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
operation.SetState(types.OperationStateError)
operation.SetMessage("Pending operation was cancelled")
if opResponse.GetOperation().Issues != nil {
operation.SetMessage(ydbOpResponse.IssueString())
} else {
operation.SetMessage("Running operation was cancelled")
}
} else {
operation.SetState(types.OperationStateError)
operation.SetMessage(ydbOpResponse.IssueString())
Expand Down Expand Up @@ -132,6 +136,8 @@ func RBOperationHandler(
operation.SetMessage(ydbOpResponse.IssueString())
}
}
default:
return fmt.Errorf("unexpected operation state %s", mr.State)
}

xlog.Info(
Expand Down
28 changes: 14 additions & 14 deletions internal/handlers/restore_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: "1",
Expand Down Expand Up @@ -52,7 +52,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) {
assert.Equal(t, "Error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage())
}

func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) {
func TestRBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -66,7 +66,7 @@ func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: ydbOp.Id,
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) {
assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus())
}

func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) {
func TestRBOperationHandlerRunningOperationInProgress(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -115,7 +115,7 @@ func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: ydbOp.Id,
Expand All @@ -141,7 +141,7 @@ func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) {
op, err := dbConnector.GetOperation(ctx, rbOp.ID)
assert.Empty(t, err)
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStatePending, op.GetState())
assert.Equal(t, types.OperationStateRunning, op.GetState())

// check ydb operation status (should be in progress
ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId)
Expand All @@ -150,7 +150,7 @@ func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) {
assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady())
}

func TestRBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
func TestRBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -164,7 +164,7 @@ func TestRBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: ydbOp.Id,
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestRBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus())
}

func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) {
func TestRBOperationHandlerRunningOperationCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -211,7 +211,7 @@ func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: ydbOp.Id,
Expand All @@ -237,7 +237,7 @@ func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) {
assert.Empty(t, err)
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStateError, op.GetState())
assert.Equal(t, "Pending operation was cancelled", op.GetMessage())
assert.Equal(t, "Running operation was cancelled", op.GetMessage())

// check ydb operation status (should be forgotten)
ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId)
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) {

}

func TestRBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) {
func TestRBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -453,7 +453,7 @@ func TestRBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) {
rbOp := types.RestoreBackupOperation{
ID: types.GenerateObjectID(),
BackupId: types.GenerateObjectID(),
State: types.OperationStatePending,
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
YdbOperationId: ydbOp.Id,
Expand All @@ -478,7 +478,7 @@ func TestRBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) {
op, err := dbConnector.GetOperation(ctx, rbOp.ID)
assert.Empty(t, err)
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStatePending, op.GetState())
assert.Equal(t, types.OperationStateRunning, op.GetState())

// check ydb operation status
ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId)
Expand Down
10 changes: 8 additions & 2 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TBOperationHandler(
}

switch tb.State {
case types.OperationStatePending:
case types.OperationStateRunning:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(tb.Audit.CreatedAt, config) {
Expand All @@ -133,7 +133,11 @@ func TBOperationHandler(
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
backupToWrite.Status = types.BackupStateError
operation.SetState(types.OperationStateError)
operation.SetMessage("got CANCELLED status for PENDING operation")
if opResponse.GetOperation().Issues != nil {
operation.SetMessage(ydbOpResponse.IssueString())
} else {
operation.SetMessage("got CANCELLED status for running operation")
}
} else {
backupToWrite.Status = types.BackupStateError
operation.SetState(types.OperationStateError)
Expand Down Expand Up @@ -188,6 +192,8 @@ func TBOperationHandler(
operation.SetMessage(ydbOpResponse.IssueString())
}
}
default:
return fmt.Errorf("unexpected operation state %s", tb.State)
}
response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId)
if err != nil {
Expand Down
Loading

0 comments on commit 75bbe4e

Please sign in to comment.