Skip to content

Commit

Permalink
no-commit-on-rollback: no commit on rollback
Browse files Browse the repository at this point in the history
Alexander Lavrukov authored and nvamelichev committed Dec 21, 2023
1 parent aeda533 commit 1e6697f
Showing 7 changed files with 69 additions and 150 deletions.
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@
import lombok.Getter;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
@@ -25,7 +24,6 @@ public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransact

private final long txId = txIdGenerator.incrementAndGet();
private final Stopwatch txStopwatch = Stopwatch.createStarted();
private final NormalExecutionWatcher normalExecutionWatcher = new NormalExecutionWatcher();
private final List<Runnable> pendingWrites = new ArrayList<>();

@Getter
@@ -75,10 +73,6 @@ private void commitImpl() {
pendingWrite.run();
}

if (normalExecutionWatcher.hasLastStatementCompletedExceptionally()) {
throw new IllegalStateException("Transaction should not be committed if the last statement finished exceptionally");
}

storage.commit(txId, getVersion(), watcher);
} catch (Exception e) {
storage.rollback(txId);
@@ -133,10 +127,8 @@ final <T extends Entity<T>> void doInWriteTransaction(
consumer.accept(shard);
});
if (options.isImmediateWrites()) {
normalExecutionWatcher.execute(() -> {
query.run();
transactionLocal.projectionCache().applyProjectionChanges(this);
});
query.run();
transactionLocal.projectionCache().applyProjectionChanges(this);
} else {
pendingWrites.add(query);
}
@@ -145,10 +137,10 @@ final <T extends Entity<T>> void doInWriteTransaction(
final <T extends Entity<T>, R> R doInTransaction(
String action, Class<T> type, Function<ReadOnlyTxDataShard<T>, R> func
) {
return normalExecutionWatcher.execute(() -> logTransaction(action, () -> {
return logTransaction(action, () -> {
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion());
return func.apply(shard);
}));
});
}

private void logTransaction(String action, Runnable runnable) {
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import lombok.Getter;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tech.ydb.yoj.repository.db;
package tech.ydb.yoj.repository.test.inmemory.legacy;

import java.util.function.Supplier;

Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
import org.slf4j.LoggerFactory;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
@@ -75,7 +74,6 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final NormalExecutionWatcher exceptionWatcher = new NormalExecutionWatcher();
private final List<Stream<?>> openedStreams = new ArrayList<>();

@Getter
@@ -112,25 +110,27 @@ public <T extends Entity<T>> Table<T> table(Class<T> c) {

@Override
public void commit() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally()) {
log.error("Commit operation is not expected after the last statement threw an exception");
}
try {
flushPendingWrites();
} catch (Throwable t) {
performRollbackAsCleanup();
rollback();
throw t;
}
endTransaction("commit", this::doCommit);
}

@Override
public void rollback() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally() || options.isImmediateWrites()) {
performRollbackAsCleanup();
} else {
endTransaction("commit (for consistency check only)", this::doCommit);
}
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

private void doCommit() {
@@ -205,19 +205,15 @@ private void endTransaction(String actionName, Runnable finalAction) {
}

private TxControl<?> getTxControl() {
switch (options.getIsolationLevel()) {
case SERIALIZABLE_READ_WRITE:
return switch (options.getIsolationLevel()) {
case SERIALIZABLE_READ_WRITE -> {
TxControl<?> txControl = (txId != null ? TxControl.id(txId) : TxControl.serializableRw());
return txControl.setCommitTx(false);
case ONLINE_CONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY:
return TxControl.staleRo();
default:
return null;
}
yield txControl.setCommitTx(false);
}
case ONLINE_CONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY -> TxControl.staleRo();
};
}

private String getYql(Statement<?, ?> statement) {
@@ -236,19 +232,6 @@ private void flushPendingWrites() {
.forEach(this::execute);
}

private void performRollbackAsCleanup() {
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = statement.readFromCache(params, cache);
@@ -259,22 +242,21 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
return result;
}

result = exceptionWatcher.execute(() -> {
List<RESULT> callResult = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteScanQueryLegacy(statement, params);
}
result = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteDataQuery(statement, params);
return doExecuteScanQueryLegacy(statement, params);
}
});
trace(statement, callResult);
return callResult;
} else {
return doExecuteDataQuery(statement, params);
}
});

trace(statement, result);
statement.storeToCache(params, result, cache);

return result;
}

@@ -398,10 +380,8 @@ public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value
}
YdbRepository.Query<PARAMS> query = new YdbRepository.Query<>(statement, value);
if (options.isImmediateWrites()) {
exceptionWatcher.execute(() -> {
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
});
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
} else {
pendingWrites.add(query);
}
Original file line number Diff line number Diff line change
@@ -340,28 +340,24 @@ public void checkDBIsUnavailable() {
checkTxRetryableOnRequestError(StatusCode.UNAVAILABLE);
checkTxRetryableOnFlushingError(StatusCode.UNAVAILABLE);
checkTxNonRetryableOnCommit(StatusCode.UNAVAILABLE);
checkTxUnavailableOnNormalRollback(StatusCode.UNAVAILABLE);
}

@Test
public void checkDBIsOverloaded() {
checkTxRetryableOnRequestError(StatusCode.OVERLOADED);
checkTxRetryableOnFlushingError(StatusCode.OVERLOADED);
checkTxNonRetryableOnCommit(StatusCode.OVERLOADED);
checkTxUnavailableOnNormalRollback(StatusCode.OVERLOADED);
}

