Skip to content

Commit

Permalink
Backport to branch(3) : Add an extension point to collect uncommitted…
Browse files Browse the repository at this point in the history
… read-set and write-set (#2287)

Co-authored-by: Mitsunori Komatsu <[email protected]>
  • Loading branch information
feeblefakie and komamitsu authored Oct 18, 2024
1 parent 29e5c18 commit e002cde
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 11 deletions.
6 changes: 6 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,12 @@ public enum CoreError implements ScalarDbError {
Category.INTERNAL_ERROR, "0044", "The Upsert operation failed. Details: %s", "", ""),
JDBC_TRANSACTION_UPDATE_OPERATION_FAILED(
Category.INTERNAL_ERROR, "0045", "The Update operation failed. Details: %s", "", ""),
HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED(
Category.INTERNAL_ERROR,
"0046",
"Handling the before-preparation snapshot hook failed. Details: %s",
"",
""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.scalar.db.transaction.consensuscommit;

import java.util.concurrent.Future;

public interface BeforePreparationSnapshotHook {
Future<Void> handle(
TransactionTableMetadataManager transactionTableMetadataManager,
Snapshot.ReadWriteSets readWriteSets);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TransactionState;
import com.scalar.db.common.error.CoreError;
Expand All @@ -22,6 +23,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,6 +37,8 @@ public class CommitHandler {
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;

@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public CommitHandler(
DistributedStorage storage,
Expand All @@ -50,7 +55,52 @@ protected void onPrepareFailure(Snapshot snapshot) {}

protected void onValidateFailure(Snapshot snapshot) {}

private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snapshot)
throws UnknownTransactionStatusException, CommitException {
if (beforePreparationSnapshotHook == null) {
return Optional.empty();
}

try {
return Optional.of(
beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets()));
} catch (Exception e) {
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of preparation phase. But the callback method name
// `onPrepareFailure()` should be renamed to more reasonable one.
onPrepareFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
snapshot.getId());
}
}

private void waitBeforePreparationSnapshotHookFuture(
Snapshot snapshot, @Nullable Future<Void> snapshotHookFuture)
throws UnknownTransactionStatusException, CommitException {
if (snapshotHookFuture == null) {
return;
}

try {
snapshotHookFuture.get();
} catch (Exception e) {
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of validation phase. But the callback method name
// `onValidateFailure()` should be renamed to more reasonable one.
onValidateFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
snapshot.getId());
}
}

public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException {
Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);
try {
prepare(snapshot);
} catch (PreparationException e) {
Expand Down Expand Up @@ -79,6 +129,8 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
throw e;
}

waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null));

commitState(snapshot);
commitRecords(snapshot);
}
Expand Down Expand Up @@ -234,4 +286,16 @@ public void rollbackRecords(Snapshot snapshot) {
// ignore since records are recovered lazily
}
}

