Skip to content

Commit

Permalink
set audit for TBWR, improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Pleshakov committed Nov 25, 2024
1 parent 055eadc commit 7e28fdb
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
13 changes: 13 additions & 0 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ func main() {
if !done {
log.Panicln("failed to complete a backup in 30 seconds")
}
time.Sleep(time.Second * 11) // to wait for operation handler
tbwr, err = opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: op.Id,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if tbwr.Status.String() != string(types.OperationStateDone) {
log.Panicf("unexpected operation state: %v", tbwr.Status.String())
}
if tbwr.UpdatedAt == nil || tbwr.UpdatedAt.AsTime() != tbwr.Audit.CompletedAt.AsTime() {
log.Panicf("unexpected operation updatedAt/completedAt: %v, %v", tbwr.UpdatedAt, tbwr.Audit.CompletedAt)
}
restoreOperation, err := client.MakeRestore(
context.Background(), &pb.MakeRestoreRequest{
ContainerId: containerID,
Expand Down
42 changes: 33 additions & 9 deletions internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/jonboulle/clockwork"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"math"
"strings"
"time"
"ydbcp/internal/backup_operations"
"ydbcp/internal/config"
Expand Down Expand Up @@ -184,6 +186,9 @@ func TBWROperationHandler(
if err != nil {
tbwr.State = types.OperationStateError
tbwr.Message = err.Error()
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())

errup := db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
if errup != nil {
return errup
Expand All @@ -202,21 +207,34 @@ func TBWROperationHandler(
{
tbwr.State = types.OperationStateDone
tbwr.Message = "Success"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
}
case Skip:
return nil
case Error:
{
operationIDs := strings.Join(func() []string {
var ids []string
for _, item := range ops {
ids = append(ids, item.GetID())
}
return ids
}(), ", ")
tbwr.State = types.OperationStateError
tbwr.Message = "retry attempts exhausted"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())

tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", len(ops))
fields := []zap.Field{
zap.Int("RetriesCount", len(ops)),
}
if len(ops) > 0 {
fields = append(fields, zap.String("TBOperationID", ops[len(ops)-1].GetID()))
tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs)
fields = append(fields, zap.String("OperationIDs", operationIDs))
}
xlog.Error(ctx, "retry attempts exhausted for TBWR operation", fields...)
xlog.Error(ctx, "retry attempts exceeded limit for TBWR operation", fields...)
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
}
case RunNewTb:
Expand All @@ -240,6 +258,9 @@ func TBWROperationHandler(
default:
tbwr.State = types.OperationStateError
tbwr.Message = "unexpected operation state"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())

_ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
return errors.New(tbwr.Message)
}
Expand All @@ -250,21 +271,22 @@ func TBWROperationHandler(
//if cancelled, set cancelled to itself
{
xlog.Info(ctx, "cancelling TBWR operation")
if len(tbOps) == 0 {
tbwr.State = types.OperationStateCancelled
tbwr.Message = "Success"
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
var last *types.TakeBackupOperation
if len(tbOps) > 0 {
last = tbOps[len(tbOps)-1]
}
last := tbOps[len(tbOps)-1]
if !types.IsActive(last) {
if last == nil || !types.IsActive(last) {
tbwr.State = types.OperationStateCancelled
tbwr.Message = "Success"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
} else {
if last.State == types.OperationStatePending || last.State == types.OperationStateRunning {
xlog.Info(ctx, "cancelling TB operation", zap.String("TBOperationID", last.ID))
last.State = types.OperationStateStartCancelling
last.Message = "Cancelling by parent operation"
last.UpdatedAt = timestamppb.New(clock.Now())
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(last))
}
}
Expand All @@ -273,6 +295,8 @@ func TBWROperationHandler(
{
tbwr.State = types.OperationStateError
tbwr.Message = "unexpected operation state"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
_ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
return errors.New(tbwr.Message)
}
Expand Down
10 changes: 9 additions & 1 deletion internal/handlers/take_backup_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"fmt"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -203,6 +204,7 @@ func TestTBWRHandlerSuccess(t *testing.T) {
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) {
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down Expand Up @@ -327,6 +330,7 @@ func TestTBWRHandlerSkipError(t *testing.T) {
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
Audit: &pb.AuditInfo{},
},
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 3}},
}
Expand Down Expand Up @@ -391,6 +395,7 @@ func TestTBWRHandlerError(t *testing.T) {
State: types.OperationStateRunning,
Message: "",
YdbConnectionParams: types.YdbConnectionParams{},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down Expand Up @@ -431,7 +436,7 @@ func TestTBWRHandlerError(t *testing.T) {
assert.Empty(t, err)
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStateError, op.GetState())
assert.Equal(t, "retry attempts exhausted", op.GetMessage())
assert.Equal(t, fmt.Sprintf("retry attempts exceeded limit: 1. Launched operations %s", ops[0].GetID()), op.GetMessage())
}

func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
Expand All @@ -448,6 +453,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
Endpoint: "i.valid.com",
DatabaseName: "/mydb",
},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down Expand Up @@ -514,6 +520,7 @@ func TestTBWRHandlerStartCancel(t *testing.T) {
Endpoint: "i.valid.com",
DatabaseName: "/mydb",
},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down Expand Up @@ -589,6 +596,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) {
Endpoint: "i.valid.com",
DatabaseName: "/mydb",
},
Audit: &pb.AuditInfo{},
},
RetryConfig: nil,
}
Expand Down

0 comments on commit 7e28fdb

Please sign in to comment.