Skip to content

Commit

Permalink
add audit fields
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Aug 20, 2024
1 parent a9c5147 commit 67e2055
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 37 deletions.
24 changes: 18 additions & 6 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"errors"
"flag"
"fmt"
_ "go.uber.org/automaxprocs"
"google.golang.org/protobuf/types/known/timestamppb"
"net"
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"

table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -94,7 +94,9 @@ func (s *server) checkAuth(ctx context.Context, permission, containerID, resourc
return "", errPermissionDenied
}
if resp[0].Code != ap.AuthCodeSuccess {
xlog.Error(ctx, "auth plugin response", zap.Int("code", int(resp[0].Code)), zap.String("message", resp[0].Message))
xlog.Error(
ctx, "auth plugin response", zap.Int("code", int(resp[0].Code)), zap.String("message", resp[0].Message),
)
return "", errPermissionDenied
}
return subject, nil
Expand Down Expand Up @@ -187,6 +189,7 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
BackupID: types.GenerateObjectID(), // TODO: do we need backup id?
}

now := timestamppb.Now()
clientOperationID, err := s.clientConn.ExportToS3(ctx, client, s3Settings)
if err != nil {
xlog.Error(ctx, "can't start export operation", zap.Error(err), zap.String("dns", dsn))
Expand All @@ -204,6 +207,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
S3Bucket: s.s3.Bucket,
S3PathPrefix: dstPrefix,
Status: types.BackupStatePending,
AuditInfo: &pb.AuditInfo{
CreatedAt: now,
},
}
backupID, err := s.driver.CreateBackup(ctx, backup)
if err != nil {
Expand All @@ -225,8 +231,10 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
},
SourcePaths: req.GetSourcePaths(),
SourcePathToExclude: req.GetSourcePathsToExclude(),
CreatedAt: time.Now(),
YdbOperationId: clientOperationID,
Audit: &pb.AuditInfo{
CreatedAt: now,
},
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand All @@ -242,7 +250,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
func (s *server) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) {
xlog.Info(ctx, "MakeRestore", zap.String("request", req.String()))

subject, err := s.checkAuth(ctx, auth.PermissionBackupRestore, req.ContainerId, "") // TODO: check access to backup as resource
subject, err := s.checkAuth(
ctx, auth.PermissionBackupRestore, req.ContainerId, "",
) // TODO: check access to backup as resource
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +313,9 @@ func (s *server) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*
DatabaseName: req.GetDatabaseName(),
},
YdbOperationId: clientOperationID,
CreatedAt: time.Now(),
Audit: &pb.AuditInfo{
CreatedAt: timestamppb.Now(),
},
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down
26 changes: 21 additions & 5 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package db

import (
"fmt"
"time"
"ydbcp/internal/types"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

type StructFromResultSet[T any] func(result result.Result) (*T, error)
Expand Down Expand Up @@ -88,6 +89,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
ydbOperationId *string
operationStateBuf *string
message *string
completedAt *time.Time
)
err := res.ScanNamed(
named.Required("id", &operationId),
Expand All @@ -101,6 +103,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("operation_id", &ydbOperationId),
named.Optional("status", &operationStateBuf),
named.Optional("message", &message),
named.Required("completed_at", &completedAt),
)
if err != nil {
return nil, err
Expand All @@ -109,6 +112,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
if operationStateBuf != nil {
operationState = types.OperationState(*operationStateBuf)
}
var completedTs *timestamppb.Timestamp
completedTs = nil
if completedAt != nil {
completedTs = timestamppb.New(*completedAt)
}
if operationType == string(types.OperationTypeTB) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId.String())
Expand All @@ -124,7 +132,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
DatabaseName: databaseName,
},
YdbOperationId: StringOrEmpty(ydbOperationId),
CreatedAt: createdAt,
Audit: &pb.AuditInfo{
Creator: "",
CreatedAt: timestamppb.New(createdAt),
CompletedAt: completedTs,
},
}, nil
} else if operationType == string(types.OperationTypeRB) {
if backupId == nil {
Expand All @@ -141,7 +153,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
DatabaseName: databaseName,
},
YdbOperationId: StringOrEmpty(ydbOperationId),
CreatedAt: createdAt,
Audit: &pb.AuditInfo{
Creator: "",
CreatedAt: timestamppb.New(createdAt),
CompletedAt: completedTs,
},
}, nil
}

Expand Down
8 changes: 7 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
)
d.AddValueParam(
"$created_at",
table_types.TimestampValueFromTime(tb.CreatedAt),
table_types.TimestampValueFromTime(tb.Audit.CreatedAt.AsTime()),
)
d.AddValueParam(
"$operation_id",
Expand All @@ -110,6 +110,12 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
"$message",
table_types.StringValueFromString(operation.GetMessage()),
)
if operation.GetCompletedAt() != nil {
d.AddValueParam(
"$completed_at",
table_types.TimestampValueFromTime(operation.GetCompletedAt().AsTime()),
)
}
return d
}

