diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 059854b0..d5d3f18e 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -49,7 +49,7 @@ func (s *server) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (* return nil, fmt.Errorf("failed to parse uuid %s: %w", request.GetId(), err) } backups, err := s.driver.SelectBackups( - ctx, queries.MakeReadTableQuery( + ctx, queries.NewReadTableQuery( queries.WithTableName("Backups"), queries.WithSelectFields(queries.AllBackupFields...), queries.WithQueryFilters( @@ -118,7 +118,7 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) { xlog.Debug(ctx, "ListBackups", zap.String("request", request.String())) backups, err := s.driver.SelectBackups( - ctx, queries.MakeReadTableQuery( + ctx, queries.NewReadTableQuery( queries.WithTableName("Backups"), queries.WithSelectFields(queries.AllBackupFields...), queries.WithQueryFilters( @@ -156,7 +156,7 @@ func (s *server) ListOperations(ctx context.Context, request *pb.ListOperationsR ) { xlog.Debug(ctx, "ListOperations", zap.String("request", request.String())) operations, err := s.driver.SelectOperations( - ctx, queries.MakeReadTableQuery( + ctx, queries.NewReadTableQuery( queries.WithTableName("Operations"), queries.WithSelectFields(queries.AllOperationFields...), queries.WithQueryFilters( @@ -270,7 +270,10 @@ func main() { handlersRegistry := processor.NewOperationHandlerRegistry() err = handlersRegistry.Add( - types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()), + types.OperationTypeTB, + handlers.NewTBOperationHandler( + dbConnector, client.NewClientYdbConnector(), configInstance, queries.NewWriteTableQuery, + ), ) if err != nil { xlog.Error(ctx, "failed to register TB handler", zap.Error(err)) @@ -279,7 +282,7 @@ func main() { err = handlersRegistry.Add( types.OperationTypeRB, - handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance), + handlers.NewRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance), ) if err != nil { diff --git a/go.mod b/go.mod index 0a052210..d0ed20f1 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,9 @@ require ( github.com/golang-jwt/jwt/v4 v4.4.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect ) diff --git a/go.sum b/go.sum index 81ae04e5..2cb8bb77 100644 --- a/go.sum +++ b/go.sum @@ -103,28 +103,28 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -147,8 +147,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= +google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index 84d0f638..e4b03e62 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -47,6 +47,7 @@ type DBConnector interface { CreateOperation(context.Context, types.Operation) (types.ObjectID, error) CreateBackup(context.Context, types.Backup) (types.ObjectID, error) UpdateBackup(context context.Context, id types.ObjectID, backupState string) error + ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error Close() } @@ -238,7 +239,7 @@ func (d *YdbConnector) SelectBackupsByStatus( return DoStructSelect[types.Backup]( ctx, d, - queries.MakeReadTableQuery( + queries.NewReadTableQuery( queries.WithTableName("Backups"), queries.WithSelectFields("id"), queries.WithQueryFilters( @@ -280,7 +281,7 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( return DoInterfaceSelect[types.Operation]( ctx, d, - queries.MakeReadTableQuery( + queries.NewReadTableQuery( queries.WithTableName("Operations"), queries.WithSelectFields(queries.AllOperationFields...), queries.WithQueryFilters( @@ -300,14 +301,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( func (d *YdbConnector) UpdateOperation( ctx context.Context, operation types.Operation, ) error { - return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation))) + return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation)) } func (d *YdbConnector) CreateOperation( ctx context.Context, operation types.Operation, ) (types.ObjectID, error) { operation.SetId(types.GenerateObjectID()) - err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateOperation(operation))) + err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateOperation(operation)) if err != nil { return types.ObjectID{}, err } @@ -319,7 +320,7 @@ func (d *YdbConnector) CreateBackup( ) (types.ObjectID, error) { id := types.GenerateObjectID() backup.ID = id - err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) + err := d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithCreateBackup(backup)) if err != nil { return types.ObjectID{}, err } @@ -333,5 +334,5 @@ func (d *YdbConnector) UpdateBackup( ID: id, Status: backupStatus, } - return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) + return d.ExecuteUpsert(context, queries.NewWriteTableQuery().WithUpdateBackup(backup)) } diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index dc9f73ea..e7cc0d2d 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -140,8 +140,26 @@ func (c *MockDBConnector) GetOperation( ) } -func (d *MockDBConnector) SelectOperations( - ctx context.Context, queryBuilder queries.ReadTableQuery, +func (c *MockDBConnector) GetBackup( + _ context.Context, backupID types.ObjectID, +) (types.Backup, error) { + if backup, exist := c.backups[backupID]; exist { + return backup, nil + } + return types.Backup{}, fmt.Errorf( + "backup not found, id %s", backupID.String(), + ) +} + +func (c *MockDBConnector) SelectOperations( + _ context.Context, _ queries.ReadTableQuery, ) ([]types.Operation, error) { return nil, errors.New("Do not call this method") } + +func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error { + queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) + c.operations[queryBuilderMock.Operation.GetId()] = queryBuilderMock.Operation + c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup + return nil +} diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index a6a6c12b..473f597c 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -5,6 +5,7 @@ import ( "github.com/google/uuid" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "time" "ydbcp/internal/types" ) @@ -70,21 +71,23 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { operationId types.ObjectID containerId string operationType string + createdAt time.Time + database string - operationStateBuf *string backupId *types.ObjectID ydbOperationId *string - database *string + operationStateBuf *string ) err := res.ScanNamed( named.Required("id", &operationId), named.Required("container_id", &containerId), named.Required("type", &operationType), + named.Required("created_at", &createdAt), + named.Required("database", &database), - named.Optional("status", &operationStateBuf), named.Optional("backup_id", &backupId), named.Optional("operation_id", &ydbOperationId), - named.Optional("database", &database), + named.Optional("status", &operationStateBuf), ) if err != nil { return nil, err @@ -93,31 +96,37 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { if operationStateBuf != nil { operationState = types.OperationState(*operationStateBuf) } + ydbOpId := "" + if ydbOperationId != nil { + ydbOpId = *ydbOperationId + } if operationType == string(types.OperationTypeTB) { - if backupId == nil || database == nil || ydbOperationId == nil { - return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) + if backupId == nil { + return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId.String()) } return &types.TakeBackupOperation{ Id: operationId, - BackupId: types.ObjectID(*backupId), + BackupId: *backupId, ContainerID: containerId, State: operationState, Message: "", - YdbConnectionParams: types.GetYdbConnectionParams(*database), - YdbOperationId: *ydbOperationId, + YdbConnectionParams: types.GetYdbConnectionParams(database), + YdbOperationId: ydbOpId, + CreatedAt: createdAt, }, nil } else if operationType == string(types.OperationTypeRB) { - if backupId == nil || database == nil || ydbOperationId == nil { - return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) + if backupId == nil { + return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId.String()) } return &types.RestoreBackupOperation{ Id: operationId, - BackupId: types.ObjectID(*backupId), + BackupId: *backupId, ContainerID: containerId, State: operationState, Message: "", - YdbConnectionParams: types.GetYdbConnectionParams(*database), - YdbOperationId: *ydbOperationId, + YdbConnectionParams: types.GetYdbConnectionParams(database), + YdbOperationId: ydbOpId, + CreatedAt: createdAt, }, nil } diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index 97c047fb..c9472b0a 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -52,7 +52,7 @@ type ReadTableQueryImpl struct { type ReadTableQueryOption func(*ReadTableQueryImpl) -func MakeReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl { +func NewReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl { d := &ReadTableQueryImpl{} d.filters = make([][]table_types.Value, 0) d.filterFields = make([]string, 0) diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go index 19b3047e..e62e9ee4 100644 --- a/internal/connectors/db/yql/queries/read_test.go +++ b/internal/connectors/db/yql/queries/read_test.go @@ -24,7 +24,7 @@ SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 table.ValueParam("$param3", table_types.StringValueFromString("yyy")), ) ) - builder := MakeReadTableQuery( + builder := NewReadTableQuery( WithTableName("table1"), WithSelectFields("column1", "column2", "column3"), WithQueryFilters( diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index da83cb79..4a211a2a 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -14,6 +14,10 @@ import ( type WriteTableQuery interface { FormatQuery(ctx context.Context) (*FormatQueryResult, error) + WithCreateBackup(backup types.Backup) WriteTableQuery + WithCreateOperation(operation types.Operation) WriteTableQuery + WithUpdateBackup(backup types.Backup) WriteTableQuery + WithUpdateOperation(operation types.Operation) WriteTableQuery } type WriteTableQueryImpl struct { @@ -123,42 +127,36 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl return d } -type WriteTableQueryOption func(*WriteTableQueryImpl) +type WriteTableQueryImplOption func(*WriteTableQueryImpl) -func MakeWriteTableQuery(options ...WriteTableQueryOption) *WriteTableQueryImpl { - d := &WriteTableQueryImpl{} - for _, opt := range options { - opt(d) - } - return d +type WriteTableQueryMockOption func(*WriteTableQueryMock) + +func NewWriteTableQuery() WriteTableQuery { + return &WriteTableQueryImpl{} } -func WithCreateBackup(backup types.Backup) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) - } +func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) + return d } -func WithUpdateBackup(backup types.Backup) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index)) - } +func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index)) + return d } -func WithUpdateOperation(operation types.Operation) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index)) - } +func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index)) + return d } -func WithCreateOperation(operation types.Operation) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) - } +func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) + return d } func (d *WriteSingleTableQueryImpl) DeclareParameters() string { @@ -180,9 +178,14 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu return nil, errors.New("No table") } declares := t.DeclareParameters() + paramNames := t.GetParamNames() + keyParam := fmt.Sprintf("%s = %s", t.upsertFields[0], paramNames[0]) + updates := make([]string, 0) + for j := 1; j < len(t.upsertFields); j++ { + updates = append(updates, fmt.Sprintf("%s = %s", t.upsertFields[j], paramNames[j])) + } queryStrings[i] = fmt.Sprintf( - "%s;\nUPSERT INTO %s (%s) VALUES (%s)", declares, t.tableName, strings.Join(t.upsertFields, ", "), - strings.Join(t.GetParamNames(), ", "), + "%s;\nUPDATE %s SET %s WHERE %s", declares, t.tableName, strings.Join(updates, ", "), keyParam, ) for _, p := range t.tableQueryParams { allParams = append(allParams, p) diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go new file mode 100644 index 00000000..6f20d099 --- /dev/null +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -0,0 +1,39 @@ +package queries + +import ( + "context" + "ydbcp/internal/types" +) + +type WriteTableQueryMock struct { + Operation types.Operation + Backup types.Backup +} + +func NewWriteTableQueryMock() WriteTableQuery { + return &WriteTableQueryMock{} +} + +func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) { + return &FormatQueryResult{}, nil +} + +func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery { + w.Backup = backup + return w +} + +func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery { + w.Operation = operation + return w +} + +func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery { + w.Backup = backup + return w +} + +func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery { + w.Operation = operation + return w +} diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index 4c039aab..95b4b7c0 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -13,11 +13,11 @@ func TestQueryBuilder_Write(t *testing.T) { const ( queryString = `DECLARE $id_0 AS Uuid; DECLARE $status_0 AS String; -UPSERT INTO Backups (id, status) VALUES ($id_0, $status_0); +UPDATE Backups SET status = $status_0 WHERE id = $id_0; DECLARE $id_1 AS Uuid; DECLARE $status_1 AS String; DECLARE $message_1 AS String; -UPSERT INTO Operations (id, status, message) VALUES ($id_1, $status_1, $message_1)` +UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1` ) opId := types.GenerateObjectID() backupId := types.GenerateObjectID() @@ -30,10 +30,9 @@ UPSERT INTO Operations (id, status, message) VALUES ($id_1, $status_1, $message_ ID: backupId, Status: "Available", } - builder := MakeWriteTableQuery( - WithUpdateBackup(backup), - WithUpdateOperation(&op), - ) + builder := NewWriteTableQuery(). + WithUpdateBackup(backup). + WithUpdateOperation(&op) var ( queryParams = table.NewQueryParameters( table.ValueParam("$id_0", table_types.UUIDValue(backupId)), diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index d0f75cb8..a3ebd907 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -40,7 +40,7 @@ CREATE TABLE Operations ( backup_id UUID, initiated String, - created_at Timestamp, + created_at Timestamp NOT NULL, completed_at Timestamp, status String, diff --git a/internal/connectors/db/yql/schema/fill_tables.yql b/internal/connectors/db/yql/schema/fill_tables.yql index 7dfa4be1..85493a5f 100644 --- a/internal/connectors/db/yql/schema/fill_tables.yql +++ b/internal/connectors/db/yql/schema/fill_tables.yql @@ -7,5 +7,5 @@ UPSERT INTO `OperationTypes` (code, description, is_cancellable) VALUES UPSERT INTO `Backups` (id, container_id, database, status) VALUES (Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING'); -UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id) VALUES - (Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112')); +UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id, created_at) VALUES + (Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112'), CurrentUtcTimestamp()); diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index b19ef796..465f1190 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "go.uber.org/zap" - "time" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -13,20 +12,14 @@ import ( "ydbcp/internal/util/xlog" ) -func MakeRBOperationHandler(db db.DBConnector, client client.ClientConnector, config config.Config) types.OperationHandler { +func NewRBOperationHandler( + db db.DBConnector, client client.ClientConnector, config config.Config, +) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { return RBOperationHandler(ctx, op, db, client, config) } } -func valid(status Ydb.StatusIds_StatusCode) bool { - return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED -} - -func retriable(status Ydb.StatusIds_StatusCode) bool { - return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE -} - func RBOperationHandler( ctx context.Context, operation types.Operation, @@ -34,7 +27,8 @@ func RBOperationHandler( client client.ClientConnector, config config.Config, ) error { - xlog.Info(ctx, "received operation", + xlog.Info( + ctx, "received operation", zap.String("id", operation.GetId().String()), zap.String("type", string(operation.GetType())), zap.String("state", string(operation.GetState())), @@ -42,7 +36,8 @@ func RBOperationHandler( ) if operation.GetType() != types.OperationTypeRB { - return fmt.Errorf("wrong type %s != %s for operation %s", + return fmt.Errorf( + "wrong type %s != %s for operation %s", operation.GetType(), types.OperationTypeRB, types.OperationToString(operation), ) } @@ -54,104 +49,56 @@ func RBOperationHandler( conn, err := client.Open(ctx, types.MakeYdbConnectionString(mr.YdbConnectionParams)) if err != nil { - return fmt.Errorf("error initializing client connector for operation #%s: %w", + return fmt.Errorf( + "error initializing client connector for operation #%s: %w", mr.GetId().String(), err, ) } defer func() { _ = client.Close(ctx, conn) }() - xlog.Info(ctx, "getting operation status", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), + ydbOpResponse, err := lookupYdbOperationStatus( + ctx, client, conn, operation, mr.YdbOperationId, mr.CreatedAt, config, ) - - opResponse, err := client.GetOperationStatus(ctx, conn, mr.YdbOperationId) if err != nil { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { - operation.SetState(types.OperationStateError) - operation.SetMessage("Operation deadline exceeded") - return db.UpdateOperation(ctx, operation) - } - - return fmt.Errorf( - "failed to get operation status for operation #%s, import operation id %s: %w", - mr.GetId().String(), - mr.YdbOperationId, - err, - ) + return err } - - if retriable(opResponse.GetOperation().GetStatus()) { - xlog.Info(ctx, "received retriable error", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), - ) - - return nil + if ydbOpResponse.shouldAbortHandler { + operation.SetState(ydbOpResponse.opState) + operation.SetMessage(ydbOpResponse.opMessage) + return db.UpdateOperation(ctx, operation) } - if !valid(opResponse.GetOperation().GetStatus()) { - operation.SetState(types.OperationStateError) - operation.SetMessage(fmt.Sprintf("Error status: %s, issues: %s", - opResponse.GetOperation().GetStatus(), - types.IssuesToString(opResponse.GetOperation().Issues)), - ) - return db.UpdateOperation(ctx, operation) + if ydbOpResponse.opResponse == nil { + return nil } + opResponse := ydbOpResponse.opResponse switch mr.State { case types.OperationStatePending: { if !opResponse.GetOperation().Ready { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { - xlog.Info(ctx, "cancelling operation due to ttl", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), - ) - - response, err := client.CancelOperation(ctx, conn, mr.YdbOperationId) + if deadlineExceeded(mr.CreatedAt, config) { + err = CancelYdbOperation(ctx, client, conn, operation, mr.YdbOperationId, "TTL") if err != nil { - return fmt.Errorf( - "error cancelling operation #%s, import operation id %s: %w", - mr.GetId().String(), - mr.YdbOperationId, - err, - ) + return err } - - if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { - return fmt.Errorf( - "error cancelling operation id %s, import operation id %s, issues: %s", - mr.GetId().String(), - mr.YdbOperationId, - types.IssuesToString(response.GetIssues()), - ) - } - - operation.SetState(types.OperationStateCancelling) - operation.SetMessage("Operation deadline exceeded") return db.UpdateOperation(ctx, operation) } - return nil } - if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { operation.SetState(types.OperationStateDone) operation.SetMessage("Success") } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { - operation.SetState(types.OperationStateCancelled) - operation.SetMessage("Pending operation wac cancelled") + operation.SetState(types.OperationStateError) + operation.SetMessage("Pending operation was cancelled") } } case types.OperationStateCancelling: { if !opResponse.GetOperation().Ready { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { + if deadlineExceeded(mr.CreatedAt, config) { operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") return db.UpdateOperation(ctx, operation) @@ -159,7 +106,6 @@ func RBOperationHandler( return nil } - if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { operation.SetState(types.OperationStateDone) operation.SetMessage("Operation was completed despite cancellation") @@ -170,7 +116,8 @@ func RBOperationHandler( } } - xlog.Info(ctx, "forgetting operation", + xlog.Info( + ctx, "forgetting operation", zap.String("id", mr.Id.String()), zap.String("type", string(operation.GetType())), zap.String("ydb_operation_id", mr.YdbOperationId), diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go index b502aad7..15e0d496 100644 --- a/internal/handlers/restore_backup_test.go +++ b/internal/handlers/restore_backup_test.go @@ -35,7 +35,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) { ) // try to handle rb operation with non-existing ydb operation id - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -76,7 +76,7 @@ func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle pending rb operation with zero ttl - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -123,7 +123,7 @@ func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) { dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle pending rb operation with ttl - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -169,7 +169,7 @@ func TestRBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -214,15 +214,16 @@ func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) err := handler(ctx, &rbOp) assert.Empty(t, err) - // check operation status (should be cancelled) + // check operation status (should be error) op, err := dbConnector.GetOperation(ctx, rbOp.Id) assert.Empty(t, err) assert.NotEmpty(t, op) - assert.Equal(t, types.OperationStateCancelled, op.GetState()) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Pending operation was cancelled", op.GetMessage()) // check ydb operation status (should be forgotten) ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) @@ -260,7 +261,7 @@ func TestRBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) // try to handle cancelling rb operation with zero ttl - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -307,7 +308,7 @@ func TestRBOperationHandlerCancellingOperationInProgress(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -353,7 +354,7 @@ func TestRBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -399,7 +400,7 @@ func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) err := handler(ctx, &rbOp) assert.Empty(t, err) @@ -445,7 +446,7 @@ func TestRBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) { clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) - handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) err := handler(ctx, &rbOp) assert.Empty(t, err) diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index c74e4274..00a2c695 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -3,16 +3,20 @@ package handlers import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" - - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ) -func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) types.OperationHandler { +func NewTBOperationHandler( + db db.DBConnector, client client.ClientConnector, config config.Config, + getQueryBuilder func() queries.WriteTableQuery, +) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client) + return TBOperationHandler(ctx, op, db, client, config, getQueryBuilder) } } @@ -21,6 +25,8 @@ func TBOperationHandler( operation types.Operation, db db.DBConnector, client client.ClientConnector, + config config.Config, + getQueryBuilder func() queries.WriteTableQuery, ) error { if operation.GetType() != types.OperationTypeTB { return fmt.Errorf("wrong operation type %s != %s", operation.GetType(), types.OperationTypeTB) @@ -37,90 +43,87 @@ func TBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() - //lookup YdbServerOperationStatus - opInfo, err := client.GetOperationStatus(ctx, conn, tb.YdbOperationId) + ydbOpResponse, err := lookupYdbOperationStatus( + ctx, client, conn, operation, tb.YdbOperationId, tb.CreatedAt, config, + ) if err != nil { - //skip, write log - //upsert message into operation? - return fmt.Errorf( - "failed to lookup operation status for operation id %s, export operation id %s: %w", - tb.GetId().String(), - tb.YdbOperationId, - err, + return err + } + + backupToWrite := types.Backup{ + ID: tb.BackupId, + Status: types.BackupStateUnknown, + } + + if ydbOpResponse.shouldAbortHandler { + operation.SetState(ydbOpResponse.opState) + operation.SetMessage(ydbOpResponse.opMessage) + backupToWrite.Status = types.BackupStateError + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } + if ydbOpResponse.opResponse == nil { + return nil + } + opResponse := ydbOpResponse.opResponse + switch tb.State { case types.OperationStatePending: { - if !opInfo.GetOperation().Ready { - //if pending: return op, nil - //if backup deadline failed: cancel operation. (skip for now) - return nil - } - if opInfo.GetOperation().Status == Ydb.StatusIds_SUCCESS { - //upsert into operations (id, status) values (id, done)? - //db.StartUpdate() - //.WithUpdateBackup() - //.WithYUpdateOperation() - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateAvailable) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, + if !opResponse.GetOperation().Ready { + if deadlineExceeded(tb.CreatedAt, config) { + err = CancelYdbOperation(ctx, client, conn, operation, tb.YdbOperationId, "TTL") + if err != nil { + return err + } + backupToWrite.Status = types.BackupStateError + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) + } else { + return nil } + } else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + backupToWrite.Status = types.BackupStateAvailable operation.SetState(types.OperationStateDone) operation.SetMessage("Success") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + backupToWrite.Status = types.BackupStateError + operation.SetState(types.OperationStateError) + operation.SetMessage("got CANCELLED status for PENDING operation") } else { - //op.State = Error - //upsert into operations (id, status, message) values (id, error, message)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, - ) - } - if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { - operation.SetMessage("got CANCELLED status for PENDING operation") - } else { - operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues)) - } + backupToWrite.Status = types.BackupStateError operation.SetState(types.OperationStateError) + operation.SetMessage(ydbOpResponse.IssueString()) } } case types.OperationStateCancelling: { - if !opInfo.GetOperation().Ready { - //can this hang in cancelling state? - return nil - } - if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { - //upsert into operations (id, status, message) values (id, cancelled)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateCancelled) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, + if !opResponse.GetOperation().Ready { + if deadlineExceeded(tb.CreatedAt, config) { + backupToWrite.Status = types.BackupStateError + operation.SetState(types.OperationStateError) + operation.SetMessage("Operation deadline exceeded") + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) + } else { + return nil } + } + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + backupToWrite.Status = types.BackupStateAvailable + operation.SetState(types.OperationStateDone) + operation.SetMessage("Operation was completed despite cancellation") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + backupToWrite.Status = types.BackupStateCancelled operation.SetState(types.OperationStateCancelled) operation.SetMessage("Success") } else { - //upsert into operations (id, status, message) values (id, error, error.message)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, - ) - } + backupToWrite.Status = types.BackupStateError operation.SetState(types.OperationStateError) - operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues)) + operation.SetMessage(ydbOpResponse.IssueString()) } } } @@ -142,5 +145,7 @@ func TBOperationHandler( types.IssuesToString(response.GetIssues()), ) } - return db.UpdateOperation(ctx, operation) + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) } diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 41871ce1..0f037cf4 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -2,8 +2,10 @@ package handlers import ( "context" - "sync" "testing" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -14,11 +16,46 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" ) -func TestTBOperationHandler(t *testing.T) { - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestTBOperationHandlerInvalidOperationResponse(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + backupMap[backupID] = backup + opMap[opId] = &tbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector() + // try to handle tb operation with non-existing ydb operation id + handler := NewTBOperationHandler(dbConnector, clientConnector, config.Config{}, queries.NewWriteTableQueryMock) + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage()) +} + +func TestTBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { + ctx := context.Background() opId := types.GenerateObjectID() backupID := types.GenerateObjectID() tbOp := types.TakeBackupOperation{ @@ -34,6 +71,408 @@ func TestTBOperationHandler(t *testing.T) { Status: types.BackupStatePending, } + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 0, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be cancelled because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be cancelled) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_CANCELLED, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerPendingOperationInProgress(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be pending) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) + assert.Equal(t, "", op.GetMessage()) + + // check backup status (should be in pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + + // check backup status (should be done) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateAvailable, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerPendingOperationCancelled(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be error) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 0, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be failed because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + + // check backup status (should be pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + ydbOp := &Ydb_Operations.Operation{ Id: "1", Ready: true, @@ -55,23 +494,162 @@ func TestTBOperationHandler(t *testing.T) { client.WithOperations(ydbOpMap), ) - handler := MakeTBOperationHandler(dbConnector, clientConnector) + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, "Operation was completed despite cancellation", op.GetMessage()) + + // check backup status (should be available) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateAvailable, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) err := handler(ctx, &tbOp) assert.Empty(t, err) - result, err := dbConnector.GetOperation(ctx, opId) + // check operation status (should be cancelled) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelled, op.GetState()) + + // check backup status (should be cancelled) + b, err := dbConnector.GetBackup(ctx, backupID) assert.Empty(t, err) - assert.Equal( - t, result.GetState(), types.OperationStateDone, - "operation state should be Done", + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateCancelled, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + +} + +func TestTBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_UNAVAILABLE, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := NewTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) - backups, err2 := dbConnector.SelectBackupsByStatus(ctx, types.BackupStateAvailable) - assert.Empty(t, err2) - assert.Equal(t, 1, len(backups)) - assert.Equal(t, types.BackupStateAvailable, backups[0].Status) + // check backup status (should be pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) - cancel() - wg.Wait() + // check ydb operation status + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_UNAVAILABLE, ydbOpStatus.GetOperation().GetStatus()) } diff --git a/internal/handlers/utils.go b/internal/handlers/utils.go new file mode 100644 index 00000000..299240c2 --- /dev/null +++ b/internal/handlers/utils.go @@ -0,0 +1,127 @@ +package handlers + +import ( + "context" + "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "github.com/ydb-platform/ydb-go-sdk/v3" + "go.uber.org/zap" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func deadlineExceeded(createdAt time.Time, config config.Config) bool { + return time.Since(createdAt) > time.Duration(config.OperationTtlSeconds)*time.Second +} +func isValidStatus(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED +} + +func isRetriableStatus(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE +} + +type LookupYdbOperationResponse struct { + opResponse *Ydb_Operations.GetOperationResponse + shouldAbortHandler bool + + opState types.OperationState + opMessage string +} + +func (r *LookupYdbOperationResponse) IssueString() string { + return types.IssuesToString(r.opResponse.GetOperation().Issues) +} + +func lookupYdbOperationStatus( + ctx context.Context, client client.ClientConnector, conn *ydb.Driver, operation types.Operation, + ydbOperationId string, + createdAt time.Time, config config.Config, +) (*LookupYdbOperationResponse, error) { + xlog.Info( + ctx, "getting operation status", + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + opResponse, err := client.GetOperationStatus(ctx, conn, ydbOperationId) + if err != nil { + if deadlineExceeded(createdAt, config) { + return &LookupYdbOperationResponse{ + shouldAbortHandler: true, + opState: types.OperationStateError, + opMessage: "Operation deadline exceeded", + }, nil + } + + return nil, fmt.Errorf( + "failed to get operation status for operation #%s, import operation id %s: %w", + operation.GetId().String(), ydbOperationId, err, + ) + } + + if isRetriableStatus(opResponse.GetOperation().GetStatus()) { + xlog.Info( + ctx, "received retriable error", + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + return &LookupYdbOperationResponse{}, nil + } + + if !isValidStatus(opResponse.GetOperation().GetStatus()) { + return &LookupYdbOperationResponse{ + shouldAbortHandler: true, + opState: types.OperationStateError, + opMessage: fmt.Sprintf( + "Error status: %s, issues: %s", + opResponse.GetOperation().GetStatus(), + types.IssuesToString(opResponse.GetOperation().Issues), + ), + }, nil + } + + return &LookupYdbOperationResponse{ + opResponse: opResponse, + }, nil +} + +func CancelYdbOperation( + ctx context.Context, client client.ClientConnector, conn *ydb.Driver, + operation types.Operation, ydbOperationId string, reason string, +) error { + xlog.Info( + ctx, "cancelling operation", zap.String("reason", reason), + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + + response, err := client.CancelOperation(ctx, conn, ydbOperationId) + if err != nil { + return fmt.Errorf( + "error cancelling operation #%s, import operation id %s: %w", + operation.GetId().String(), + ydbOperationId, + err, + ) + } + + if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { + return fmt.Errorf( + "error cancelling operation id %s, import operation id %s, issues: %s", + operation.GetId().String(), + ydbOperationId, + types.IssuesToString(response.GetIssues()), + ) + } + + operation.SetState(types.OperationStateCancelling) + operation.SetMessage("Operation deadline exceeded") + return nil +} diff --git a/internal/types/backup.go b/internal/types/backup.go index 16ed7fb8..13a8d822 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -110,6 +110,7 @@ type TakeBackupOperation struct { YdbOperationId string SourcePaths []string SourcePathToExclude []string + CreatedAt time.Time } func (o *TakeBackupOperation) GetId() ObjectID {