From 67e2055cf10ec3d98f68861225de70fd9069193e Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 16 Aug 2024 19:23:12 +0300 Subject: [PATCH] add audit fields --- cmd/ydbcp/main.go | 24 +++++++++--- internal/connectors/db/process_result_set.go | 26 ++++++++++--- internal/connectors/db/yql/queries/write.go | 8 +++- internal/handlers/restore_backup.go | 10 +++-- internal/handlers/take_backup.go | 16 ++++++-- internal/handlers/utils.go | 8 ++-- internal/types/backup.go | 41 ++++++++++++-------- 7 files changed, 96 insertions(+), 37 deletions(-) diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 26d20719..14dcac80 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -5,6 +5,8 @@ import ( "errors" "flag" "fmt" + _ "go.uber.org/automaxprocs" + "google.golang.org/protobuf/types/known/timestamppb" "net" "os" "os/signal" @@ -12,10 +14,8 @@ import ( "strings" "sync" "syscall" - "time" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" - _ "go.uber.org/automaxprocs" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -94,7 +94,9 @@ func (s *server) checkAuth(ctx context.Context, permission, containerID, resourc return "", errPermissionDenied } if resp[0].Code != ap.AuthCodeSuccess { - xlog.Error(ctx, "auth plugin response", zap.Int("code", int(resp[0].Code)), zap.String("message", resp[0].Message)) + xlog.Error( + ctx, "auth plugin response", zap.Int("code", int(resp[0].Code)), zap.String("message", resp[0].Message), + ) return "", errPermissionDenied } return subject, nil @@ -187,6 +189,7 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb BackupID: types.GenerateObjectID(), // TODO: do we need backup id? } + now := timestamppb.Now() clientOperationID, err := s.clientConn.ExportToS3(ctx, client, s3Settings) if err != nil { xlog.Error(ctx, "can't start export operation", zap.Error(err), zap.String("dns", dsn)) @@ -204,6 +207,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb S3Bucket: s.s3.Bucket, S3PathPrefix: dstPrefix, Status: types.BackupStatePending, + AuditInfo: &pb.AuditInfo{ + CreatedAt: now, + }, } backupID, err := s.driver.CreateBackup(ctx, backup) if err != nil { @@ -225,8 +231,10 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb }, SourcePaths: req.GetSourcePaths(), SourcePathToExclude: req.GetSourcePathsToExclude(), - CreatedAt: time.Now(), YdbOperationId: clientOperationID, + Audit: &pb.AuditInfo{ + CreatedAt: now, + }, } operationID, err := s.driver.CreateOperation(ctx, op) @@ -242,7 +250,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb func (s *server) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) { xlog.Info(ctx, "MakeRestore", zap.String("request", req.String())) - subject, err := s.checkAuth(ctx, auth.PermissionBackupRestore, req.ContainerId, "") // TODO: check access to backup as resource + subject, err := s.checkAuth( + ctx, auth.PermissionBackupRestore, req.ContainerId, "", + ) // TODO: check access to backup as resource if err != nil { return nil, err } @@ -303,7 +313,9 @@ func (s *server) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (* DatabaseName: req.GetDatabaseName(), }, YdbOperationId: clientOperationID, - CreatedAt: time.Now(), + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + }, } operationID, err := s.driver.CreateOperation(ctx, op) diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 53a13421..d5fc3e5c 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -2,12 +2,13 @@ package db import ( "fmt" - "time" - "ydbcp/internal/types" - "github.com/google/uuid" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "google.golang.org/protobuf/types/known/timestamppb" + "time" + "ydbcp/internal/types" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) type StructFromResultSet[T any] func(result result.Result) (*T, error) @@ -88,6 +89,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { ydbOperationId *string operationStateBuf *string message *string + completedAt *time.Time ) err := res.ScanNamed( named.Required("id", &operationId), @@ -101,6 +103,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { named.Optional("operation_id", &ydbOperationId), named.Optional("status", &operationStateBuf), named.Optional("message", &message), + named.Required("completed_at", &completedAt), ) if err != nil { return nil, err @@ -109,6 +112,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { if operationStateBuf != nil { operationState = types.OperationState(*operationStateBuf) } + var completedTs *timestamppb.Timestamp + completedTs = nil + if completedAt != nil { + completedTs = timestamppb.New(*completedAt) + } if operationType == string(types.OperationTypeTB) { if backupId == nil { return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId.String()) @@ -124,7 +132,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { DatabaseName: databaseName, }, YdbOperationId: StringOrEmpty(ydbOperationId), - CreatedAt: createdAt, + Audit: &pb.AuditInfo{ + Creator: "", + CreatedAt: timestamppb.New(createdAt), + CompletedAt: completedTs, + }, }, nil } else if operationType == string(types.OperationTypeRB) { if backupId == nil { @@ -141,7 +153,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { DatabaseName: databaseName, }, YdbOperationId: StringOrEmpty(ydbOperationId), - CreatedAt: createdAt, + Audit: &pb.AuditInfo{ + Creator: "", + CreatedAt: timestamppb.New(createdAt), + CompletedAt: completedTs, + }, }, nil } diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 7d7fb9ff..870c5bc6 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -86,7 +86,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle ) d.AddValueParam( "$created_at", - table_types.TimestampValueFromTime(tb.CreatedAt), + table_types.TimestampValueFromTime(tb.Audit.CreatedAt.AsTime()), ) d.AddValueParam( "$operation_id", @@ -110,6 +110,12 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle "$message", table_types.StringValueFromString(operation.GetMessage()), ) + if operation.GetCompletedAt() != nil { + d.AddValueParam( + "$completed_at", + table_types.TimestampValueFromTime(operation.GetCompletedAt().AsTime()), + ) + } return d } diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index e174428e..09761876 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -59,7 +60,7 @@ func RBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() ydbOpResponse, err := lookupYdbOperationStatus( - ctx, client, conn, operation, mr.YdbOperationId, mr.CreatedAt, config, + ctx, client, conn, operation, mr.YdbOperationId, mr.Audit.CreatedAt, config, ) if err != nil { return err @@ -67,6 +68,7 @@ func RBOperationHandler( if ydbOpResponse.shouldAbortHandler { operation.SetState(ydbOpResponse.opState) operation.SetMessage(ydbOpResponse.opMessage) + operation.SetCompletedAt(timestamppb.Now()) return db.UpdateOperation(ctx, operation) } @@ -79,7 +81,7 @@ func RBOperationHandler( case types.OperationStatePending: { if !opResponse.GetOperation().Ready { - if deadlineExceeded(mr.CreatedAt, config) { + if deadlineExceeded(mr.Audit.CreatedAt, config) { err = CancelYdbOperation(ctx, client, conn, operation, mr.YdbOperationId, "TTL") if err != nil { return err @@ -99,9 +101,10 @@ func RBOperationHandler( case types.OperationStateCancelling: { if !opResponse.GetOperation().Ready { - if deadlineExceeded(mr.CreatedAt, config) { + if deadlineExceeded(mr.Audit.CreatedAt, config) { operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") + operation.SetCompletedAt(timestamppb.Now()) return db.UpdateOperation(ctx, operation) } @@ -143,5 +146,6 @@ func RBOperationHandler( ) } + operation.SetCompletedAt(timestamppb.Now()) return db.UpdateOperation(ctx, operation) } diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index fd7ddf70..91495aa1 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -45,12 +46,13 @@ func TBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() ydbOpResponse, err := lookupYdbOperationStatus( - ctx, client, conn, operation, tb.YdbOperationId, tb.CreatedAt, config, + ctx, client, conn, operation, tb.YdbOperationId, tb.Audit.CreatedAt, config, ) if err != nil { return err } + now := timestamppb.Now() backupToWrite := types.Backup{ ID: tb.BackupId, Status: types.BackupStateUnknown, @@ -59,7 +61,9 @@ func TBOperationHandler( if ydbOpResponse.shouldAbortHandler { operation.SetState(ydbOpResponse.opState) operation.SetMessage(ydbOpResponse.opMessage) + operation.SetCompletedAt(now) backupToWrite.Status = types.BackupStateError + backupToWrite.AuditInfo.CompletedAt = now return db.ExecuteUpsert( ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) @@ -73,12 +77,13 @@ func TBOperationHandler( case types.OperationStatePending: { if !opResponse.GetOperation().Ready { - if deadlineExceeded(tb.CreatedAt, config) { + if deadlineExceeded(tb.Audit.CreatedAt, config) { err = CancelYdbOperation(ctx, client, conn, operation, tb.YdbOperationId, "TTL") if err != nil { return err } backupToWrite.Status = types.BackupStateError + backupToWrite.AuditInfo.CompletedAt = operation.GetCompletedAt() return db.ExecuteUpsert( ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) @@ -102,10 +107,12 @@ func TBOperationHandler( case types.OperationStateCancelling: { if !opResponse.GetOperation().Ready { - if deadlineExceeded(tb.CreatedAt, config) { + if deadlineExceeded(tb.Audit.CreatedAt, config) { backupToWrite.Status = types.BackupStateError + backupToWrite.AuditInfo.CompletedAt = now operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") + operation.SetCompletedAt(now) return db.ExecuteUpsert( ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) @@ -113,6 +120,7 @@ func TBOperationHandler( return nil } } + opResponse.GetOperation() if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { backupToWrite.Status = types.BackupStateAvailable operation.SetState(types.OperationStateDone) @@ -146,6 +154,8 @@ func TBOperationHandler( types.IssuesToString(response.GetIssues()), ) } + backupToWrite.AuditInfo.CompletedAt = now + operation.SetCompletedAt(now) return db.ExecuteUpsert( ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) diff --git a/internal/handlers/utils.go b/internal/handlers/utils.go index 807b09cb..370739ff 100644 --- a/internal/handlers/utils.go +++ b/internal/handlers/utils.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" "time" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -15,8 +16,8 @@ import ( "go.uber.org/zap" ) -func deadlineExceeded(createdAt time.Time, config config.Config) bool { - return time.Since(createdAt) > time.Duration(config.OperationTtlSeconds)*time.Second +func deadlineExceeded(createdAt *timestamppb.Timestamp, config config.Config) bool { + return time.Since(createdAt.AsTime()) > time.Duration(config.OperationTtlSeconds)*time.Second } func isValidStatus(status Ydb.StatusIds_StatusCode) bool { return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED @@ -41,7 +42,7 @@ func (r *LookupYdbOperationResponse) IssueString() string { func lookupYdbOperationStatus( ctx context.Context, client client.ClientConnector, conn *ydb.Driver, operation types.Operation, ydbOperationId string, - createdAt time.Time, config config.Config, + createdAt *timestamppb.Timestamp, config config.Config, ) (*LookupYdbOperationResponse, error) { xlog.Info( ctx, "getting operation status", @@ -124,5 +125,6 @@ func CancelYdbOperation( operation.SetState(types.OperationStateCancelling) operation.SetMessage("Operation deadline exceeded") + operation.SetCompletedAt(timestamppb.Now()) return nil } diff --git a/internal/types/backup.go b/internal/types/backup.go index 20aee706..8aecd3d6 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -3,14 +3,12 @@ package types import ( "context" "fmt" - "log" - "strings" - "time" - "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" + "log" + "strings" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -65,6 +63,7 @@ type Backup struct { S3PathPrefix string Status string Message string + AuditInfo *pb.AuditInfo } func (o *Backup) String() string { @@ -90,7 +89,7 @@ func (o *Backup) Proto() *pb.Backup { Bucket: o.S3Bucket, PathPrefix: o.S3PathPrefix, }, - Audit: nil, + Audit: o.AuditInfo, Size: 0, Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]), Message: o.Message, @@ -111,6 +110,8 @@ type Operation interface { SetState(s OperationState) GetMessage() string SetMessage(m string) + GetCompletedAt() *timestamppb.Timestamp + SetCompletedAt(t *timestamppb.Timestamp) Proto() *pb.Operation } @@ -124,7 +125,7 @@ type TakeBackupOperation struct { YdbOperationId string SourcePaths []string SourcePathToExclude []string - CreatedAt time.Time + Audit *pb.AuditInfo } func (o *TakeBackupOperation) GetId() ObjectID { @@ -153,7 +154,12 @@ func (o *TakeBackupOperation) GetMessage() string { func (o *TakeBackupOperation) SetMessage(m string) { o.Message = m } - +func (o *TakeBackupOperation) GetCompletedAt() *timestamppb.Timestamp { + return o.Audit.GetCompletedAt() +} +func (o *TakeBackupOperation) SetCompletedAt(t *timestamppb.Timestamp) { + o.Audit.CompletedAt = t +} func (o *TakeBackupOperation) Proto() *pb.Operation { return &pb.Operation{ Id: o.Id.String(), @@ -166,7 +172,7 @@ func (o *TakeBackupOperation) Proto() *pb.Operation { SourcePaths: o.SourcePaths, SourcePathsToExclude: o.SourcePathToExclude, RestorePaths: nil, - Audit: nil, + Audit: o.Audit, Status: o.State.Enum(), Message: o.Message, } @@ -181,7 +187,7 @@ type RestoreBackupOperation struct { YdbConnectionParams YdbConnectionParams YdbOperationId string DestinationPaths []string - CreatedAt time.Time + Audit *pb.AuditInfo } func (o *RestoreBackupOperation) GetId() ObjectID { @@ -210,6 +216,10 @@ func (o *RestoreBackupOperation) GetMessage() string { func (o *RestoreBackupOperation) SetMessage(m string) { o.Message = m } +func (o *RestoreBackupOperation) GetCompletedAt() *timestamppb.Timestamp { return o.Audit.CompletedAt } +func (o *RestoreBackupOperation) SetCompletedAt(t *timestamppb.Timestamp) { + o.Audit.CompletedAt = t +} func (o *RestoreBackupOperation) Proto() *pb.Operation { return &pb.Operation{ @@ -223,13 +233,9 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation { SourcePaths: nil, SourcePathsToExclude: nil, RestorePaths: o.DestinationPaths, - Audit: &pb.AuditInfo{ - Creator: "", - CreatedAt: timestamppb.New(o.CreatedAt), - CompletedAt: nil, - }, - Status: o.State.Enum(), - Message: o.Message, + Audit: o.Audit, + Status: o.State.Enum(), + Message: o.Message, } } @@ -268,6 +274,9 @@ func (o *GenericOperation) GetMessage() string { func (o *GenericOperation) SetMessage(m string) { o.Message = m } +func (o *GenericOperation) GetCompletedAt() *timestamppb.Timestamp { return nil } +func (o *GenericOperation) SetCompletedAt(_ *timestamppb.Timestamp) {} + func (o *GenericOperation) Proto() *pb.Operation { log.Fatalf("Converting GenericOperation to Proto: %s", o.Id) return nil