Skip to content

Commit

Permalink
feat(ydbcp): add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Nov 7, 2024
1 parent c9e554e commit ada2976
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 66 deletions.
14 changes: 8 additions & 6 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func main() {
authProvider,
configInstance.ClientConnection.AllowedEndpointDomains,
configInstance.ClientConnection.AllowInsecureEndpoint,
metrics,
).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server)
operation.NewOperationService(dbConnector, authProvider, metrics).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server)
if err := server.Start(ctx, &wg); err != nil {
xlog.Error(ctx, "Error start GRPC server", zap.Error(err))
os.Exit(1)
Expand All @@ -126,7 +127,7 @@ func main() {
if err := handlersRegistry.Add(
types.OperationTypeTB,
handlers.NewTBOperationHandler(
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery,
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand All @@ -135,15 +136,15 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeRB,
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance),
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics),
); err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand All @@ -153,7 +154,8 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, clockwork.NewRealClock(),
clientConnector, configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery,
clockwork.NewRealClock(), metrics,
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
12 changes: 8 additions & 4 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
ListObjects(pathPrefix string, bucket string) ([]string, int64, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}
Expand Down Expand Up @@ -49,7 +49,8 @@ func NewS3Connector(config config.S3Config) (*ClientS3Connector, error) {
return &ClientS3Connector{s3: s3Client}, nil
}

func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, error) {
func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, int64, error) {
var size int64
objects := make([]string, 0)
objectsPtr := &objects

Expand All @@ -65,17 +66,20 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, object := range p.Contents {
*objectsPtr = append(*objectsPtr, *object.Key)
if object.Size != nil {
size += *object.Size
}
}

return true
},
)

if err != nil {
return nil, err
return nil, 0, err
}

return *objectsPtr, nil
return *objectsPtr, size, nil
}

func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) {
Expand Down
9 changes: 7 additions & 2 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
Expand All @@ -21,9 +22,10 @@ func NewDBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBulderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory)
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory, mon)
}
}

Expand All @@ -34,6 +36,7 @@ func DBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down Expand Up @@ -96,11 +99,13 @@ func DBOperationHandler(
}

deleteBackup := func(pathPrefix string, bucket string) error {
err := DeleteBackupData(s3, pathPrefix, bucket)
size, err := DeleteBackupData(s3, pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to delete backup data: %v", err)
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, size)

backupToWrite.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
Expand Down
6 changes: 4 additions & 2 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
Expand All @@ -16,10 +17,10 @@ import (
)

func NewRBOperationHandler(
db db.DBConnector, client client.ClientConnector, config config.Config,
db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return RBOperationHandler(ctx, op, db, client, config)
return RBOperationHandler(ctx, op, db, client, config, mon)
}
}

Expand All @@ -29,6 +30,7 @@ func RBOperationHandler(
db db.DBConnector,
client client.ClientConnector,
config config.Config,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "RBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down
5 changes: 4 additions & 1 deletion internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
Expand All @@ -23,11 +24,12 @@ func NewBackupScheduleHandler(
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule, clientConn, s3, clientConfig,
queryBuilderFactory, clock,
queryBuilderFactory, clock, mon,
)
}
}
Expand All @@ -41,6 +43,7 @@ func BackupScheduleHandler(
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) error {
if schedule.Status != types.BackupScheduleStateActive {
xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID))
Expand Down
39 changes: 25 additions & 14 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/connectors/s3"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
Expand All @@ -20,10 +21,10 @@ import (

func NewTBOperationHandler(
db db.DBConnector, client client.ClientConnector, s3 s3.S3Connector, config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
queryBuilderFactory queries.WriteQueryBulderFactory, mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory)
return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon)
}
}

Expand All @@ -35,6 +36,7 @@ func TBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "TBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down Expand Up @@ -123,14 +125,15 @@ func TBOperationHandler(
operation.SetMessage("Operation deadline exceeded")
}
return db.UpdateOperation(ctx, operation)
} else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket)
if err != nil {
return err
}
}

size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket)
if err != nil {
return err
}

if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
backupToWrite.Status = types.BackupStateAvailable
backupToWrite.Size = size
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
Expand All @@ -147,6 +150,8 @@ func TBOperationHandler(
operation.SetMessage(ydbOpResponse.IssueString())
}
backupToWrite.Message = operation.GetMessage()
backupToWrite.Size = size
mon.IncBytesWrittenCounter(backup.ContainerID, backup.S3Bucket, size)
}
case types.OperationStateStartCancelling:
{
Expand Down Expand Up @@ -178,30 +183,36 @@ func TBOperationHandler(

return db.UpdateOperation(ctx, operation)
}
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket)
if err != nil {
return err
}

size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket)
if err != nil {
return err
}

if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
backupToWrite.Status = types.BackupStateAvailable
backupToWrite.Size = size
operation.SetState(types.OperationStateDone)
operation.SetMessage("Operation was completed despite cancellation: " + tb.Message)
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
err = DeleteBackupData(s3, backup.S3PathPrefix, backup.S3Bucket)
size, err = DeleteBackupData(s3, backup.S3PathPrefix, backup.S3Bucket)
if err != nil {
return err
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, size)

backupToWrite.Status = types.BackupStateCancelled
operation.SetState(types.OperationStateCancelled)
operation.SetMessage(tb.Message)
} else {
backupToWrite.Status = types.BackupStateError
backupToWrite.Size = size
operation.SetState(types.OperationStateError)
operation.SetMessage(ydbOpResponse.IssueString())
}
backupToWrite.Message = operation.GetMessage()
mon.IncBytesWrittenCounter(backup.ContainerID, backup.S3Bucket, size)
}
default:
return fmt.Errorf("unexpected operation state %s", tb.State)
Expand Down
5 changes: 4 additions & 1 deletion internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
Expand All @@ -26,9 +27,10 @@ func NewTBWROperationHandler(
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock)
return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock, mon)
}
}

Expand Down Expand Up @@ -124,6 +126,7 @@ func TBWROperationHandler(
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBulderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "TBWROperationHandler", zap.String("OperationID", operation.GetID()))

Expand Down
10 changes: 5 additions & 5 deletions internal/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,18 @@ func CancelYdbOperation(
return nil
}

func DeleteBackupData(s3 s3.S3Connector, s3PathPrefix string, s3Bucket string) error {
objects, err := s3.ListObjects(s3PathPrefix, s3Bucket)
func DeleteBackupData(s3 s3.S3Connector, s3PathPrefix string, s3Bucket string) (int64, error) {
objects, size, err := s3.ListObjects(s3PathPrefix, s3Bucket)
if err != nil {
return fmt.Errorf("failed to list S3 objects: %v", err)
return 0, fmt.Errorf("failed to list S3 objects: %v", err)
}

if len(objects) != 0 {
err = s3.DeleteObjects(objects, s3Bucket)
if err != nil {
return fmt.Errorf("failed to delete S3 objects: %v", err)
return 0, fmt.Errorf("failed to delete S3 objects: %v", err)
}
}

return nil
return size, nil
}
Loading

0 comments on commit ada2976

Please sign in to comment.