Skip to content

Commit

Permalink
small fixes for UI
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Aug 9, 2024
1 parent 15bdc6d commit e7e4dc9
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 128 deletions.
92 changes: 55 additions & 37 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -128,7 +127,10 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
xlog.Error(ctx, "can't start export operation", zap.Error(err), zap.String("dns", dsn))
return nil, fmt.Errorf("can't start export operation, dsn %s: %w", dsn, err)
}
xlog.Debug(ctx, "export operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn))
xlog.Debug(
ctx, "export operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn),
)
//TODO: forbid empty container id

backup := types.Backup{
ContainerID: req.GetContainerId(),
Expand Down Expand Up @@ -175,25 +177,34 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb

func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) {
xlog.Debug(ctx, "ListBackups", zap.String("request", request.String()))
queryFilters := make([]queries.QueryFilter, 0)
//TODO: forbid empty containerId
if request.GetContainerId() != "" {
queryFilters = append(
queryFilters, queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
)
}
if request.GetDatabaseNameMask() != "" {
queryFilters = append(
queryFilters, queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
)
}
backups, err := s.driver.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
),
queries.WithQueryFilters(queryFilters...),
),
)
if err != nil {
Expand All @@ -204,46 +215,53 @@ func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest
pbBackups = append(pbBackups, backup.Proto())
}
return &pb.ListBackupsResponse{
Backups: pbBackups,
NextPageToken: strconv.Itoa(len(backups)),
Backups: pbBackups,
}, nil
}

func (s *server) ListOperations(ctx context.Context, request *pb.ListOperationsRequest) (
*pb.ListOperationsResponse, error,
) {
xlog.Debug(ctx, "ListOperations", zap.String("request", request.String()))
queryFilters := make([]queries.QueryFilter, 0)
//TODO: forbid empty containerId
if request.GetContainerId() != "" {
queryFilters = append(
queryFilters, queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
)
}
if request.GetDatabaseNameMask() != "" {
queryFilters = append(
queryFilters, queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
)
}
operations, err := s.driver.SelectOperations(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Operations"),
queries.WithSelectFields(queries.AllOperationFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
),
queries.WithQueryFilters(queryFilters...),
),
)
if err != nil {
return nil, fmt.Errorf("error getting backups: %w", err)
return nil, fmt.Errorf("error getting operations: %w", err)
}
pbOperations := make([]*pb.Operation, 0, len(operations))
for _, operation := range operations {
pbOperations = append(pbOperations, operation.Proto())
}
return &pb.ListOperationsResponse{
Operations: pbOperations,
NextPageToken: strconv.Itoa(len(operations)),
Operations: pbOperations,
}, nil
}

Expand Down
19 changes: 5 additions & 14 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,10 @@ func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) st
return paramName
}

func (d *ReadTableQueryImpl) DeclareParameters() string {
declares := make([]string, len(d.tableQueryParams))
for i, param := range d.tableQueryParams {
declares[i] = fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String())
}
return strings.Join(declares, ";\n")
}

