Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error message to identify exact problem on enlist failure. #229

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ public String getProperty(String name) {
return ret;
}

/** Allow optional properties
*
* @param name
* @param defaultValue
* @return
*/
public String getProperty(String name, String defaultValue) {
completeProperties();
String ret = properties.getProperty(name);
if (ret == null) {
ret = defaultValue;
}
ret = ret.trim();
return ret;
}

public void setProperty(String name, String value) {
properties.setProperty(name, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private boolean enlist() throws AtomikosSQLException {
} else if (state == TxState.MARKED_ABORT){
AtomikosSQLException.throwAtomikosSQLException("The transaction has been set to rollback-only");
} else {
AtomikosSQLException.throwAtomikosSQLException("The transaction has timed out - try increasing the timeout if needed");
AtomikosSQLException.throwAtomikosSQLException("The transaction has potentially timed out (state: " + state + ") - try increasing the timeout if needed");
}
} else {
if (!localTransactionMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.atomikos.icatch.config.Configuration;
import com.atomikos.icatch.provider.ConfigProperties;
Expand All @@ -33,6 +35,11 @@ public class CachedRepository implements Repository {
private volatile long numberOfPutsSinceLastCheckpoint = 0;
private long checkpointInterval;
private long forgetOrphanedLogEntriesDelay;

public static final String DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS = "com.atomikos.icatch.disk_force_bunding_max_coordination_wait_time_ms";
// zero means wait as long as it takes
private long diskForceBundlingMaxCoordinationWaitTimeMs = 0;

public CachedRepository(
InMemoryRepository inMemoryCoordinatorLogEntryRepository,
Repository backupCoordinatorLogEntryRepository) {
Expand All @@ -47,6 +54,9 @@ public void init() {
ConfigProperties configProperties = Configuration.getConfigProperties();
checkpointInterval = configProperties.getCheckpointInterval();
forgetOrphanedLogEntriesDelay = configProperties.getForgetOrphanedLogEntriesDelay();

diskForceBundlingMaxCoordinationWaitTimeMs = Long.parseLong(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS, "0"));
LOGGER.logDebug("diskForceBundlingMaxCoordinationWaitTimeMs " + diskForceBundlingMaxCoordinationWaitTimeMs);

try {
Collection<PendingTransactionRecord> coordinatorLogEntries = backupCoordinatorLogEntryRepository.getAllCoordinatorLogEntries();
Expand All @@ -63,19 +73,39 @@ public void init() {
}

@Override
public synchronized void put(String id, PendingTransactionRecord coordinatorLogEntry)
public CountDownLatch put(String id, PendingTransactionRecord coordinatorLogEntry)
throws IllegalArgumentException, LogWriteException {

try {
CountDownLatch cdl;
synchronized (this) {
if(needsCheckpoint()){
performCheckpoint();
}
backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
cdl = backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
inMemoryCoordinatorLogEntryRepository.put(id, coordinatorLogEntry);
numberOfPutsSinceLastCheckpoint++;
}
// If there is a latch returned, we are running in disk-force-bundling mode, so wait for the disk-force-bundling thread
// to signal disk-force has occured. The waiting is done outside of the synchronized block, otherwise no bundling would be
// possible in the first place.
if (cdl != null) {
if (diskForceBundlingMaxCoordinationWaitTimeMs > 0) {
boolean completed = cdl.await(diskForceBundlingMaxCoordinationWaitTimeMs, TimeUnit.MILLISECONDS);
if (!completed) {
LOGGER.logWarning("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism.");
throw new IllegalStateException("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism.");
}
}
else {
cdl.await();
}
}
} catch (Exception e) {
performCheckpoint();
LOGGER.logDebug("Issue occurred during write put, trying checkpoint.", e);
performCheckpoint();
}
return null;
}

private synchronized void performCheckpoint() throws LogWriteException {
Expand Down
Loading