Skip to content

Commit

Permalink
feat(ydbcp): add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Nov 12, 2024
1 parent b338e5c commit 8aa1e0f
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 102 deletions.
17 changes: 10 additions & 7 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func main() {
authProvider,
configInstance.ClientConnection.AllowedEndpointDomains,
configInstance.ClientConnection.AllowInsecureEndpoint,
metrics,
).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server)
operation.NewOperationService(dbConnector, authProvider, metrics).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server)
if err := server.Start(ctx, &wg); err != nil {
xlog.Error(ctx, "Error start GRPC server", zap.Error(err))
os.Exit(1)
Expand All @@ -133,7 +134,7 @@ func main() {
if err := handlersRegistry.Add(
types.OperationTypeTB,
handlers.NewTBOperationHandler(
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery,
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand All @@ -142,15 +143,15 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeRB,
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance),
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics),
); err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand All @@ -164,7 +165,9 @@ func main() {
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
clockwork.NewRealClock(),
metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
Expand All @@ -174,7 +177,7 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics,
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
12 changes: 8 additions & 4 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
ListObjects(pathPrefix string, bucket string) ([]string, int64, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}
Expand Down Expand Up @@ -49,7 +49,8 @@ func NewS3Connector(config config.S3Config) (*ClientS3Connector, error) {
return &ClientS3Connector{s3: s3Client}, nil
}

func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, error) {
func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, int64, error) {
var size int64
objects := make([]string, 0)
objectsPtr := &objects

Expand All @@ -65,17 +66,20 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, object := range p.Contents {
*objectsPtr = append(*objectsPtr, *object.Key)
if object.Size != nil {
size += *object.Size
}
}

return true
},
)

if err != nil {
return nil, err
return nil, 0, err
}

return *objectsPtr, nil
return *objectsPtr, size, nil
}

func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) {
Expand Down
41 changes: 35 additions & 6 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
Expand All @@ -21,9 +22,10 @@ func NewDBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBulderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory)
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory, mon)
}
}

Expand All @@ -34,6 +36,7 @@ func DBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand All @@ -59,9 +62,15 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.ExecuteUpsert(
err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

backups, err := db.SelectBackups(
Expand All @@ -84,23 +93,37 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

backup := backups[0]
if backup.Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

deleteBackup := func(pathPrefix string, bucket string) error {
err := DeleteBackupData(s3, pathPrefix, bucket)
size, err := DeleteBackupData(s3, pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to delete backup data: %v", err)
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, size)

backupToWrite.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
Expand Down Expand Up @@ -133,7 +156,13 @@ func DBOperationHandler(
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return db.ExecuteUpsert(
err = db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}
29 changes: 23 additions & 6 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
Expand All @@ -16,10 +17,10 @@ import (
)

func NewRBOperationHandler(
db db.DBConnector, client client.ClientConnector, config config.Config,
db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return RBOperationHandler(ctx, op, db, client, config)
return RBOperationHandler(ctx, op, db, client, config, mon)
}
}

Expand All @@ -29,6 +30,7 @@ func RBOperationHandler(
db db.DBConnector,
client client.ClientConnector,
config config.Config,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "RBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down Expand Up @@ -64,7 +66,12 @@ func RBOperationHandler(
operation.SetState(ydbOpResponse.opState)
operation.SetMessage(ydbOpResponse.opMessage)
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)
if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

if ydbOpResponse.opResponse == nil {
Expand Down Expand Up @@ -114,9 +121,14 @@ func RBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
}

return db.UpdateOperation(ctx, operation)
err := db.UpdateOperation(ctx, operation)
if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}
}
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
operation.SetState(types.OperationStateDone)
Expand Down Expand Up @@ -160,5 +172,10 @@ func RBOperationHandler(
}

operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
err = db.UpdateOperation(ctx, operation)
if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}
5 changes: 4 additions & 1 deletion internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
Expand All @@ -19,11 +20,12 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.Backu
func NewBackupScheduleHandler(
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule,
queryBuilderFactory, clock,
queryBuilderFactory, clock, mon,
)
}
}
Expand All @@ -34,6 +36,7 @@ func BackupScheduleHandler(
schedule types.BackupSchedule,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) error {
if schedule.Status != types.BackupScheduleStateActive {
xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID))
Expand Down
Loading

0 comments on commit 8aa1e0f

Please sign in to comment.