From dda0c5b714775ad83f4ba4e2c16d29abb825158e Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Wed, 6 Nov 2024 18:01:50 +0100 Subject: [PATCH] feat(ydbcp): add metrics --- cmd/ydbcp/main.go | 17 ++- internal/connectors/db/yql/queries/write.go | 2 +- internal/connectors/s3/connector.go | 12 +- internal/connectors/s3/mock.go | 52 ++++--- internal/handlers/delete_backup.go | 43 ++++-- internal/handlers/delete_backup_test.go | 52 ++++--- internal/handlers/restore_backup.go | 21 ++- internal/handlers/restore_backup_test.go | 56 +++++-- internal/handlers/schedule_backup.go | 9 +- internal/handlers/schedule_backup_test.go | 2 + internal/handlers/take_backup.go | 139 ++++++++++++------ internal/handlers/take_backup_retry.go | 9 +- internal/handlers/take_backup_retry_test.go | 8 + internal/handlers/take_backup_test.go | 106 +++++++++---- internal/handlers/utils.go | 14 +- internal/metrics/metrics.go | 128 +++++++++++++++- internal/metrics/metrics_moc.go | 48 +++++- internal/metrics/metrics_test.go | 6 +- internal/processor/processor.go | 28 +--- .../server/services/backup/backup_service.go | 71 +++++++-- .../backup_schedule_service.go | 77 ++++++++-- .../services/operation/operation_service.go | 45 +++++- .../schedule_watcher/schedule_watcher_test.go | 4 + internal/watchers/ttl_watcher/ttl_watcher.go | 4 +- 24 files changed, 719 insertions(+), 234 deletions(-) diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 8b0609c3..5d74cd8a 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -121,9 +121,10 @@ func main() { authProvider, configInstance.ClientConnection.AllowedEndpointDomains, configInstance.ClientConnection.AllowInsecureEndpoint, + metrics, ).Register(server) - operation.NewOperationService(dbConnector, authProvider).Register(server) - backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server) + operation.NewOperationService(dbConnector, authProvider, metrics).Register(server) + backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server) if err := server.Start(ctx, &wg); err != nil { xlog.Error(ctx, "Error start GRPC server", zap.Error(err)) os.Exit(1) @@ -133,7 +134,7 @@ func main() { if err := handlersRegistry.Add( types.OperationTypeTB, handlers.NewTBOperationHandler( - dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, + dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics, ), ); err != nil { xlog.Error(ctx, "failed to register TB handler", zap.Error(err)) @@ -142,7 +143,7 @@ func main() { if err := handlersRegistry.Add( types.OperationTypeRB, - handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance), + handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics), ); err != nil { xlog.Error(ctx, "failed to register RB handler", zap.Error(err)) os.Exit(1) @@ -150,7 +151,7 @@ func main() { if err := handlersRegistry.Add( types.OperationTypeDB, - handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery), + handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics), ); err != nil { xlog.Error(ctx, "failed to register DB handler", zap.Error(err)) os.Exit(1) @@ -164,7 +165,9 @@ func main() { configInstance.S3, configInstance.ClientConnection, queries.NewWriteTableQuery, - clockwork.NewRealClock()), + clockwork.NewRealClock(), + metrics, + ), ); err != nil { xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err)) os.Exit(1) @@ -174,7 +177,7 @@ func main() { ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery) backupScheduleHandler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQuery, clockwork.NewRealClock(), + queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics, ) schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler) xlog.Info(ctx, "YDBCP started") diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index d0ef7176..34906b92 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -37,7 +37,7 @@ type WriteSingleTableQueryImpl struct { } type WriteTableQueryImplOption func(*WriteTableQueryImpl) -type WriteQueryBulderFactory func() WriteTableQuery +type WriteQueryBuilderFactory func() WriteTableQuery func (d *WriteSingleTableQueryImpl) AddValueParam(name string, value table_types.Value) { d.upsertFields = append(d.upsertFields, name[1:]) diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index e3c5c87f..e42f6c5e 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -13,7 +13,7 @@ import ( ) type S3Connector interface { - ListObjects(pathPrefix string, bucket string) ([]string, error) + ListObjects(pathPrefix string, bucket string) ([]string, int64, error) GetSize(pathPrefix string, bucket string) (int64, error) DeleteObjects(keys []string, bucket string) error } @@ -49,7 +49,8 @@ func NewS3Connector(config config.S3Config) (*ClientS3Connector, error) { return &ClientS3Connector{s3: s3Client}, nil } -func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, error) { +func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, int64, error) { + var size int64 objects := make([]string, 0) objectsPtr := &objects @@ -65,6 +66,9 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) { for _, object := range p.Contents { *objectsPtr = append(*objectsPtr, *object.Key) + if object.Size != nil { + size += *object.Size + } } return true @@ -72,10 +76,10 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str ) if err != nil { - return nil, err + return nil, 0, err } - return *objectsPtr, nil + return *objectsPtr, size, nil } func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) { diff --git a/internal/connectors/s3/mock.go b/internal/connectors/s3/mock.go index 67ddae65..941cdf23 100644 --- a/internal/connectors/s3/mock.go +++ b/internal/connectors/s3/mock.go @@ -1,53 +1,63 @@ package s3 import ( - "fmt" + "strings" "github.com/aws/aws-sdk-go/service/s3" ) +type Bucket map[string]*s3.Object + type MockS3Connector struct { - storage map[string][]*s3.Object + storage map[string]Bucket } -func NewMockS3Connector(storage map[string][]*s3.Object) *MockS3Connector { +func NewMockS3Connector(storage map[string]Bucket) *MockS3Connector { return &MockS3Connector{ storage: storage, } } -func (m *MockS3Connector) ListObjects(pathPrefix string, _ string) ([]string, error) { +func (m *MockS3Connector) ListObjects(pathPrefix string, bucketName string) ([]string, int64, error) { objects := make([]string, 0) + var size int64 + + if bucket, ok := m.storage[bucketName]; ok { + for key, object := range bucket { + if strings.HasPrefix(key, pathPrefix) { + objects = append(objects, key) - if content, ok := m.storage[pathPrefix]; ok { - for _, object := range content { - if object.Key == nil { - objects = append(objects, *object.Key) + if object.Size != nil { + size += *object.Size + } } } } - return objects, nil + return objects, size, nil } -func (m *MockS3Connector) GetSize(pathPrefix string, _ string) (int64, error) { - if content, ok := m.storage[pathPrefix]; ok { - var size int64 - for _, object := range content { - if object.Size != nil { - size += *object.Size +func (m *MockS3Connector) GetSize(pathPrefix string, bucketName string) (int64, error) { + var size int64 + + if bucket, ok := m.storage[bucketName]; ok { + for key, object := range bucket { + if strings.HasPrefix(key, pathPrefix) { + if object.Size != nil { + size += *object.Size + } } } - - return size, nil } - return 0, fmt.Errorf("objects not found, path: %s", pathPrefix) + return size, nil } -func (m *MockS3Connector) DeleteObjects(objects []string, _ string) error { - for _, object := range objects { - delete(m.storage, object) +func (m *MockS3Connector) DeleteObjects(objects []string, bucketName string) error { + if bucket, ok := m.storage[bucketName]; ok { + for _, key := range objects { + delete(bucket, key) + } } return nil diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go index 2083ee08..368429f7 100644 --- a/internal/handlers/delete_backup.go +++ b/internal/handlers/delete_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "ydbcp/internal/metrics" "ydbcp/internal/config" "ydbcp/internal/connectors/db" @@ -20,10 +21,11 @@ func NewDBOperationHandler( db db.DBConnector, s3 s3.S3Connector, config config.Config, - queryBulderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, + mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory) + return DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon) } } @@ -33,7 +35,8 @@ func DBOperationHandler( db db.DBConnector, s3 s3.S3Connector, config config.Config, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, + mon metrics.MetricsRegistry, ) error { xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -54,14 +57,30 @@ func DBOperationHandler( Status: types.BackupStateUnknown, } + upsertAndReportMetrics := func(operation types.Operation, backup *types.Backup) error { + var err error + + if backup != nil { + err = db.ExecuteUpsert( + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(*backup), + ) + } else { + err = db.UpdateOperation(ctx, operation) + } + + if err == nil { + mon.ObserveOperationDuration(operation) + } + + return err + } + if deadlineExceeded(dbOp.Audit.CreatedAt, config) { backupToWrite.Status = types.BackupStateError operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() - return db.ExecuteUpsert( - ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), - ) + return upsertAndReportMetrics(operation, &backupToWrite) } backups, err := db.SelectBackups( @@ -84,7 +103,7 @@ func DBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage("Backup not found") operation.GetAudit().CompletedAt = timestamppb.Now() - return db.UpdateOperation(ctx, operation) + return upsertAndReportMetrics(operation, nil) } backup := backups[0] @@ -92,15 +111,17 @@ func DBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status)) operation.GetAudit().CompletedAt = timestamppb.Now() - return db.UpdateOperation(ctx, operation) + return upsertAndReportMetrics(operation, nil) } deleteBackup := func(pathPrefix string, bucket string) error { - err := DeleteBackupData(s3, pathPrefix, bucket) + size, err := DeleteBackupData(s3, pathPrefix, bucket) if err != nil { return fmt.Errorf("failed to delete backup data: %v", err) } + mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size) + backupToWrite.Status = types.BackupStateDeleted operation.SetState(types.OperationStateDone) operation.SetMessage("Success") @@ -133,7 +154,5 @@ func DBOperationHandler( return fmt.Errorf("unexpected operation state %s", dbOp.State) } - return db.ExecuteUpsert( - ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), - ) + return upsertAndReportMetrics(operation, &backupToWrite) } diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go index d9595e0f..2f905685 100644 --- a/internal/handlers/delete_backup_test.go +++ b/internal/handlers/delete_backup_test.go @@ -3,6 +3,7 @@ package handlers import ( "context" "testing" + "ydbcp/internal/metrics" "ydbcp/internal/config" "ydbcp/internal/connectors/db" @@ -12,7 +13,6 @@ import ( pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/s3" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -38,7 +38,7 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &dbOp dbConnector := db.NewMockDBConnector( @@ -52,6 +52,7 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 0, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &dbOp) @@ -89,23 +90,24 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { ID: backupID, Status: types.BackupStateDeleting, S3PathPrefix: "pathPrefix", + S3Bucket: "bucket", } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &dbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -118,10 +120,12 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + mon := metrics.NewMockMetricsRegistry() handler := NewDBOperationHandler( dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 1000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &dbOp) @@ -141,9 +145,12 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { assert.Equal(t, types.BackupStateDeleted, b.Status) // check s3 objects (should be deleted) - objects, err := s3Connector.ListObjects("pathPrefix", "bucket") + objects, _, err := s3Connector.ListObjects("pathPrefix", "bucket") assert.Empty(t, err) assert.Empty(t, objects) + + // check metrics + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) } func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { @@ -164,23 +171,24 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { ID: backupID, Status: types.BackupStateDeleting, S3PathPrefix: "pathPrefix", + S3Bucket: "bucket", } opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &dbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -193,10 +201,12 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + mon := metrics.NewMockMetricsRegistry() handler := NewDBOperationHandler( dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 1000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &dbOp) @@ -216,9 +226,12 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { assert.Equal(t, types.BackupStateDeleted, b.Status) // check s3 objects (should be deleted) - objects, err := s3Connector.ListObjects("pathPrefix", "bucket") + objects, _, err := s3Connector.ListObjects("pathPrefix", "bucket") assert.Empty(t, err) assert.Empty(t, objects) + + // check metrics + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) } func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { @@ -243,19 +256,19 @@ func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &dbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -272,6 +285,7 @@ func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { dbConnector, s3Connector, config.Config{ OperationTtlSeconds: 1000, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &dbOp) diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index 9a8e10c5..6329ba92 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + "ydbcp/internal/metrics" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -16,10 +17,10 @@ import ( ) func NewRBOperationHandler( - db db.DBConnector, client client.ClientConnector, config config.Config, + db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return RBOperationHandler(ctx, op, db, client, config) + return RBOperationHandler(ctx, op, db, client, config, mon) } } @@ -29,6 +30,7 @@ func RBOperationHandler( db db.DBConnector, client client.ClientConnector, config config.Config, + mon metrics.MetricsRegistry, ) error { xlog.Info(ctx, "RBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -54,6 +56,16 @@ func RBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() + upsertAndReportMetrics := func(operation types.Operation) error { + err := db.UpdateOperation(ctx, operation) + + if err == nil { + mon.ObserveOperationDuration(operation) + } + + return err + } + ydbOpResponse, err := lookupYdbOperationStatus( ctx, client, conn, operation, mr.YdbOperationId, mr.Audit.CreatedAt, config, ) @@ -64,7 +76,7 @@ func RBOperationHandler( operation.SetState(ydbOpResponse.opState) operation.SetMessage(ydbOpResponse.opMessage) operation.GetAudit().CompletedAt = timestamppb.Now() - return db.UpdateOperation(ctx, operation) + return upsertAndReportMetrics(operation) } if ydbOpResponse.opResponse == nil { @@ -114,6 +126,7 @@ func RBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() + return upsertAndReportMetrics(operation) } return db.UpdateOperation(ctx, operation) @@ -160,5 +173,5 @@ func RBOperationHandler( } operation.GetAudit().CompletedAt = timestamppb.Now() - return db.UpdateOperation(ctx, operation) + return upsertAndReportMetrics(operation) } diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go index 51b9dd8d..bd2f8b06 100644 --- a/internal/handlers/restore_backup_test.go +++ b/internal/handlers/restore_backup_test.go @@ -3,6 +3,7 @@ package handlers import ( "context" "testing" + "ydbcp/internal/metrics" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -41,7 +42,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) { ) // try to handle rb operation with non-existing ydb operation id - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry()) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -84,7 +85,7 @@ func TestRBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle pending rb operation with zero ttl - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry()) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -133,7 +134,12 @@ func TestRBOperationHandlerRunningOperationInProgress(t *testing.T) { dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle pending rb operation with ttl - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 1000}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -181,7 +187,12 @@ func TestRBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 1000}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -228,7 +239,12 @@ func TestRBOperationHandlerRunningOperationCancelled(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 10}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -277,7 +293,7 @@ func TestRBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle cancelling rb operation with zero ttl - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry()) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -326,7 +342,12 @@ func TestRBOperationHandlerCancellingOperationInProgress(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 1000}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -374,7 +395,12 @@ func TestRBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 10}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -422,7 +448,12 @@ func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 10}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -470,7 +501,12 @@ func TestRBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler( + dbConnector, + clientConnector, + config.Config{OperationTtlSeconds: 10}, + metrics.NewMockMetricsRegistry(), + ) err := handler(ctx, &rbOp) assert.Empty(t, err) diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index a692b51e..ba91151c 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -9,6 +9,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -17,13 +18,14 @@ import ( type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error func NewBackupScheduleHandler( - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + mon metrics.MetricsRegistry, ) BackupScheduleHandlerType { return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error { return BackupScheduleHandler( ctx, driver, schedule, - queryBuilderFactory, clock, + queryBuilderFactory, clock, mon, ) } } @@ -32,8 +34,9 @@ func BackupScheduleHandler( ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + mon metrics.MetricsRegistry, ) error { if schedule.Status != types.BackupScheduleStateActive { xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID)) diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index 33dae79c..afd19cb0 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -8,6 +8,7 @@ import ( "time" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) @@ -42,6 +43,7 @@ func TestBackupScheduleHandler(t *testing.T) { handler := NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, dbConnector, schedule) assert.Empty(t, err) diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index d1604b66..68eb200c 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -8,6 +8,7 @@ import ( "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/connectors/s3" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -20,10 +21,10 @@ import ( func NewTBOperationHandler( db db.DBConnector, client client.ClientConnector, s3 s3.S3Connector, config config.Config, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory) + return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon) } } @@ -34,7 +35,8 @@ func TBOperationHandler( client client.ClientConnector, s3 s3.S3Connector, config config.Config, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, + mon metrics.MetricsRegistry, ) error { xlog.Info(ctx, "TBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -53,6 +55,47 @@ func TBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() + upsertAndReportMetrics := func( + operation types.Operation, + backup types.Backup, + containerId string, + database string, + status Ydb.StatusIds_StatusCode, + ) error { + err := db.ExecuteUpsert( + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backup), + ) + + if err == nil { + mon.ObserveOperationDuration(operation) + mon.IncCompletedBackupsCount(containerId, database, status) + } + + return err + } + + backups, err := db.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(tb.BackupID)}, + }, + ), + ), + ) + + if err != nil { + return fmt.Errorf("can't select backups: %v", err) + } + + if len(backups) == 0 { + return fmt.Errorf("backup not found: %s", tb.BackupID) + } + + backup := backups[0] + ydbOpResponse, err := lookupYdbOperationStatus( ctx, client, conn, operation, tb.YdbOperationId, tb.Audit.CreatedAt, config, ) @@ -74,8 +117,23 @@ func TBOperationHandler( backupToWrite.Status = types.BackupStateError backupToWrite.Message = operation.GetMessage() backupToWrite.AuditInfo.CompletedAt = now - return db.ExecuteUpsert( - ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + + if ydbOpResponse.opResponse != nil { + return upsertAndReportMetrics( + operation, + backupToWrite, + backup.ContainerID, + backup.DatabaseName, + ydbOpResponse.opResponse.GetOperation().Status, + ) + } + + return upsertAndReportMetrics( + operation, + backupToWrite, + backup.ContainerID, + backup.DatabaseName, + Ydb.StatusIds_TIMEOUT, ) } if ydbOpResponse.opResponse == nil { @@ -83,28 +141,6 @@ func TBOperationHandler( } opResponse := ydbOpResponse.opResponse - backups, err := db.SelectBackups( - ctx, queries.NewReadTableQuery( - queries.WithTableName("Backups"), - queries.WithQueryFilters( - queries.QueryFilter{ - Field: "id", - Values: []table_types.Value{table_types.StringValueFromString(tb.BackupID)}, - }, - ), - ), - ) - - if err != nil { - return fmt.Errorf("can't select backups: %v", err) - } - - if len(backups) == 0 { - return fmt.Errorf("backup not found: %s", tb.BackupID) - } - - backup := backups[0] - getBackupSize := func(s3PathPrefix string, s3Bucket string) (int64, error) { size, err := s3.GetSize(s3PathPrefix, s3Bucket) if err != nil { @@ -123,14 +159,15 @@ func TBOperationHandler( operation.SetMessage("Operation deadline exceeded") } return db.UpdateOperation(ctx, operation) - } else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { - size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket) - if err != nil { - return err - } + } + + size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket) + if err != nil { + return err + } + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { backupToWrite.Status = types.BackupStateAvailable - backupToWrite.Size = size operation.SetState(types.OperationStateDone) operation.SetMessage("Success") } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { @@ -147,6 +184,8 @@ func TBOperationHandler( operation.SetMessage(ydbOpResponse.IssueString()) } backupToWrite.Message = operation.GetMessage() + backupToWrite.Size = size + mon.IncBytesWrittenCounter(backup.ContainerID, backup.DatabaseName, backup.S3Bucket, size) } case types.OperationStateStartCancelling: { @@ -171,37 +210,47 @@ func TBOperationHandler( operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = now backupToWrite.Message = operation.GetMessage() - return db.ExecuteUpsert( - ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + return upsertAndReportMetrics( + operation, + backupToWrite, + backup.ContainerID, + backup.DatabaseName, + Ydb.StatusIds_TIMEOUT, ) } return db.UpdateOperation(ctx, operation) } - if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { - size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket) - if err != nil { - return err - } + size, err := getBackupSize(backup.S3PathPrefix, backup.S3Bucket) + if err != nil { + return err + } + + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { backupToWrite.Status = types.BackupStateAvailable backupToWrite.Size = size operation.SetState(types.OperationStateDone) operation.SetMessage("Operation was completed despite cancellation: " + tb.Message) } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { - err = DeleteBackupData(s3, backup.S3PathPrefix, backup.S3Bucket) + size, err = DeleteBackupData(s3, backup.S3PathPrefix, backup.S3Bucket) if err != nil { return err } + + mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size) + backupToWrite.Status = types.BackupStateCancelled operation.SetState(types.OperationStateCancelled) operation.SetMessage(tb.Message) } else { backupToWrite.Status = types.BackupStateError + backupToWrite.Size = size operation.SetState(types.OperationStateError) operation.SetMessage(ydbOpResponse.IssueString()) } backupToWrite.Message = operation.GetMessage() + mon.IncBytesWrittenCounter(backup.ContainerID, backup.DatabaseName, backup.S3Bucket, size) } default: return fmt.Errorf("unexpected operation state %s", tb.State) @@ -226,7 +275,11 @@ func TBOperationHandler( } backupToWrite.AuditInfo.CompletedAt = now operation.GetAudit().CompletedAt = now - return db.ExecuteUpsert( - ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + return upsertAndReportMetrics( + operation, + backupToWrite, + backup.ContainerID, + backup.DatabaseName, + opResponse.GetOperation().Status, ) } diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index a298f254..3db5ee42 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -14,6 +14,7 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -24,11 +25,12 @@ func NewTBWROperationHandler( client client.ClientConnector, s3 config.S3Config, clientConfig config.ClientConnectionConfig, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock) + return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock, mon) } } @@ -142,8 +144,9 @@ func TBWROperationHandler( clientConn client.ClientConnector, s3 config.S3Config, clientConfig config.ClientConnectionConfig, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + mon metrics.MetricsRegistry, ) error { ctx = xlog.With(ctx, zap.String("OperationID", operation.GetID())) diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index 8097da46..53a72099 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -13,6 +13,7 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) @@ -241,6 +242,7 @@ func TestTBWRHandlerSuccess(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -301,6 +303,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -364,6 +367,7 @@ func TestTBWRHandlerSkipError(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t3.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -418,6 +422,7 @@ func TestTBWRHandlerError(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t2.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -470,6 +475,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -543,6 +549,7 @@ func TestTBWRHandlerStartCancel(t *testing.T) { }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -617,6 +624,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbwr) assert.Empty(t, err) diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 8e272e6a..2ad69c80 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "ydbcp/internal/metrics" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -14,7 +15,6 @@ import ( pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/s3" "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" @@ -43,7 +43,7 @@ func TestTBOperationHandlerInvalidOperationResponse(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp dbConnector := db.NewMockDBConnector( @@ -59,6 +59,7 @@ func TestTBOperationHandlerInvalidOperationResponse(t *testing.T) { s3Connector, config.Config{}, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbOp) assert.Empty(t, err) @@ -100,7 +101,7 @@ func TestTBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -118,6 +119,7 @@ func TestTBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 0, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbOp) @@ -172,7 +174,7 @@ func TestTBOperationHandlerRunningOperationInProgress(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -190,6 +192,7 @@ func TestTBOperationHandlerRunningOperationInProgress(t *testing.T) { dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbOp) @@ -234,6 +237,7 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { ID: backupID, Status: types.BackupStateRunning, S3PathPrefix: "pathPrefix", + S3Bucket: "bucket", } ydbOp := &Ydb_Operations.Operation{ @@ -246,20 +250,20 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -273,11 +277,12 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -300,6 +305,11 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) assert.Empty(t, err) assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + + // check metrics + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { @@ -332,7 +342,7 @@ func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -345,11 +355,12 @@ func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -371,6 +382,11 @@ func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) assert.Empty(t, err) assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + + // check metrics + assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) { @@ -403,7 +419,7 @@ func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -416,11 +432,12 @@ func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 0, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -444,6 +461,11 @@ func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) assert.Empty(t, err) assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) + + // check metrics + assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { @@ -476,7 +498,7 @@ func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -494,6 +516,7 @@ func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbOp) @@ -537,6 +560,7 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T ID: backupID, Status: types.BackupStateRunning, S3PathPrefix: "pathPrefix", + S3Bucket: "bucket", } ydbOp := &Ydb_Operations.Operation{ @@ -549,20 +573,20 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -576,11 +600,12 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -603,6 +628,11 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) assert.Empty(t, err) assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + + // check metrics + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { @@ -635,7 +665,7 @@ func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -648,11 +678,12 @@ func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -675,6 +706,10 @@ func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { assert.Empty(t, err) assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + // check metrics + assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t *testing.T) { @@ -696,6 +731,7 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t ID: backupID, Status: types.BackupStateRunning, S3PathPrefix: "pathPrefix", + S3Bucket: "bucket", } ydbOp := &Ydb_Operations.Operation{ @@ -708,20 +744,20 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp - s3ObjectsMap["pathPrefix"] = []*s3.Object{ - { + s3ObjectsMap["bucket"] = s3Client.Bucket{ + "pathPrefix/data_1.csv": { Key: aws.String("data_1.csv"), Size: aws.Int64(100), }, - { + "pathPrefix/data_2.csv": { Key: aws.String("data_2.csv"), Size: aws.Int64(150), }, - { + "pathPrefix/data_3.csv": { Key: aws.String("data_3.csv"), Size: aws.Int64(200), }, @@ -735,11 +771,12 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t ) s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) - + mon := metrics.NewMockMetricsRegistry() handler := NewTBOperationHandler( dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + mon, ) err := handler(ctx, &tbOp) @@ -763,8 +800,14 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) // check s3 objects (should be removed) - objects, err := s3Connector.ListObjects("pathPrefix", "") + objects, _, err := s3Connector.ListObjects("pathPrefix", "") assert.Equal(t, 0, len(objects)) + + // check metrics + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) + assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) { @@ -797,7 +840,7 @@ func TestTBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) { opMap := make(map[string]types.Operation) backupMap := make(map[string]types.Backup) ydbOpMap := make(map[string]*Ydb_Operations.Operation) - s3ObjectsMap := make(map[string][]*s3.Object) + s3ObjectsMap := make(map[string]s3Client.Bucket) backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp @@ -815,6 +858,7 @@ func TestTBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) { dbConnector, clientConnector, s3Connector, config.Config{ OperationTtlSeconds: 10000, }, queries.NewWriteTableQueryMock, + metrics.NewMockMetricsRegistry(), ) err := handler(ctx, &tbOp) diff --git a/internal/handlers/utils.go b/internal/handlers/utils.go index e8807476..7e88e657 100644 --- a/internal/handlers/utils.go +++ b/internal/handlers/utils.go @@ -4,10 +4,9 @@ import ( "context" "fmt" "time" - "ydbcp/internal/connectors/s3" - "ydbcp/internal/config" "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/s3" "ydbcp/internal/types" "ydbcp/internal/util/xlog" @@ -80,6 +79,7 @@ func lookupYdbOperationStatus( if !isValidStatus(opResponse.GetOperation().GetStatus()) { return &LookupYdbOperationResponse{ + opResponse: opResponse, shouldAbortHandler: true, opState: types.OperationStateError, opMessage: fmt.Sprintf( @@ -131,18 +131,18 @@ func CancelYdbOperation( return nil } -func DeleteBackupData(s3 s3.S3Connector, s3PathPrefix string, s3Bucket string) error { - objects, err := s3.ListObjects(s3PathPrefix, s3Bucket) +func DeleteBackupData(s3 s3.S3Connector, s3PathPrefix string, s3Bucket string) (int64, error) { + objects, size, err := s3.ListObjects(s3PathPrefix, s3Bucket) if err != nil { - return fmt.Errorf("failed to list S3 objects: %v", err) + return 0, fmt.Errorf("failed to list S3 objects: %v", err) } if len(objects) != 0 { err = s3.DeleteObjects(objects, s3Bucket) if err != nil { - return fmt.Errorf("failed to delete S3 objects: %v", err) + return 0, fmt.Errorf("failed to delete S3 objects: %v", err) } } - return nil + return size, nil } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 2ed37ee5..1030226d 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "net/http" "sync" "time" + "ydbcp/internal/types" "ydbcp/internal/config" "ydbcp/internal/util/xlog" @@ -18,17 +20,82 @@ import ( ) type MetricsRegistry interface { - Factory() promauto.Factory + IncApiCallsCounter(serviceName string, methodName string, status string) + IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) + IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) + ObserveOperationDuration(operation types.Operation) + IncHandlerRunsCount(containerId string, operationType string) + IncFailedHandlerRunsCount(containerId string, operationType string) + IncSuccessfulHandlerRunsCount(containerId string, operationType string) + IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) } type MetricsRegistryImpl struct { server *http.Server reg *prometheus.Registry cfg config.MetricsServerConfig + + // api metrics + apiCallsCounter *prometheus.CounterVec + + // storage metrics + bytesWrittenCounter *prometheus.CounterVec + bytesDeletedCounter *prometheus.CounterVec + + // operation metrics + operationsDuration *prometheus.HistogramVec + + // operation processor metrics + handlerRunsCount *prometheus.CounterVec + handlerFailedCount *prometheus.CounterVec + handlerSuccessfulCount *prometheus.CounterVec + + // backup metrics + backupsFailedCount *prometheus.CounterVec + backupsSucceededCount *prometheus.CounterVec +} + +func (s *MetricsRegistryImpl) IncApiCallsCounter(serviceName string, methodName string, code string) { + s.apiCallsCounter.WithLabelValues(serviceName, methodName, code).Inc() +} + +func (s *MetricsRegistryImpl) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) { + s.bytesWrittenCounter.WithLabelValues(containerId, bucket, database).Add(float64(bytes)) +} + +func (s *MetricsRegistryImpl) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) { + s.bytesDeletedCounter.WithLabelValues(containerId, bucket, database).Add(float64(bytes)) +} + +func (s *MetricsRegistryImpl) ObserveOperationDuration(operation types.Operation) { + if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { + duration := operation.GetAudit().CompletedAt.AsTime().Sub(operation.GetAudit().CreatedAt.AsTime()) + s.operationsDuration.WithLabelValues( + operation.GetContainerID(), + operation.GetType().String(), + operation.GetState().String(), + ).Observe(duration.Seconds()) + } +} + +func (s *MetricsRegistryImpl) IncHandlerRunsCount(containerId string, operationType string) { + s.handlerRunsCount.WithLabelValues(containerId, operationType).Inc() +} + +func (s *MetricsRegistryImpl) IncFailedHandlerRunsCount(containerId string, operationType string) { + s.handlerFailedCount.WithLabelValues(containerId, operationType).Inc() +} + +func (s *MetricsRegistryImpl) IncSuccessfulHandlerRunsCount(containerId string, operationType string) { + s.handlerSuccessfulCount.WithLabelValues(containerId, operationType).Inc() } -func (s *MetricsRegistryImpl) Factory() promauto.Factory { - return promauto.With(s.reg) +func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) { + if code == Ydb.StatusIds_SUCCESS { + s.backupsSucceededCount.WithLabelValues(containerId, database).Inc() + } else { + s.backupsFailedCount.WithLabelValues(containerId, database, code.String()).Inc() + } } func NewMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) *MetricsRegistryImpl { @@ -37,6 +104,61 @@ func NewMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met cfg: *cfg, } + s.apiCallsCounter = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "api", + Name: "calls_count", + Help: "Total count of API calls", + }, []string{"service", "method", "status"}) + + s.bytesWrittenCounter = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "storage", + Name: "bytes_written", + Help: "Count of bytes written to storage", + }, []string{"container_id", "bucket", "database"}) + + s.bytesDeletedCounter = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "storage", + Name: "bytes_deleted", + Help: "Count of bytes deleted from storage", + }, []string{"container_id", "bucket", "database"}) + + s.operationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: "operations", + Name: "duration_seconds", + Help: "Duration of operations in seconds", + Buckets: prometheus.ExponentialBuckets(10, 2, 8), + }, []string{"container_id", "type", "status"}) + + s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "operation_processor", + Name: "handler_runs_count", + Help: "Total count of operation handler runs", + }, []string{"container_id", "operation_type"}) + + s.handlerFailedCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "operation_processor", + Name: "handler_runs_failed_count", + Help: "Total count of failed operation handler runs", + }, []string{"container_id", "operation_type"}) + + s.handlerSuccessfulCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "operation_processor", + Name: "handler_runs_successful_count", + Help: "Total count of successful operation handler runs", + }, []string{"container_id", "operation_type"}) + + s.backupsFailedCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "backups", + Name: "failed_count", + Help: "Total count of failed backups", + }, []string{"container_id", "database", "reason"}) + + s.backupsSucceededCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "backups", + Name: "succeeded_count", + Help: "Total count of successful backups", + }, []string{"container_id", "database"}) + mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{Registry: s.reg})) diff --git a/internal/metrics/metrics_moc.go b/internal/metrics/metrics_moc.go index 889f91f6..14dfe368 100644 --- a/internal/metrics/metrics_moc.go +++ b/internal/metrics/metrics_moc.go @@ -1,20 +1,56 @@ package metrics import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "ydbcp/internal/types" ) type MockMetricsRegistry struct { - reg *prometheus.Registry + metrics map[string]int64 } -func (s *MockMetricsRegistry) Factory() promauto.Factory { - return promauto.With(s.reg) +func (s *MockMetricsRegistry) GetMetrics() map[string]int64 { + return s.metrics +} + +func (s *MockMetricsRegistry) IncApiCallsCounter(serviceName string, methodName string, status string) { + s.metrics["api_calls_count"]++ +} + +func (s *MockMetricsRegistry) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) { + s.metrics["storage_bytes_written"] += bytes +} + +func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) { + s.metrics["storage_bytes_deleted"] += bytes +} + +func (s *MockMetricsRegistry) ObserveOperationDuration(operation types.Operation) { + s.metrics["operations_duration_seconds"]++ +} + +func (s *MockMetricsRegistry) IncHandlerRunsCount(containerId string, operationType string) { + s.metrics["operation_processor_handler_runs_count"]++ +} + +func (s *MockMetricsRegistry) IncFailedHandlerRunsCount(containerId string, operationType string) { + s.metrics["operation_processor_handler_runs_failed_count"]++ +} + +func (s *MockMetricsRegistry) IncSuccessfulHandlerRunsCount(containerId string, operationType string) { + s.metrics["operation_processor_handler_runs_successful_count"]++ +} + +func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) { + if code == Ydb.StatusIds_SUCCESS { + s.metrics["backups_succeeded_count"]++ + } else { + s.metrics["backups_failed_count"]++ + } } func NewMockMetricsRegistry() *MockMetricsRegistry { return &MockMetricsRegistry{ - reg: prometheus.NewRegistry(), + metrics: make(map[string]int64), } } diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 5f51be59..9e44b5bb 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -12,7 +12,6 @@ import ( "ydbcp/internal/config" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -27,8 +26,7 @@ func TestMetricsCount(t *testing.T) { } p := NewMetricsRegistry(ctx, &wg, cfg) - count := p.Factory().NewCounter(prometheus.CounterOpts{Name: "test_counter"}) - count.Add(123) + p.apiCallsCounter.WithLabelValues("test_service", "test_method", "test_status").Add(123) repeat := 10 for { @@ -45,7 +43,7 @@ func TestMetricsCount(t *testing.T) { resBody, err := io.ReadAll(res.Body) assert.NoError(t, err) - pattern := []byte("test_counter") + pattern := []byte("api_calls_count{method=\"test_method\",service=\"test_service\",status=\"test_status\"}") val := 0 for _, line := range bytes.Split(resBody, []byte("\n")) { if len(line) == 0 { diff --git a/internal/processor/processor.go b/internal/processor/processor.go index dcc58ee3..79d23679 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -12,7 +12,6 @@ import ( "ydbcp/internal/util/xlog" "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -33,9 +32,7 @@ type OperationProcessorImpl struct { runningOperations map[string]bool results chan string - runsCount *prometheus.CounterVec - failedCount *prometheus.CounterVec - successfulCount *prometheus.CounterVec + mon metrics.MetricsRegistry } type Option func(*OperationProcessorImpl) @@ -73,22 +70,7 @@ func NewOperationProcessor( tickerProvider: ticker.NewRealTicker, runningOperations: make(map[string]bool), results: make(chan string), - - runsCount: mon.Factory().NewCounterVec(prometheus.CounterOpts{ - Subsystem: metricsSubsystem, - Name: "operations_runs_count", - Help: "Total count of runs of the operation", - }, []string{"task_type"}), - failedCount: mon.Factory().NewCounterVec(prometheus.CounterOpts{ - Subsystem: metricsSubsystem, - Name: "operations_failed_count", - Help: "Total count of failed operations", - }, []string{"task_type"}), - successfulCount: mon.Factory().NewCounterVec(prometheus.CounterOpts{ - Subsystem: metricsSubsystem, - Name: "operations_successful_count", - Help: "Total count of successful operations", - }, []string{"task_type"}), + mon: mon, } for _, opt := range options { opt(op) @@ -165,7 +147,7 @@ func (o *OperationProcessorImpl) processOperation(op types.Operation) { ) return } - o.runsCount.WithLabelValues(op.GetType().String()).Inc() + o.mon.IncHandlerRunsCount(op.GetContainerID(), op.GetType().String()) o.runningOperations[op.GetID()] = true o.workersWaitGroup.Add(1) go func() { @@ -183,14 +165,14 @@ func (o *OperationProcessorImpl) processOperation(op types.Operation) { zap.String("operation", types.OperationToString(op)), zap.Error(err), ) - o.failedCount.WithLabelValues(op.GetType().String()).Inc() + o.mon.IncFailedHandlerRunsCount(op.GetContainerID(), op.GetType().String()) } else { xlog.Debug( ctx, "operation handler finished successfully", zap.String("operation", types.OperationToString(op)), ) - o.successfulCount.WithLabelValues(op.GetType().String()).Inc() + o.mon.IncSuccessfulHandlerRunsCount(op.GetContainerID(), op.GetType().String()) } o.results <- op.GetID() }() diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index f91ecc83..f60fd289 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -10,6 +10,7 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/server" "ydbcp/internal/server/grpcinfo" "ydbcp/internal/types" @@ -33,15 +34,22 @@ type BackupService struct { allowedEndpointDomains []string allowInsecureEndpoint bool clock clockwork.Clock + mon metrics.MetricsRegistry +} + +func (s *BackupService) IncApiCallsCounter(methodName string, code codes.Code) { + s.mon.IncApiCallsCounter("BackupService", methodName, code.String()) } func (s *BackupService) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*pb.Backup, error) { + const methodName string = "GetBackup" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Debug(ctx, "GetBackup", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) ctx = xlog.With(ctx, zap.String("BackupID", request.Id)) backupID, err := types.ParseObjectID(request.GetId()) if err != nil { xlog.Error(ctx, "failed to parse BackupID", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.InvalidArgument) return nil, status.Error(codes.InvalidArgument, "failed to parse BackupID") } ctx = xlog.With(ctx, zap.String("BackupID", backupID)) @@ -58,10 +66,12 @@ func (s *BackupService) GetBackup(ctx context.Context, request *pb.GetBackupRequ ) if err != nil { xlog.Error(ctx, "can't select backups", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't select backups") } if len(backups) == 0 { xlog.Error(ctx, "backup not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup not found") // TODO: Permission denied? } backup := backups[0] @@ -69,21 +79,25 @@ func (s *BackupService) GetBackup(ctx context.Context, request *pb.GetBackupRequ // TODO: Need to check access to backup resource by backupID subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupGet, backup.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) - xlog.Debug(ctx, "GetBackup", zap.String("backup", backup.String())) + xlog.Debug(ctx, methodName, zap.String("backup", backup.String())) + s.IncApiCallsCounter(methodName, codes.OK) return backups[0].Proto(), nil } func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb.Operation, error) { + const methodName string = "MakeBackup" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Info(ctx, "MakeBackup", zap.String("request", req.String())) + xlog.Debug(ctx, methodName, zap.String("request", req.String())) ctx = xlog.With(ctx, zap.String("ContainerID", req.ContainerId)) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, req.ContainerId, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -116,6 +130,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques _, err = backup_operations.OpenConnAndValidateSourcePaths(ctx, backup_operations.FromTBWROperation(tbwr), s.clientConn) if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } @@ -123,22 +138,26 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques ctx, queries.NewWriteTableQuery().WithCreateOperation(tbwr), ) if err != nil { - return nil, err + s.IncApiCallsCounter(methodName, codes.Internal) + return nil, status.Error(codes.Internal, err.Error()) } ctx = xlog.With(ctx, zap.String("BackupID", tbwr.BackupID)) ctx = xlog.With(ctx, zap.String("OperationID", tbwr.GetID())) - xlog.Debug(ctx, "MakeBackup was started successfully", zap.String("operation", types.OperationToString(tbwr))) + xlog.Debug(ctx, methodName, zap.String("operation", types.OperationToString(tbwr))) + s.IncApiCallsCounter(methodName, codes.OK) return tbwr.Proto(), nil } func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) { + const methodName string = "DeleteBackup" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Info(ctx, "DeleteBackup", zap.String("request", req.String())) + xlog.Debug(ctx, methodName, zap.String("request", req.String())) ctx = xlog.With(ctx, zap.String("BackupID", req.BackupId)) backupID, err := types.ParseObjectID(req.BackupId) if err != nil { xlog.Error(ctx, "failed to parse BackupID", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.InvalidArgument) return nil, status.Error(codes.InvalidArgument, "failed to parse BackupID") } ctx = xlog.With(ctx, zap.String("BackupID", backupID)) @@ -157,11 +176,13 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe if err != nil { xlog.Error(ctx, "can't select backups", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't select backups") } if len(backups) == 0 { xlog.Error(ctx, "backup not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup not found") // TODO: Permission Denied? } @@ -170,12 +191,14 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, backup.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) if !backup.CanBeDeleted() { xlog.Error(ctx, "backup can't be deleted", zap.String("BackupStatus", backup.Status)) + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Errorf(codes.FailedPrecondition, "backup can't be deleted, status %s", backup.Status) } @@ -207,22 +230,26 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe ) if err != nil { xlog.Error(ctx, "can't create operation", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't create operation") } ctx = xlog.With(ctx, zap.String("OperationID", op.GetID())) - xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op))) + xlog.Debug(ctx, methodName, zap.String("operation", types.OperationToString(op))) + s.IncApiCallsCounter(methodName, codes.OK) return op.Proto(), nil } func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) { + const methodName string = "MakeRestore" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Info(ctx, "MakeRestore", zap.String("request", req.String())) + xlog.Debug(ctx, methodName, zap.String("request", req.String())) ctx = xlog.With(ctx, zap.String("BackupID", req.BackupId)) backupID, err := types.ParseObjectID(req.BackupId) if err != nil { xlog.Error(ctx, "failed to parse BackupID", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.InvalidArgument) return nil, status.Error(codes.InvalidArgument, "failed to parse BackupID") } ctx = xlog.With(ctx, zap.String("BackupID", backupID)) @@ -240,10 +267,12 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ ) if err != nil { xlog.Error(ctx, "can't select backups", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't select backups") } if len(backups) == 0 { xlog.Error(ctx, "backup not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup not found") // TODO: Permission denied? } backup := backups[0] @@ -253,6 +282,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ ctx, s.auth, auth.PermissionBackupRestore, backup.ContainerID, "", ) // TODO: check access to backup as resource if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -263,6 +293,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ "endpoint of database is invalid or not allowed", zap.String("DatabaseEndpoint", req.DatabaseEndpoint), ) + s.IncApiCallsCounter(methodName, codes.InvalidArgument) return nil, status.Errorf( codes.InvalidArgument, "endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint, ) @@ -270,6 +301,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ if backup.Status != types.BackupStateAvailable { xlog.Error(ctx, "backup is not available", zap.String("BackupStatus", backup.Status)) + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Errorf(codes.FailedPrecondition, "backup is not available, status %s", backup.Status) } @@ -282,6 +314,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ client, err := s.clientConn.Open(ctx, dsn) if err != nil { xlog.Error(ctx, "can't open client connection", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Unknown) return nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn) } defer func() { @@ -293,11 +326,13 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ accessKey, err := s.s3.AccessKey() if err != nil { xlog.Error(ctx, "can't get S3AccessKey", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't get S3AccessKey") } secretKey, err := s.s3.SecretKey() if err != nil { xlog.Error(ctx, "can't get S3SecretKey", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't get S3SecretKey") } @@ -310,6 +345,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ fullPath, ok := backup_operations.SafePathJoin(backup.S3PathPrefix, p) if !ok { xlog.Error(ctx, "incorrect source path", zap.String("path", p)) + s.IncApiCallsCounter(methodName, codes.InvalidArgument) return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p) } sourcePaths = append(sourcePaths, fullPath) @@ -333,6 +369,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ clientOperationID, err := s.clientConn.ImportFromS3(ctx, client, s3Settings) if err != nil { xlog.Error(ctx, "can't start import operation", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Unknown) return nil, status.Errorf(codes.Unknown, "can't start import operation, dsn %s", dsn) } ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID)) @@ -360,23 +397,27 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ operationID, err := s.driver.CreateOperation(ctx, op) if err != nil { xlog.Error(ctx, "can't create operation", zap.String("operation", types.OperationToString(op)), zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't create operation") } ctx = xlog.With(ctx, zap.String("OperationID", operationID)) - xlog.Info(ctx, "RestoreBackup operation created") - op.ID = operationID + + xlog.Debug(ctx, methodName, zap.String("operation", types.OperationToString(op))) + s.IncApiCallsCounter(methodName, codes.OK) return op.Proto(), nil } func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) ( *pb.ListBackupsResponse, error, ) { + const methodName string = "ListBackups" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Debug(ctx, "ListBackups", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) ctx = xlog.With(ctx, zap.String("ContainerID", request.ContainerId)) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupList, request.ContainerId, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -421,10 +462,12 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups } pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } orderSpec, err := queries.NewOrderSpec(request.GetOrder()) if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } @@ -438,6 +481,7 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups ) if err != nil { xlog.Error(ctx, "error getting backups", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backups") } pbBackups := make([]*pb.Backup, 0, len(backups)) @@ -450,7 +494,8 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups if uint64(len(pbBackups)) == pageSpec.Limit { res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) } - xlog.Debug(ctx, "ListBackups success") + xlog.Debug(ctx, methodName, zap.String("response", res.String())) + s.IncApiCallsCounter(methodName, codes.OK) return res, nil } @@ -465,6 +510,7 @@ func NewBackupService( auth ap.AuthProvider, allowedEndpointDomains []string, allowInsecureEndpoint bool, + mon metrics.MetricsRegistry, ) *BackupService { return &BackupService{ driver: driver, @@ -474,5 +520,6 @@ func NewBackupService( allowedEndpointDomains: allowedEndpointDomains, allowInsecureEndpoint: allowInsecureEndpoint, clock: clockwork.NewRealClock(), + mon: mon, } } diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index c57c2345..33c43ef3 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -11,6 +11,7 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/server" "ydbcp/internal/server/grpcinfo" "ydbcp/internal/types" @@ -31,6 +32,11 @@ type BackupScheduleService struct { clientConn client.ClientConnector auth ap.AuthProvider clock clockwork.Clock + mon metrics.MetricsRegistry +} + +func (s *BackupScheduleService) IncApiCallsCounter(methodName string, code codes.Code) { + s.mon.IncApiCallsCounter("BackupScheduleService", methodName, code.String()) } func (s *BackupScheduleService) CheckClientDbAccess( @@ -54,11 +60,13 @@ func (s *BackupScheduleService) CheckClientDbAccess( func (s *BackupScheduleService) CreateBackupSchedule( ctx context.Context, request *pb.CreateBackupScheduleRequest, ) (*pb.BackupSchedule, error) { + const methodName string = "CreateBackupSchedule" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Info(ctx, "CreateBackupSchedule", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) ctx = xlog.With(ctx, zap.String("ContainerID", request.ContainerId)) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, request.ContainerId, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -66,16 +74,19 @@ func (s *BackupScheduleService) CreateBackupSchedule( Endpoint: request.Endpoint, DatabaseName: request.DatabaseName, }); err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } if request.ScheduleSettings == nil { xlog.Error( ctx, "no backup schedule settings for CreateBackupSchedule", zap.String("request", request.String()), ) + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "no backup schedule settings for CreateBackupSchedule") } if request.ScheduleSettings.RecoveryPointObjective != nil && (request.ScheduleSettings.RecoveryPointObjective.Seconds == 0) { + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "recovery point objective should be greater than 0") } var scheduleName *string @@ -106,6 +117,7 @@ func (s *BackupScheduleService) CreateBackupSchedule( err = schedule.UpdateNextLaunch(s.clock.Now()) if err != nil { + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, err.Error()) } @@ -115,21 +127,24 @@ func (s *BackupScheduleService) CreateBackupSchedule( ctx, "can't create backup schedule", zap.String("backup schedule", schedule.Proto(s.clock).String()), zap.Error(err), ) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't create backup schedule") } - xlog.Info(ctx, "backup schedule created", zap.String("BackupScheduleID", schedule.ID)) + xlog.Debug(ctx, methodName, zap.Stringer("schedule", &schedule)) + s.IncApiCallsCounter(methodName, codes.OK) return schedule.Proto(s.clock), nil } func (s *BackupScheduleService) UpdateBackupSchedule( ctx context.Context, request *pb.UpdateBackupScheduleRequest, ) (*pb.BackupSchedule, error) { + const methodName string = "UpdateBackupSchedule" ctx = grpcinfo.WithGRPCInfo(ctx) scheduleID := request.GetId() ctx = xlog.With(ctx, zap.String("BackupScheduleID", scheduleID)) - xlog.Debug(ctx, "UpdateBackupSchedule", zap.Stringer("request", request)) + xlog.Debug(ctx, methodName, zap.Stringer("request", request)) schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( @@ -142,10 +157,12 @@ func (s *BackupScheduleService) UpdateBackupSchedule( if err != nil { xlog.Error(ctx, "error getting backup schedule", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backup schedule") } if len(schedules) == 0 { xlog.Error(ctx, "backup schedule not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup schedule not found") } @@ -154,6 +171,7 @@ func (s *BackupScheduleService) UpdateBackupSchedule( // TODO: Need to check access to backup schedule not by container id? subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, schedule.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -161,11 +179,13 @@ func (s *BackupScheduleService) UpdateBackupSchedule( Endpoint: schedule.DatabaseEndpoint, DatabaseName: schedule.DatabaseName, }); err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } if schedule.Status == types.BackupScheduleStateDeleted { xlog.Error(ctx, "backup schedule was deleted") + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "backup schedule was deleted") } @@ -180,17 +200,20 @@ func (s *BackupScheduleService) UpdateBackupSchedule( if request.ScheduleSettings.SchedulePattern != nil { _, err = types.ParseCronExpr(request.ScheduleSettings.SchedulePattern.Crontab) if err != nil { + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "failed to parse crontab") } } if request.ScheduleSettings.RecoveryPointObjective != nil && request.ScheduleSettings.RecoveryPointObjective.Seconds == 0 { + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "recovery point objective should be greater than 0") } schedule.ScheduleSettings = request.ScheduleSettings err = schedule.UpdateNextLaunch(s.clock.Now()) if err != nil { + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "failed to update next launch time") } } @@ -206,22 +229,25 @@ func (s *BackupScheduleService) UpdateBackupSchedule( ctx, "can't update backup schedule", zap.String("backup schedule", schedule.Proto(s.clock).String()), zap.Error(err), ) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't update backup schedule") } - xlog.Info(ctx, "UpdateBackupSchedule was completed successfully", zap.Stringer("schedule", schedule)) + xlog.Debug(ctx, methodName, zap.Stringer("schedule", schedule)) + s.IncApiCallsCounter(methodName, codes.OK) return schedule.Proto(s.clock), nil } func (s *BackupScheduleService) GetBackupSchedule( ctx context.Context, request *pb.GetBackupScheduleRequest, ) (*pb.BackupSchedule, error) { + const methodName string = "GetBackupSchedule" ctx = grpcinfo.WithGRPCInfo(ctx) scheduleID := request.GetId() ctx = xlog.With(ctx, zap.String("BackupScheduleID", scheduleID)) - xlog.Debug(ctx, "GetBackupSchedule", zap.Stringer("request", request)) + xlog.Debug(ctx, methodName, zap.Stringer("request", request)) schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( @@ -234,10 +260,12 @@ func (s *BackupScheduleService) GetBackupSchedule( if err != nil { xlog.Error(ctx, "error getting backup schedule", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backup schedule") } if len(schedules) == 0 { xlog.Error(ctx, "backup schedule not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup schedule not found") // TODO: Permission denied? } @@ -246,23 +274,27 @@ func (s *BackupScheduleService) GetBackupSchedule( // TODO: Need to check access to backup schedule not by container id? subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupGet, schedule.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) - xlog.Debug(ctx, "GetBackupSchedule", zap.Stringer("schedule", schedule)) + xlog.Debug(ctx, methodName, zap.Stringer("schedule", schedule)) + s.IncApiCallsCounter(methodName, codes.OK) return schedule.Proto(s.clock), nil } func (s *BackupScheduleService) ListBackupSchedules( ctx context.Context, request *pb.ListBackupSchedulesRequest, ) (*pb.ListBackupSchedulesResponse, error) { + const methodName string = "ListBackupSchedules" ctx = grpcinfo.WithGRPCInfo(ctx) ctx = xlog.With(ctx, zap.String("ContainerID", request.ContainerId)) - xlog.Debug(ctx, "ListBackupSchedules", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupList, request.ContainerId, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -293,6 +325,7 @@ func (s *BackupScheduleService) ListBackupSchedules( pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } @@ -311,6 +344,7 @@ func (s *BackupScheduleService) ListBackupSchedules( ) if err != nil { xlog.Error(ctx, "error getting backup schedules", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backup schedules") } pbSchedules := make([]*pb.BackupSchedule, 0, len(schedules)) @@ -321,19 +355,21 @@ func (s *BackupScheduleService) ListBackupSchedules( if uint64(len(pbSchedules)) == pageSpec.Limit { res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) } - xlog.Debug(ctx, "ListBackupSchedules success") + xlog.Debug(ctx, methodName, zap.Stringer("response", res)) + s.IncApiCallsCounter(methodName, codes.OK) return res, nil } func (s *BackupScheduleService) ToggleBackupSchedule( ctx context.Context, request *pb.ToggleBackupScheduleRequest, ) (*pb.BackupSchedule, error) { + const methodName string = "ToggleBackupSchedule" ctx = grpcinfo.WithGRPCInfo(ctx) scheduleID := request.GetId() ctx = xlog.With(ctx, zap.String("BackupScheduleID", scheduleID)) - xlog.Debug(ctx, "ToggleBackupSchedule", zap.Stringer("request", request)) + xlog.Debug(ctx, methodName, zap.Stringer("request", request)) schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( @@ -346,10 +382,12 @@ func (s *BackupScheduleService) ToggleBackupSchedule( if err != nil { xlog.Error(ctx, "error getting backup schedule", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backup schedule") } if len(schedules) == 0 { xlog.Error(ctx, "backup schedule not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup schedule not found") } @@ -357,6 +395,7 @@ func (s *BackupScheduleService) ToggleBackupSchedule( ctx = xlog.With(ctx, zap.String("ContainerID", schedule.ContainerID)) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, schedule.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -364,11 +403,13 @@ func (s *BackupScheduleService) ToggleBackupSchedule( Endpoint: schedule.DatabaseEndpoint, DatabaseName: schedule.DatabaseName, }); err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } if schedule.Status == types.BackupScheduleStateDeleted { xlog.Error(ctx, "backup schedule was deleted") + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "backup schedule was deleted") } @@ -381,6 +422,7 @@ func (s *BackupScheduleService) ToggleBackupSchedule( if schedule.ScheduleSettings != nil { err = schedule.UpdateNextLaunch(s.clock.Now()) if err != nil { + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "failed to update next launch time") } } @@ -391,22 +433,25 @@ func (s *BackupScheduleService) ToggleBackupSchedule( ctx, "can't update backup schedule", zap.String("backup schedule", schedule.Proto(s.clock).String()), zap.Error(err), ) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't update backup schedule") } - xlog.Info(ctx, "ToggleBackupSchedule was completed successfully", zap.Stringer("schedule", schedule)) + xlog.Debug(ctx, methodName, zap.Stringer("schedule", schedule)) + s.IncApiCallsCounter(methodName, codes.OK) return schedule.Proto(s.clock), nil } func (s *BackupScheduleService) DeleteBackupSchedule( ctx context.Context, request *pb.DeleteBackupScheduleRequest, ) (*pb.BackupSchedule, error) { + const methodName string = "DeleteBackupSchedule" ctx = grpcinfo.WithGRPCInfo(ctx) scheduleID := request.GetId() ctx = xlog.With(ctx, zap.String("BackupScheduleID", scheduleID)) - xlog.Debug(ctx, "DeleteBackupSchedule", zap.Stringer("request", request)) + xlog.Debug(ctx, methodName, zap.Stringer("request", request)) schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( @@ -419,10 +464,12 @@ func (s *BackupScheduleService) DeleteBackupSchedule( if err != nil { xlog.Error(ctx, "error getting backup schedule", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting backup schedule") } if len(schedules) == 0 { xlog.Error(ctx, "backup schedule not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "backup schedule not found") } @@ -431,12 +478,14 @@ func (s *BackupScheduleService) DeleteBackupSchedule( // TODO: Need to check access to backup schedule not by container id? subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupCreate, schedule.ContainerID, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) if schedule.Status == types.BackupScheduleStateDeleted { xlog.Error(ctx, "backup schedule already deleted") + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Error(codes.FailedPrecondition, "backup schedule already deleted") } @@ -447,10 +496,12 @@ func (s *BackupScheduleService) DeleteBackupSchedule( ctx, "can't delete backup schedule", zap.String("backup schedule", schedule.Proto(s.clock).String()), zap.Error(err), ) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't delete backup schedule") } - xlog.Info(ctx, "DeleteBackupSchedule was completed successfully", zap.Stringer("schedule", schedule)) + xlog.Debug(ctx, methodName, zap.Stringer("schedule", schedule)) + s.IncApiCallsCounter(methodName, codes.OK) return schedule.Proto(s.clock), nil } @@ -462,11 +513,13 @@ func NewBackupScheduleService( driver db.DBConnector, clientConn client.ClientConnector, auth ap.AuthProvider, + mon metrics.MetricsRegistry, ) *BackupScheduleService { return &BackupScheduleService{ driver: driver, clientConn: clientConn, auth: auth, clock: clockwork.NewRealClock(), + mon: mon, } } diff --git a/internal/server/services/operation/operation_service.go b/internal/server/services/operation/operation_service.go index 59a07b9b..ba291348 100644 --- a/internal/server/services/operation/operation_service.go +++ b/internal/server/services/operation/operation_service.go @@ -7,6 +7,7 @@ import ( "ydbcp/internal/auth" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/server" "ydbcp/internal/server/grpcinfo" "ydbcp/internal/types" @@ -24,17 +25,24 @@ type OperationService struct { pb.UnimplementedOperationServiceServer driver db.DBConnector auth ap.AuthProvider + mon metrics.MetricsRegistry +} + +func (s *OperationService) IncApiCallsCounter(methodName string, code codes.Code) { + s.mon.IncApiCallsCounter("OperationService", methodName, code.String()) } func (s *OperationService) ListOperations( ctx context.Context, request *pb.ListOperationsRequest, ) (*pb.ListOperationsResponse, error) { + const methodName string = "ListOperations" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Debug(ctx, "ListOperations", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) ctx = xlog.With(ctx, zap.String("ContainerID", request.ContainerId)) subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupList, request.ContainerId, "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) @@ -77,6 +85,7 @@ func (s *OperationService) ListOperations( pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } @@ -95,6 +104,7 @@ func (s *OperationService) ListOperations( ) if err != nil { xlog.Error(ctx, "error getting operations", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting operations") } pbOperations := make([]*pb.Operation, 0, len(operations)) @@ -105,7 +115,8 @@ func (s *OperationService) ListOperations( if uint64(len(pbOperations)) == pageSpec.Limit { res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) } - xlog.Debug(ctx, "success ListOperations") + xlog.Debug(ctx, methodName, zap.Stringer("response", res)) + s.IncApiCallsCounter(methodName, codes.OK) return res, nil } @@ -113,8 +124,9 @@ func (s *OperationService) CancelOperation( ctx context.Context, request *pb.CancelOperationRequest, ) (*pb.Operation, error) { + const methodName string = "CancelOperation" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Debug(ctx, "CancelOperation", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) ctx = xlog.With(ctx, zap.String("OperationID", request.OperationId)) operations, err := s.driver.SelectOperations( @@ -131,11 +143,13 @@ func (s *OperationService) CancelOperation( if err != nil { xlog.Error(ctx, "error getting operation", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error getting operation") } if len(operations) == 0 { xlog.Error(ctx, "operation not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "operation not found") } @@ -154,20 +168,24 @@ func (s *OperationService) CancelOperation( permission = auth.PermissionBackupRestore } else if operation.GetType() == types.OperationTypeDB { xlog.Error(ctx, "can't cancel DeleteBackup operation") + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Errorf(codes.FailedPrecondition, "can't cancel DeleteBackup operation: %s", types.OperationToString(operation)) } else { xlog.Error(ctx, "unknown operation type") + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Errorf(codes.Internal, "unknown operation type: %s", operation.GetType().String()) } subject, err := auth.CheckAuth(ctx, s.auth, permission, operation.GetContainerID(), "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) if operation.GetState() != types.OperationStatePending && operation.GetState() != types.OperationStateRunning { xlog.Error(ctx, "can't cancel operation with state", zap.String("OperationState", operation.GetState().String())) + s.IncApiCallsCounter(methodName, codes.FailedPrecondition) return nil, status.Errorf(codes.FailedPrecondition, "can't cancel operation with state: %s", operation.GetState().String()) } @@ -177,22 +195,26 @@ func (s *OperationService) CancelOperation( err = s.driver.UpdateOperation(ctx, operation) if err != nil { xlog.Error(ctx, "error updating operation", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "error updating operation") } xlog.Debug( - ctx, "CancelOperation was started", + ctx, methodName, zap.String("operation", types.OperationToString(operation)), ) + s.IncApiCallsCounter(methodName, codes.OK) return operation.Proto(), nil } func (s *OperationService) GetOperation(ctx context.Context, request *pb.GetOperationRequest) (*pb.Operation, error) { + const methodName string = "GetOperation" ctx = grpcinfo.WithGRPCInfo(ctx) - xlog.Debug(ctx, "GetOperation", zap.String("request", request.String())) + xlog.Debug(ctx, methodName, zap.String("request", request.String())) operationID, err := types.ParseObjectID(request.GetId()) if err != nil { xlog.Error(ctx, "failed to parse OperationID", zap.String("OperationID", request.GetId()), zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "failed to parse ObjectID") } ctx = xlog.With(ctx, zap.String("OperationID", operationID)) @@ -210,11 +232,13 @@ func (s *OperationService) GetOperation(ctx context.Context, request *pb.GetOper ) if err != nil { xlog.Error(ctx, "can't select operations", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) return nil, status.Error(codes.Internal, "can't select operations") } if len(operations) == 0 { xlog.Error(ctx, "operation not found") + s.IncApiCallsCounter(methodName, codes.NotFound) return nil, status.Error(codes.NotFound, "operation not found") // TODO: permission denied? } operation := operations[0] @@ -222,11 +246,13 @@ func (s *OperationService) GetOperation(ctx context.Context, request *pb.GetOper // TODO: Need to check access to operation resource by operationID subject, err := auth.CheckAuth(ctx, s.auth, auth.PermissionBackupGet, operation.GetContainerID(), "") if err != nil { + s.IncApiCallsCounter(methodName, status.Code(err)) return nil, err } ctx = xlog.With(ctx, zap.String("SubjectID", subject)) - xlog.Debug(ctx, "GetOperation", zap.String("operation", types.OperationToString(operations[0]))) + xlog.Debug(ctx, methodName, zap.String("operation", types.OperationToString(operations[0]))) + s.IncApiCallsCounter(methodName, codes.OK) return operations[0].Proto(), nil } @@ -234,9 +260,14 @@ func (s *OperationService) Register(server server.Server) { pb.RegisterOperationServiceServer(server.GRPCServer(), s) } -func NewOperationService(driver db.DBConnector, auth ap.AuthProvider) *OperationService { +func NewOperationService( + driver db.DBConnector, + auth ap.AuthProvider, + mon metrics.MetricsRegistry, +) *OperationService { return &OperationService{ driver: driver, auth: auth, + mon: mon, } } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index c589d728..14654b2b 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -10,6 +10,7 @@ import ( "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/handlers" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/ticker" "ydbcp/internal/watchers" @@ -62,6 +63,7 @@ func TestScheduleWatcherSimple(t *testing.T) { handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, + metrics.NewMockMetricsRegistry(), ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -170,6 +172,7 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, + metrics.NewMockMetricsRegistry(), ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -286,6 +289,7 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, + metrics.NewMockMetricsRegistry(), ) scheduleWatcherActionCompleted := make(chan struct{}) diff --git a/internal/watchers/ttl_watcher/ttl_watcher.go b/internal/watchers/ttl_watcher/ttl_watcher.go index a08eddef..eb30a6a8 100644 --- a/internal/watchers/ttl_watcher/ttl_watcher.go +++ b/internal/watchers/ttl_watcher/ttl_watcher.go @@ -19,7 +19,7 @@ func NewTtlWatcher( ctx context.Context, wg *sync.WaitGroup, db db.DBConnector, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, options ...watchers.Option, ) *watchers.WatcherImpl { return watchers.NewWatcher( @@ -38,7 +38,7 @@ func TtlWatcherAction( baseCtx context.Context, period time.Duration, db db.DBConnector, - queryBuilderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBuilderFactory, ) { ctx, cancel := context.WithTimeout(baseCtx, period) defer cancel()