diff --git a/Makefile b/Makefile index a54d6fef..efe698a9 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ proto: $(MAKE) -C plugins/auth_nebius/proto test: - go test -v ./... -short + go test -v ./... -short -race fmt: go fmt $(PACKAGES) diff --git a/internal/config/config.go b/internal/config/config.go index 0cdcbab8..578fc5dc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,6 +33,7 @@ type ClientConnectionConfig struct { Insecure bool `yaml:"insecure"` Discovery bool `yaml:"discovery" default:"true"` DialTimeoutSeconds uint32 `yaml:"dial_timeout_seconds" default:"5"` + OAuth2KeyFile string `yaml:"oauth2_key_file"` } type AuthConfig struct { diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 272cfae0..cabab61e 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -63,7 +63,6 @@ func (d *ClientYdbConnector) Open(ctx context.Context, dsn string) (*ydb.Driver, xlog.Info(ctx, "Connecting to client db", zap.String("dsn", dsn)) opts := []ydb.Option{ - ydb.WithAnonymousCredentials(), ydb.WithDialTimeout(time.Second * time.Duration(d.config.DialTimeoutSeconds)), } if d.config.Insecure { @@ -72,6 +71,11 @@ func (d *ClientYdbConnector) Open(ctx context.Context, dsn string) (*ydb.Driver, if !d.config.Discovery { opts = append(opts, ydb.WithBalancer(balancers.SingleConn())) } + if len(d.config.OAuth2KeyFile) > 0 { + opts = append(opts, ydb.WithOauth2TokenExchangeCredentialsFile(d.config.OAuth2KeyFile)) + } else { + opts = append(opts, ydb.WithAnonymousCredentials()) + } db, err := ydb.Open(ctx, dsn, opts...) if err != nil { diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 4cb294ab..9b34c0a4 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -11,6 +12,7 @@ import ( ) type MockDBConnector struct { + guard sync.Mutex operations map[string]types.Operation backups map[string]types.Backup } @@ -43,6 +45,9 @@ func WithBackups(backups map[string]types.Backup) Option { func (c *MockDBConnector) SelectBackups( _ context.Context, _ queries.ReadTableQuery, ) ([]*types.Backup, error) { + c.guard.Lock() + defer c.guard.Unlock() + backups := make([]*types.Backup, 0, len(c.backups)) for _, backup := range c.backups { backups = append(backups, &backup) @@ -53,6 +58,9 @@ func (c *MockDBConnector) SelectBackups( func (c *MockDBConnector) SelectBackupsByStatus( _ context.Context, _ string, ) ([]*types.Backup, error) { + c.guard.Lock() + defer c.guard.Unlock() + backups := make([]*types.Backup, 0, len(c.backups)) for _, backup := range c.backups { backups = append(backups, &backup) @@ -63,6 +71,9 @@ func (c *MockDBConnector) SelectBackupsByStatus( func (c *MockDBConnector) UpdateBackup( _ context.Context, id string, backupStatus string, ) error { + c.guard.Lock() + defer c.guard.Unlock() + if _, ok := c.backups[id]; !ok { return fmt.Errorf("no backup found for id %v", id) } @@ -78,6 +89,9 @@ func (c *MockDBConnector) GetTableClient() table.Client { } func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) (string, error) { + c.guard.Lock() + defer c.guard.Unlock() + var id string for { id = types.GenerateObjectID() @@ -93,6 +107,9 @@ func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) ( func (c *MockDBConnector) ActiveOperations(_ context.Context) ( []types.Operation, error, ) { + c.guard.Lock() + defer c.guard.Unlock() + operations := make([]types.Operation, 0, len(c.operations)) for _, op := range c.operations { if types.IsActive(op) { @@ -105,6 +122,9 @@ func (c *MockDBConnector) ActiveOperations(_ context.Context) ( func (c *MockDBConnector) UpdateOperation( _ context.Context, op types.Operation, ) error { + c.guard.Lock() + defer c.guard.Unlock() + if _, exist := c.operations[op.GetID()]; !exist { return fmt.Errorf( "update nonexistent operation %s", types.OperationToString(op), @@ -117,6 +137,9 @@ func (c *MockDBConnector) UpdateOperation( func (c *MockDBConnector) CreateOperation( _ context.Context, op types.Operation, ) (string, error) { + c.guard.Lock() + defer c.guard.Unlock() + var id string for { id = types.GenerateObjectID() @@ -132,6 +155,9 @@ func (c *MockDBConnector) CreateOperation( func (c *MockDBConnector) GetOperation( _ context.Context, operationID string, ) (types.Operation, error) { + c.guard.Lock() + defer c.guard.Unlock() + if op, exist := c.operations[operationID]; exist { return op, nil } @@ -143,6 +169,9 @@ func (c *MockDBConnector) GetOperation( func (c *MockDBConnector) GetBackup( _ context.Context, backupID string, ) (types.Backup, error) { + c.guard.Lock() + defer c.guard.Unlock() + if backup, exist := c.backups[backupID]; exist { return backup, nil } @@ -158,6 +187,9 @@ func (c *MockDBConnector) SelectOperations( } func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error { + c.guard.Lock() + defer c.guard.Unlock() + queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) c.operations[queryBuilderMock.Operation.GetID()] = queryBuilderMock.Operation c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index 9064042c..702c9848 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -33,10 +33,9 @@ func TestProcessor(t *testing.T) { db := db.NewMockDBConnector() handlers := NewOperationHandlerRegistry() - operationTypeTB := types.OperationType("TB") handlerCalled := make(chan struct{}) handlers.Add( - operationTypeTB, + types.OperationTypeTB, func(ctx context.Context, op types.Operation) error { xlog.Debug( ctx, "TB handler called for operation", @@ -57,6 +56,7 @@ func TestProcessor(t *testing.T) { handlers, WithTickerProvider(tickerProvider), WithPeriod(time.Second*10), + WithHandleOperationTimeout(time.Second*60), ) select { diff --git a/test.sh b/test.sh index d128fcbe..ef371161 100755 --- a/test.sh +++ b/test.sh @@ -33,7 +33,7 @@ if [[ "ListOperations" == "$1" ]]; then doneflag=1 fi if [[ "MakeBackup" == "$1" ]]; then - $GRPCURL "${ARGS[@]}" -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc/orders"], "containerId": "'"$CONTAINER_ID"'"}' localhost:50051 ydbcp.v1alpha1.BackupService.MakeBackup + $GRPCURL "${ARGS[@]}" -d '{"database_name": "/testing-global/ydbc", "database_endpoint": "grpcs://localhost:2135", "source_paths": ["/testing-global/ydbc"], "containerId": "'"$CONTAINER_ID"'"}' localhost:50051 ydbcp.v1alpha1.BackupService.MakeBackup doneflag=1 fi if [[ 0 == $doneflag ]]; then