func (d *ReadTableQueryImpl) MakeFilterString() string {
if len(d.filters) == 0 {
return ""
}
filterStrings := make([]string, 0, len(d.filters))
for i := 0; i < len(d.filterFields); i++ {
fieldFilterStrings := make([]string, 0, len(d.filters[i]))
Expand All @@ -122,7 +117,7 @@ func (d *ReadTableQueryImpl) MakeFilterString() string {
}
filterStrings = append(filterStrings, fmt.Sprintf("(%s)", strings.Join(fieldFilterStrings, " OR ")))
}
return strings.Join(filterStrings, " AND ")
return fmt.Sprintf(" WHERE %s", strings.Join(filterStrings, " AND "))
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
Expand All @@ -132,13 +127,9 @@ func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResul
if len(d.tableName) == 0 {
return nil, errors.New("No table")
}
if len(d.filters) == 0 {
return nil, errors.New("No filters")
}
filter := d.MakeFilterString()
res := fmt.Sprintf(
"%s;\nSELECT %s FROM %s WHERE %s",
d.DeclareParameters(),
"SELECT %s FROM %s%s",
strings.Join(d.selectFields, ", "),
d.tableName,
filter,
Expand Down
6 changes: 1 addition & 5 deletions internal/connectors/db/yql/queries/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import (

func TestQueryBuilder_Read(t *testing.T) {
const (
queryString = `DECLARE $param0 AS String;
DECLARE $param1 AS String;
DECLARE $param2 AS String;
DECLARE $param3 AS String;
SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)`
queryString = `SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)`
)
var (
queryParams = table.NewQueryParameters(
Expand Down
22 changes: 2 additions & 20 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,6 @@ func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) Wri
return d
}

func (d *WriteSingleTableQueryImpl) DeclareParameters() string {
declares := make([]string, 0)
if d.updateParam != nil {
declares = append(
declares,
fmt.Sprintf("DECLARE %s AS %s", (*d.updateParam).Name(), (*d.updateParam).Value().Type().String()),
)
}
for _, param := range d.tableQueryParams {
declares = append(
declares, fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String()),
)
}
return strings.Join(declares, ";\n")
}

func ProcessUpsertQuery(
queryStrings *[]string, allParams *[]table.ParameterOption, t *WriteSingleTableQueryImpl,
) error {
Expand All @@ -199,10 +183,9 @@ func ProcessUpsertQuery(
if len(t.tableName) == 0 {
return errors.New("No table")
}
declares := t.DeclareParameters()
*queryStrings = append(
*queryStrings, fmt.Sprintf(
"%s;\nUPSERT INTO %s (%s) VALUES (%s)", declares, t.tableName, strings.Join(t.upsertFields, ", "),
"UPSERT INTO %s (%s) VALUES (%s)", t.tableName, strings.Join(t.upsertFields, ", "),
strings.Join(t.GetParamNames(), ", "),
),
)
Expand All @@ -221,7 +204,6 @@ func ProcessUpdateQuery(
if len(t.tableName) == 0 {
return errors.New("No table")
}
declares := t.DeclareParameters()
paramNames := t.GetParamNames()
keyParam := fmt.Sprintf("id = %s", (*t.updateParam).Name())
updates := make([]string, 0)
Expand All @@ -230,7 +212,7 @@ func ProcessUpdateQuery(
}
*queryStrings = append(
*queryStrings, fmt.Sprintf(
"%s;\nUPDATE %s SET %s WHERE %s", declares, t.tableName, strings.Join(updates, ", "), keyParam,
"UPDATE %s SET %s WHERE %s", t.tableName, strings.Join(updates, ", "), keyParam,
),
)
*allParams = append(*allParams, *t.updateParam)
Expand Down
46 changes: 3 additions & 43 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ import (

func TestQueryBuilder_UpdateUpdate(t *testing.T) {
const (
queryString = `DECLARE $id_0 AS Uuid;
DECLARE $status_0 AS String;
UPDATE Backups SET status = $status_0 WHERE id = $id_0;
DECLARE $id_1 AS Uuid;
DECLARE $status_1 AS String;
DECLARE $message_1 AS String;
queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0;
UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`
)
opId := types.GenerateObjectID()
Expand Down Expand Up @@ -55,29 +50,7 @@ UPDATE Operations SET status = $status_1, message = $message_1 WHERE id = $id_1`

func TestQueryBuilder_CreateCreate(t *testing.T) {
const (
queryString = `DECLARE $id_0 AS Uuid;
DECLARE $container_id_0 AS String;
DECLARE $database_0 AS String;
DECLARE $endpoint_0 AS String;
DECLARE $initiated_0 AS String;
DECLARE $s3_endpoint_0 AS String;
DECLARE $s3_region_0 AS String;
DECLARE $s3_bucket_0 AS String;
DECLARE $s3_path_prefix_0 AS String;
DECLARE $status_0 AS String;
DECLARE $message_0 AS String;
UPSERT INTO Backups (id, container_id, database, endpoint, initiated, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $initiated_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0);
DECLARE $id_1 AS Uuid;
DECLARE $type_1 AS String;
DECLARE $status_1 AS String;
DECLARE $container_id_1 AS String;
DECLARE $database_1 AS String;
DECLARE $endpoint_1 AS String;
DECLARE $backup_id_1 AS Uuid;
DECLARE $initiated_1 AS String;
DECLARE $created_at_1 AS Timestamp;
DECLARE $operation_id_1 AS String;
DECLARE $message_1 AS String;
queryString = `UPSERT INTO Backups (id, container_id, database, endpoint, initiated, s3_endpoint, s3_region, s3_bucket, s3_path_prefix, status, message) VALUES ($id_0, $container_id_0, $database_0, $endpoint_0, $initiated_0, $s3_endpoint_0, $s3_region_0, $s3_bucket_0, $s3_path_prefix_0, $status_0, $message_0);
UPSERT INTO Operations (id, type, status, container_id, database, endpoint, backup_id, initiated, created_at, operation_id, message) VALUES ($id_1, $type_1, $status_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $initiated_1, $created_at_1, $operation_id_1, $message_1)`
)
opId := types.GenerateObjectID()
Expand Down Expand Up @@ -171,20 +144,7 @@ UPSERT INTO Operations (id, type, status, container_id, database, endpoint, back

func TestQueryBuilder_UpdateCreate(t *testing.T) {
const (
queryString = `DECLARE $id_0 AS Uuid;
DECLARE $status_0 AS String;
UPDATE Backups SET status = $status_0 WHERE id = $id_0;
DECLARE $id_1 AS Uuid;
DECLARE $type_1 AS String;
DECLARE $status_1 AS String;
DECLARE $container_id_1 AS String;
DECLARE $database_1 AS String;
DECLARE $endpoint_1 AS String;
DECLARE $backup_id_1 AS Uuid;
DECLARE $initiated_1 AS String;
DECLARE $created_at_1 AS Timestamp;
DECLARE $operation_id_1 AS String;
DECLARE $message_1 AS String;
queryString = `UPDATE Backups SET status = $status_0 WHERE id = $id_0;
UPSERT INTO Operations (id, type, status, container_id, database, endpoint, backup_id, initiated, created_at, operation_id, message) VALUES ($id_1, $type_1, $status_1, $container_id_1, $database_1, $endpoint_1, $backup_id_1, $initiated_1, $created_at_1, $operation_id_1, $message_1)`
)
opId := types.GenerateObjectID()
Expand Down
8 changes: 4 additions & 4 deletions internal/connectors/db/yql/schema/fill_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ UPSERT INTO `OperationTypes` (code, description, is_cancellable) VALUES
('RM', 'Remove backup', False);

--for testing purposes
UPSERT INTO `Backups` (id, container_id, database, status) VALUES
(Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING');
UPSERT INTO `Backups` (id, container_id, database, status, endpoint) VALUES
(Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING', '');

UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id, created_at) VALUES
(Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112'), CurrentUtcTimestamp());
UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id, created_at, endpoint) VALUES
(Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112'), CurrentUtcTimestamp(), '');
1 change: 1 addition & 0 deletions start-local-ydb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker run -d --rm --name ydb-local -h localhost --platform linux/amd64 -p 2135:2135 -p 2136:2136 -p 8765:8765 -v $(pwd)/ydb_certs:/ydb_certs -v $(pwd)/ydb_data:/ydb_data -e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 -e YDB_USE_IN_MEMORY_PDISKS=true -e YDB_FEATURE_FLAGS=enable_uuid_as_primary_key,enable_implicit_query_parameter_types cr.ai.nebius.cloud/crnca8q7ti1i7vpqs28l/ydb-local:24.1.16-stream-nb-1.1-1
10 changes: 5 additions & 5 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ if [[ -z "$1" ]]; then
fi
doneflag=0
if [[ "GetBackup" == "$1" ]]; then
grpcurl -plaintext -d '{"id": "12345678-1234-5678-1234-567812345678"}' localhost:50051 ydbcp.BackupService.GetBackup
grpcurl -plaintext -d '{"id": "12345678-1234-5678-1234-567812345678"}' localhost:50051 ydbcp.v1alpha1.BackupService.GetBackup
doneflag=1
fi
if [[ "ListBackups" == "$1" ]]; then
grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.BackupService.ListBackups
grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.v1alpha1.BackupService.ListBackups
doneflag=1
fi
if [[ "ListOperations" == "$1" ]]; then
grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.OperationService.ListOperations
grpcurl -plaintext -d '{"databaseNameMask": "%", "containerId": ""}' localhost:50051 ydbcp.v1alpha1.OperationService.ListOperations
doneflag=1
fi
if [[ "TakeBackup" == "$1" ]]; then
grpcurl -plaintext -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc/orders"]}' localhost:50051 ydbcp.BackupService.MakeBackup
if [[ "MakeBackup" == "$1" ]]; then
grpcurl -plaintext -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc/orders"]}' localhost:50051 ydbcp.v1alpha1.BackupService.MakeBackup
doneflag=1
fi
if [[ 0 == $doneflag ]]; then
Expand Down

0 comments on commit e7e4dc9

Please sign in to comment.