Skip to content

Commit

Permalink
feat(ydbcp): add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Nov 13, 2024
1 parent b338e5c commit 314870e
Show file tree
Hide file tree
Showing 22 changed files with 606 additions and 182 deletions.
17 changes: 10 additions & 7 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func main() {
authProvider,
configInstance.ClientConnection.AllowedEndpointDomains,
configInstance.ClientConnection.AllowInsecureEndpoint,
metrics,
).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server)
operation.NewOperationService(dbConnector, authProvider, metrics).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server)
if err := server.Start(ctx, &wg); err != nil {
xlog.Error(ctx, "Error start GRPC server", zap.Error(err))
os.Exit(1)
Expand All @@ -133,7 +134,7 @@ func main() {
if err := handlersRegistry.Add(
types.OperationTypeTB,
handlers.NewTBOperationHandler(
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery,
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand All @@ -142,15 +143,15 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeRB,
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance),
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics),
); err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand All @@ -164,7 +165,9 @@ func main() {
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
clockwork.NewRealClock(),
metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
Expand All @@ -174,7 +177,7 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics,
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
12 changes: 8 additions & 4 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
ListObjects(pathPrefix string, bucket string) ([]string, int64, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}
Expand Down Expand Up @@ -49,7 +49,8 @@ func NewS3Connector(config config.S3Config) (*ClientS3Connector, error) {
return &ClientS3Connector{s3: s3Client}, nil
}

func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, error) {
func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, int64, error) {
var size int64
objects := make([]string, 0)
objectsPtr := &objects

Expand All @@ -65,17 +66,20 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, object := range p.Contents {
*objectsPtr = append(*objectsPtr, *object.Key)
if object.Size != nil {
size += *object.Size
}
}

return true
},
)

if err != nil {
return nil, err
return nil, 0, err
}

return *objectsPtr, nil
return *objectsPtr, size, nil
}

func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) {
Expand Down
55 changes: 34 additions & 21 deletions internal/connectors/s3/mock.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,66 @@
package s3

import (
"fmt"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
)

type Bucket map[string]*s3.Object

type MockS3Connector struct {
storage map[string][]*s3.Object
storage map[string]Bucket
}

func NewMockS3Connector(storage map[string][]*s3.Object) *MockS3Connector {
func NewMockS3Connector(storage map[string]Bucket) *MockS3Connector {
return &MockS3Connector{
storage: storage,
}
}

func (m *MockS3Connector) ListObjects(pathPrefix string, _ string) ([]string, error) {
func (m *MockS3Connector) ListObjects(pathPrefix string, bucketName string) ([]string, int64, error) {
objects := make([]string, 0)
var size int64

if bucket, ok := m.storage[bucketName]; ok {
for key, object := range bucket {
if strings.HasPrefix(key, pathPrefix) {
objects = append(objects, key)

if content, ok := m.storage[pathPrefix]; ok {
for _, object := range content {
if object.Key == nil {
objects = append(objects, *object.Key)
if object.Size != nil {
size += *object.Size
}
}
}
}

return objects, nil
return objects, size, nil
}

func (m *MockS3Connector) GetSize(pathPrefix string, _ string) (int64, error) {
if content, ok := m.storage[pathPrefix]; ok {
var size int64
for _, object := range content {
if object.Size != nil {
size += *object.Size
func (m *MockS3Connector) GetSize(pathPrefix string, bucketName string) (int64, error) {
var size int64

if bucket, ok := m.storage[bucketName]; ok {
for key, object := range bucket {
print(key, pathPrefix, "\n")
if strings.HasPrefix(key, pathPrefix) {
print("yes", "\n")
if object.Size != nil {
print(*object.Size, "\n")
size += *object.Size
}
}
}

return size, nil
}

return 0, fmt.Errorf("objects not found, path: %s", pathPrefix)
return size, nil
}

func (m *MockS3Connector) DeleteObjects(objects []string, _ string) error {
for _, object := range objects {
delete(m.storage, object)
func (m *MockS3Connector) DeleteObjects(objects []string, bucketName string) error {
if bucket, ok := m.storage[bucketName]; ok {
for _, key := range objects {
delete(bucket, key)
}
}

return nil
Expand Down
41 changes: 35 additions & 6 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
Expand All @@ -21,9 +22,10 @@ func NewDBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBulderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory)
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory, mon)
}
}

Expand All @@ -34,6 +36,7 @@ func DBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand All @@ -59,9 +62,15 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.ExecuteUpsert(
err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

backups, err := db.SelectBackups(
Expand All @@ -84,23 +93,37 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

backup := backups[0]
if backup.Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

deleteBackup := func(pathPrefix string, bucket string) error {
err := DeleteBackupData(s3, pathPrefix, bucket)
size, err := DeleteBackupData(s3, pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to delete backup data: %v", err)
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size)

backupToWrite.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
Expand Down Expand Up @@ -133,7 +156,13 @@ func DBOperationHandler(
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return db.ExecuteUpsert(
err = db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}
Loading

0 comments on commit 314870e

Please sign in to comment.