-
Notifications
You must be signed in to change notification settings - Fork 1
Home
Nadeem Mohammad edited this page Sep 4, 2016
·
11 revisions
To distribute the execution of tasks using infinispan, to change the execution engine to InfinispanExecutionEngine, to do that follow this steps
<dependency>
<groupId>com.github.dexecutor</groupId>
<artifactId>dexecutor-infinispan</artifactId>
<version>0.0.1</version>
</dependency>
Following code shows the example in action
@Test
public void testDistrutedExecutorService() {
DefaultCacheManager cacheManager = buildCacheManager();
// Create a distributed executor service using the distributed cache to determine the nodes on which to run
DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cacheManager.getCache());
try {
DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);
buildGraph(dexecutor);
printGraph(dexecutor);
dexecutor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
System.out.println("*** Done ***");
} finally {
try {
// Shuts down the cache manager and all associated resources
cacheManager.stop();
distributedExecutorService.shutdownNow();
distributedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
private void buildGraph(DefaultDependentTasksExecutor<Integer, Integer> dexecutor) {
dexecutor.addDependency(1, 2);
dexecutor.addDependency(1, 2);
dexecutor.addDependency(1, 3);
dexecutor.addDependency(3, 4);
dexecutor.addDependency(3, 5);
dexecutor.addDependency(3, 6);
// executor.addDependency(10, 2); // cycle
dexecutor.addDependency(2, 7);
dexecutor.addDependency(2, 9);
dexecutor.addDependency(2, 8);
dexecutor.addDependency(9, 10);
dexecutor.addDependency(12, 13);
dexecutor.addDependency(13, 4);
dexecutor.addDependency(13, 14);
dexecutor.addIndependent(11);
}
private void printGraph(DefaultDependentTasksExecutor<Integer, Integer> dexecutor) {
StringWriter writer = new StringWriter();
dexecutor.print(writer);
System.out.println(writer);
}
private DefaultCacheManager buildCacheManager() {
// Setup up a clustered cache manager
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
// Make the default cache a distributed one
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.DIST_SYNC);
// Initialize the cache manager
DefaultCacheManager cacheManager = new DefaultCacheManager(global.build(), builder.build());
// Obtain the default cache
return cacheManager;
}
private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(DistributedExecutorService executorService) {
DependentTasksExecutorConfig<Integer, Integer> config = new DependentTasksExecutorConfig<Integer, Integer>(new InfinispanExecutionEngine<Integer, Integer>(executorService), new SleepyTaskProvider());
return new DefaultDependentTasksExecutor<Integer, Integer>(config);
}
private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
public Task<Integer, Integer> provid(final Integer id) {
return new Task<Integer, Integer>() {
public Integer execute() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return id;
}
};
}
}