Expand Down
10 changes: 7 additions & 3 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
Expand Down Expand Up @@ -59,14 +60,15 @@ func RBOperationHandler(
defer func() { _ = client.Close(ctx, conn) }()

ydbOpResponse, err := lookupYdbOperationStatus(
ctx, client, conn, operation, mr.YdbOperationId, mr.CreatedAt, config,
ctx, client, conn, operation, mr.YdbOperationId, mr.Audit.CreatedAt, config,
)
if err != nil {
return err
}
if ydbOpResponse.shouldAbortHandler {
operation.SetState(ydbOpResponse.opState)
operation.SetMessage(ydbOpResponse.opMessage)
operation.SetCompletedAt(timestamppb.Now())
return db.UpdateOperation(ctx, operation)
}

Expand All @@ -79,7 +81,7 @@ func RBOperationHandler(
case types.OperationStatePending:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(mr.CreatedAt, config) {
if deadlineExceeded(mr.Audit.CreatedAt, config) {
err = CancelYdbOperation(ctx, client, conn, operation, mr.YdbOperationId, "TTL")
if err != nil {
return err
Expand All @@ -99,9 +101,10 @@ func RBOperationHandler(
case types.OperationStateCancelling:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(mr.CreatedAt, config) {
if deadlineExceeded(mr.Audit.CreatedAt, config) {
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.SetCompletedAt(timestamppb.Now())
return db.UpdateOperation(ctx, operation)
}

Expand Down Expand Up @@ -143,5 +146,6 @@ func RBOperationHandler(
)
}

operation.SetCompletedAt(timestamppb.Now())
return db.UpdateOperation(ctx, operation)
}
16 changes: 13 additions & 3 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
Expand Down Expand Up @@ -45,12 +46,13 @@ func TBOperationHandler(
defer func() { _ = client.Close(ctx, conn) }()

ydbOpResponse, err := lookupYdbOperationStatus(
ctx, client, conn, operation, tb.YdbOperationId, tb.CreatedAt, config,
ctx, client, conn, operation, tb.YdbOperationId, tb.Audit.CreatedAt, config,
)
if err != nil {
return err
}

now := timestamppb.Now()
backupToWrite := types.Backup{
ID: tb.BackupId,
Status: types.BackupStateUnknown,
Expand All @@ -59,7 +61,9 @@ func TBOperationHandler(
if ydbOpResponse.shouldAbortHandler {
operation.SetState(ydbOpResponse.opState)
operation.SetMessage(ydbOpResponse.opMessage)
operation.SetCompletedAt(now)
backupToWrite.Status = types.BackupStateError
backupToWrite.AuditInfo.CompletedAt = now
return db.ExecuteUpsert(
ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
Expand All @@ -73,12 +77,13 @@ func TBOperationHandler(
case types.OperationStatePending:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(tb.CreatedAt, config) {
if deadlineExceeded(tb.Audit.CreatedAt, config) {
err = CancelYdbOperation(ctx, client, conn, operation, tb.YdbOperationId, "TTL")
if err != nil {
return err
}
backupToWrite.Status = types.BackupStateError
backupToWrite.AuditInfo.CompletedAt = operation.GetCompletedAt()
return db.ExecuteUpsert(
ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
Expand All @@ -102,17 +107,20 @@ func TBOperationHandler(
case types.OperationStateCancelling:
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(tb.CreatedAt, config) {
if deadlineExceeded(tb.Audit.CreatedAt, config) {
backupToWrite.Status = types.BackupStateError
backupToWrite.AuditInfo.CompletedAt = now
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.SetCompletedAt(now)
return db.ExecuteUpsert(
ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
} else {
return nil
}
}
opResponse.GetOperation()
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
backupToWrite.Status = types.BackupStateAvailable
operation.SetState(types.OperationStateDone)
Expand Down Expand Up @@ -146,6 +154,8 @@ func TBOperationHandler(
types.IssuesToString(response.GetIssues()),
)
}
backupToWrite.AuditInfo.CompletedAt = now
operation.SetCompletedAt(now)
return db.ExecuteUpsert(
ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
Expand Down
8 changes: 5 additions & 3 deletions internal/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
Expand All @@ -15,8 +16,8 @@ import (
"go.uber.org/zap"
)

func deadlineExceeded(createdAt time.Time, config config.Config) bool {
return time.Since(createdAt) > time.Duration(config.OperationTtlSeconds)*time.Second
func deadlineExceeded(createdAt *timestamppb.Timestamp, config config.Config) bool {
return time.Since(createdAt.AsTime()) > time.Duration(config.OperationTtlSeconds)*time.Second
}
func isValidStatus(status Ydb.StatusIds_StatusCode) bool {
return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED
Expand All @@ -41,7 +42,7 @@ func (r *LookupYdbOperationResponse) IssueString() string {
func lookupYdbOperationStatus(
ctx context.Context, client client.ClientConnector, conn *ydb.Driver, operation types.Operation,
ydbOperationId string,
createdAt time.Time, config config.Config,
createdAt *timestamppb.Timestamp, config config.Config,
) (*LookupYdbOperationResponse, error) {
xlog.Info(
ctx, "getting operation status",
Expand Down Expand Up @@ -124,5 +125,6 @@ func CancelYdbOperation(

operation.SetState(types.OperationStateCancelling)
operation.SetMessage("Operation deadline exceeded")
operation.SetCompletedAt(timestamppb.Now())
return nil
}
Loading

0 comments on commit 67e2055

Please sign in to comment.