diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java index 99bbe4c1..2b394160 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java @@ -95,7 +95,9 @@ public synchronized void rollback(long txId) { } @Nullable - public synchronized T find(long txId, long version, Entity.Id id) { + public synchronized T find(long txId, long version, Entity.Id id, InMemoryTxLockWatcher watcher) { + checkLocks(version, watcher); + InMemoryEntityLine entityLine = entityLines.get(id); if (entityLine == null) { return null; @@ -105,7 +107,11 @@ public synchronized T find(long txId, long version, Entity.Id id) { } @Nullable - public synchronized V find(long txId, long version, Entity.Id id, Class viewType) { + public synchronized V find( + long txId, long version, Entity.Id id, Class viewType, InMemoryTxLockWatcher watcher + ) { + checkLocks(version, watcher); + InMemoryEntityLine entityLine = entityLines.get(id); if (entityLine == null) { return null; @@ -114,7 +120,9 @@ public synchronized V find(long txId, long version, Entit return columns != null ? columns.toSchema(ViewSchema.of(viewType)) : null; } - public synchronized List findAll(long txId, long version) { + public synchronized List findAll(long txId, long version, InMemoryTxLockWatcher watcher) { + checkLocks(version, watcher); + List entities = new ArrayList<>(); for (InMemoryEntityLine entityLine : entityLines.values()) { Columns columns = entityLine.get(txId, version); diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java index 77e908ae..89e15003 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java @@ -11,6 +11,7 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import java.util.ArrayList; import java.util.List; @@ -34,8 +35,10 @@ public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransact private final InMemoryTxLockWatcher watcher; private final InMemoryStorage storage; + private boolean hasWrites = false; private Long version = null; private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public InMemoryRepositoryTransaction(TxOptions options, InMemoryRepository repository) { this.storage = repository.getStorage(); @@ -62,6 +65,9 @@ public final > InMemoryTable.DbMemory getMemory(Class @Override public void commit() { + if (isBadSession) { + throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); + } endTransaction("commit()", this::commitImpl); } @@ -125,6 +131,8 @@ final > void doInWriteTransaction( Runnable query = () -> logTransaction(log, () -> { WriteTxDataShard shard = storage.getWriteTxDataShard(type, txId, getVersion()); consumer.accept(shard); + + hasWrites = true; }); if (options.isImmediateWrites()) { query.run(); @@ -138,8 +146,14 @@ final , R> R doInTransaction( String action, Class type, Function, R> func ) { return logTransaction(action, () -> { - ReadOnlyTxDataShard shard = storage.getReadOnlyTxDataShard(type, txId, getVersion()); - return func.apply(shard); + InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS; + ReadOnlyTxDataShard shard = storage.getReadOnlyTxDataShard(type, txId, getVersion(), findWatcher); + try { + return func.apply(shard); + } catch (OptimisticLockException e) { + isBadSession = true; + throw e; + } }); } diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java index 94e02d2a..353ce90c 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java @@ -65,22 +65,24 @@ public synchronized > WriteTxDataShard getWriteTxDataShar Class type, long txId, long version ) { uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(type); - return getTxDataShard(type, txId, version); + return getTxDataShard(type, txId, version, InMemoryTxLockWatcher.NO_LOCKS); } public synchronized > ReadOnlyTxDataShard getReadOnlyTxDataShard( - Class type, long txId, long version + Class type, long txId, long version, InMemoryTxLockWatcher watcher ) { - return getTxDataShard(type, txId, version); + return getTxDataShard(type, txId, version, watcher); } - private > TxDataShardImpl getTxDataShard(Class type, long txId, long version) { + private > TxDataShardImpl getTxDataShard( + Class type, long txId, long version, InMemoryTxLockWatcher watcher + ) { @SuppressWarnings("unchecked") InMemoryDataShard shard = (InMemoryDataShard) shards.get(type); if (shard == null) { throw new InMemoryRepositoryException("Table is not created: " + type.getSimpleName()); } - return new TxDataShardImpl<>(shard, txId, version); + return new TxDataShardImpl<>(shard, txId, version, watcher); } public synchronized void dropDb() { diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java index da266895..6d7d7b87 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java @@ -1,5 +1,7 @@ package tech.ydb.yoj.repository.test.inmemory; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.EntitySchema; import tech.ydb.yoj.repository.db.Range; @@ -11,9 +13,16 @@ import java.util.Map; import java.util.Set; +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class InMemoryTxLockWatcher { - private final Map, Set>> readRows = new HashMap<>(); - private final Map, List>> readRanges = new HashMap<>(); + public static final InMemoryTxLockWatcher NO_LOCKS = new InMemoryTxLockWatcher(Map.of(), Map.of()); + + private final Map, Set>> readRows; + private final Map, List>> readRanges; + + public InMemoryTxLockWatcher() { + this(new HashMap<>(), new HashMap<>()); + } public > void markRowRead(Class type, Entity.Id id) { readRows.computeIfAbsent(type, __ -> new HashSet<>()).add(id); diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java index 6aa0110a..d2a649f2 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java @@ -10,28 +10,30 @@ final class TxDataShardImpl> implements ReadOnlyTxDataShard< private final InMemoryDataShard shard; private final long txId; private final long version; + private final InMemoryTxLockWatcher watcher; - public TxDataShardImpl(InMemoryDataShard shard, long txId, long version) { + public TxDataShardImpl(InMemoryDataShard shard, long txId, long version, InMemoryTxLockWatcher watcher) { this.shard = shard; this.txId = txId; this.version = version; + this.watcher = watcher; } @Nullable @Override public T find(Entity.Id id) { - return shard.find(txId, version, id); + return shard.find(txId, version, id, watcher); } @Nullable @Override public V find(Entity.Id id, Class viewType) { - return shard.find(txId, version, id, viewType); + return shard.find(txId, version, id, viewType, watcher); } @Override public List findAll() { - return shard.findAll(txId, version); + return shard.findAll(txId, version, watcher); } @Override diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index 341e3bb4..103bc8aa 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -495,6 +495,88 @@ public void readTableViews() { .isThrownBy(() -> db.tx(() -> db.typeFreaks().readTableIds(ReadTableParams.getDefault()).count())); } + @Test + public void doNotCommitAfterTLI() { + Project.Id id1 = new Project.Id("id1"); + Project.Id id2 = new Project.Id("id2"); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).find(id2); + + db.tx(() -> db.projects().save(new Project(id2, "name2"))); + + tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(() -> tx.table(Project.class).find(id2)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(tx::commit); + + tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute + } + + @Test + public void writeDontProduceTLI() { + Project.Id id = new Project.Id("id"); + + db.tx(() -> db.projects().save(new Project(id, "name"))); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).find(id); + + db.tx(() -> { + db.projects().find(id); + db.projects().save(new Project(id, "name2")); + }); + + // write don't produce TLI + tx.table(Project.class).save(new Project(id, "name3")); + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(tx::commit); + } + + @Test + public void consistencyCheckAllColumnsOnFind() { + Project.Id id1 = new Project.Id("id1"); + Project.Id id2 = new Project.Id("id2"); + + db.tx(() -> { + db.projects().save(new Project(id1, "name")); + db.projects().save(new Project(id2, "name")); + }); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).save(new Project(new Project.Id("id3"), "name")); // make tx available for TLI + + tx.table(Project.class).find(id1); + tx.table(Project.class).find(id2); + + db.tx(() -> { + db.projects().find(id2); + db.projects().save(new Project(id2, "name2")); + }); + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(() -> tx.table(Project.class).find(id1)); + } + @Test public void streamAllWithPartitioning() { db.tx(() -> { diff --git a/repository-ydb-v1/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v1/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index b6554507..c68b7bcb 100644 --- a/repository-ydb-v1/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v1/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -31,15 +31,12 @@ import tech.ydb.yoj.databind.schema.Column; import tech.ydb.yoj.databind.schema.ObjectSchema; import tech.ydb.yoj.repository.db.EntitySchema; -import tech.ydb.yoj.repository.db.IsolationLevel; import tech.ydb.yoj.repository.db.Repository; import tech.ydb.yoj.repository.db.RepositoryTransaction; import tech.ydb.yoj.repository.db.Tx; -import tech.ydb.yoj.repository.db.TxOptions; import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.exception.ConversionException; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; -import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RetryableException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.list.ListRequest; @@ -866,32 +863,6 @@ public void schemaWithHint() { repository.schema(HintAutoPartitioningByLoad.class).create(); } - @Test - public void doNotCommitAfterTLI() { - Project.Id id1 = new Project.Id("id1"); - Project.Id id2 = new Project.Id("id2"); - - RepositoryTransaction tx = repository.startTransaction( - TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) - .withImmediateWrites(true) - .withFirstLevelCache(false) - ); - - tx.table(Project.class).find(id2); - - db.tx(() -> db.projects().save(new Project(id2, "name2"))); - - tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI - - assertThatExceptionOfType(OptimisticLockException.class) - .isThrownBy(() -> tx.table(Project.class).find(id2)); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(tx::commit); - - tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute - } - @AllArgsConstructor private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase { @Delegate diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index 7ce1e70e..357026c9 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -33,15 +33,12 @@ import tech.ydb.yoj.databind.schema.Column; import tech.ydb.yoj.databind.schema.ObjectSchema; import tech.ydb.yoj.repository.db.EntitySchema; -import tech.ydb.yoj.repository.db.IsolationLevel; import tech.ydb.yoj.repository.db.Repository; import tech.ydb.yoj.repository.db.RepositoryTransaction; import tech.ydb.yoj.repository.db.Tx; -import tech.ydb.yoj.repository.db.TxOptions; import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.exception.ConversionException; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; -import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RetryableException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.list.ListRequest; @@ -867,32 +864,6 @@ public void schemaWithHint() { repository.schema(HintAutoPartitioningByLoad.class).create(); } - @Test - public void doNotCommitAfterTLI() { - Project.Id id1 = new Project.Id("id1"); - Project.Id id2 = new Project.Id("id2"); - - RepositoryTransaction tx = repository.startTransaction( - TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) - .withImmediateWrites(true) - .withFirstLevelCache(false) - ); - - tx.table(Project.class).find(id2); - - db.tx(() -> db.projects().save(new Project(id2, "name2"))); - - tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI - - assertThatExceptionOfType(OptimisticLockException.class) - .isThrownBy(() -> tx.table(Project.class).find(id2)); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(tx::commit); - - tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute - } - @AllArgsConstructor private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase { @Delegate