From 10bd41ade44af7a4f575448fd989d247327160f2 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 12 Sep 2024 17:15:59 +0300 Subject: [PATCH] * Added option `ydb.WithSessionPoolSessionIdleTimeToLive` for restrict idle time of query sessions * Fixed bug with leak of query transactions --- CHANGELOG.md | 2 + internal/pool/pool.go | 32 ++-- internal/pool/pool_test.go | 14 +- internal/query/client.go | 27 ++- internal/query/client_test.go | 288 +++++++++++++++++++++++++++++- internal/query/config/config.go | 11 +- internal/query/config/options.go | 10 ++ internal/query/session/session.go | 39 ++-- internal/query/session/status.go | 3 + internal/query/transaction.go | 32 ++-- internal/table/client.go | 2 +- options.go | 10 ++ 12 files changed, 402 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df8ef0e32..fd3ddb311 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added option `ydb.WithSessionPoolSessionIdleTimeToLive` for restrict idle time of query sessions +* Fixed bug with leak of query transactions * Changed `ydb_go_sdk_ydb_driver_conn_requests` metrics splitted to `ydb_go_sdk_ydb_driver_conn_request_statuses` and `ydb_go_sdk_ydb_driver_conn_request_methods` * Fixed metadata for operation service connection * Fixed composing query traces in call `db.Query.Do[Tx]` using option `query.WithTrace` diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 17be47081..5fb9a5215 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -26,18 +26,18 @@ type ( Item } Config[PT ItemConstraint[T], T any] struct { - trace *Trace - clock clockwork.Clock - limit int - createTimeout time.Duration - createItem func(ctx context.Context) (PT, error) - closeTimeout time.Duration - closeItem func(ctx context.Context, item PT) - idleThreshold time.Duration + trace *Trace + clock clockwork.Clock + limit int + createTimeout time.Duration + createItem func(ctx context.Context) (PT, error) + closeTimeout time.Duration + closeItem func(ctx context.Context, item PT) + idleTimeToLive time.Duration } itemInfo[PT ItemConstraint[T], T any] struct { - idle *xlist.Element[PT] - touched time.Time + idle *xlist.Element[PT] + lastUsage time.Time } waitChPool[PT ItemConstraint[T], T any] interface { GetOrNew() *chan PT @@ -99,9 +99,9 @@ func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] { } } -func WithIdleThreshold[PT ItemConstraint[T], T any](idleThreshold time.Duration) Option[PT, T] { +func WithIdleTimeToLive[PT ItemConstraint[T], T any](idleTTL time.Duration) Option[PT, T] { return func(c *Config[PT, T]) { - c.idleThreshold = idleThreshold + c.idleTimeToLive = idleTTL } } @@ -218,7 +218,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen if newItem != nil { p.mu.WithLock(func() { p.index[newItem] = itemInfo[PT, T]{ - touched: p.config.clock.Now(), + lastUsage: p.config.clock.Now(), } }) } @@ -461,7 +461,7 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) { panic(fmt.Sprintf("inconsistent index: (%v, %+v, %+v)", has, el, info.idle)) } - return item, info.touched + return item, info.lastUsage } // removes first session from idle and resets the keepAliveCount @@ -547,7 +547,7 @@ func (p *Pool[PT, T]) pushIdle(item PT, now time.Time) { } p.changeState(func() Stats { - info.touched = now + info.lastUsage = now info.idle = p.idle.PushBack(item) p.index[item] = info @@ -595,7 +595,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { / return info }) - if p.config.idleThreshold > 0 && p.config.clock.Since(info.touched) > p.config.idleThreshold { + if p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive { p.closeItem(ctx, item) p.mu.WithLock(func() { p.changeState(func() Stats { diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index d22c48a66..2c6c58639 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -394,7 +394,7 @@ func TestPool(t *testing.T) { // replace default async closer for sync testing WithSyncCloseItem[*testItem, testItem](), WithClock[*testItem, testItem](fakeClock), - WithIdleThreshold[*testItem, testItem](idleThreshold), + WithIdleTimeToLive[*testItem, testItem](idleThreshold), WithTrace[*testItem, testItem](defaultTrace), ) @@ -402,14 +402,14 @@ func TestPool(t *testing.T) { s2 := mustGetItem(t, p) // Put both items at the absolutely same time. - // That is, both items must be updated their touched timestamp. + // That is, both items must be updated their lastUsage timestamp. mustPutItem(t, p, s1) mustPutItem(t, p, s2) require.Len(t, p.index, 2) require.Equal(t, 2, p.idle.Len()) - // Move clock to longer than idleThreshold + // Move clock to longer than idleTimeToLive fakeClock.Advance(idleThreshold + time.Nanosecond) // on get item from idle list the pool must check the item idle timestamp @@ -423,15 +423,15 @@ func TestPool(t *testing.T) { t.Fatal("unexpected number of closed items") } - // Move time to idleThreshold / 2 - this emulate a "spent" some time working within item. + // Move time to idleTimeToLive / 2 - this emulate a "spent" some time working within item. fakeClock.Advance(idleThreshold / 2) // Now put that item back - // pool must update a touched timestamp of item + // pool must update a lastUsage timestamp of item mustPutItem(t, p, s3) - // Move time to idleThreshold / 2 - // Total time since last updating touched timestampe is more than idleThreshold + // Move time to idleTimeToLive / 2 + // Total time since last updating lastUsage timestampe is more than idleTimeToLive fakeClock.Advance(idleThreshold/2 + time.Nanosecond) require.Len(t, p.index, 1) diff --git a/internal/query/client.go b/internal/query/client.go index 5f9f04104..3091de0d4 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -205,9 +205,7 @@ func do( err := op(ctx, s) if err != nil { - if xerrors.IsOperationError(err) { - s.SetStatus(session.StatusClosed) - } + s.SetStatus(session.StatusError) return xerrors.WithStackTrace(err) } @@ -263,27 +261,27 @@ func doTx( txSettings tx.Settings, opts ...retry.Option, ) (finalErr error) { - err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) { + err := do(ctx, pool, func(ctx context.Context, s *Session) (opErr error) { tx, err := s.Begin(ctx, txSettings) if err != nil { return xerrors.WithStackTrace(err) } - err = op(ctx, tx) - if err != nil { - errRollback := tx.Rollback(ctx) - if errRollback != nil { - return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) + + defer func() { + _ = tx.Rollback(ctx) + + if opErr != nil { + s.SetStatus(session.StatusError) } + }() + err = op(ctx, tx) + if err != nil { return xerrors.WithStackTrace(err) } + err = tx.CommitTx(ctx) if err != nil { - errRollback := tx.Rollback(ctx) - if errRollback != nil { - return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) - } - return xerrors.WithStackTrace(err) } @@ -530,6 +528,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()), pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()), + pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()), pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { var ( createCtx context.Context diff --git a/internal/query/client_test.go b/internal/query/client_test.go index eb6d494e1..1ba9fb625 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "math/rand" "testing" "github.com/stretchr/testify/require" @@ -13,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" "go.uber.org/mock/gomock" + "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" @@ -279,6 +281,284 @@ func TestClient(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, counter) }) + t.Run("TxLeak", func(t *testing.T) { + t.Run("OnExec", func(t *testing.T) { + t.Run("WithExplicitCommit", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + txInFlight := 0 + ctrl := gomock.NewController(t) + err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + if rand.Int31n(100) < 50 { + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + txInFlight++ + + stream.EXPECT().Recv().Return(nil, io.EOF) + + client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.CommitTransactionRequest, option ...grpc.CallOption) ( + *Ydb_Query.CommitTransactionResponse, error, + ) { + txInFlight-- + + return &Ydb_Query.CommitTransactionResponse{ + Status: Ydb.StatusIds_SUCCESS, + }, nil + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return stream, nil + }) + + return newTestSessionWithClient("123", client), nil + }), func(ctx context.Context, tx query.TxActor) error { + return tx.Exec(ctx, "") + }, tx.NewSettings(tx.WithSerializableReadWrite())) + require.NoError(t, err) + require.Zero(t, txInFlight) + }) + }) + t.Run("WithLazyCommit", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + ctrl := gomock.NewController(t) + txInFlight := 0 + err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + require.True(t, request.GetTxControl().GetCommitTx()) + + if rand.Int31n(100) < 50 { + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + if rand.Int31n(100) < 50 { + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + txInFlight++ + + stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + txInFlight-- + + return nil, io.EOF + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return stream, nil + }) + + return newTestSessionWithClient("123", client), nil + }), func(ctx context.Context, tx query.TxActor) error { + return tx.Exec(ctx, "", options.WithCommit()) + }, tx.NewSettings(tx.WithSerializableReadWrite())) + require.NoError(t, err) + require.Zero(t, txInFlight) + }) + }) + }) + t.Run("OnSecondExec", func(t *testing.T) { + t.Run("WithExplicitCommit", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + ctrl := gomock.NewController(t) + txInFlight := 0 + err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + if rand.Int31n(100) < 50 { + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + firstStream := NewMockQueryService_ExecuteQueryClient(ctrl) + firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + txInFlight++ + + firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + if rand.Int31n(100) < 50 { + client.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, + request *Ydb_Query.RollbackTransactionRequest, + option ...grpc.CallOption, + ) (*Ydb_Query.RollbackTransactionResponse, error) { + txInFlight-- + + return &Ydb_Query.RollbackTransactionResponse{}, nil + }) + + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + secondStream := NewMockQueryService_ExecuteQueryClient(ctrl) + secondStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + secondStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.CommitTransactionRequest, option ...grpc.CallOption) ( + *Ydb_Query.CommitTransactionResponse, error, + ) { + txInFlight-- + + return &Ydb_Query.CommitTransactionResponse{ + Status: Ydb.StatusIds_SUCCESS, + }, nil + }) + + return nil, io.EOF + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{}, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return secondStream, nil + }) + + return nil, io.EOF + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return firstStream, nil + }) + + return newTestSessionWithClient("123", client), nil + }), func(ctx context.Context, tx query.TxActor) error { + if err := tx.Exec(ctx, ""); err != nil { + return err + } + + return tx.Exec(ctx, "") + }, tx.NewSettings(tx.WithSerializableReadWrite())) + require.NoError(t, err) + }) + }) + t.Run("WithLazyCommit", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + ctrl := gomock.NewController(t) + txInFlight := 0 + err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + if rand.Int31n(100) < 50 { + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + firstStream := NewMockQueryService_ExecuteQueryClient(ctrl) + firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + txInFlight++ + + firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) ( + Ydb_Query_V1.QueryService_ExecuteQueryClient, error, + ) { + require.True(t, request.GetTxControl().GetCommitTx()) + + if rand.Int31n(100) < 50 { + client.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, + request *Ydb_Query.RollbackTransactionRequest, + option ...grpc.CallOption, + ) (*Ydb_Query.RollbackTransactionResponse, error) { + txInFlight-- + + return &Ydb_Query.RollbackTransactionResponse{}, nil + }) + + return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION)) + } + + secondStream := NewMockQueryService_ExecuteQueryClient(ctrl) + secondStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + secondStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + return nil, io.EOF + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{}, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return secondStream, nil + }) + + return nil, io.EOF + }) + + return &Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ExecStats: &Ydb_TableStats.QueryStats{}, + }, nil + }) + + return firstStream, nil + }) + + return newTestSessionWithClient("123", client), nil + }), func(ctx context.Context, tx query.TxActor) error { + if err := tx.Exec(ctx, ""); err != nil { + return err + } + + return tx.Exec(ctx, "", options.WithCommit()) + }, tx.NewSettings(tx.WithSerializableReadWrite())) + require.NoError(t, err) + }) + }) + }) + }) }) t.Run("Exec", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { @@ -1151,12 +1431,7 @@ type sessionControllerMock struct { } func (s *sessionControllerMock) IsAlive() bool { - switch s.status { - case session.StatusClosed, session.StatusClosing: - return false - default: - return true - } + return session.IsAlive(s.status) } func (s *sessionControllerMock) Close(ctx context.Context) error { @@ -1201,6 +1476,7 @@ func testPool( return pool.New[*Session, Session](ctx, pool.WithLimit[*Session, Session](1), pool.WithCreateItemFunc(createSession), + pool.WithSyncCloseItem[*Session, Session](), ) } diff --git a/internal/query/config/config.go b/internal/query/config/config.go index 7adb08242..befa08447 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -19,8 +19,9 @@ type Config struct { poolLimit int - sessionCreateTimeout time.Duration - sessionDeleteTimeout time.Duration + sessionCreateTimeout time.Duration + sessionDeleteTimeout time.Duration + sessionIddleTimeToLive time.Duration trace *trace.Query } @@ -68,3 +69,9 @@ func (c *Config) SessionCreateTimeout() time.Duration { func (c *Config) SessionDeleteTimeout() time.Duration { return c.sessionDeleteTimeout } + +// SessionIdleTimeToLive limits maximum time to live of idle session +// If idleTimeToLive is less than or equal to zero then sessions will not be closed by idle +func (c *Config) SessionIdleTimeToLive() time.Duration { + return c.sessionIddleTimeToLive +} diff --git a/internal/query/config/options.go b/internal/query/config/options.go index 2b30c5be2..01e040da1 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -55,3 +55,13 @@ func WithSessionDeleteTimeout(deleteTimeout time.Duration) Option { } } } + +// WithSessionIdleTimeToLive limits maximum time to live of idle session +// If idleTimeToLive is less than or equal to zero then sessions will not be closed by idle +func WithSessionIdleTimeToLive(idleTimeToLive time.Duration) Option { + return func(c *Config) { + if idleTimeToLive > 0 { + c.sessionIddleTimeToLive = idleTimeToLive + } + } +} diff --git a/internal/query/session/session.go b/internal/query/session/session.go index 142728953..54612b5d7 100644 --- a/internal/query/session/session.go +++ b/internal/query/session/session.go @@ -2,7 +2,7 @@ package session import ( "context" - "sync/atomic" + "fmt" "time" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" @@ -50,11 +50,26 @@ func (c *core) NodeID() uint32 { } func (c *core) statusCode() Status { - return Status(atomic.LoadUint32((*uint32)(&c.status))) + return c.status } func (c *core) SetStatus(status Status) { - atomic.StoreUint32((*uint32)(&c.status), uint32(status)) + switch c.status { + case statusUnknown: + c.status = status + case StatusIdle: + c.status = status + case StatusInUse: + c.status = status + case StatusClosing: + c.status = status + case StatusClosed: + c.status = status + case StatusError: + c.status = status + default: + panic(fmt.Sprintf("Unknown%d", c.status)) + } } func (c *core) Status() string { @@ -81,7 +96,16 @@ func WithTrace(t *trace.Query) Option { } } -func Open( //nolint:funlen +func IsAlive(status Status) bool { + switch status { + case StatusClosed, StatusClosing, StatusError: + return false + default: + return true + } +} + +func Open( ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...Option, ) (_ *core, finalErr error) { core := &core{ @@ -90,12 +114,7 @@ func Open( //nolint:funlen status: statusUnknown, checks: []func(s *core) bool{ func(s *core) bool { - switch s.statusCode() { - case StatusClosed, StatusClosing: - return false - default: - return true - } + return IsAlive(s.status) }, }, } diff --git a/internal/query/session/status.go b/internal/query/session/status.go index e8123dee0..5f7985626 100644 --- a/internal/query/session/status.go +++ b/internal/query/session/status.go @@ -12,6 +12,7 @@ const ( StatusInUse StatusClosing StatusClosed + StatusError ) func (s Status) String() string { @@ -26,6 +27,8 @@ func (s Status) String() string { return "Closing" case StatusClosed: return "Closed" + case StatusError: + return "Error" default: return fmt.Sprintf("Unknown%d", s) } diff --git a/internal/query/transaction.go b/internal/query/transaction.go index 50e10a403..4b43a5288 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -113,7 +113,7 @@ func (tx *Transaction) QueryResultSet( } if settings.TxControl().Commit { - if txID != nil { + if txID != nil && tx.Identifier != nil { return nil, xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag) } tx.completed = true @@ -193,7 +193,7 @@ func (tx *Transaction) txControl() *queryTx.Control { func (tx *Transaction) ID() string { if tx.Identifier == nil { - return "LAZY_TX" + return LazyTxID } return tx.Identifier.ID() @@ -236,7 +236,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu } if settings.TxControl().Commit { - if txID != nil { + if txID != nil && tx.Identifier != nil { return xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag) } tx.completed = true @@ -322,7 +322,7 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec } if settings.TxControl().Commit { - if txID != nil { + if txID != nil && tx.Identifier != nil { return nil, xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag) } tx.completed = true @@ -350,17 +350,21 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi return nil } -func (tx *Transaction) CommitTx(ctx context.Context) (err error) { - defer func() { - tx.notifyOnCompleted(err) - tx.completed = true - }() - +func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) { if tx.Identifier == nil { return nil } - err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID()) + if tx.completed { + return nil + } + + defer func() { + tx.notifyOnCompleted(finalErr) + tx.completed = true + }() + + err := commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) { tx.s.SetStatus(session.StatusClosed) @@ -384,11 +388,15 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi return nil } -func (tx *Transaction) Rollback(ctx context.Context) error { +func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) { if tx.Identifier == nil { return nil } + if tx.completed { + return nil + } + tx.completed = true tx.notifyOnCompleted(ErrTransactionRollingBack) diff --git a/internal/table/client.go b/internal/table/client.go index 5ce8e08dc..d65958ca0 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -33,7 +33,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config }, pool: pool.New[*session, session](ctx, pool.WithLimit[*session, session](config.SizeLimit()), - pool.WithIdleThreshold[*session, session](config.IdleThreshold()), + pool.WithIdleTimeToLive[*session, session](config.IdleThreshold()), pool.WithCreateItemTimeout[*session, session](config.CreateSessionTimeout()), pool.WithCloseItemTimeout[*session, session](config.DeleteTimeout()), pool.WithClock[*session, session](config.Clock()), diff --git a/options.go b/options.go index 304d1439a..668d607a1 100644 --- a/options.go +++ b/options.go @@ -501,6 +501,16 @@ func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { } } +// WithSessionPoolSessionIdleTimeToLive limits maximum time to live of idle session +// If idleTimeToLive is less than or equal to zero then sessions will not be closed by idle +func WithSessionPoolSessionIdleTimeToLive(idleThreshold time.Duration) Option { + return func(ctx context.Context, c *Driver) error { + c.queryOptions = append(c.queryOptions, queryConfig.WithSessionIdleTimeToLive(idleThreshold)) + + return nil + } +} + // WithSessionPoolCreateSessionTimeout set timeout for new session creation process in table.Client func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Option { return func(ctx context.Context, c *Driver) error {