Skip to content
Nadeem Mohammad edited this page Sep 4, 2016 · 11 revisions

To distribute the execution of tasks using infinispan, to change the execution engine to InfinispanExecutionEngine, to do that follow this steps

Step 1: Add dexecutor-infinispan dependency

  <dependency>
	<groupId>com.github.dexecutor</groupId>
	<artifactId>dexecutor-infinispan</artifactId>
	<version>0.0.1</version>
  </dependency>

Step 2: Build the cache manager

Step 3: Create the distributed Executor service using the cache manager

Step 4: Use that Distributed Executor Service to construct Distributed execution engine.

Following code shows the example in action

@Test
public void testDistrutedExecutorService() {

	DefaultCacheManager cacheManager = buildCacheManager();
	
	// Create a distributed executor service using the distributed cache to determine the nodes on which to run
	DefaultExecutorService distributedExecutorService = new DefaultExecutorService(cacheManager.getCache());

	try {
		DefaultDependentTasksExecutor<Integer, Integer> dexecutor = newTaskExecutor(distributedExecutorService);

		buildGraph(dexecutor);
		printGraph(dexecutor);
		dexecutor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
		System.out.println("*** Done ***");
	} finally {
		try {
			// Shuts down the cache manager and all associated resources
			cacheManager.stop();
			distributedExecutorService.shutdownNow();
			distributedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
		} catch (InterruptedException e) {

		}
	}
}

private void buildGraph(DefaultDependentTasksExecutor<Integer, Integer> dexecutor) {
	dexecutor.addDependency(1, 2);
	dexecutor.addDependency(1, 2);
	dexecutor.addDependency(1, 3);
	dexecutor.addDependency(3, 4);
	dexecutor.addDependency(3, 5);
	dexecutor.addDependency(3, 6);
	// executor.addDependency(10, 2); // cycle
	dexecutor.addDependency(2, 7);
	dexecutor.addDependency(2, 9);
	dexecutor.addDependency(2, 8);
	dexecutor.addDependency(9, 10);
	dexecutor.addDependency(12, 13);
	dexecutor.addDependency(13, 4);
	dexecutor.addDependency(13, 14);
	dexecutor.addIndependent(11);
}

private void printGraph(DefaultDependentTasksExecutor<Integer, Integer> dexecutor) {
	StringWriter writer = new StringWriter();
	dexecutor.print(writer);
	System.out.println(writer);
}

private DefaultCacheManager buildCacheManager() {
	// Setup up a clustered cache manager
	GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
	// Make the default cache a distributed one
	ConfigurationBuilder builder = new ConfigurationBuilder();
	builder.clustering().cacheMode(CacheMode.DIST_SYNC);
	// Initialize the cache manager
	DefaultCacheManager cacheManager = new DefaultCacheManager(global.build(), builder.build());
	// Obtain the default cache
	return cacheManager;
}

private DefaultDependentTasksExecutor<Integer, Integer> newTaskExecutor(DistributedExecutorService executorService) {
	DependentTasksExecutorConfig<Integer, Integer> config  = new DependentTasksExecutorConfig<Integer, Integer>(new InfinispanExecutionEngine<Integer, Integer>(executorService), new SleepyTaskProvider());
	return new DefaultDependentTasksExecutor<Integer, Integer>(config);
}

private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

	public Task<Integer, Integer> provid(final Integer id) {

		return new Task<Integer, Integer>() {

			public Integer execute() {
				try {
					Thread.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return id;
			}
		};
	}
}
Clone this wiki locally