Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ydbcp): add docker-compose for ydbcp #46

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:24.1
platform: linux/amd64
hostname: local-ydb
#volumes:
# - ./ydbd/ydb_certs:/ydb_certs
# - ./ydbd/ydb_data:/ydb_data
environment:
- GRPC_TLS_PORT=2135
- GRPC_PORT=2136
- MON_PORT=8765
- YDB_ANONYMOUS_CREDENTIALS=true
- YDB_USE_IN_MEMORY_PDISKS=true
- YDB_FEATURE_FLAGS=enable_uuid_as_primary_key,enable_implicit_query_parameter_types
ports:
- "8765:8765"
#- "2135:2135"
#- "2136:2136"
networks:
- ydbcp-net

setup_ydb:
image: cr.ai.nebius.cloud/crnca8q7ti1i7vpqs28l/ydb-local:24.1.16-stream-nb-1.1-1
platform: linux/amd64
volumes:
- ./init_db:/init_db
- ./internal/connectors/db/yql/schema:/init_db/schema
depends_on:
ydb:
condition: service_healthy
restart: "no"
command: bash -c "chmod +x ./init_db/create_tables.sh && ./init_db/create_tables.sh"
networks:
- ydbcp-net

s3:
image: quay.io/minio/minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ydbcp
MINIO_ROOT_PASSWORD: password
ENABLE_VIRTUAL_STYLE: 0
ports:
#- "9000:9000"
- "9001:9001"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 10s
timeout: 20s
retries: 3
networks:
- ydbcp-net

setup_s3:
image: amazon/aws-cli
environment:
- AWS_ACCESS_KEY_ID=ydbcp
- AWS_SECRET_ACCESS_KEY=password
- AWS_DEFAULT_REGION=us-east-1
- AWS_ENDPOINT_URL=http://ydbcp-s3-1.ydbcp-net:9000
depends_on:
s3:
condition: service_healthy
command: s3api create-bucket --bucket test-bucket
networks:
- ydbcp-net

ydbcp:
image: ydbcp:local
depends_on:
setup_ydb:
condition: service_completed_successfully
setup_s3:
condition: service_completed_successfully
build:
context: .
dockerfile: ./dockerfile
ports:
- "50051:50051"
networks:
- ydbcp-net

networks:
ydbcp-net:
name: ydbcp-net
external: true
20 changes: 20 additions & 0 deletions dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use the official Golang image as the base image
FROM golang:1.22-alpine as builder

# Set the Current Working Directory inside the container
WORKDIR ./ydbcp

# Copy source code into the container
COPY ./ ./

# Download all dependencies
RUN go mod download

# Install grpcurl
#RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

# Build the Go app
RUN go build -o . ./cmd/ydbcp/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.3.0
github.com/stretchr/testify v1.8.1
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8
github.com/ydb-platform/ydb-go-sdk/v3 v3.75.2
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.27.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 h1:nL8XwD6fSst7xFUirkaWJmE7kM0CdWRYgu6+YQer1d4=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8 h1:ZWxYw6L51aNAMLbTpC/VbXP0rcnvsCAJqx7EI/CjWmc=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240821162910-6cb364b2ccc8/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk/v3 v3.75.2 h1:thrbvktqKA6LFZTnZrGuQi8LQVel1J2dDfoQFsgvcYs=
github.com/ydb-platform/ydb-go-sdk/v3 v3.75.2/go.mod h1:QMmT1fMKZnpid73USXLJawh+32bKySSE2WtEnBUIKd8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
Expand Down
6 changes: 6 additions & 0 deletions init_db/create_tables.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# create ydbcp tables
./ydb -e grpc://ydbcp-ydb-1.ydbcp-net:2136 -d /local scripting yql -f init_db/schema/create_tables.yql

