From e7e4dc9fa92a31694de1b37af243c65572bc8480 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Tue, 6 Aug 2024 14:36:17 +0300 Subject: [PATCH] small fixes for UI --- cmd/ydbcp/main.go | 92 +++++++++++-------- internal/connectors/db/yql/queries/read.go | 19 +--- .../connectors/db/yql/queries/read_test.go | 6 +- internal/connectors/db/yql/queries/write.go | 22 +---- .../connectors/db/yql/queries/write_test.go | 46 +--------- .../connectors/db/yql/schema/fill_tables.yql | 8 +- start-local-ydb.sh | 1 + test.sh | 10 +- 8 files changed, 76 insertions(+), 128 deletions(-) create mode 100644 start-local-ydb.sh diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 7683fba6..06e9732d 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -9,7 +9,6 @@ import ( "os" "os/signal" "path" - "strconv" "strings" "sync" "syscall" @@ -128,7 +127,10 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb xlog.Error(ctx, "can't start export operation", zap.Error(err), zap.String("dns", dsn)) return nil, fmt.Errorf("can't start export operation, dsn %s: %w", dsn, err) } - xlog.Debug(ctx, "export operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn)) + xlog.Debug( + ctx, "export operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn), + ) + //TODO: forbid empty container id backup := types.Backup{ ContainerID: req.GetContainerId(), @@ -175,25 +177,34 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) { xlog.Debug(ctx, "ListBackups", zap.String("request", request.String())) + queryFilters := make([]queries.QueryFilter, 0) + //TODO: forbid empty containerId + if request.GetContainerId() != "" { + queryFilters = append( + queryFilters, queries.QueryFilter{ + Field: "container_id", + Values: []table_types.Value{ + table_types.StringValueFromString(request.ContainerId), + }, + }, + ) + } + if request.GetDatabaseNameMask() != "" { + queryFilters = append( + queryFilters, queries.QueryFilter{ + Field: "database", + Values: []table_types.Value{ + table_types.StringValueFromString(request.DatabaseNameMask), + }, + IsLike: true, + }, + ) + } backups, err := s.driver.SelectBackups( ctx, queries.NewReadTableQuery( queries.WithTableName("Backups"), queries.WithSelectFields(queries.AllBackupFields...), - queries.WithQueryFilters( - queries.QueryFilter{ - Field: "container_id", - Values: []table_types.Value{ - table_types.StringValueFromString(request.ContainerId), - }, - }, - queries.QueryFilter{ - Field: "database", - Values: []table_types.Value{ - table_types.StringValueFromString(request.DatabaseNameMask), - }, - IsLike: true, - }, - ), + queries.WithQueryFilters(queryFilters...), ), ) if err != nil { @@ -204,8 +215,7 @@ func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest pbBackups = append(pbBackups, backup.Proto()) } return &pb.ListBackupsResponse{ - Backups: pbBackups, - NextPageToken: strconv.Itoa(len(backups)), + Backups: pbBackups, }, nil } @@ -213,37 +223,45 @@ func (s *server) ListOperations(ctx context.Context, request *pb.ListOperationsR *pb.ListOperationsResponse, error, ) { xlog.Debug(ctx, "ListOperations", zap.String("request", request.String())) + queryFilters := make([]queries.QueryFilter, 0) + //TODO: forbid empty containerId + if request.GetContainerId() != "" { + queryFilters = append( + queryFilters, queries.QueryFilter{ + Field: "container_id", + Values: []table_types.Value{ + table_types.StringValueFromString(request.ContainerId), + }, + }, + ) + } + if request.GetDatabaseNameMask() != "" { + queryFilters = append( + queryFilters, queries.QueryFilter{ + Field: "database", + Values: []table_types.Value{ + table_types.StringValueFromString(request.DatabaseNameMask), + }, + IsLike: true, + }, + ) + } operations, err := s.driver.SelectOperations( ctx, queries.NewReadTableQuery( queries.WithTableName("Operations"), queries.WithSelectFields(queries.AllOperationFields...), - queries.WithQueryFilters( - queries.QueryFilter{ - Field: "container_id", - Values: []table_types.Value{ - table_types.StringValueFromString(request.ContainerId), - }, - }, - queries.QueryFilter{ - Field: "database", - Values: []table_types.Value{ - table_types.StringValueFromString(request.DatabaseNameMask), - }, - IsLike: true, - }, - ), + queries.WithQueryFilters(queryFilters...), ), ) if err != nil { - return nil, fmt.Errorf("error getting backups: %w", err) + return nil, fmt.Errorf("error getting operations: %w", err) } pbOperations := make([]*pb.Operation, 0, len(operations)) for _, operation := range operations { pbOperations = append(pbOperations, operation.Proto()) } return &pb.ListOperationsResponse{ - Operations: pbOperations, - NextPageToken: strconv.Itoa(len(operations)), + Operations: pbOperations, }, nil } diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index f5da0b66..2576847e 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -100,15 +100,10 @@ func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) st return paramName } -func (d *ReadTableQueryImpl) DeclareParameters() string { - declares := make([]string, len(d.tableQueryParams)) - for i, param := range d.tableQueryParams { - declares[i] = fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String()) - } - return strings.Join(declares, ";\n") -} - func (d *ReadTableQueryImpl) MakeFilterString() string { + if len(d.filters) == 0 { + return "" + } filterStrings := make([]string, 0, len(d.filters)) for i := 0; i < len(d.filterFields); i++ { fieldFilterStrings := make([]string, 0, len(d.filters[i])) @@ -122,7 +117,7 @@ func (d *ReadTableQueryImpl) MakeFilterString() string { } filterStrings = append(filterStrings, fmt.Sprintf("(%s)", strings.Join(fieldFilterStrings, " OR "))) } - return strings.Join(filterStrings, " AND ") + return fmt.Sprintf(" WHERE %s", strings.Join(filterStrings, " AND ")) } func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { @@ -132,13 +127,9 @@ func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResul if len(d.tableName) == 0 { return nil, errors.New("No table") } - if len(d.filters) == 0 { - return nil, errors.New("No filters") - } filter := d.MakeFilterString() res := fmt.Sprintf( - "%s;\nSELECT %s FROM %s WHERE %s", - d.DeclareParameters(), + "SELECT %s FROM %s%s", strings.Join(d.selectFields, ", "), d.tableName, filter, diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go index 60333a2a..dc4b57b9 100644 --- a/internal/connectors/db/yql/queries/read_test.go +++ b/internal/connectors/db/yql/queries/read_test.go @@ -11,11 +11,7 @@ import ( func TestQueryBuilder_Read(t *testing.T) { const ( - queryString = `DECLARE $param0 AS String; -DECLARE $param1 AS String; -DECLARE $param2 AS String; -DECLARE $param3 AS String; -SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` + queryString = `SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` ) var ( queryParams = table.NewQueryParameters( diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 957e52f1..9de9468f 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -174,22 +174,6 @@ func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) Wri return d } -func (d *WriteSingleTableQueryImpl) DeclareParameters() string { - declares := make([]string, 0) - if d.updateParam != nil { - declares = append( - declares, - fmt.Sprintf("DECLARE %s AS %s", (*d.updateParam).Name(), (*d.updateParam).Value().Type().String()), - ) - } - for _, param := range d.tableQueryParams { - declares = append( - declares, fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String()), - ) - } - return strings.Join(declares, ";\n") -} - func ProcessUpsertQuery( queryStrings *[]string, allParams *[]table.ParameterOption, t *WriteSingleTableQueryImpl, ) error { @@ -199,10 +183,9 @@ func ProcessUpsertQuery( if len(t.tableName) == 0 { return errors.New("No table") } - declares := t.DeclareParameters() *queryStrings = append( *queryStrings, fmt.Sprintf( - "%s;\nUPSERT INTO %s (%s) VALUES (%s)", declares, t.tableName, strings.Join(t.upsertFields, ", "), + "UPSERT INTO %s (%s) VALUES (%s)", t.tableName, strings.Join(t.upsertFields, ", "), strings.Join(t.GetParamNames(), ", "), ), ) @@ -221,7 +204,6 @@ func ProcessUpdateQuery( if len(t.tableName) == 0 { return errors.New("No table") } - declares := t.DeclareParameters() paramNames := t.GetParamNames() keyParam := fmt.Sprintf("id = %s", (*t.updateParam).Name()) updates := make([]string, 0) @@ -230,7 +212,7 @@ func ProcessUpdateQuery( } *queryStrings = append( *queryStrings, fmt.Sprintf( - "%s;\nUPDATE %s SET %s WHERE %s", declares, t.tableName, strings.Join(updates, ", "), keyParam, + "UPDATE %s SET %s WHERE %s", t.tableName, strings.Join(updates, ", "), keyParam, ), ) *allParams = append(*allParams, *t.updateParam) diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index 24fa3fdc..43b41b4f 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -13,12 +13,7 @@ import ( func TestQueryBuilder_UpdateUpdate(t *testing.T) { const ( - queryString = `DECLARE $id_0 AS Uuid; -DECLARE $status_0 AS String; -UPDATE Backups SET status = $status_0 WHERE id = $id_0; -DECLARE $id_1 AS Uuid; -DECLARE $status_1 AS String; -DECLARE $message_1 AS String; + queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0; UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1` ) opId := types.GenerateObjectID() @@ -55,29 +50,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1` func TestQueryBuilder_CreateCreate(t *testing.T) { const ( - queryString = `DECLARE $id_0 AS Uuid; -DECLARE $container_id_0 AS String; -DECLARE $database_0 AS String; -DECLARE $endpoint_0 AS String; -DECLARE $initiated_0 AS String; -DECLARE $s3_endpoint_0 AS String; -DECLARE $s3_region_0 AS String; -DECLARE $s3_bucket_0 AS String; -DECLARE $s3_path_prefix_0 AS String; -DECLARE $status_0 AS String; -DECLARE $message_0 AS String; -UPSERT INTO Backups (id, container_id, database, endpoint, initiated, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $initiated_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0); -DECLARE $id_1 AS Uuid; -DECLARE $type_1 AS String; -DECLARE $status_1 AS String; -DECLARE $container_id_1 AS String; -DECLARE $database_1 AS String; -DECLARE $endpoint_1 AS String; -DECLARE $backup_id_1 AS Uuid; -DECLARE $initiated_1 AS String; -DECLARE $created_at_1 AS Timestamp; -DECLARE $operation_id_1 AS String; -DECLARE $message_1 AS String; + queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, initiated, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $initiated_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0); UPSERT INTO Operations (id, type, status, container_id, database, endpoint, backup_id, initiated, created_at, operation_id, message) VALUES ($id_1, $type_1, $status_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $initiated_1, $created_at_1, $operation_id_1, $message_1)` ) opId := types.GenerateObjectID() @@ -171,20 +144,7 @@ UPSERT INTO Operations (id, type, status, container_id, database, endpoint, back func TestQueryBuilder_UpdateCreate(t *testing.T) { const ( - queryString = `DECLARE $id_0 AS Uuid; -DECLARE $status_0 AS String; -UPDATE Backups SET status = $status_0 WHERE id = $id_0; -DECLARE $id_1 AS Uuid; -DECLARE $type_1 AS String; -DECLARE $status_1 AS String; -DECLARE $container_id_1 AS String; -DECLARE $database_1 AS String; -DECLARE $endpoint_1 AS String; -DECLARE $backup_id_1 AS Uuid; -DECLARE $initiated_1 AS String; -DECLARE $created_at_1 AS Timestamp; -DECLARE $operation_id_1 AS String; -DECLARE $message_1 AS String; + queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0; UPSERT INTO Operations (id, type, status, container_id, database, endpoint, backup_id, initiated, created_at, operation_id, message) VALUES ($id_1, $type_1, $status_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $initiated_1, $created_at_1, $operation_id_1, $message_1)` ) opId := types.GenerateObjectID() diff --git a/internal/connectors/db/yql/schema/fill_tables.yql b/internal/connectors/db/yql/schema/fill_tables.yql index 85493a5f..b1fb6348 100644 --- a/internal/connectors/db/yql/schema/fill_tables.yql +++ b/internal/connectors/db/yql/schema/fill_tables.yql @@ -4,8 +4,8 @@ UPSERT INTO `OperationTypes` (code, description, is_cancellable) VALUES ('RM', 'Remove backup', False); --for testing purposes -UPSERT INTO `Backups` (id, container_id, database, status) VALUES - (Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING'); +UPSERT INTO `Backups` (id, container_id, database, status, endpoint) VALUES + (Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING', ''); -UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id, created_at) VALUES - (Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112'), CurrentUtcTimestamp()); +UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id, created_at, endpoint) VALUES + (Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112'), CurrentUtcTimestamp(), ''); diff --git a/start-local-ydb.sh b/start-local-ydb.sh new file mode 100644 index 00000000..e80c557c --- /dev/null +++ b/start-local-ydb.sh @@ -0,0 +1 @@ +docker run -d --rm --name ydb-local -h localhost --platform linux/amd64 -p 2135:2135 -p 2136:2136 -p 8765:8765 -v $(pwd)/ydb_certs:/ydb_certs -v $(pwd)/ydb_data:/ydb_data -e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 -e YDB_USE_IN_MEMORY_PDISKS=true -e YDB_FEATURE_FLAGS=enable_uuid_as_primary_key,enable_implicit_query_parameter_types cr.ai.nebius.cloud/crnca8q7ti1i7vpqs28l/ydb-local:24.1.16-stream-nb-1.1-1 diff --git a/test.sh b/test.sh index 1f630903..225f0358 100755 --- a/test.sh +++ b/test.sh @@ -3,19 +3,19 @@ if [[ -z "$1" ]]; then fi doneflag=0 if [[ "GetBackup" == "$1" ]]; then - grpcurl -plaintext -d '{"id": "12345678-1234-5678-1234-567812345678"}' localhost:50051 ydbcp.BackupService.GetBackup + grpcurl -plaintext -d '{"id": "12345678-1234-5678-1234-567812345678"}' localhost:50051 ydbcp.v1alpha1.BackupService.GetBackup doneflag=1 fi if [[ "ListBackups" == "$1" ]]; then - grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.BackupService.ListBackups + grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.v1alpha1.BackupService.ListBackups doneflag=1 fi if [[ "ListOperations" == "$1" ]]; then - grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.OperationService.ListOperations + grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.v1alpha1.OperationService.ListOperations doneflag=1 fi -if [[ "TakeBackup" == "$1" ]]; then - grpcurl -plaintext -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc/orders"]}' localhost:50051 ydbcp.BackupService.MakeBackup +if [[ "MakeBackup" == "$1" ]]; then + grpcurl -plaintext -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc/orders"]}' localhost:50051 ydbcp.v1alpha1.BackupService.MakeBackup doneflag=1 fi if [[ 0 == $doneflag ]]; then