Skip to content

Commit

Permalink
better-session-handling: SessionControl
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Dec 26, 2023
1 parent 98e3ce8 commit cbbcd74
Showing 1 changed file with 79 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.client.ResultSetConverter;
import tech.ydb.yoj.repository.ydb.client.SessionManager;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;
import tech.ydb.yoj.repository.ydb.exception.BadSessionException;
Expand Down Expand Up @@ -84,18 +85,16 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
@Getter
private final TransactionLocal transactionLocal;
private final RepositoryCache cache;
private final SessionControl session;

protected final REPO repo;

private Session session = null;
private Stopwatch sessionSw;
protected String txId = null;
private String firstNonNullTxId = null; // used for logs
private String closeAction = null; // used to detect of usage transaction after commit()/rollback()
private boolean isBadSession = false;

public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.repo = repo;
this.session = new SessionControl(repo.getSessionManager());
this.options = options;
this.transactionLocal = new TransactionLocal(options);
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
Expand Down Expand Up @@ -128,7 +127,7 @@ public void rollback() {
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
Status status = YdbOperations.safeJoin(session.get().rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
Expand All @@ -139,7 +138,7 @@ public void rollback() {

private void doCommit() {
try {
Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings()));
Status status = YdbOperations.safeJoin(session.get().commitTransaction(txId, new CommitTxSettings()));
validatePkConstraint(status.getIssues());
validate("commit", status.getCode(), status.toString());
} catch (YdbComponentUnavailableException | YdbOverloadedException e) {
Expand Down Expand Up @@ -171,13 +170,13 @@ private void validate(String request, StatusCode statusCode, String response) {
YdbValidator.validate(request, statusCode, response);
} catch (BadSessionException | OptimisticLockException e) {
transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName());
isBadSession = true;
session.release();
throw e;
}
}

private boolean isFinalActionNeeded(String actionName) {
if (session == null || isBadSession) {
if (!session.isActive()) {
transactionLocal.log().info("No-op %s: no active DB session", actionName);
return false;
}
Expand Down Expand Up @@ -208,12 +207,11 @@ private void endTransaction(String actionName, Runnable finalAction) {
} catch (Exception e) {
throw new UnexpectedException("Could not " + actionName + " " + txId, e);
} finally {
closeAction = actionName;
if (session != null) {
transactionLocal.log().info("[[%s]] TOTAL (txId=%s,sessionId=%s)", sessionSw, firstNonNullTxId, session.getId());
// NB: We use getSessionManager() method to allow mocking YdbRepository
repo.getSessionManager().release(session);
session = null;
if (session.isActive()) {
transactionLocal.log().info("[[%s]] TOTAL (txId=%s,sessionId=%s)",
session.getStopwatch(), firstNonNullTxId, session.get().getId()
);
session.release();
}
}
}
Expand Down Expand Up @@ -297,7 +295,9 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
// todo
// settings.setTraceId();

Result<DataQueryResult> result = YdbOperations.safeJoin(session.executeDataQuery(yql, txControl, sdkParams, settings));
Result<DataQueryResult> result = YdbOperations.safeJoin(
session.initAndGet().executeDataQuery(yql, txControl, sdkParams, settings)
);

if (result.isSuccess()) {
txId = emptyToNull(result.getValue().getTxId());
Expand Down Expand Up @@ -332,7 +332,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,
Params sdkParams = getSdkParams(statement, params);

List<RESULT> result = new ArrayList<>();
Status status = YdbOperations.safeJoin(session.executeScanQuery(yql, sdkParams, settings, rs -> {
Status status = YdbOperations.safeJoin(session.initAndGet().executeScanQuery(yql, sdkParams, settings, rs -> {
if (result.size() + rs.getRowCount() > options.getScanOptions().getMaxSize()) {
throw new ResultTruncatedException(
format("Query result size became greater than %d", options.getScanOptions().getMaxSize()),
Expand Down Expand Up @@ -374,8 +374,7 @@ private <PARAMS, RESULT> Stream<RESULT> doExecuteScanQuery(Statement<PARAMS, RES

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
session.initAndGet().executeScanQuery(
yql, sdkParams, settings,
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);
Expand Down Expand Up @@ -432,7 +431,7 @@ public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams pa

try {
Status status = YdbOperations.safeJoin(
session.executeBulkUpsert(
session.initAndGet().executeBulkUpsert(
tableName,
ListValue.of(values),
settings
Expand Down Expand Up @@ -474,8 +473,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
session.initAndGet().readTable(
tableName, settings.build(),
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
).whenComplete(spliterator::onSupplierThreadComplete);
Expand All @@ -487,7 +485,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Status status = YdbOperations.safeJoin(
session.readTable(
session.initAndGet().readTable(
tableName,
settings.build(),
rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action)
Expand All @@ -512,20 +510,7 @@ private void doCall(String actionStr, Runnable call) {
});
}

private void initSession() {
if (closeAction != null) {
throw new IllegalStateException("Transaction already closed by " + closeAction);
}
if (session == null) {
// NB: We use getSessionManager() method to allow mocking YdbRepository
session = repo.getSessionManager().getSession();
sessionSw = Stopwatch.createStarted();
}
}

private <R> R doCall(String actionStr, Supplier<R> call) {
initSession();

Stopwatch sw = Stopwatch.createStarted();
String resultStr = "";
try {
Expand Down Expand Up @@ -553,8 +538,66 @@ private void trace(@NonNull Statement<?, ?> statement, Object results) {
log.trace("{}", new Object() {
@Override
public String toString() {
return format("[txId=%s,sessionId=%s] %s%s", firstNonNullTxId, session.getId(), statement, debugResult(results));
return format("[txId=%s,sessionId=%s] %s%s", firstNonNullTxId, session.get().getId(), statement, debugResult(results));
}
});
}

private static final class SessionControl {
private final SessionManager sessionManager;

private State state = State.EMPTY;
private Session session;
private Stopwatch stopwatch;

public SessionControl(SessionManager sessionManager) {
this.sessionManager = sessionManager;
}

public boolean isActive() {
return state == State.ACTIVE;
}

public Stopwatch getStopwatch() {
if (state == State.EMPTY) {
throw new IllegalStateException("Can't get session: session wasn't activated");
}
return stopwatch;
}

public Session get() {
if (state != State.ACTIVE) {
throw new IllegalStateException("Can't get session: session isn't activated");
}

return session;
}

public Session initAndGet() {
if (state == State.EMPTY) {
// NB: We use getSessionManager() method to allow mocking YdbRepository
session = sessionManager.getSession();
stopwatch = Stopwatch.createStarted();
state = State.ACTIVE;
}

return get();
}

public void release() {
if (state != State.ACTIVE) {
throw new IllegalStateException("Can't release session: session isn't activated");
}
sessionManager.release(session);
session = null;
stopwatch.stop();
state = State.RELEASED;
}

private enum State {
EMPTY,
ACTIVE,
RELEASED
}
}
}

0 comments on commit cbbcd74

Please sign in to comment.