Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No commit on rollback #1

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.Getter;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
Expand All @@ -25,7 +24,6 @@ public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransact

private final long txId = txIdGenerator.incrementAndGet();
private final Stopwatch txStopwatch = Stopwatch.createStarted();
private final NormalExecutionWatcher normalExecutionWatcher = new NormalExecutionWatcher();
private final List<Runnable> pendingWrites = new ArrayList<>();

@Getter
Expand Down Expand Up @@ -75,10 +73,6 @@ private void commitImpl() {
pendingWrite.run();
}

if (normalExecutionWatcher.hasLastStatementCompletedExceptionally()) {
throw new IllegalStateException("Transaction should not be committed if the last statement finished exceptionally");
}

storage.commit(txId, getVersion(), watcher);
} catch (Exception e) {
storage.rollback(txId);
Expand Down Expand Up @@ -133,10 +127,8 @@ final <T extends Entity<T>> void doInWriteTransaction(
consumer.accept(shard);
});
if (options.isImmediateWrites()) {
normalExecutionWatcher.execute(() -> {
query.run();
transactionLocal.projectionCache().applyProjectionChanges(this);
});
query.run();
transactionLocal.projectionCache().applyProjectionChanges(this);
} else {
pendingWrites.add(query);
}
Expand All @@ -145,10 +137,10 @@ final <T extends Entity<T>> void doInWriteTransaction(
final <T extends Entity<T>, R> R doInTransaction(
String action, Class<T> type, Function<ReadOnlyTxDataShard<T>, R> func
) {
return normalExecutionWatcher.execute(() -> logTransaction(action, () -> {
return logTransaction(action, () -> {
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion());
return func.apply(shard);
}));
});
}

private void logTransaction(String action, Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import lombok.Getter;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tech.ydb.yoj.repository.db;
package tech.ydb.yoj.repository.test.inmemory.legacy;

import java.util.function.Supplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.slf4j.LoggerFactory;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.NormalExecutionWatcher;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
Expand Down Expand Up @@ -75,7 +74,6 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final NormalExecutionWatcher exceptionWatcher = new NormalExecutionWatcher();
private final List<Stream<?>> openedStreams = new ArrayList<>();

@Getter
Expand Down Expand Up @@ -112,25 +110,27 @@ public <T extends Entity<T>> Table<T> table(Class<T> c) {

@Override
public void commit() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally()) {
log.error("Commit operation is not expected after the last statement threw an exception");
}
try {
flushPendingWrites();
} catch (Throwable t) {
performRollbackAsCleanup();
rollback();
throw t;
}
endTransaction("commit", this::doCommit);
}

@Override
public void rollback() {
if (exceptionWatcher.hasLastStatementCompletedExceptionally() || options.isImmediateWrites()) {
performRollbackAsCleanup();
} else {
endTransaction("commit (for consistency check only)", this::doCommit);
}
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

private void doCommit() {
Expand Down Expand Up @@ -205,19 +205,15 @@ private void endTransaction(String actionName, Runnable finalAction) {
}

private TxControl<?> getTxControl() {
switch (options.getIsolationLevel()) {
case SERIALIZABLE_READ_WRITE:
return switch (options.getIsolationLevel()) {
nvamelichev marked this conversation as resolved.
Show resolved Hide resolved
case SERIALIZABLE_READ_WRITE -> {
TxControl<?> txControl = (txId != null ? TxControl.id(txId) : TxControl.serializableRw());
return txControl.setCommitTx(false);
case ONLINE_CONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY:
return TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY:
return TxControl.staleRo();
default:
return null;
}
yield txControl.setCommitTx(false);
}
case ONLINE_CONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(false);
case ONLINE_INCONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(true);
case STALE_CONSISTENT_READ_ONLY -> TxControl.staleRo();
};
}

private String getYql(Statement<?, ?> statement) {
Expand All @@ -236,19 +232,6 @@ private void flushPendingWrites() {
.forEach(this::execute);
}

private void performRollbackAsCleanup() {
Interrupts.runInCleanupMode(() -> {
try {
endTransaction("rollback", () -> {
Status status = YdbOperations.safeJoin(session.rollbackTransaction(txId, new RollbackTxSettings()));
validate("rollback", status.getCode(), status.toString());
});
} catch (Throwable t) {
log.info("Failed to rollback the transaction", t);
}
});
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = statement.readFromCache(params, cache);
Expand All @@ -259,22 +242,21 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
return result;
}

result = exceptionWatcher.execute(() -> {
List<RESULT> callResult = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteScanQueryLegacy(statement, params);
}
result = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
if (options.getScanOptions().isUseNewSpliterator()) {
return doExecuteScanQueryList(statement, params);
} else {
return doExecuteDataQuery(statement, params);
return doExecuteScanQueryLegacy(statement, params);
}
});
trace(statement, callResult);
return callResult;
} else {
return doExecuteDataQuery(statement, params);
}
});

trace(statement, result);
statement.storeToCache(params, result, cache);

return result;
}

Expand Down Expand Up @@ -398,10 +380,8 @@ public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value
}
YdbRepository.Query<PARAMS> query = new YdbRepository.Query<>(statement, value);
if (options.isImmediateWrites()) {
exceptionWatcher.execute(() -> {
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
});
execute(query);
transactionLocal.projectionCache().applyProjectionChanges(this);
} else {
pendingWrites.add(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,28 +340,24 @@ public void checkDBIsUnavailable() {
checkTxRetryableOnRequestError(StatusCode.UNAVAILABLE);
checkTxRetryableOnFlushingError(StatusCode.UNAVAILABLE);
checkTxNonRetryableOnCommit(StatusCode.UNAVAILABLE);
checkTxUnavailableOnNormalRollback(StatusCode.UNAVAILABLE);
}

@Test
public void checkDBIsOverloaded() {
checkTxRetryableOnRequestError(StatusCode.OVERLOADED);
checkTxRetryableOnFlushingError(StatusCode.OVERLOADED);
checkTxNonRetryableOnCommit(StatusCode.OVERLOADED);
checkTxUnavailableOnNormalRollback(StatusCode.OVERLOADED);
}

@Test
public void checkDBSessionBusy() {
checkTxRetryableOnRequestError(StatusCode.PRECONDITION_FAILED);
checkTxRetryableOnFlushingError(StatusCode.PRECONDITION_FAILED);
checkTxNonRetryableOnCommit(StatusCode.PRECONDITION_FAILED);
checkTxUnavailableOnNormalRollback(StatusCode.PRECONDITION_FAILED);

checkTxRetryableOnRequestError(StatusCode.SESSION_BUSY);
checkTxRetryableOnFlushingError(StatusCode.SESSION_BUSY);
checkTxNonRetryableOnCommit(StatusCode.SESSION_BUSY);
checkTxUnavailableOnNormalRollback(StatusCode.SESSION_BUSY);
}

@Test
Expand Down Expand Up @@ -848,18 +844,6 @@ private void checkTxNonRetryableOnCommit(StatusCode statusCode) {
);
}

private void checkTxUnavailableOnNormalRollback(StatusCode statusCode) {
RepositoryTransaction tx = repository.startTransaction();
tx.table(Project.class).findAll();

// This rollback is a checking consistency DB commit, since the last transaction statement finished normally.
runWithModifiedStatusCode(
statusCode,
() -> assertThatExceptionOfType(UnavailableException.class)
.isThrownBy(tx::rollback)
);
}

static StatusCode statusCode = null;

private void runWithModifiedStatusCode(StatusCode code, Runnable runnable) {
Expand Down
Loading