/**
* Sets the {@link BeforePreparationSnapshotHook}. This method must be called immediately after
* the constructor is invoked.
*
* @param beforePreparationSnapshotHook The snapshot hook to set.
* @throws NullPointerException If the argument is null.
*/
public void setBeforePreparationSnapshotHook(
BeforePreparationSnapshotHook beforePreparationSnapshotHook) {
this.beforePreparationSnapshotHook = checkNotNull(beforePreparationSnapshotHook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ConsensusCommitManager extends ActiveTransactionManagedDistributedT
private final Coordinator coordinator;
private final ParallelExecutor parallelExecutor;
private final RecoveryHandler recovery;
private final CommitHandler commit;
protected final CommitHandler commit;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;
Expand All @@ -75,7 +75,7 @@ public ConsensusCommitManager(
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}

ConsensusCommitManager(DatabaseConfig databaseConfig) {
protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
super(databaseConfig);
StorageFactory storageFactory = StorageFactory.create(databaseConfig.getProperties());
storage = storageFactory.getStorage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public final class ConsensusCommitUtils {

Expand Down Expand Up @@ -299,4 +300,18 @@ static String convertUnsatisfiedConditionExceptionMessageForUpdate(
}
return message;
}

/**
* Returns the next `tx_version` based on the current value.
*
* @param currentTxVersion The current `tx_version`, if it exists, or null otherwise.
* @return The next `tx_version`.
*/
public static int getNextTxVersion(@Nullable Integer currentTxVersion) {
if (currentTxVersion == null) {
return 1;
} else {
return currentTxVersion + 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.scalar.db.transaction.consensuscommit.Attribute.ID;
import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getNextTxVersion;

import com.google.common.annotations.VisibleForTesting;
import com.scalar.db.api.ConditionBuilder;
Expand Down Expand Up @@ -66,7 +67,7 @@ private void add(Put base, @Nullable TransactionResult result) throws ExecutionE
if (!base.isInsertModeEnabled() && result != null) { // overwrite existing record
createBeforeColumns(base, result).forEach(putBuilder::value);
int version = result.getVersion();
putBuilder.intValue(Attribute.VERSION, version + 1);
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(version));

// check if the record is not interrupted by other conflicting transactions
if (result.isDeemedAsCommitted()) {
Expand All @@ -82,7 +83,7 @@ private void add(Put base, @Nullable TransactionResult result) throws ExecutionE
.build());
}
} else { // initial record or insert mode enabled
putBuilder.intValue(Attribute.VERSION, 1);
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(null));

// check if the record is not created by other conflicting transactions
putBuilder.condition(ConditionBuilder.putIfNotExists());
Expand All @@ -107,7 +108,7 @@ private void add(Delete base, @Nullable TransactionResult result) throws Executi
if (result != null) {
createBeforeColumns(base, result).forEach(putBuilder::value);
int version = result.getVersion();
putBuilder.intValue(Attribute.VERSION, version + 1);
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(version));

// check if the record is not interrupted by other conflicting transactions
if (result.isDeemedAsCommitted()) {
Expand All @@ -122,7 +123,7 @@ private void add(Delete base, @Nullable TransactionResult result) throws Executi
.build());
}
} else {
putBuilder.intValue(Attribute.VERSION, 1);
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(null));

// check if the record is not created by other conflicting transactions
putBuilder.condition(ConditionBuilder.putIfNotExists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -161,6 +162,10 @@ public List<Delete> getDeletesInDeleteSet() {
return new ArrayList<>(deleteSet.values());
}

public ReadWriteSets getReadWriteSets() {
return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet());
}

public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
throws CrudException {
if (deleteSet.containsKey(key)) {
Expand Down Expand Up @@ -653,4 +658,38 @@ public String toString() {
.toString();
}
}

public static class ReadWriteSets {
public final String transactionId;
public final Map<Key, Optional<TransactionResult>> readSetMap;
public final List<Entry<Key, Put>> writeSet;
public final List<Entry<Key, Delete>> deleteSet;

public ReadWriteSets(
String transactionId,
Map<Key, Optional<TransactionResult>> readSetMap,
Collection<Entry<Key, Put>> writeSet,
Collection<Entry<Key, Delete>> deleteSet) {
this.transactionId = transactionId;
this.readSetMap = new HashMap<>(readSetMap);
this.writeSet = new ArrayList<>(writeSet);
this.deleteSet = new ArrayList<>(deleteSet);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ReadWriteSets)) return false;
ReadWriteSets that = (ReadWriteSets) o;
return Objects.equals(transactionId, that.transactionId)
&& Objects.equals(readSetMap, that.readSetMap)
&& Objects.equals(writeSet, that.writeSet)
&& Objects.equals(deleteSet, that.deleteSet);
}

@Override
public int hashCode() {
return Objects.hash(transactionId, readSetMap, writeSet, deleteSet);
}
}
}
Loading

0 comments on commit e002cde

Please sign in to comment.