From c15078f6c83ef4354f7cd522a65ef44d102f4f3f Mon Sep 17 00:00:00 2001 From: jlinn Date: Sun, 4 Dec 2016 20:36:29 -0800 Subject: [PATCH] Fix handling of jobs which disallow concurrent execution --- README.md | 2 +- changelog.md | 3 + pom.xml | 14 +- .../quartz/jobstore/AbstractRedisStorage.java | 18 ++- .../joelinn/quartz/jobstore/RedisStorage.java | 47 +++--- ...tionTest.java => BaseIntegrationTest.java} | 137 ++++++------------ .../quartz/MultiThreadedIntegrationTest.java | 105 ++++++++++++++ .../quartz/SingleThreadedIntegrationTest.java | 38 +++++ src/test/resources/logback-test.xml | 12 ++ 9 files changed, 258 insertions(+), 118 deletions(-) rename src/test/java/net/joelinn/quartz/{RedisJobStoreIntegrationTest.java => BaseIntegrationTest.java} (52%) create mode 100644 src/test/java/net/joelinn/quartz/MultiThreadedIntegrationTest.java create mode 100644 src/test/java/net/joelinn/quartz/SingleThreadedIntegrationTest.java create mode 100644 src/test/resources/logback-test.xml diff --git a/README.md b/README.md index f12a254..9a9aec6 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Maven dependency: net.joelinn quartz-redis-jobstore - 1.1.6 + 1.1.7 ``` diff --git a/changelog.md b/changelog.md index 8abf7ba..de48a32 100644 --- a/changelog.md +++ b/changelog.md @@ -1,4 +1,7 @@ # Changelog +### 2016-12-04 +* Fixed handling of jobs marked with `@DisallowConcurrentExecution`. + ### 2016-10-30 * Fix serialization of HolidayCalendar diff --git a/pom.xml b/pom.xml index bcd4b5d..1a2d838 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ -Xdoclint:none 2.2.1 2.6.1 + 1.1.7 @@ -140,9 +141,16 @@ - org.slf4j - slf4j-simple - 1.7.7 + ch.qos.logback + logback-classic + ${logback.version} + test + + + + ch.qos.logback + logback-core + ${logback.version} test diff --git a/src/main/java/net/joelinn/quartz/jobstore/AbstractRedisStorage.java b/src/main/java/net/joelinn/quartz/jobstore/AbstractRedisStorage.java index 57557d6..4320d1b 100644 --- a/src/main/java/net/joelinn/quartz/jobstore/AbstractRedisStorage.java +++ b/src/main/java/net/joelinn/quartz/jobstore/AbstractRedisStorage.java @@ -733,7 +733,9 @@ public List acquireNextTriggers(long noLaterThan, int maxCount, for (Tuple triggerTuple : jedis.zrangeByScoreWithScores(redisSchema.triggerStateKey(RedisTriggerState.WAITING), 0, (double) (noLaterThan + timeWindow), 0, maxCount)) { OperableTrigger trigger = retrieveTrigger(redisSchema.triggerKey(triggerTuple.getElement()), jedis); if(applyMisfire(trigger, jedis)){ - logger.debug("misfired trigger: " + triggerTuple.getElement()); + if (logger.isDebugEnabled()) { + logger.debug("misfired trigger: " + triggerTuple.getElement()); + } retry = true; break; } @@ -745,11 +747,19 @@ public List acquireNextTriggers(long noLaterThan, int maxCount, final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey()); JobDetail job = retrieveJob(trigger.getJobKey(), jedis); if(job != null && isJobConcurrentExecutionDisallowed(job.getJobClass())){ - if(acquiredJobHashKeysForNoConcurrentExec.contains(jobHashKey)){ + if (logger.isTraceEnabled()) { + logger.trace("Attempting to acquire job " + job.getKey() + " with concurrent execution disallowed."); + } + if (acquiredJobHashKeysForNoConcurrentExec.contains(jobHashKey)) { // a trigger is already acquired for this job + if (logger.isTraceEnabled()) { + logger.trace("Job " + job.getKey() + " with concurrent execution disallowed already acquired."); + } continue; - } - else{ + } else { + if (logger.isTraceEnabled()) { + logger.trace("Job " + job.getKey() + " with concurrent execution disallowed not yet acquired. Acquiring."); + } acquiredJobHashKeysForNoConcurrentExec.add(jobHashKey); } } diff --git a/src/main/java/net/joelinn/quartz/jobstore/RedisStorage.java b/src/main/java/net/joelinn/quartz/jobstore/RedisStorage.java index fec7ab9..832b745 100644 --- a/src/main/java/net/joelinn/quartz/jobstore/RedisStorage.java +++ b/src/main/java/net/joelinn/quartz/jobstore/RedisStorage.java @@ -3,8 +3,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import org.quartz.*; import org.quartz.Calendar; +import org.quartz.*; import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.StringMatcher; import org.quartz.spi.OperableTrigger; @@ -223,18 +223,20 @@ public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, Jedis Response triggerPausedResponse = pipe.sismember(redisSchema.pausedTriggerGroupsSet(), triggerGroupSetKey); Response jobPausedResponse = pipe.sismember(redisSchema.pausedJobGroupsSet(), redisSchema.jobGroupSetKey(trigger.getJobKey())); pipe.sync(); - if(triggerPausedResponse.get() || jobPausedResponse.get()){ - final long nextFireTime = trigger.getNextFireTime() != null ? trigger.getNextFireTime().getTime() : -1; - final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey()); - if(jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)){ + final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey()); + final long nextFireTime = trigger.getNextFireTime() != null ? trigger.getNextFireTime().getTime() : -1; + if (triggerPausedResponse.get() || jobPausedResponse.get()){ + if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) { setTriggerState(RedisTriggerState.PAUSED_BLOCKED, (double) nextFireTime, triggerHashKey, jedis); - } - else{ + } else { setTriggerState(RedisTriggerState.PAUSED, (double) nextFireTime, triggerHashKey, jedis); } - } - else if(trigger.getNextFireTime() != null){ - setTriggerState(RedisTriggerState.WAITING, (double) trigger.getNextFireTime().getTime(), triggerHashKey, jedis); + } else if(trigger.getNextFireTime() != null){ + if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) { + setTriggerState(RedisTriggerState.BLOCKED, nextFireTime, triggerHashKey, jedis); + } else { + setTriggerState(RedisTriggerState.WAITING, (double) trigger.getNextFireTime().getTime(), triggerHashKey, jedis); + } } } @@ -688,21 +690,35 @@ public List triggersFired(List triggers, Je final Date previousFireTime = trigger.getPreviousFireTime(); trigger.triggered(calendar); + // set the trigger state to WAITING + final long nextFireTime = trigger.getNextFireTime().getTime(); + jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime)); + setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis); + JobDetail job = retrieveJob(trigger.getJobKey(), jedis); TriggerFiredBundle triggerFiredBundle = new TriggerFiredBundle(job, trigger, calendar, false, new Date(), previousFireTime, previousFireTime, trigger.getNextFireTime()); // handling jobs for which concurrent execution is disallowed - if(isJobConcurrentExecutionDisallowed(job.getJobClass())){ + if (isJobConcurrentExecutionDisallowed(job.getJobClass())){ + if (logger.isTraceEnabled()) { + logger.trace("Firing trigger " + trigger.getKey() + " for job " + job.getKey() + " for which concurrent execution is disallowed. Adding job to blocked jobs set."); + } final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey()); final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(job.getKey()); for (String nonConcurrentTriggerHashKey : jedis.smembers(jobTriggerSetKey)) { Double score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.WAITING), nonConcurrentTriggerHashKey); if(score != null){ + if (logger.isTraceEnabled()) { + logger.trace("Setting state of trigger " + trigger.getKey() + " for non-concurrent job " + job.getKey() + " to BLOCKED."); + } setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey, jedis); } else{ score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), nonConcurrentTriggerHashKey); if(score != null){ + if (logger.isTraceEnabled()) { + logger.trace("Setting state of trigger " + trigger.getKey() + " for non-concurrent job " + job.getKey() + " to PAUSED_BLOCKED."); + } setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey, jedis); } } @@ -711,16 +727,11 @@ public List triggersFired(List triggers, Je pipe.set(redisSchema.jobBlockedKey(job.getKey()), schedulerInstanceId); pipe.sadd(redisSchema.blockedJobsSet(), jobHashKey); pipe.sync(); - } - - // release the fired trigger - if(trigger.getNextFireTime() != null){ - final long nextFireTime = trigger.getNextFireTime().getTime(); + } else if(trigger.getNextFireTime() != null){ jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime)); logger.debug(String.format("Releasing trigger %s with next fire time %s. Setting state to WAITING.", triggerHashKey, nextFireTime)); setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis); - } - else{ + } else { jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, ""); unsetTriggerState(triggerHashKey, jedis); } diff --git a/src/test/java/net/joelinn/quartz/RedisJobStoreIntegrationTest.java b/src/test/java/net/joelinn/quartz/BaseIntegrationTest.java similarity index 52% rename from src/test/java/net/joelinn/quartz/RedisJobStoreIntegrationTest.java rename to src/test/java/net/joelinn/quartz/BaseIntegrationTest.java index 7ee0263..0aa6b26 100644 --- a/src/test/java/net/joelinn/quartz/RedisJobStoreIntegrationTest.java +++ b/src/test/java/net/joelinn/quartz/BaseIntegrationTest.java @@ -5,11 +5,8 @@ import net.joelinn.quartz.jobstore.RedisJobStore; import org.junit.After; import org.junit.Before; -import org.junit.Test; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.matchers.NameMatcher; -import org.quartz.simpl.PropertySettingJobFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -18,21 +15,21 @@ import redis.embedded.RedisServer; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; -import static net.joelinn.quartz.TestUtils.*; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; +import static net.joelinn.quartz.TestUtils.getPort; /** * @author Joe Linn - * 10/4/2016 + * 12/4/2016 */ -public class RedisJobStoreIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(RedisJobStoreIntegrationTest.class); +public abstract class BaseIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(BaseIntegrationTest.class); + + protected RedisServer redisServer; + protected Scheduler scheduler; + protected Pool jedisPool; - private RedisServer redisServer; - private Scheduler scheduler; - private Pool jedisPool; @Before public void setUp() throws Exception { @@ -45,14 +42,20 @@ public void setUp() throws Exception { jedisPool = new JedisPool(host, port); + + scheduler = new StdSchedulerFactory(schedulerConfig(host, port)).getScheduler(); + scheduler.start(); + } + + + protected Properties schedulerConfig(String host, int port) { Properties config = new Properties(); config.setProperty("org.quartz.jobStore.class", RedisJobStore.class.getName()); config.setProperty("org.quartz.jobStore.host", host); config.setProperty("org.quartz.jobStore.port", String.valueOf(port)); config.setProperty("org.quartz.threadPool.threadCount", "1"); config.setProperty("org.quartz.jobStore.misfireThreshold", "500"); - scheduler = new StdSchedulerFactory(config).getScheduler(); - scheduler.start(); + return config; } @@ -66,80 +69,6 @@ public void tearDown() throws Exception { } - @Test - public void testCompleteListener() throws Exception { - final String jobName = "oneJob"; - JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup"); - - final String triggerName = "trigger1"; - CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", "* * * * * ?"); - - Waiter waiter = new Waiter(); - scheduler.scheduleJob(jobDetail, trigger); - scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName)); - - // wait for CompleteListener.triggerComplete() to be called - waiter.await(1500); - } - - - @Test - public void testMisfireListener() throws Exception { - final String jobName = "oneJob"; - JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup"); - - final String triggerName = "trigger1"; - final String everySecond = "* * * * * ?"; - CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond); - - - JobDetail sleepJob = createJob(SleepJob.class, "sleepJob", "twoGroup"); - CronTrigger sleepTrigger = createCronTrigger("sleepTrigger", "twoGroup", everySecond); - Waiter waiter = new Waiter(); - scheduler.scheduleJob(sleepJob, sleepTrigger); - scheduler.scheduleJob(jobDetail, trigger); - - scheduler.getListenerManager().addTriggerListener(new MisfireListener(waiter), NameMatcher.triggerNameEquals(triggerName)); - - // wait for MisfireListener.triggerMisfired() to be called - waiter.await(2500); - } - - - @Test - public void testTriggerData() throws Exception { - final String jobName = "good"; - JobDetail jobDetail = createJob(DataJob.class, jobName, "goodGroup"); - - final String triggerName = "trigger1"; - final String everySecond = "* * * * * ?"; - CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond); - trigger = trigger.getTriggerBuilder() - .usingJobData("foo", "bar") - .build(); - scheduler.setJobFactory(new RedisJobFactory()); - scheduler.scheduleJob(jobDetail, trigger); - Waiter waiter = new Waiter(); - scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName)); - - // wait for CompleteListener.triggerComplete() to be called - waiter.await(1500); - - try (Jedis jedis = jedisPool.getResource()) { - assertThat(jedis.get("foo"), equalTo("bar")); - } - } - - - private class RedisJobFactory extends PropertySettingJobFactory { - @Override - protected void setBeanProps(Object obj, JobDataMap data) throws SchedulerException { - data.put("jedisPool", jedisPool); - super.setBeanProps(obj, data); - } - } - - public static class DataJob implements Job { private Pool jedisPool; @@ -173,10 +102,34 @@ public void execute(JobExecutionContext context) throws JobExecutionException { } - private class CompleteListener implements TriggerListener { + @DisallowConcurrentExecution + public static class SingletonSleepJob extends SleepJob { + public static final AtomicInteger currentlyExecuting = new AtomicInteger(0); + public static final AtomicInteger concurrentExecutions = new AtomicInteger(0); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + log.info("Starting job: " + context.getJobDetail().getKey() + " due to trigger " + context.getTrigger().getKey()); + if (currentlyExecuting.incrementAndGet() > 1) { + log.error("Concurrent execution detected!!"); + concurrentExecutions.incrementAndGet(); + throw new JobExecutionException("Concurrent execution not allowed!"); + } + try { + Thread.sleep(1000); // add some extra sleep time to ensure that concurrent execution will be attempted + } catch (InterruptedException e) { + throw new JobExecutionException("Interrupted while sleeping.", e); + } + super.execute(context); + currentlyExecuting.decrementAndGet(); + } + } + + + protected class CompleteListener implements TriggerListener { private final Waiter waiter; - private CompleteListener(Waiter waiter) { + protected CompleteListener(Waiter waiter) { this.waiter = waiter; } @@ -207,10 +160,10 @@ public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigge } - private class MisfireListener implements TriggerListener { + protected class MisfireListener implements TriggerListener { private final Waiter waiter; - private MisfireListener(Waiter waiter) { + protected MisfireListener(Waiter waiter) { this.waiter = waiter; } diff --git a/src/test/java/net/joelinn/quartz/MultiThreadedIntegrationTest.java b/src/test/java/net/joelinn/quartz/MultiThreadedIntegrationTest.java new file mode 100644 index 0000000..162519d --- /dev/null +++ b/src/test/java/net/joelinn/quartz/MultiThreadedIntegrationTest.java @@ -0,0 +1,105 @@ +package net.joelinn.quartz; + +import net.jodah.concurrentunit.Waiter; +import org.junit.Test; +import org.quartz.CronTrigger; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.SchedulerException; +import org.quartz.impl.matchers.NameMatcher; +import org.quartz.simpl.PropertySettingJobFactory; +import redis.clients.jedis.Jedis; + +import java.util.Properties; + +import static net.joelinn.quartz.TestUtils.createCronTrigger; +import static net.joelinn.quartz.TestUtils.createJob; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author Joe Linn + * 10/4/2016 + */ +public class MultiThreadedIntegrationTest extends BaseIntegrationTest { + + @Override + protected Properties schedulerConfig(String host, int port) { + Properties config = super.schedulerConfig(host, port); + config.setProperty("org.quartz.threadPool.threadCount", "2"); + return config; + } + + @Test + public void testCompleteListener() throws Exception { + final String jobName = "oneJob"; + JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup"); + + final String triggerName = "trigger1"; + CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", "* * * * * ?"); + + Waiter waiter = new Waiter(); + scheduler.scheduleJob(jobDetail, trigger); + scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName)); + + // wait for CompleteListener.triggerComplete() to be called + waiter.await(1500); + } + + + @Test + public void testTriggerData() throws Exception { + final String jobName = "good"; + JobDetail jobDetail = createJob(DataJob.class, jobName, "goodGroup"); + + final String triggerName = "trigger1"; + final String everySecond = "* * * * * ?"; + CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond); + trigger = trigger.getTriggerBuilder() + .usingJobData("foo", "bar") + .build(); + scheduler.setJobFactory(new RedisJobFactory()); + scheduler.scheduleJob(jobDetail, trigger); + Waiter waiter = new Waiter(); + scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName)); + + // wait for CompleteListener.triggerComplete() to be called + waiter.await(1500); + + try (Jedis jedis = jedisPool.getResource()) { + assertThat(jedis.get("foo"), equalTo("bar")); + } + } + + + @Test + public void testDisallowConcurrent() throws Exception { + JobDetail job1 = createJob(SingletonSleepJob.class, "job1", "group1"); + CronTrigger trigger1 = createCronTrigger("trigger1", "group1", "* * * * * ?"); + CronTrigger trigger2 = createCronTrigger("trigger2", "group2", "* * * * * ?") + .getTriggerBuilder() + .forJob(job1) + .build(); + + Waiter waiter = new Waiter(); + scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(trigger1.getKey().getName())); + scheduler.scheduleJob(job1, trigger1); + //scheduler.scheduleJob(trigger2); + + waiter.await(6000, 2); + + assertThat(SingletonSleepJob.concurrentExecutions.get(), equalTo(0)); + } + + + private class RedisJobFactory extends PropertySettingJobFactory { + @Override + protected void setBeanProps(Object obj, JobDataMap data) throws SchedulerException { + data.put("jedisPool", jedisPool); + super.setBeanProps(obj, data); + } + } + + + +} diff --git a/src/test/java/net/joelinn/quartz/SingleThreadedIntegrationTest.java b/src/test/java/net/joelinn/quartz/SingleThreadedIntegrationTest.java new file mode 100644 index 0000000..0f891a0 --- /dev/null +++ b/src/test/java/net/joelinn/quartz/SingleThreadedIntegrationTest.java @@ -0,0 +1,38 @@ +package net.joelinn.quartz; + +import net.jodah.concurrentunit.Waiter; +import org.junit.Test; +import org.quartz.CronTrigger; +import org.quartz.JobDetail; +import org.quartz.impl.matchers.NameMatcher; + +import static net.joelinn.quartz.TestUtils.createCronTrigger; +import static net.joelinn.quartz.TestUtils.createJob; + +/** + * @author Joe Linn + * 12/4/2016 + */ +public class SingleThreadedIntegrationTest extends BaseIntegrationTest { + @Test + public void testMisfireListener() throws Exception { + final String jobName = "oneJob"; + JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup"); + + final String triggerName = "trigger1"; + final String everySecond = "* * * * * ?"; + CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond); + + + JobDetail sleepJob = createJob(SleepJob.class, "sleepJob", "twoGroup"); + CronTrigger sleepTrigger = createCronTrigger("sleepTrigger", "twoGroup", everySecond); + Waiter waiter = new Waiter(); + scheduler.scheduleJob(sleepJob, sleepTrigger); + scheduler.scheduleJob(jobDetail, trigger); + + scheduler.getListenerManager().addTriggerListener(new MisfireListener(waiter), NameMatcher.triggerNameEquals(triggerName)); + + // wait for MisfireListener.triggerMisfired() to be called + waiter.await(2500); + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 0000000..2cbe2f5 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %C{5}:%L - %msg%n + + + + + + + \ No newline at end of file