# create and fill user table kv_test
./ydb -e grpc://ydbcp-ydb-1.ydbcp-net:2136 -d /local workload kv init
./ydb -e grpc://ydbcp-ydb-1.ydbcp-net:2136 -d /local workload kv run upsert --rows 100
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type S3Config struct {
PathPrefix string `yaml:"path_prefix"`
AccessKeyIDPath string `yaml:"access_key_id_path"`
SecretAccessKeyPath string `yaml:"secret_access_key_path"`
S3ForcePathStyle bool `yaml:"s3_force_path_style"`
}

type YDBConnectionConfig struct {
Expand Down
51 changes: 29 additions & 22 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ func prepareItemsForExport(
items := make([]*Ydb_Export.ExportToS3Settings_Item, len(sources))

for i, source := range sources {
// Destination prefix format: s3_destination_prefix/database_name/timestamp_backup_id/rel_source_path
// Destination prefix format: s3_destination_prefix/database_name/timestamp/rel_source_path
destinationPrefix := path.Join(
s3Settings.DestinationPrefix,
clientDb.Scheme().Database(),
time.Now().Format(types.BackupTimestampFormat)+"_"+s3Settings.BackupID,
time.Now().Format(types.BackupTimestampFormat),
strings.TrimPrefix(source, clientDb.Scheme().Database()+"/"),
)

Expand Down Expand Up @@ -255,14 +255,15 @@ func (d *ClientYdbConnector) ExportToS3(
CancelAfter: durationpb.New(time.Second),
},
Settings: &Ydb_Export.ExportToS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
DisableVirtualAddressing: s3Settings.S3ForcePathStyle,
},
},
)
Expand Down Expand Up @@ -294,7 +295,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings
},
)

items := make([]*Ydb_Import.ImportFromS3Settings_Item, len(s3Settings.SourcePaths))
items := make([]*Ydb_Import.ImportFromS3Settings_Item, 0)
itemsPtr := &items

for _, sourcePath := range s3Settings.SourcePaths {
if sourcePath[len(sourcePath)-1] != '/' {
Expand All @@ -311,8 +313,8 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings

key, found := strings.CutSuffix(*object.Key, "scheme.pb")
if found {
items = append(
items,
*itemsPtr = append(
*itemsPtr,
&Ydb_Import.ImportFromS3Settings_Item{
SourcePrefix: key,
DestinationPath: path.Join(
Expand All @@ -334,7 +336,7 @@ func prepareItemsForImport(ctx context.Context, clientDb *ydb.Driver, s3Settings
}
}

return items, nil
return *itemsPtr, nil
}

func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings types.ImportSettings) (string, error) {
Expand All @@ -347,6 +349,10 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
return "", fmt.Errorf("error preparing list of items for import: %s", err.Error())
}

if len(items) == 0 {
return "", fmt.Errorf("empty list of items for import")
}

importClient := Ydb_Import_V1.NewImportServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(
ctx, "Importing data from s3",
Expand All @@ -364,14 +370,15 @@ func (d *ClientYdbConnector) ImportFromS3(ctx context.Context, clientDb *ydb.Dri
CancelAfter: durationpb.New(time.Second),
},
Settings: &Ydb_Import.ImportFromS3Settings{
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
Endpoint: s3Settings.Endpoint,
Bucket: s3Settings.Bucket,
Region: s3Settings.Region,
AccessKey: s3Settings.AccessKey,
SecretKey: s3Settings.SecretKey,
Description: s3Settings.Description,
NumberOfRetries: s3Settings.NumberOfRetries,
Items: items,
DisableVirtualAddressing: s3Settings.S3ForcePathStyle,
},
},
)
Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
DatabaseName: databaseName,
},
YdbOperationId: StringOrEmpty(ydbOperationId),
SourcePaths: sourcePathsSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
}, nil
}
Expand Down
40 changes: 37 additions & 3 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
)
}
}
tb, ok := operation.(*types.TakeBackupOperation)
if ok {

if operation.GetType() == types.OperationTypeTB {
tb, ok := operation.(*types.TakeBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to TakeBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(tb.ContainerID),
)
Expand Down Expand Up @@ -108,8 +113,37 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
table_types.StringValueFromString(strings.Join(tb.SourcePathToExclude, ",")),
)
}
} else if operation.GetType() == types.OperationTypeRB {
rb, ok := operation.(*types.RestoreBackupOperation)
if !ok {
xlog.Error(ctx, "error cast operation to RestoreBackupOperation", zap.String("operation_id", operation.GetID()))
}

d.AddValueParam(
"$container_id", table_types.StringValueFromString(rb.ContainerID),
)
d.AddValueParam(
"$database",
table_types.StringValueFromString(rb.YdbConnectionParams.DatabaseName),
)
d.AddValueParam(
"$endpoint",
table_types.StringValueFromString(rb.YdbConnectionParams.Endpoint),
)
d.AddValueParam(
"$backup_id",
table_types.StringValueFromString(rb.BackupId),
)
d.AddValueParam(
"$operation_id",
table_types.StringValueFromString(rb.YdbOperationId),
)
d.AddValueParam("$message", table_types.StringValueFromString(rb.Message))

if len(rb.SourcePaths) > 0 {
d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ",")))
}
} else {
//TODO: support RestoreBackup operation
xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType())))
}

