diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index e7647f40ff..ef921a1ef9 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -175,6 +175,11 @@ public class ClusterBasedJobCoordinator { private final SystemAdmins systemAdmins; + /** + * Util to close the KafkaCheckpointManager later on + */ + private MetadataResourceUtil metadataResourceUtil; + /** * Internal variable for the instance of {@link JmxServer} */ @@ -259,7 +264,7 @@ public void run() { //create necessary checkpoint and changelog streams, if not created JobModel jobModel = jobModelManager.jobModel(); - MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config); + metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config); metadataResourceUtil.createResources(); // fan out the startpoints @@ -341,6 +346,7 @@ private void onShutDown() { systemAdmins.stop(); shutDowncontainerPlacementRequestAllocatorAndUtils(); containerProcessManager.stop(); + metadataResourceUtil.stop(); metadataStore.close(); } catch (Throwable e) { LOG.error("Exception while stopping cluster based job coordinator", e); diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java index 1050662920..c2081c67e3 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java @@ -59,6 +59,10 @@ public void createResources() { createChangelogStreams(); } + public void stop() { + checkpointManager.stop(); + } + @VisibleForTesting void createChangelogStreams() { ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions); diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 9fb9975ea2..6fb4141517 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -67,6 +67,7 @@ public class PassthroughJobCoordinator implements JobCoordinator { private final String processorId; private final Config config; private final LocationId locationId; + private MetadataResourceUtil metadataResourceUtil; private JobCoordinatorListener coordinatorListener = null; public PassthroughJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { @@ -85,7 +86,7 @@ public void start() { try { jobModel = getJobModel(); // TODO metrics registry has been null here for a while; is it safe? - MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, null, config); + metadataResourceUtil = new MetadataResourceUtil(jobModel, null, config); metadataResourceUtil.createResources(); } catch (Exception e) { LOGGER.error("Exception while trying to getJobModel.", e); @@ -111,6 +112,7 @@ public void stop() { coordinatorListener.onJobModelExpired(); coordinatorListener.onCoordinatorStop(); } + metadataResourceUtil.stop(); } @Override diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index feaabba34b..8175ffa73e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -114,6 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator { private JobModel newJobModel; private boolean hasLoadedMetadataResources = false; private String cachedJobModelVersion = null; + private MetadataResourceUtil metadataResourceUtil; @VisibleForTesting ZkSessionMetrics zkSessionMetrics; @@ -312,7 +313,7 @@ JobModel readJobModelFromMetadataStore(String zkJobModelVersion) { @VisibleForTesting void loadMetadataResources(JobModel jobModel) { try { - MetadataResourceUtil metadataResourceUtil = createMetadataResourceUtil(jobModel, config); + metadataResourceUtil = createMetadataResourceUtil(jobModel, config); metadataResourceUtil.createResources(); if (coordinatorStreamStore != null) { diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala index 9e0285254e..dd606d1943 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala @@ -21,7 +21,7 @@ package org.apache.samza.job.local import java.util.concurrent.CountDownLatch -import org.apache.samza.coordinator.JobModelManager +import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil} import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish} import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob} @@ -45,7 +45,8 @@ object ProcessJob { class ProcessJob( commandBuilder: CommandBuilder, val jobModelManager: JobModelManager, - val coordinatorStreamStore: CoordinatorStreamStore) extends StreamJob with Logging { + val coordinatorStreamStore: CoordinatorStreamStore, + val metadataResourceUtil: MetadataResourceUtil) extends StreamJob with Logging { import ProcessJob._ @@ -76,6 +77,7 @@ class ProcessJob( case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage)) } finally { jobModelManager.stop + metadataResourceUtil.stop coordinatorStreamStore.close setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish) } diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index fab8c6ee69..c4f40e03e3 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -113,6 +113,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging { .setId("0") .setUrl(jobModelManager.server.getUrl) - new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore) + new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore, metadataResourceUtil) } } diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index c1a36837f5..e70510851e 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -158,6 +158,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { threadJob } finally { jobModelManager.stop + metadataResourceUtil.stop if (jmxServer != null) { jmxServer.stop } diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala index d4f4d41fed..70a0194763 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala @@ -21,7 +21,7 @@ package org.apache.samza.job.local import com.google.common.collect.ImmutableMap import org.apache.samza.config.MapConfig -import org.apache.samza.coordinator.JobModelManager +import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil} import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish} @@ -59,7 +59,8 @@ object TestProcessJob { new ProcessJob( commandBuilder, new MockJobModelManager, - new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin)) + new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin), + new MockMetadataResourceUtil) } private def getMockJobModelManager(processJob: ProcessJob): MockJobModelManager = { @@ -69,6 +70,10 @@ object TestProcessJob { private def getMockCoordinatorStreamStore(processJob: ProcessJob): MockCoordinateStreamStore = { processJob.coordinatorStreamStore.asInstanceOf[MockCoordinateStreamStore] } + + private def getMockMetadataResourceUtil(processJob: ProcessJob): MockMetadataResourceUtil = { + processJob.metadataResourceUtil.asInstanceOf[MockMetadataResourceUtil] + } } class TestProcessJob { @@ -84,6 +89,7 @@ class TestProcessJob { assertEquals(SuccessfulFinish, status) assertTrue(getMockJobModelManager(processJob).stopped) assertTrue(getMockCoordinatorStreamStore(processJob).closed) + assertTrue(getMockMetadataResourceUtil(processJob).stopped) } @Test @@ -95,6 +101,7 @@ class TestProcessJob { assertEquals(UnsuccessfulFinish, status) assertTrue(getMockJobModelManager(processJob).stopped) assertTrue(getMockCoordinatorStreamStore(processJob).closed) + assertTrue(getMockMetadataResourceUtil(processJob).stopped) } @Test @@ -116,6 +123,7 @@ class TestProcessJob { assertEquals(UnsuccessfulFinish, processJob.getStatus) assertTrue(getMockJobModelManager(processJob).stopped) assertTrue(getMockCoordinatorStreamStore(processJob).closed) + assertTrue(getMockMetadataResourceUtil(processJob).stopped) } @Test @@ -127,6 +135,7 @@ class TestProcessJob { assertEquals(UnsuccessfulFinish, processJob.getStatus) assertTrue(getMockJobModelManager(processJob).stopped) assertTrue(getMockCoordinatorStreamStore(processJob).closed) + assertTrue(getMockMetadataResourceUtil(processJob).stopped) } @Test @@ -138,6 +147,7 @@ class TestProcessJob { assertEquals(SuccessfulFinish, processJob.getStatus) assertTrue(getMockJobModelManager(processJob).stopped) assertTrue(getMockCoordinatorStreamStore(processJob).closed) + assertTrue(getMockMetadataResourceUtil(processJob).stopped) } @Test @@ -186,4 +196,14 @@ class MockCoordinateStreamStore( override def close: Unit = { closed = true } -} \ No newline at end of file +} + +class MockMetadataResourceUtil extends MetadataResourceUtil(null, null, null) { + var stopped: Boolean = false + + override def createResources: Unit = {} + + override def stop: Unit = { + stopped = true + } +}