Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ydbcp): add metrics #97

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func main() {
authProvider,
configInstance.ClientConnection.AllowedEndpointDomains,
configInstance.ClientConnection.AllowInsecureEndpoint,
metrics,
).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server)
operation.NewOperationService(dbConnector, authProvider, metrics).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server)
if err := server.Start(ctx, &wg); err != nil {
xlog.Error(ctx, "Error start GRPC server", zap.Error(err))
os.Exit(1)
Expand All @@ -133,7 +134,7 @@ func main() {
if err := handlersRegistry.Add(
types.OperationTypeTB,
handlers.NewTBOperationHandler(
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery,
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand All @@ -142,15 +143,15 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeRB,
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance),
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics),
); err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand All @@ -164,7 +165,9 @@ func main() {
configInstance.S3,
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock()),
clockwork.NewRealClock(),
metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
Expand All @@ -174,7 +177,7 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

backupScheduleHandler := handlers.NewBackupScheduleHandler(
queries.NewWriteTableQuery, clockwork.NewRealClock(),
queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics,
)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
xlog.Info(ctx, "YDBCP started")
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type WriteSingleTableQueryImpl struct {
}

type WriteTableQueryImplOption func(*WriteTableQueryImpl)
type WriteQueryBulderFactory func() WriteTableQuery
type WriteQueryBuilderFactory func() WriteTableQuery

func (d *WriteSingleTableQueryImpl) AddValueParam(name string, value table_types.Value) {
d.upsertFields = append(d.upsertFields, name[1:])
Expand Down
12 changes: 8 additions & 4 deletions internal/connectors/s3/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type S3Connector interface {
ListObjects(pathPrefix string, bucket string) ([]string, error)
ListObjects(pathPrefix string, bucket string) ([]string, int64, error)
GetSize(pathPrefix string, bucket string) (int64, error)
DeleteObjects(keys []string, bucket string) error
}
Expand Down Expand Up @@ -49,7 +49,8 @@ func NewS3Connector(config config.S3Config) (*ClientS3Connector, error) {
return &ClientS3Connector{s3: s3Client}, nil
}

func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, error) {
func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]string, int64, error) {
var size int64
objects := make([]string, 0)
objectsPtr := &objects

Expand All @@ -65,17 +66,20 @@ func (c *ClientS3Connector) ListObjects(pathPrefix string, bucket string) ([]str
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, object := range p.Contents {
*objectsPtr = append(*objectsPtr, *object.Key)
if object.Size != nil {
size += *object.Size
}
}

return true
},
)

if err != nil {
return nil, err
return nil, 0, err
}

return *objectsPtr, nil
return *objectsPtr, size, nil
}

func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, error) {
Expand Down
52 changes: 31 additions & 21 deletions internal/connectors/s3/mock.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,63 @@
package s3

import (
"fmt"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
)

type Bucket map[string]*s3.Object

type MockS3Connector struct {
storage map[string][]*s3.Object
storage map[string]Bucket
}

func NewMockS3Connector(storage map[string][]*s3.Object) *MockS3Connector {
func NewMockS3Connector(storage map[string]Bucket) *MockS3Connector {
return &MockS3Connector{
storage: storage,
}
}

func (m *MockS3Connector) ListObjects(pathPrefix string, _ string) ([]string, error) {
func (m *MockS3Connector) ListObjects(pathPrefix string, bucketName string) ([]string, int64, error) {
objects := make([]string, 0)
var size int64

if bucket, ok := m.storage[bucketName]; ok {
for key, object := range bucket {
if strings.HasPrefix(key, pathPrefix) {
objects = append(objects, key)

if content, ok := m.storage[pathPrefix]; ok {
for _, object := range content {
if object.Key == nil {
objects = append(objects, *object.Key)
if object.Size != nil {
size += *object.Size
}
}
}
}

return objects, nil
return objects, size, nil
}

func (m *MockS3Connector) GetSize(pathPrefix string, _ string) (int64, error) {
if content, ok := m.storage[pathPrefix]; ok {
var size int64
for _, object := range content {
if object.Size != nil {
size += *object.Size
func (m *MockS3Connector) GetSize(pathPrefix string, bucketName string) (int64, error) {
var size int64

if bucket, ok := m.storage[bucketName]; ok {
for key, object := range bucket {
if strings.HasPrefix(key, pathPrefix) {
if object.Size != nil {
size += *object.Size
}
}
}

return size, nil
}

return 0, fmt.Errorf("objects not found, path: %s", pathPrefix)
return size, nil
ulya-sidorina marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *MockS3Connector) DeleteObjects(objects []string, _ string) error {
for _, object := range objects {
delete(m.storage, object)
func (m *MockS3Connector) DeleteObjects(objects []string, bucketName string) error {
if bucket, ok := m.storage[bucketName]; ok {
for _, key := range objects {
delete(bucket, key)
}
}

return nil
Expand Down
43 changes: 31 additions & 12 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
Expand All @@ -20,10 +21,11 @@ func NewDBOperationHandler(
db db.DBConnector,
s3 s3.S3Connector,
config config.Config,
queryBulderFactory queries.WriteQueryBulderFactory,
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, queryBulderFactory)
return DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon)
}
}

Expand All @@ -33,7 +35,8 @@ func DBOperationHandler(
db db.DBConnector,
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBulderFactory,
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand All @@ -54,14 +57,30 @@ func DBOperationHandler(
Status: types.BackupStateUnknown,
}

upsertAndReportMetrics := func(operation types.Operation, backup *types.Backup) error {
var err error

if backup != nil {
err = db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(*backup),
)
} else {
err = db.UpdateOperation(ctx, operation)
}

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

if deadlineExceeded(dbOp.Audit.CreatedAt, config) {
backupToWrite.Status = types.BackupStateError
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
return upsertAndReportMetrics(operation, &backupToWrite)
}

backups, err := db.SelectBackups(
Expand All @@ -84,23 +103,25 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return upsertAndReportMetrics(operation, nil)
}

backup := backups[0]
if backup.Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
return upsertAndReportMetrics(operation, nil)
}

deleteBackup := func(pathPrefix string, bucket string) error {
err := DeleteBackupData(s3, pathPrefix, bucket)
size, err := DeleteBackupData(s3, pathPrefix, bucket)
if err != nil {
return fmt.Errorf("failed to delete backup data: %v", err)
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size)

backupToWrite.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
Expand Down Expand Up @@ -133,7 +154,5 @@ func DBOperationHandler(
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return db.ExecuteUpsert(
ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
return upsertAndReportMetrics(operation, &backupToWrite)
}
Loading
Loading