Expand Down
4 changes: 0 additions & 4 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
DROP TABLE Backups;
DROP TABLE OperationTypes;
DROP TABLE Operations;

CREATE TABLE Backups (
id String NOT NULL,
container_id String NOT NULL,
Expand Down
3 changes: 3 additions & 0 deletions internal/connectors/db/yql/schema/drop_tables.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE Backups;
DROP TABLE OperationTypes;
DROP TABLE Operations;
7 changes: 5 additions & 2 deletions internal/server/services/backup/backupservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
SourcePaths: req.GetSourcePaths(),
SourcePathToExclude: req.GetSourcePathsToExclude(),
DestinationPrefix: s.s3.PathPrefix,
BackupID: types.GenerateObjectID(), // TODO: do we need backup id?
S3ForcePathStyle: s.s3.S3ForcePathStyle,
}

clientOperationID, err := s.clientConn.ExportToS3(ctx, client, s3Settings)
Expand Down Expand Up @@ -250,14 +250,15 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ

s3Settings := types.ImportSettings{
Endpoint: s.s3.Endpoint,
Region: s.s3.Region,
Bucket: s.s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp restore", // TODO: write description
NumberOfRetries: 10, // TODO: get value from configuration
BackupID: req.GetBackupId(),
SourcePaths: req.GetSourcePaths(),
S3ForcePathStyle: true,
S3ForcePathStyle: s.s3.S3ForcePathStyle,
DestinationPrefix: req.GetDestinationPrefix(),
}

Expand All @@ -283,6 +284,8 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ
CreatedAt: timestamppb.Now(),
Creator: subject,
},
SourcePaths: req.GetSourcePaths(),
DestinationPrefix: req.GetDestinationPrefix(),
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down
12 changes: 8 additions & 4 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ type RestoreBackupOperation struct {
Message string
YdbConnectionParams YdbConnectionParams
YdbOperationId string
DestinationPaths []string
Audit *pb.AuditInfo
// TODO: this field is not used, because DestinationPaths
// will be constructed from SourcePaths and DestinationPrefix
DestinationPaths []string
SourcePaths []string
DestinationPrefix string
Audit *pb.AuditInfo
}

func (o *RestoreBackupOperation) GetID() string {
Expand Down Expand Up @@ -203,7 +207,7 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation {
DatabaseEndpoint: o.YdbConnectionParams.Endpoint,
YdbServerOperationId: o.YdbOperationId,
BackupId: o.BackupId,
SourcePaths: nil,
SourcePaths: o.SourcePaths,
SourcePathsToExclude: nil,
RestorePaths: o.DestinationPaths,
Audit: o.Audit,
Expand Down Expand Up @@ -339,7 +343,7 @@ type ExportSettings struct {
SourcePaths []string
SourcePathToExclude []string
DestinationPrefix string
BackupID string
S3ForcePathStyle bool
}

type ImportSettings struct {
Expand Down
Loading
Loading