From 69cc7bd34e587c2512b3c071f03b6c57421c0772 Mon Sep 17 00:00:00 2001 From: leonardograf Date: Mon, 19 Aug 2024 18:02:20 +0200 Subject: [PATCH 1/3] Improve error message to identify exact problem on enlist failure. --- .../com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java index bcb0b12f..5ec7cc3e 100644 --- a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java +++ b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java @@ -97,7 +97,7 @@ private boolean enlist() throws AtomikosSQLException { } else if (state == TxState.MARKED_ABORT){ AtomikosSQLException.throwAtomikosSQLException("The transaction has been set to rollback-only"); } else { - AtomikosSQLException.throwAtomikosSQLException("The transaction has timed out - try increasing the timeout if needed"); + AtomikosSQLException.throwAtomikosSQLException("The transaction has potentially timed out (state: " + state + ") - try increasing the timeout if needed"); } } else { if (!localTransactionMode) { From e13619746d46a703b650fb38c1b3dbe749574187 Mon Sep 17 00:00:00 2001 From: leonardograf Date: Sun, 8 Sep 2024 11:06:47 +0200 Subject: [PATCH 2/3] allow optional configuration properties with default --- .../icatch/provider/ConfigProperties.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java index 3a61392d..927f1c5b 100644 --- a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java +++ b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java @@ -157,6 +157,22 @@ public String getProperty(String name) { return ret; } + /** Allow optional properties + * + * @param name + * @param defaultValue + * @return + */ + public String getProperty(String name, String defaultValue) { + completeProperties(); + String ret = properties.getProperty(name); + if (ret == null) { + ret = defaultValue; + } + ret = ret.trim(); + return ret; + } + public void setProperty(String name, String value) { properties.setProperty(name, value); } From 41d99180fcfc83fba77d55117e93f7e6f08c85f5 Mon Sep 17 00:00:00 2001 From: leonardograf Date: Sun, 8 Sep 2024 11:25:31 +0200 Subject: [PATCH 3/3] disk-force-bundling optimization option for FileSystemRepository --- .../recovery/fs/CachedRepository.java | 36 ++- .../recovery/fs/FileSystemRepository.java | 257 ++++++++++++++++-- .../recovery/fs/InMemoryRepository.java | 4 +- .../com/atomikos/recovery/fs/Repository.java | 6 +- 4 files changed, 273 insertions(+), 30 deletions(-) diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java index f6d43b01..ba0c7f18 100644 --- a/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java +++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/CachedRepository.java @@ -11,6 +11,8 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.atomikos.icatch.config.Configuration; import com.atomikos.icatch.provider.ConfigProperties; @@ -33,6 +35,11 @@ public class CachedRepository implements Repository { private volatile long numberOfPutsSinceLastCheckpoint = 0; private long checkpointInterval; private long forgetOrphanedLogEntriesDelay; + + public static final String DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS = "com.atomikos.icatch.disk_force_bunding_max_coordination_wait_time_ms"; + // zero means wait as long as it takes + private long diskForceBundlingMaxCoordinationWaitTimeMs = 0; + public CachedRepository( InMemoryRepository inMemoryCoordinatorLogEntryRepository, Repository backupCoordinatorLogEntryRepository) { @@ -47,6 +54,9 @@ public void init() { ConfigProperties configProperties = Configuration.getConfigProperties(); checkpointInterval = configProperties.getCheckpointInterval(); forgetOrphanedLogEntriesDelay = configProperties.getForgetOrphanedLogEntriesDelay(); + + diskForceBundlingMaxCoordinationWaitTimeMs = Long.parseLong(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_COORDINATION_WAIT_TIME_MS, "0")); + LOGGER.logDebug("diskForceBundlingMaxCoordinationWaitTimeMs " + diskForceBundlingMaxCoordinationWaitTimeMs); try { Collection coordinatorLogEntries = backupCoordinatorLogEntryRepository.getAllCoordinatorLogEntries(); @@ -63,19 +73,39 @@ public void init() { } @Override - public synchronized void put(String id, PendingTransactionRecord coordinatorLogEntry) + public CountDownLatch put(String id, PendingTransactionRecord coordinatorLogEntry) throws IllegalArgumentException, LogWriteException { try { + CountDownLatch cdl; + synchronized (this) { if(needsCheckpoint()){ performCheckpoint(); } - backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry); + cdl = backupCoordinatorLogEntryRepository.put(id, coordinatorLogEntry); inMemoryCoordinatorLogEntryRepository.put(id, coordinatorLogEntry); numberOfPutsSinceLastCheckpoint++; + } + // If there is a latch returned, we are running in disk-force-bundling mode, so wait for the disk-force-bundling thread + // to signal disk-force has occured. The waiting is done outside of the synchronized block, otherwise no bundling would be + // possible in the first place. + if (cdl != null) { + if (diskForceBundlingMaxCoordinationWaitTimeMs > 0) { + boolean completed = cdl.await(diskForceBundlingMaxCoordinationWaitTimeMs, TimeUnit.MILLISECONDS); + if (!completed) { + LOGGER.logWarning("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism."); + throw new IllegalStateException("Disk force coordination time expired without completion for " + id + ", throwing exception. Another try will be done via checkpoint mechanism."); + } + } + else { + cdl.await(); + } + } } catch (Exception e) { - performCheckpoint(); + LOGGER.logDebug("Issue occurred during write put, trying checkpoint.", e); + performCheckpoint(); } + return null; } private synchronized void performCheckpoint() throws LogWriteException { diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java index ea265d19..2242cf84 100644 --- a/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java +++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/FileSystemRepository.java @@ -18,10 +18,18 @@ import java.io.StreamCorruptedException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.atomikos.icatch.config.Configuration; import com.atomikos.icatch.provider.ConfigProperties; @@ -38,8 +46,36 @@ public class FileSystemRepository implements Repository { private static final Logger LOGGER = LoggerFactory.createLogger(FileSystemRepository.class); private VersionedFile file; - private FileChannel rwChannel = null; + + // make volatile to allow, for the DISK_FORCE_BUNDING_ENABLED=true case, the null-check in initChannelIfNecessary + // to work properly without the need to acquire + private volatile FileChannel rwChannel = null; private LogFileLock lock_; + + // Enable/disable disk-force-bundling mode optimization. + // Ffor heavy concurrent load this can significantly improve transaction throughput by bundling the disk force + // of several transactions. Depending on the settings of the tuning options below this might in turn increase + // individual transaction latency - test and tune, your mileage may vary. + public static final String DISK_FORCE_BUNDLING_ENABLED_PROPERTY = "com.atomikos.icatch.disk_force_bunding_enabled"; + private boolean diskForceBundlingEnabled = false; + // The maximum number of items to collect in a single batch. + // Note that it makes no sense to make this number larger than the maximum number of concurrent threads, if you + // do this and make the max_wait_time larger than 0 you will actually decrease commit throughput and increase latency. + public static final String DISK_FORCE_BUNDLING_MAX_BUNDLE_SIZE_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_bundle_size"; + private int diskForceBundlingMaxBundleSize = 20; + // set this to non-zero to reduce load on the disk and tune your load profile + public static final String DISK_FORCE_BUNDLING_MAX_BUNDLING_WAIT_TIME_MS_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_bundling_wait_time_ms"; + private long diskForceBundlingMaxBundlingWaitTimeMs = 0; + // size limit of the disk-force-bunding queue + public static final String DISK_FORCE_BUNDLING_QUEUE_MAX_SIZE_PROPERTY = "com.atomikos.icatch.disk_force_bunding_max_queue_size"; + private int diskForceBundlingMaxQueueSize = 1000; + + // the queue to put ByteBuffer write requests into + private final BlockingQueue diskForceBundlingQueue = new ArrayBlockingQueue<>(diskForceBundlingMaxQueueSize); + + // This read/write lock is used a bit atypically, the read-locks are used for the write & force operations, + // the write lock for the renewal/checkpointing operations of the rwChannel. + private final ReadWriteLock diskForceBundlingRwChannelLock = new ReentrantReadWriteLock(); @Override public void init() throws LogException { @@ -48,45 +84,148 @@ public void init() throws LogException { String baseName = configProperties.getLogBaseName(); LOGGER.logDebug("baseDir " + baseDir); LOGGER.logDebug("baseName " + baseName); + + diskForceBundlingEnabled = Boolean.parseBoolean(configProperties.getProperty(DISK_FORCE_BUNDLING_ENABLED_PROPERTY, "false")); + diskForceBundlingMaxBundleSize = Integer.parseInt(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_BUNDLE_SIZE_PROPERTY, "20")); + diskForceBundlingMaxBundlingWaitTimeMs = Long.parseLong(configProperties.getProperty(DISK_FORCE_BUNDLING_MAX_BUNDLING_WAIT_TIME_MS_PROPERTY, "0")); + diskForceBundlingMaxQueueSize = Integer.parseInt(configProperties.getProperty(DISK_FORCE_BUNDLING_QUEUE_MAX_SIZE_PROPERTY, "1000")); + + LOGGER.logDebug("diskForceBundlingEnabled " + diskForceBundlingEnabled); + LOGGER.logDebug("diskForceBundlingMaxBundleSize " + diskForceBundlingMaxBundleSize); + LOGGER.logDebug("diskForceBundlingMaxBundlingWaitTimeMs " + diskForceBundlingMaxBundlingWaitTimeMs); + LOGGER.logDebug("diskForceBundlingMaxQueueSize " + diskForceBundlingMaxQueueSize); + lock_ = new LogFileLock(baseDir, baseName); LOGGER.logDebug("LogFileLock " + lock_); lock_.acquireLock(); file = new VersionedFile(baseDir, baseName, ".log"); - + + // if disk-force-bundling is enabled, start the bundling thread + if (diskForceBundlingEnabled) { + Thread t = new Thread(new Runnable() { + public void run() { + int totalCount = 0; + while (true) { + try { + // start with one due to the first poll + int count = 1; + List holdersProcessed = new ArrayList<>(diskForceBundlingMaxBundleSize); + BufferHolder bh = diskForceBundlingQueue.take();//poll(1, TimeUnit.MILLISECONDS); + while (count < diskForceBundlingMaxBundleSize && bh != null) { + count++; + writeToChannel(bh.buff); + holdersProcessed.add(bh); + if (diskForceBundlingMaxBundlingWaitTimeMs <= 0) { + // performance tests have shown this to be faster than poll(0, TimeUnit.MILLISECONDS), + // at least on Windows (say, +10-15%), under heavy load + bh = diskForceBundlingQueue.poll(); + } + else { + bh = diskForceBundlingQueue.poll(diskForceBundlingMaxBundlingWaitTimeMs, TimeUnit.MILLISECONDS); + } + } + // the last one might be non-null but the batch-count already reached - don't forget to process that too... + if (bh != null) { + writeToChannel(bh.buff); + holdersProcessed.add(bh); + } + writeForceChannel(false); + for (BufferHolder bhp : holdersProcessed) { + bhp.latch.countDown(); + } + if (LOGGER.isTraceEnabled()) { + totalCount = totalCount + holdersProcessed.size(); + LOGGER.logTrace("TotalCount: " + totalCount + ", last bundle size: " + holdersProcessed.size()); + } + } + catch (InterruptedException e) { + LOGGER.logError("InterruptedException Problem in disk-force-bundling thread! Trying to continue.", e); + // set-back interrupted flag + Thread.currentThread().interrupt(); + } + catch (IOException e) { + LOGGER.logError("IOException Problem in disk-force-bundling thread! Trying to continue.", e); + } + } + } + }, "Disk-Force-Bundle-Thread"); + t.setPriority(10); + t.setDaemon(true); + t.start(); + LOGGER.logInfo("Started Disk-Force-Bundle Thread"); + } + else { + LOGGER.logDebug("Running in classic (Non-Disk-Force-Bundle) mode"); + } } - + @Override - public void put(String id, PendingTransactionRecord pendingTransactionRecord) + public CountDownLatch put(String id, PendingTransactionRecord pendingTransactionRecord) throws IllegalArgumentException, LogWriteException { try { initChannelIfNecessary(); - write(pendingTransactionRecord, true); + return write(pendingTransactionRecord, true); } catch (IOException e) { throw new LogWriteException(e); } } - private synchronized void initChannelIfNecessary() + private void initChannelIfNecessary() throws FileNotFoundException { - if (rwChannel == null) { + if (diskForceBundlingEnabled) { + if (rwChannel == null) { + try { + diskForceBundlingRwChannelLock.writeLock().lock(); + rwChannel = file.openNewVersionForNioWriting(); + } + finally { + diskForceBundlingRwChannelLock.writeLock().unlock(); + } + } + } + else { + synchronized (this) { + if (rwChannel == null) { rwChannel = file.openNewVersionForNioWriting(); - } + } + } + } } - private void write(PendingTransactionRecord pendingTransactionRecord, + private CountDownLatch write(PendingTransactionRecord pendingTransactionRecord, boolean flushImmediately) throws IOException { String str = pendingTransactionRecord.toRecord(); byte[] buffer = str.getBytes(); ByteBuffer buff = ByteBuffer.wrap(buffer); - writeToFile(buff, flushImmediately); + return writeToFile(buff, flushImmediately); } - private synchronized void writeToFile(ByteBuffer buff, boolean force) + private CountDownLatch writeToFile(ByteBuffer buff, boolean force) throws IOException { - rwChannel.write(buff); - if (force) { - rwChannel.force(false); - } + + if (diskForceBundlingEnabled) { + if (force) { + BufferHolder bh = new BufferHolder(); + bh.buff = buff; + // directly offer without timeout, it is unlikely that the queue becomes full (other mechanisms will become stuck first, + // i. e. threads hanging on the latch.await and timing out there) + diskForceBundlingQueue.offer(bh); + return bh.latch; + } + else { + writeToChannel(buff); + return null; + } + } + else { + synchronized (this) { + rwChannel.write(buff); + if (force) { + rwChannel.force(false); + } + return null; + } + } } @Override @@ -187,14 +326,31 @@ private static void closeSilently(BufferedReader fis) { public void writeCheckpoint(Collection checkpointContent) throws LogWriteException { try { - closeOutput(); - - rwChannel = file.openNewVersionForNioWriting(); - for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) { - write(coordinatorLogEntry, false); - } - rwChannel.force(false); - file.discardBackupVersion(); + if (diskForceBundlingEnabled) { + try { + diskForceBundlingRwChannelLock.writeLock().lock(); + closeOutput(); + rwChannel = file.openNewVersionForNioWriting(); + for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) { + write(coordinatorLogEntry, false); + } + rwChannel.force(false); + file.discardBackupVersion(); + } + finally { + diskForceBundlingRwChannelLock.writeLock().unlock(); + } + } + else { + closeOutput(); + + rwChannel = file.openNewVersionForNioWriting(); + for (PendingTransactionRecord coordinatorLogEntry : checkpointContent) { + write(coordinatorLogEntry, false); + } + rwChannel.force(false); + file.discardBackupVersion(); + } } catch (FileNotFoundException firstStart) { // the file could not be opened for reading; // merely return the default empty vector @@ -216,14 +372,65 @@ protected void closeOutput() throws IllegalStateException { @Override public void close() { + if (diskForceBundlingEnabled) { try { + diskForceBundlingRwChannelLock.writeLock().lock(); + closeOutput(); + } catch (Exception e) { + LOGGER.logWarning("Error closing file - ignoring", e); + } finally { + diskForceBundlingRwChannelLock.writeLock().unlock(); + lock_.releaseLock(); + } + } + else { + try { closeOutput(); } catch (Exception e) { LOGGER.logWarning("Error closing file - ignoring", e); } finally { lock_.releaseLock(); } - + } } - + + /** Helper method to write to channel with proper locking for disk-force-budling + * + * @param buff + * @throws IOException + */ + private void writeToChannel(ByteBuffer buff) throws IOException { + try { + diskForceBundlingRwChannelLock.readLock().lock(); + rwChannel.write(buff); + } + finally { + diskForceBundlingRwChannelLock.readLock().unlock(); + } + } + + /** Helper method to disk-force channel with proper locking for disk-force-budling + * + * @param forceMeta + * @throws IOException + */ + private void writeForceChannel(boolean forceMeta) throws IOException { + try { + diskForceBundlingRwChannelLock.readLock().lock(); + rwChannel.force(forceMeta); + } + finally { + diskForceBundlingRwChannelLock.readLock().unlock(); + } + } + + /** Simple helper class to transfer ByteBuffer and countdown latch + * from original/transaction thread to disk-force thread + */ + private static class BufferHolder { + private ByteBuffer buff; + // coordination latch, once it's down the original/transaction thread + // knows the buffer has been disk-forced + private final CountDownLatch latch = new CountDownLatch(1); + } } diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java index 8bfb6c01..2fb91c03 100644 --- a/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java +++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/InMemoryRepository.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import com.atomikos.recovery.PendingTransactionRecord; import com.atomikos.recovery.TxState; @@ -30,7 +31,7 @@ public void init() { @Override - public synchronized void put(String id, PendingTransactionRecord coordinatorLogEntry) + public synchronized CountDownLatch put(String id, PendingTransactionRecord coordinatorLogEntry) throws IllegalArgumentException { PendingTransactionRecord existing = storage.get(id); if (existing != null && existing == coordinatorLogEntry) { @@ -41,6 +42,7 @@ public synchronized void put(String id, PendingTransactionRecord coordinatorLogE } else { storage.put(id, coordinatorLogEntry); } + return null; } @Override diff --git a/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java b/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java index 252cb244..64107770 100644 --- a/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java +++ b/public/transactions/src/main/java/com/atomikos/recovery/fs/Repository.java @@ -9,6 +9,7 @@ package com.atomikos.recovery.fs; import java.util.Collection; +import java.util.concurrent.CountDownLatch; import com.atomikos.recovery.LogException; import com.atomikos.recovery.LogReadException; @@ -19,7 +20,10 @@ public interface Repository { void init() throws LogException; - void put(String id,PendingTransactionRecord pendingTransactionRecord) throws LogWriteException; + // Returns the disk-force-bundling coordination latch for the case that disk-force-bundling is enabled. + // The latch is necessary upstream because of the CachedRepository wrapping the FileSystemRepository and the CachedRepository + // doing it's own synchronization. + CountDownLatch put(String id,PendingTransactionRecord pendingTransactionRecord) throws LogWriteException; PendingTransactionRecord get(String coordinatorId) throws LogReadException;