Skip to content

Commit

Permalink
Fix handling of jobs which disallow concurrent execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jlinn committed Dec 5, 2016
1 parent 9d149b6 commit c15078f
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 118 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Maven dependency:
<dependency>
<groupId>net.joelinn</groupId>
<artifactId>quartz-redis-jobstore</artifactId>
<version>1.1.6</version>
<version>1.1.7</version>
</dependency>
```

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
### 2016-12-04
* Fixed handling of jobs marked with `@DisallowConcurrentExecution`.

### 2016-10-30
* Fix serialization of HolidayCalendar

Expand Down
14 changes: 11 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<additionalparam>-Xdoclint:none</additionalparam>
<quartz.version>2.2.1</quartz.version>
<jackson.version>2.6.1</jackson.version>
<logback.version>1.1.7</logback.version>
</properties>

<repositories>
Expand Down Expand Up @@ -140,9 +141,16 @@
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.7</version>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,9 @@ public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount,
for (Tuple triggerTuple : jedis.zrangeByScoreWithScores(redisSchema.triggerStateKey(RedisTriggerState.WAITING), 0, (double) (noLaterThan + timeWindow), 0, maxCount)) {
OperableTrigger trigger = retrieveTrigger(redisSchema.triggerKey(triggerTuple.getElement()), jedis);
if(applyMisfire(trigger, jedis)){
logger.debug("misfired trigger: " + triggerTuple.getElement());
if (logger.isDebugEnabled()) {
logger.debug("misfired trigger: " + triggerTuple.getElement());
}
retry = true;
break;
}
Expand All @@ -745,11 +747,19 @@ public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount,
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
JobDetail job = retrieveJob(trigger.getJobKey(), jedis);
if(job != null && isJobConcurrentExecutionDisallowed(job.getJobClass())){
if(acquiredJobHashKeysForNoConcurrentExec.contains(jobHashKey)){
if (logger.isTraceEnabled()) {
logger.trace("Attempting to acquire job " + job.getKey() + " with concurrent execution disallowed.");
}
if (acquiredJobHashKeysForNoConcurrentExec.contains(jobHashKey)) {
// a trigger is already acquired for this job
if (logger.isTraceEnabled()) {
logger.trace("Job " + job.getKey() + " with concurrent execution disallowed already acquired.");
}
continue;
}
else{
} else {
if (logger.isTraceEnabled()) {
logger.trace("Job " + job.getKey() + " with concurrent execution disallowed not yet acquired. Acquiring.");
}
acquiredJobHashKeysForNoConcurrentExec.add(jobHashKey);
}
}
Expand Down
47 changes: 29 additions & 18 deletions src/main/java/net/joelinn/quartz/jobstore/RedisStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.quartz.*;
import org.quartz.Calendar;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.matchers.StringMatcher;
import org.quartz.spi.OperableTrigger;
Expand Down Expand Up @@ -223,18 +223,20 @@ public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, Jedis
Response<Boolean> triggerPausedResponse = pipe.sismember(redisSchema.pausedTriggerGroupsSet(), triggerGroupSetKey);
Response<Boolean> jobPausedResponse = pipe.sismember(redisSchema.pausedJobGroupsSet(), redisSchema.jobGroupSetKey(trigger.getJobKey()));
pipe.sync();
if(triggerPausedResponse.get() || jobPausedResponse.get()){
final long nextFireTime = trigger.getNextFireTime() != null ? trigger.getNextFireTime().getTime() : -1;
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
if(jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)){
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
final long nextFireTime = trigger.getNextFireTime() != null ? trigger.getNextFireTime().getTime() : -1;
if (triggerPausedResponse.get() || jobPausedResponse.get()){
if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) {
setTriggerState(RedisTriggerState.PAUSED_BLOCKED, (double) nextFireTime, triggerHashKey, jedis);
}
else{
} else {
setTriggerState(RedisTriggerState.PAUSED, (double) nextFireTime, triggerHashKey, jedis);
}
}
else if(trigger.getNextFireTime() != null){
setTriggerState(RedisTriggerState.WAITING, (double) trigger.getNextFireTime().getTime(), triggerHashKey, jedis);
} else if(trigger.getNextFireTime() != null){
if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) {
setTriggerState(RedisTriggerState.BLOCKED, nextFireTime, triggerHashKey, jedis);
} else {
setTriggerState(RedisTriggerState.WAITING, (double) trigger.getNextFireTime().getTime(), triggerHashKey, jedis);
}
}
}

Expand Down Expand Up @@ -688,21 +690,35 @@ public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, Je
final Date previousFireTime = trigger.getPreviousFireTime();
trigger.triggered(calendar);

// set the trigger state to WAITING
final long nextFireTime = trigger.getNextFireTime().getTime();
jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime));
setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis);

JobDetail job = retrieveJob(trigger.getJobKey(), jedis);
TriggerFiredBundle triggerFiredBundle = new TriggerFiredBundle(job, trigger, calendar, false, new Date(), previousFireTime, previousFireTime, trigger.getNextFireTime());

// handling jobs for which concurrent execution is disallowed
if(isJobConcurrentExecutionDisallowed(job.getJobClass())){
if (isJobConcurrentExecutionDisallowed(job.getJobClass())){
if (logger.isTraceEnabled()) {
logger.trace("Firing trigger " + trigger.getKey() + " for job " + job.getKey() + " for which concurrent execution is disallowed. Adding job to blocked jobs set.");
}
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(job.getKey());
for (String nonConcurrentTriggerHashKey : jedis.smembers(jobTriggerSetKey)) {
Double score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.WAITING), nonConcurrentTriggerHashKey);
if(score != null){
if (logger.isTraceEnabled()) {
logger.trace("Setting state of trigger " + trigger.getKey() + " for non-concurrent job " + job.getKey() + " to BLOCKED.");
}
setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey, jedis);
}
else{
score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), nonConcurrentTriggerHashKey);
if(score != null){
if (logger.isTraceEnabled()) {
logger.trace("Setting state of trigger " + trigger.getKey() + " for non-concurrent job " + job.getKey() + " to PAUSED_BLOCKED.");
}
setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey, jedis);
}
}
Expand All @@ -711,16 +727,11 @@ public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, Je
pipe.set(redisSchema.jobBlockedKey(job.getKey()), schedulerInstanceId);
pipe.sadd(redisSchema.blockedJobsSet(), jobHashKey);
pipe.sync();
}

// release the fired trigger
if(trigger.getNextFireTime() != null){
final long nextFireTime = trigger.getNextFireTime().getTime();
} else if(trigger.getNextFireTime() != null){
jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime));
logger.debug(String.format("Releasing trigger %s with next fire time %s. Setting state to WAITING.", triggerHashKey, nextFireTime));
setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis);
}
else{
} else {
jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, "");
unsetTriggerState(triggerHashKey, jedis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
import net.joelinn.quartz.jobstore.RedisJobStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.NameMatcher;
import org.quartz.simpl.PropertySettingJobFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
Expand All @@ -18,21 +15,21 @@
import redis.embedded.RedisServer;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static net.joelinn.quartz.TestUtils.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static net.joelinn.quartz.TestUtils.getPort;

/**
* @author Joe Linn
* 10/4/2016
* 12/4/2016
*/
public class RedisJobStoreIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(RedisJobStoreIntegrationTest.class);
public abstract class BaseIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(BaseIntegrationTest.class);

protected RedisServer redisServer;
protected Scheduler scheduler;
protected Pool<Jedis> jedisPool;

private RedisServer redisServer;
private Scheduler scheduler;
private Pool<Jedis> jedisPool;

@Before
public void setUp() throws Exception {
Expand All @@ -45,14 +42,20 @@ public void setUp() throws Exception {

jedisPool = new JedisPool(host, port);


scheduler = new StdSchedulerFactory(schedulerConfig(host, port)).getScheduler();
scheduler.start();
}


protected Properties schedulerConfig(String host, int port) {
Properties config = new Properties();
config.setProperty("org.quartz.jobStore.class", RedisJobStore.class.getName());
config.setProperty("org.quartz.jobStore.host", host);
config.setProperty("org.quartz.jobStore.port", String.valueOf(port));
config.setProperty("org.quartz.threadPool.threadCount", "1");
config.setProperty("org.quartz.jobStore.misfireThreshold", "500");
scheduler = new StdSchedulerFactory(config).getScheduler();
scheduler.start();
return config;
}


Expand All @@ -66,80 +69,6 @@ public void tearDown() throws Exception {
}


@Test
public void testCompleteListener() throws Exception {
final String jobName = "oneJob";
JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup");

final String triggerName = "trigger1";
CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", "* * * * * ?");

Waiter waiter = new Waiter();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName));

// wait for CompleteListener.triggerComplete() to be called
waiter.await(1500);
}


@Test
public void testMisfireListener() throws Exception {
final String jobName = "oneJob";
JobDetail jobDetail = createJob(TestJob.class, jobName, "oneGroup");

final String triggerName = "trigger1";
final String everySecond = "* * * * * ?";
CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond);


JobDetail sleepJob = createJob(SleepJob.class, "sleepJob", "twoGroup");
CronTrigger sleepTrigger = createCronTrigger("sleepTrigger", "twoGroup", everySecond);
Waiter waiter = new Waiter();
scheduler.scheduleJob(sleepJob, sleepTrigger);
scheduler.scheduleJob(jobDetail, trigger);

scheduler.getListenerManager().addTriggerListener(new MisfireListener(waiter), NameMatcher.triggerNameEquals(triggerName));

// wait for MisfireListener.triggerMisfired() to be called
waiter.await(2500);
}


@Test
public void testTriggerData() throws Exception {
final String jobName = "good";
JobDetail jobDetail = createJob(DataJob.class, jobName, "goodGroup");

final String triggerName = "trigger1";
final String everySecond = "* * * * * ?";
CronTrigger trigger = createCronTrigger(triggerName, "oneGroup", everySecond);
trigger = trigger.getTriggerBuilder()
.usingJobData("foo", "bar")
.build();
scheduler.setJobFactory(new RedisJobFactory());
scheduler.scheduleJob(jobDetail, trigger);
Waiter waiter = new Waiter();
scheduler.getListenerManager().addTriggerListener(new CompleteListener(waiter), NameMatcher.triggerNameEquals(triggerName));

// wait for CompleteListener.triggerComplete() to be called
waiter.await(1500);

try (Jedis jedis = jedisPool.getResource()) {
assertThat(jedis.get("foo"), equalTo("bar"));
}
}


private class RedisJobFactory extends PropertySettingJobFactory {
@Override
protected void setBeanProps(Object obj, JobDataMap data) throws SchedulerException {
data.put("jedisPool", jedisPool);
super.setBeanProps(obj, data);
}
}


public static class DataJob implements Job {
private Pool<Jedis> jedisPool;

Expand Down Expand Up @@ -173,10 +102,34 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}


private class CompleteListener implements TriggerListener {
@DisallowConcurrentExecution
public static class SingletonSleepJob extends SleepJob {
public static final AtomicInteger currentlyExecuting = new AtomicInteger(0);
public static final AtomicInteger concurrentExecutions = new AtomicInteger(0);

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.info("Starting job: " + context.getJobDetail().getKey() + " due to trigger " + context.getTrigger().getKey());
if (currentlyExecuting.incrementAndGet() > 1) {
log.error("Concurrent execution detected!!");
concurrentExecutions.incrementAndGet();
throw new JobExecutionException("Concurrent execution not allowed!");
}
try {
Thread.sleep(1000); // add some extra sleep time to ensure that concurrent execution will be attempted
} catch (InterruptedException e) {
throw new JobExecutionException("Interrupted while sleeping.", e);
}
super.execute(context);
currentlyExecuting.decrementAndGet();
}
}


protected class CompleteListener implements TriggerListener {
private final Waiter waiter;

private CompleteListener(Waiter waiter) {
protected CompleteListener(Waiter waiter) {
this.waiter = waiter;
}

Expand Down Expand Up @@ -207,10 +160,10 @@ public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigge
}


private class MisfireListener implements TriggerListener {
protected class MisfireListener implements TriggerListener {
private final Waiter waiter;

private MisfireListener(Waiter waiter) {
protected MisfireListener(Waiter waiter) {
this.waiter = waiter;
}

Expand Down
Loading

0 comments on commit c15078f

Please sign in to comment.