Skip to content

Commit

Permalink
fix(db_connector): add timeout for db connection check
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Nov 29, 2024
1 parent 979b3c1 commit b208c04
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
11 changes: 8 additions & 3 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ type YdbConnector struct {
driver *ydb.Driver
}

func select1(ctx context.Context, db *ydb.Driver) error {
func select1(baseCtx context.Context, db *ydb.Driver, timeout time.Duration) error {
readTx := table.TxControl(
table.BeginTx(
table.WithOnlineReadOnly(),
),
table.CommitTx(),
)

ctx, cancel := context.WithTimeout(baseCtx, timeout)
defer cancel()

return db.Table().Do(
ctx, func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(
Expand All @@ -93,8 +97,9 @@ func select1(ctx context.Context, db *ydb.Driver) error {
}

func NewYdbConnector(ctx context.Context, config config.YDBConnectionConfig) (*YdbConnector, error) {
dialTimeout := time.Second * time.Duration(config.DialTimeoutSeconds)
opts := []ydb.Option{
ydb.WithDialTimeout(time.Second * time.Duration(config.DialTimeoutSeconds)),
ydb.WithDialTimeout(dialTimeout),
}
if config.Insecure {
opts = append(opts, ydb.WithTLSSInsecureSkipVerify())
Expand All @@ -113,7 +118,7 @@ func NewYdbConnector(ctx context.Context, config config.YDBConnectionConfig) (*Y
if err != nil {
return nil, fmt.Errorf("can't connect to YDB, dsn %s: %w", config.ConnectionString, err)
}
err = select1(ctx, driver)
err = select1(ctx, driver, dialTimeout)
if err != nil {
return nil, fmt.Errorf("can't connect to YDB, dsn %s: %w", config.ConnectionString, err)
}
Expand Down
30 changes: 18 additions & 12 deletions internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func TBWROperationHandler(
if err != nil {
tbwr.State = types.OperationStateError
tbwr.Message = err.Error()
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)

errup := db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
if errup != nil {
Expand All @@ -207,8 +208,9 @@ func TBWROperationHandler(
{
tbwr.State = types.OperationStateDone
tbwr.Message = "Success"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
}
case Skip:
Expand All @@ -223,8 +225,9 @@ func TBWROperationHandler(
return ids
}(), ", ")
tbwr.State = types.OperationStateError
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)

tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", len(ops))
fields := []zap.Field{
Expand Down Expand Up @@ -258,8 +261,9 @@ func TBWROperationHandler(
default:
tbwr.State = types.OperationStateError
tbwr.Message = "unexpected operation state"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)

_ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
return errors.New(tbwr.Message)
Expand All @@ -278,8 +282,9 @@ func TBWROperationHandler(
if last == nil || !types.IsActive(last) {
tbwr.State = types.OperationStateCancelled
tbwr.Message = "Success"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
} else {
if last.State == types.OperationStatePending || last.State == types.OperationStateRunning {
Expand All @@ -295,8 +300,9 @@ func TBWROperationHandler(
{
tbwr.State = types.OperationStateError
tbwr.Message = "unexpected operation state"
tbwr.UpdatedAt = timestamppb.New(clock.Now())
tbwr.Audit.CompletedAt = timestamppb.New(clock.Now())
now := clock.Now()
tbwr.UpdatedAt = timestamppb.New(now)
tbwr.Audit.CompletedAt = timestamppb.New(now)
_ = db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
return errors.New(tbwr.Message)
}
Expand Down

0 comments on commit b208c04

Please sign in to comment.