Skip to content

Commit

Permalink
Merge pull request #58 from feedzai/bf-remove-deadlock-AbstractBatch
Browse files Browse the repository at this point in the history
ref PULSEDEV-18139 pdb: Fix incorrect shutdown in AbstractBatch
  • Loading branch information
henriquevcosta authored Apr 24, 2017
2 parents 75a9298 + c49daeb commit 6fd034a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public abstract class AbstractBatch implements Runnable {
/**
* Timestamp of the last flush.
*/
protected long lastFlush;
protected volatile long lastFlush;
/**
* EntityEntry buffer.
*/
Expand Down Expand Up @@ -151,15 +151,23 @@ protected void start() {
/**
* Destroys this batch.
*/
public synchronized void destroy() {
public void destroy() {
logger.trace("{} - Destroy called on Batch", name);
scheduler.shutdownNow();
scheduler.shutdown();

try {
if (!scheduler.awaitTermination(maxAwaitTimeShutdown, TimeUnit.MILLISECONDS)) {
logger.warn("Could not terminate batch within {}", DurationFormatUtils.formatDurationWords(maxAwaitTimeShutdown, true, true));
logger.warn(
"Could not terminate batch within {}. Forcing shutdown.",
DurationFormatUtils.formatDurationWords(
maxAwaitTimeShutdown,
true,
true
)
);
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("Interrupted while waiting.", e);
}
Expand Down Expand Up @@ -302,7 +310,7 @@ public void flush(boolean sync) {
}

@Override
public synchronized void run() {
public void run() {
if (System.currentTimeMillis() - lastFlush >= batchTimeout) {
logger.trace("[{}] Flush timeout occurred", name);
flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void flushFreesConnectionOnFailure() throws DatabaseEngineException {
* @since 2.1.6
*/
@Test
public void testFlushBashSync() throws DatabaseEngineException, InterruptedException {
public void testFlushBatchSync() throws DatabaseEngineException, InterruptedException {
final AtomicInteger transactions = new AtomicInteger();
// mock the begin transaction to force waiting to cause flushes to wait for others.
new MockUp<AbstractDatabaseEngine>() {
Expand Down Expand Up @@ -287,6 +287,41 @@ void beginTransaction(Invocation inv) throws DatabaseEngineRuntimeException {
assertEquals("check that only 1 transaction was really executed", 1, transactions.get());
}

/**
* Tests that there is no race condition between the {@link AbstractBatch#destroy()} and {@link AbstractBatch#run()}
* methods.
* This is a regression test for PULSEDEV-18139, where a race condition was causing the scheduler to attempt to
* call run while it another thread was already inside `destroy` but had not yet called shutdown on the scheduler.
* Since those two methods were synchronized, the `run` would not finish while destroy was waiting for all tasks in
* the Executor to finish.
* For this test to properly work it is critical that the batch is configured to wait more for the scheduler termination
* than the test timeout.
*
* @since 2.1.10
* @throws DatabaseEngineException If the operations on the engine fail.
*/
@Test(timeout = 30000)
public void testBatchRunDestroyRace() throws DatabaseEngineException {
final DbEntity entity = dbEntity()
.name("TEST")
.addColumn("COL1", INT)
.addColumn("COL2", BOOLEAN)
.addColumn("COL3", DOUBLE)
.addColumn("COL4", LONG)
.addColumn("COL5", STRING).build();

engine.addEntity(entity);


for (int i = 0; i < 40; i++) {
// The maxAwaitTimeShutdown parameter must be larger than the test timeout.
final MockedBatch batch = MockedBatch.create(engine, "test", 5, 10L, 50000);
batch.add("TEST", entry().set("COL1", 1).build());

// Call `destroy` which will wait, if the data race occurs, for more than the test timeout
batch.destroy();
}
}

/**
* Create test table.
Expand Down Expand Up @@ -361,6 +396,11 @@ private void checkTestEntry(int idx, Map<String,ResultColumn> row) {
*/
private static class MockedBatch extends AbstractBatch {

/**
* Duration of the sleep in the beginning of the destroy method.
*/
static final long PRE_DESTROY_SLEEP_DURATION = 500L;

private List<BatchEntry> failedEntries = new ArrayList<>();

private MockedBatch(DatabaseEngine de, String name, int batchSize, long batchTimeout, long maxAwaitTimeShutdown) {
Expand All @@ -382,6 +422,12 @@ public void onFlushFailure(BatchEntry[] entries) {
public List<BatchEntry> getFailedEntries() {
return failedEntries;
}

@Override
public synchronized void destroy() {
Uninterruptibles.sleepUninterruptibly(PRE_DESTROY_SLEEP_DURATION, TimeUnit.MILLISECONDS);
super.destroy();
}
}

}

0 comments on commit 6fd034a

Please sign in to comment.