@Test
public void checkDBSessionBusy() {
checkTxRetryableOnRequestError(StatusCode.PRECONDITION_FAILED);
checkTxRetryableOnFlushingError(StatusCode.PRECONDITION_FAILED);
checkTxNonRetryableOnCommit(StatusCode.PRECONDITION_FAILED);
checkTxUnavailableOnNormalRollback(StatusCode.PRECONDITION_FAILED);

checkTxRetryableOnRequestError(StatusCode.SESSION_BUSY);
checkTxRetryableOnFlushingError(StatusCode.SESSION_BUSY);
checkTxNonRetryableOnCommit(StatusCode.SESSION_BUSY);
checkTxUnavailableOnNormalRollback(StatusCode.SESSION_BUSY);
}

@Test
@@ -848,18 +844,6 @@ private void checkTxNonRetryableOnCommit(StatusCode statusCode) {
);
}

private void checkTxUnavailableOnNormalRollback(StatusCode statusCode) {
RepositoryTransaction tx = repository.startTransaction();
tx.table(Project.class).findAll();

// This rollback is a checking consistency DB commit, since the last transaction statement finished normally.
runWithModifiedStatusCode(
statusCode,
() -> assertThatExceptionOfType(UnavailableException.class)
.isThrownBy(tx::rollback)
);
}

static StatusCode statusCode = null;

private void runWithModifiedStatusCode(StatusCode code, Runnable runnable) {
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import tech.ydb.table.values.Value;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
@@ -76,7 +75,6 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final NormalExecutionWatcher exceptionWatcher = new NormalExecutionWatcher();
private final List<Stream<?>> openedStreams = new ArrayList<>();

@Getter
@@ -113,25 +111,27 @@ public <T extends Entity<T>> Table<T> table(Class<T> c) {

@Override
public void commit() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally()) {
log.error("Commit operation is not expected after the last statement threw an exception");
}
try {
flushPendingWrites();
} catch (Throwable t) {
performRollbackAsCleanup();
rollback();
throw t;
}
endTransaction("commit", this::doCommit);
}

@Override
public void rollback() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally() || options.isImmediateWrites()) {
performRollbackAsCleanup();
} else {
endTransaction("commit (for consistency check only)", this::doCommit);
}
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

private void doCommit() {
@@ -206,19 +206,15 @@ private void endTransaction(String actionName, Runnable finalAction) {
}

private TxControl<?> getTxControl() {
switch (options.getIsolationLevel()) {
case SERIALIZABLE_READ_WRITE:
return switch (options.getIsolationLevel()) {
case SERIALIZABLE_READ_WRITE -> {
TxControl<?> txControl = (txId != null ? TxControl.id(txId) : TxControl.serializableRw());
return txControl.setCommitTx(false);
case ONLINE_CONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY:
return TxControl.staleRo();
default:
return null;
}
yield txControl.setCommitTx(false);
}
case ONLINE_CONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY -> TxControl.staleRo();
};
}

private String getYql(Statement<?, ?> statement) {
@@ -237,19 +233,6 @@ private void flushPendingWrites() {
.forEach(this::execute);
}

private void performRollbackAsCleanup() {
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = statement.readFromCache(params, cache);
@@ -260,22 +243,21 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
return result;
}

result = exceptionWatcher.execute(() -> {
List<RESULT> callResult = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteScanQueryLegacy(statement, params);
}
result = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteDataQuery(statement, params);
return doExecuteScanQueryLegacy(statement, params);
}
});
trace(statement, callResult);
return callResult;
} else {
return doExecuteDataQuery(statement, params);
}
});

trace(statement, result);
statement.storeToCache(params, result, cache);

return result;
}

@@ -398,10 +380,8 @@ public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value
}
YdbRepository.Query<PARAMS> query = new YdbRepository.Query<>(statement, value);
if (options.isImmediateWrites()) {
exceptionWatcher.execute(() -> {
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
});
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
} else {
pendingWrites.add(query);
}
Original file line number Diff line number Diff line change
@@ -341,28 +341,24 @@ public void checkDBIsUnavailable() {
checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE);
checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE);
checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE);
checkTxUnavailableOnNormalRollback(StatusCodesProtos.StatusIds.StatusCode.UNAVAILABLE);
}

@Test
public void checkDBIsOverloaded() {
checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED);
checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED);
checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED);
checkTxUnavailableOnNormalRollback(StatusCodesProtos.StatusIds.StatusCode.OVERLOADED);
}

@Test
public void checkDBSessionBusy() {
checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED);
checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED);
checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED);
checkTxUnavailableOnNormalRollback(StatusCodesProtos.StatusIds.StatusCode.PRECONDITION_FAILED);

checkTxRetryableOnRequestError(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY);
checkTxRetryableOnFlushingError(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY);
checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY);
checkTxUnavailableOnNormalRollback(StatusCodesProtos.StatusIds.StatusCode.SESSION_BUSY);
}

@Test
@@ -849,18 +845,6 @@ private void checkTxNonRetryableOnCommit(StatusCodesProtos.StatusIds.StatusCode
);
}

private void checkTxUnavailableOnNormalRollback(StatusCodesProtos.StatusIds.StatusCode statusCode) {
RepositoryTransaction tx = repository.startTransaction();
tx.table(Project.class).findAll();

// This rollback is a checking consistency DB commit, since the last transaction statement finished normally.
runWithModifiedStatusCode(
statusCode,
() -> assertThatExceptionOfType(UnavailableException.class)
.isThrownBy(tx::rollback)
);
}

static StatusCodesProtos.StatusIds.StatusCode statusCode = null;

private void runWithModifiedStatusCode(StatusCodesProtos.StatusIds.StatusCode code, Runnable runnable) {

0 comments on commit 1e6697f

Please sign in to comment.