From 2e91dfeff642f75192f28be33071fcb9ce443bb4 Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Wed, 8 Nov 2023 10:27:55 -0800 Subject: [PATCH 1/5] SAMZA-2795: Set thread to daemon thread for operator executor service (#1690) Description As part of SAMZA-2781, we introduced operator executors to manage operator handoff execution. However, the threads created by the executor service are non-daemon and hence prevent the JVM from shutting down. For context, we don't have a clean way to shutdown the executor due to lack of clean lifecycle management of the factory. Hence shutting down the executor service within TaskInstance is not an option and the fix is to make it daemon threads. Changes Make the threads spawned by the operator executor to be non-daemon Tests None API Changes None Upgrade Instructions None Usage Instructions None --- .../org/apache/samza/task/DefaultTaskExecutorFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java index 5ed5165c1d..5ea0fb3170 100644 --- a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java @@ -68,7 +68,9 @@ public ExecutorService getOperatorExecutor(TaskName taskName, Config config) { } else { LOG.info("Using single threaded thread pool as operator thread pool for task {}", key.getTaskName()); operatorExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("Samza " + key.getTaskName() + " Thread-%d").build()); + new ThreadFactoryBuilder().setNameFormat("Samza " + key.getTaskName() + " Thread-%d") + .setDaemon(true) + .build()); } return operatorExecutor; From 65f31eb6e7da19a39b082635d20f730059aac8cb Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Mon, 20 Nov 2023 14:02:51 -0800 Subject: [PATCH 2/5] SAMZA-2763: Support worker JVM opts for Samza Beam portable mode (#1689) Summary: Support JVM options for worker process in Samza Beam portable mode Description: With portable mode support for Samza Beam, we want to tune and configure the JVM options for worker process. In this PR, we add support by introducing worker.opts configuration and autosizing integration support. Changes: - Added worker.opts configuration - Add autosizing integration support for Xmx - Updated configuration table and website API Changes: None Usage Instructions: worker.opts can be used similar to other samza application configuration although it only applies to Samza Beam portable execution mode and is ignored otherwise. Upgrade Instructions: None --- .../versioned/jobs/configuration-table.html | 15 +++++ .../org/apache/samza/config/JobConfig.java | 1 + .../samza/config/ShellCommandConfig.java | 52 ++++++++++----- .../apache/samza/job/ShellCommandBuilder.java | 1 + .../samza/config/TestShellCommandConfig.java | 63 ++++++++++++++++++- .../samza/job/TestShellCommandBuilder.java | 3 + 6 files changed, 119 insertions(+), 16 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index f4c8d4d7ba..e00c983d8b 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -2048,6 +2048,21 @@

Samza Configuration Reference

+ + worker.opts + + + Any JVM options to include in the command line when executing worker process in portable execution of Samza using beam. For example, + this can be used to set the JVM heap size, to tune the garbage collector, or to enable + remote debugging. + Anything you put in worker.opts gets forwarded directly to the commandline of worker process as part of the JVM invocation. + Note: The configuration only applies for Samza Beam portable mode. +
+
Example: worker.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC
+
+ + + yarn.package.path diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index b9aa82cf2a..3d0b532625 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -158,6 +158,7 @@ public class JobConfig extends MapConfig { public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb"; public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb"; public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores"; + public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb"; public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory"; public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory"; diff --git a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java index 73bcf8ec60..90780c81db 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java @@ -18,7 +18,9 @@ */ package org.apache.samza.config; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; +import org.apache.commons.lang3.StringUtils; public class ShellCommandConfig extends MapConfig { @@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig { public static final String COMMAND_SHELL_EXECUTE = "task.execute"; public static final String TASK_JVM_OPTS = "task.opts"; + public static final String WORKER_JVM_OPTS = "worker.opts"; public static final String TASK_JAVA_HOME = "task.java.home"; /** @@ -97,20 +100,19 @@ public String getCommand() { } public Optional getTaskOpts() { - Optional jvmOpts = Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS)); - Optional maxHeapMbOptional = Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB)); - if (new JobConfig(this).getAutosizingEnabled() && maxHeapMbOptional.isPresent()) { - String maxHeapMb = maxHeapMbOptional.get(); - String xmxSetting = "-Xmx" + maxHeapMb + "m"; - if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) { - jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", xmxSetting)); - } else if (jvmOpts.isPresent()) { - jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting)); - } else { - jvmOpts = Optional.of(xmxSetting); - } - } - return jvmOpts; + String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS); + String autosizingContainerMaxHeap = get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB); + + return Optional.ofNullable(getFinalJvmOptions(taskOpts, autosizingContainerMaxHeap)); + } + + /** + * Returns the worker opts for the application if available. + */ + public Optional getWorkerOpts() { + String autosizingWorkerHeapMb = get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB); + String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS); + return Optional.ofNullable(getFinalJvmOptions(workerOpts, autosizingWorkerHeapMb)); } public Optional getJavaHome() { @@ -120,4 +122,26 @@ public Optional getJavaHome() { public Optional getAdditionalClasspathDir() { return Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR)); } + + /** + * Returns the final JVM options by applying the heap override if available to the jvm opts + */ + @VisibleForTesting + String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) { + String finalJvmOpts = jvmOpts; + if (new JobConfig(this).getAutosizingEnabled() && StringUtils.isNotEmpty(maxHeapOverride)) { + String xmxSetting = "-Xmx" + maxHeapOverride + "m"; + if (StringUtils.isNotBlank(jvmOpts)) { + if (jvmOpts.contains("-Xmx")) { + finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting); + } else { + finalJvmOpts = jvmOpts.concat(" " + xmxSetting); + } + } else { + finalJvmOpts = xmxSetting; + } + } + + return finalJvmOpts; + } } diff --git a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java index 37253442a4..504d9a26f7 100644 --- a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java @@ -44,6 +44,7 @@ public Map buildEnvironment() { envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id); envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString()); envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse("")); + envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, shellCommandConfig.getWorkerOpts().orElse("")); envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, shellCommandConfig.getAdditionalClasspathDir().orElse("")); shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome)); diff --git a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java index 452883d120..d5f8085ba5 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java @@ -22,8 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; public class TestShellCommandConfig { @@ -81,6 +80,66 @@ public void testGetTaskOptsAutosizingEnabled() { assertEquals(Optional.of("-Dproperty=value -Xmx1024m"), shellCommandConfig.getTaskOpts()); } + @Test + public void testGetWorkerOptsAutosizingDisabled() { + ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig( + ImmutableMap.of(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB, + "1024", "worker.opts", "-Xmx10m -Dproperty=value"))); + + String workerOpts = shellCommandConfig.getWorkerOpts() + .orElse(null); + String expectedOpts = "-Xmx10m -Dproperty=value"; + + assertNotNull(workerOpts); + assertEquals(expectedOpts, workerOpts); + } + + @Test + public void testGetWorkerOptsAutosizingEnabled() { + ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig( + ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB, + "1024", "worker.opts", "-Xmx10m -Dproperty=value"))); + + String workerOpts = shellCommandConfig.getWorkerOpts() + .orElse(null); + String expectedOpts = "-Xmx1024m -Dproperty=value"; + + assertNotNull(workerOpts); + assertEquals(expectedOpts, workerOpts); + } + + @Test + public void testGetFinalJvmOptionsAutosizingDisabled() { + ShellCommandConfig shellCommandConfig = + new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false"))); + String jvmOptions = ""; + String expectedJvmOptions = ""; + + // no override passed + assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "")); + + // ignore override since autosizing is disabled + assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048")); + } + + @Test + public void testGetFinalJvmOptionsAutosizingEnabled() { + ShellCommandConfig shellCommandConfig = + new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true"))); + String jvmOptions = "-Xmx1024m"; + String expectedJvmOptions = "-Xmx1024m"; + assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "")); + + // override should take effect with autosizing enabled + expectedJvmOptions = "-Xmx2048m"; + assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048")); + + // override should take effect even if xmx is not set + jvmOptions = "-Dproperty=value"; + expectedJvmOptions = "-Dproperty=value -Xmx2048m"; + assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048")); + } + @Test public void testGetJavaHome() { ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig()); diff --git a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java index ca7be0e4a3..afb6bfeec5 100644 --- a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java @@ -45,6 +45,7 @@ public void testBasicBuild() throws MalformedURLException { ShellCommandConfig.ENV_CONTAINER_ID, "1", ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING, ShellCommandConfig.ENV_JAVA_OPTS, "", + ShellCommandConfig.WORKER_JVM_OPTS, "", ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); // assertions when command path is not set assertEquals("foo", shellCommandBuilder.buildCommand()); @@ -60,6 +61,7 @@ public void testBuildEnvironment() throws MalformedURLException { Config config = new MapConfig(new ImmutableMap.Builder() .put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo") .put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g") + .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g") .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath") .put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home") .build()); @@ -71,6 +73,7 @@ public void testBuildEnvironment() throws MalformedURLException { .put(ShellCommandConfig.ENV_CONTAINER_ID, "1") .put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING) .put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g") + .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g") .put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath") .put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home") .build(); From 66495b677a728ff75a8674b217672cd51aece640 Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Tue, 21 Nov 2023 11:56:08 -0800 Subject: [PATCH 3/5] SAMZA-2796: Introduce config knob for framework thread sub DAG execution (#1691) Description As part of SAMZA-2781, we use framework thread pool to execute hand-offs and sub-DAG execution. We want to add a config knob to enable users opt-in to the feature as opposed to enable it by default. Changes Introduce config knob to use the framework executor Tests Added unit tests Usage Instructions Refer to the configuration documentation. To enable framework thread pool for sub-DAG execution and message hand off, set job.operator.framework.executor.enabled to true --- .../versioned/jobs/configuration-table.html | 10 ++ .../org/apache/samza/config/JobConfig.java | 8 ++ .../samza/operators/impl/OperatorImpl.java | 92 ++++++++------ .../apache/samza/container/TaskInstance.scala | 6 +- .../operators/impl/TestOperatorImpl.java | 112 ++++++++++++++++-- 5 files changed, 175 insertions(+), 53 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index e00c983d8b..390be03761 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -494,6 +494,16 @@

Samza Configuration Reference

+ + job.operator.framework.executor.enabled + false + + If enabled, framework thread pool will be used for message hand off and sub DAG execution. Otherwise, the + execution will fall back to using caller thread or java fork join pool depending on the type of work + chained as part of message hand off. + + + Zookeeper-based job configuration diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java index 3d0b532625..17f527252e 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java @@ -197,6 +197,10 @@ public class JobConfig extends MapConfig { public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor"; public static final int DEFAULT_JOB_ELASTICITY_FACTOR = 1; + public static final String JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED = "job.operator.framework.executor.enabled"; + + public static final boolean DEFAULT_JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED = false; + public JobConfig(Config config) { super(config); } @@ -528,4 +532,8 @@ public int getElasticityFactor() { public String getCoordinatorExecuteCommand() { return get(COORDINATOR_EXECUTE_COMMAND, DEFAULT_COORDINATOR_EXECUTE_COMMAND); } + + public boolean getOperatorFrameworkExecutorEnabled() { + return getBoolean(JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED, DEFAULT_JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED); + } } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 8b477d42db..c870264e91 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -95,6 +97,7 @@ public abstract class OperatorImpl { private ControlMessageSender controlMessageSender; private int elasticityFactor; private ExecutorService operatorExecutor; + private boolean operatorExecutorEnabled; /** * Initialize this {@link OperatorImpl} and its user-defined functions. @@ -136,7 +139,9 @@ public final void init(InternalTaskContext internalTaskContext) { this.taskModel = taskContext.getTaskModel(); this.callbackScheduler = taskContext.getCallbackScheduler(); handleInit(context); - this.elasticityFactor = new JobConfig(config).getElasticityFactor(); + JobConfig jobConfig = new JobConfig(config); + this.elasticityFactor = jobConfig.getElasticityFactor(); + this.operatorExecutorEnabled = jobConfig.getOperatorFrameworkExecutorEnabled(); this.operatorExecutor = context.getTaskContext().getOperatorExecutor(); initialized = true; @@ -192,21 +197,20 @@ public final CompletionStage onMessageAsync(M message, MessageCollector co getOpImplId(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e); } - CompletionStage result = completableResultsFuture.thenComposeAsync(results -> { + CompletionStage result = composeFutureWithExecutor(completableResultsFuture, results -> { long endNs = this.highResClock.nanoTime(); this.handleMessageNs.update(endNs - startNs); return CompletableFuture.allOf(results.stream() - .flatMap(r -> this.registeredOperators.stream() - .map(op -> op.onMessageAsync(r, collector, coordinator))) + .flatMap(r -> this.registeredOperators.stream().map(op -> op.onMessageAsync(r, collector, coordinator))) .toArray(CompletableFuture[]::new)); - }, operatorExecutor); + }); WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn(); if (watermarkFn != null) { // check whether there is new watermark emitted from the user function Long outputWm = watermarkFn.getOutputWatermark(); - return result.thenComposeAsync(ignored -> propagateWatermark(outputWm, collector, coordinator), operatorExecutor); + return composeFutureWithExecutor(result, ignored -> propagateWatermark(outputWm, collector, coordinator)); } return result; @@ -245,11 +249,9 @@ public final CompletionStage onTimer(MessageCollector collector, TaskCoord .map(op -> op.onMessageAsync(r, collector, coordinator))) .toArray(CompletableFuture[]::new)); - return resultFuture.thenComposeAsync(x -> - CompletableFuture.allOf(this.registeredOperators - .stream() - .map(op -> op.onTimer(collector, coordinator)) - .toArray(CompletableFuture[]::new)), operatorExecutor); + return composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf(this.registeredOperators.stream() + .map(op -> op.onTimer(collector, coordinator)) + .toArray(CompletableFuture[]::new))); } /** @@ -315,15 +317,14 @@ public final CompletionStage aggregateEndOfStream(EndOfStreamMessage eos, } // populate the end-of-stream through the dag - endOfStreamFuture = onEndOfStream(collector, coordinator) - .thenAcceptAsync(result -> { - if (eosStates.allEndOfStream()) { - // all inputs have been end-of-stream, shut down the task - LOG.info("All input streams have reached the end for task {}", taskName.getTaskName()); - coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); - } - }, operatorExecutor); + endOfStreamFuture = acceptFutureWithExecutor(onEndOfStream(collector, coordinator), result -> { + if (eosStates.allEndOfStream()) { + // all inputs have been end-of-stream, shut down the task + LOG.info("All input streams have reached the end for task {}", taskName.getTaskName()); + coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + } + }); } return endOfStreamFuture; @@ -347,10 +348,10 @@ private CompletionStage onEndOfStream(MessageCollector collector, TaskCoor .map(op -> op.onMessageAsync(r, collector, coordinator))) .toArray(CompletableFuture[]::new)); - endOfStreamFuture = resultFuture.thenComposeAsync(x -> - CompletableFuture.allOf(this.registeredOperators.stream() + endOfStreamFuture = composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf( + this.registeredOperators.stream() .map(op -> op.onEndOfStream(collector, coordinator)) - .toArray(CompletableFuture[]::new)), operatorExecutor); + .toArray(CompletableFuture[]::new))); } return endOfStreamFuture; @@ -406,15 +407,14 @@ public final CompletionStage aggregateDrainMessages(DrainMessage drainMess controlMessageSender.broadcastToOtherPartitions(new DrainMessage(drainMessage.getRunId()), ssp, collector); } - drainFuture = onDrainOfStream(collector, coordinator) - .thenAcceptAsync(result -> { - if (drainStates.areAllStreamsDrained()) { - // All input streams have been drained, shut down the task - LOG.info("All input streams have been drained for task {}. Requesting shutdown.", taskName.getTaskName()); - coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); - } - }, operatorExecutor); + drainFuture = acceptFutureWithExecutor(onDrainOfStream(collector, coordinator), result -> { + if (drainStates.areAllStreamsDrained()) { + // All input streams have been drained, shut down the task + LOG.info("All input streams have been drained for task {}. Requesting shutdown.", taskName.getTaskName()); + coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + } + }); } return drainFuture; @@ -439,10 +439,10 @@ private CompletionStage onDrainOfStream(MessageCollector collector, TaskCo .toArray(CompletableFuture[]::new)); // propagate DrainMessage to downstream operators - drainFuture = resultFuture.thenComposeAsync(x -> - CompletableFuture.allOf(this.registeredOperators.stream() + drainFuture = composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf( + this.registeredOperators.stream() .map(op -> op.onDrainOfStream(collector, coordinator)) - .toArray(CompletableFuture[]::new)), operatorExecutor); + .toArray(CompletableFuture[]::new))); } return drainFuture; @@ -474,8 +474,8 @@ public final CompletionStage aggregateWatermark(WatermarkMessage watermark controlMessageSender.broadcastToOtherPartitions(new WatermarkMessage(watermark), ssp, collector); } // populate the watermark through the dag - watermarkFuture = onWatermark(watermark, collector, coordinator) - .thenAcceptAsync(ignored -> watermarkStates.updateAggregateMetric(ssp, watermark), operatorExecutor); + watermarkFuture = acceptFutureWithExecutor(onWatermark(watermark, collector, coordinator), + ignored -> watermarkStates.updateAggregateMetric(ssp, watermark)); } return watermarkFuture; @@ -530,8 +530,8 @@ private CompletionStage onWatermark(long watermark, MessageCollector colle .toArray(CompletableFuture[]::new)); } - watermarkFuture = watermarkFuture.thenComposeAsync(res -> propagateWatermark(outputWm, collector, coordinator), - operatorExecutor); + watermarkFuture = + composeFutureWithExecutor(watermarkFuture, res -> propagateWatermark(outputWm, collector, coordinator)); } return watermarkFuture; @@ -679,6 +679,20 @@ final Collection handleMessage(M message, MessageCollector collector, TaskCo .toCompletableFuture().join(); } + @VisibleForTesting + final CompletionStage composeFutureWithExecutor(CompletionStage futureToChain, + Function> fn) { + return operatorExecutorEnabled ? futureToChain.thenComposeAsync(fn, operatorExecutor) + : futureToChain.thenCompose(fn); + } + + @VisibleForTesting + final CompletionStage acceptFutureWithExecutor(CompletionStage futureToChain, + Consumer consumer) { + return operatorExecutorEnabled ? futureToChain.thenAcceptAsync(consumer, operatorExecutor) + : futureToChain.thenAccept(consumer); + } + private HighResolutionClock createHighResClock(Config config) { MetricsConfig metricsConfig = new MetricsConfig(config); // The timer metrics calculation here is only enabled for debugging diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 285e7c8778..89738e2de0 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -93,9 +93,13 @@ class TaskInstance( val jobConfig = new JobConfig(jobContext.getConfig) val taskExecutorFactory = ReflectionUtil.getObj(jobConfig.getTaskExecutorFactory, classOf[TaskExecutorFactory]) + var operatorExecutor = Option.empty[java.util.concurrent.ExecutorService].orNull + if (jobConfig.getOperatorFrameworkExecutorEnabled) { + operatorExecutor = taskExecutorFactory.getOperatorExecutor(taskName, jobContext.getConfig) + } new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager, new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache, - systemStreamPartitions, taskExecutorFactory.getOperatorExecutor(taskName, jobContext.getConfig)) + systemStreamPartitions, operatorExecutor) } // need separate field for this instead of using it through Context, since Context throws an exception if it is null private val applicationTaskContextOption = applicationTaskContextFactoryOption diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 9cb307d57a..6709417b98 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,16 +18,24 @@ */ package org.apache.samza.operators.impl; +import com.google.common.collect.ImmutableMap; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.context.ContainerContext; import org.apache.samza.context.Context; import org.apache.samza.context.InternalTaskContext; -import org.apache.samza.context.MockContext; +import org.apache.samza.context.JobContext; +import org.apache.samza.context.TaskContext; import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistryMap; @@ -44,33 +52,111 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; public class TestOperatorImpl { private Context context; private InternalTaskContext internalTaskContext; + private JobContext jobContext; + + private TaskContext taskContext; + + private ContainerContext containerContext; + @Before public void setup() { - this.context = new MockContext(); + this.context = mock(Context.class); this.internalTaskContext = mock(InternalTaskContext.class); + this.jobContext = mock(JobContext.class); + this.taskContext = mock(TaskContext.class); + this.containerContext = mock(ContainerContext.class); when(this.internalTaskContext.getContext()).thenReturn(this.context); // might be necessary in the future when(this.internalTaskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(mock(EndOfStreamStates.class)); when(this.internalTaskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); - when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class)); - when(this.context.getTaskContext().getOperatorExecutor()).thenReturn(Executors.newSingleThreadExecutor()); - when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.context.getJobContext()).thenReturn(jobContext); + when(this.context.getTaskContext()).thenReturn(taskContext); + when(this.taskContext.getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.taskContext.getTaskModel()).thenReturn(mock(TaskModel.class)); + when(this.taskContext.getOperatorExecutor()).thenReturn(Executors.newSingleThreadExecutor()); + when(this.context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + } + + @Test + public void testComposeFutureWithExecutorWithFrameworkExecutorEnabled() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + ExecutorService mockExecutor = mock(ExecutorService.class); + CompletionStage mockFuture = mock(CompletionStage.class); + Function> mockFunction = mock(Function.class); + + Config config = new MapConfig(ImmutableMap.of("job.operator.framework.executor.enabled", "true")); + + when(this.taskContext.getOperatorExecutor()).thenReturn(mockExecutor); + when(this.jobContext.getConfig()).thenReturn(config); + + opImpl.init(this.internalTaskContext); + opImpl.composeFutureWithExecutor(mockFuture, mockFunction); + + verify(mockFuture).thenComposeAsync(eq(mockFunction), eq(mockExecutor)); + } + + @Test + public void testComposeFutureWithExecutorWithFrameworkExecutorDisabled() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + ExecutorService mockExecutor = mock(ExecutorService.class); + CompletionStage mockFuture = mock(CompletionStage.class); + Function> mockFunction = mock(Function.class); + + Config config = new MapConfig(ImmutableMap.of("job.operator.framework.executor.enabled", "false")); + + when(this.taskContext.getOperatorExecutor()).thenReturn(mockExecutor); + when(this.jobContext.getConfig()).thenReturn(config); + + opImpl.init(this.internalTaskContext); + opImpl.composeFutureWithExecutor(mockFuture, mockFunction); + + verify(mockFuture).thenCompose(eq(mockFunction)); } + @Test + public void testAcceptFutureWithExecutorWithFrameworkExecutorDisabled() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + ExecutorService mockExecutor = mock(ExecutorService.class); + CompletionStage mockFuture = mock(CompletionStage.class); + Consumer mockConsumer = mock(Consumer.class); + + Config config = new MapConfig(ImmutableMap.of("job.operator.framework.executor.enabled", "false")); + + when(this.taskContext.getOperatorExecutor()).thenReturn(mockExecutor); + when(this.jobContext.getConfig()).thenReturn(config); + + opImpl.init(this.internalTaskContext); + opImpl.acceptFutureWithExecutor(mockFuture, mockConsumer); + + verify(mockFuture).thenAccept(eq(mockConsumer)); + } + + @Test + public void testAcceptFutureWithExecutorWithFrameworkExecutorEnabled() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + ExecutorService mockExecutor = mock(ExecutorService.class); + CompletionStage mockFuture = mock(CompletionStage.class); + Consumer mockConsumer = mock(Consumer.class); + + Config config = new MapConfig(ImmutableMap.of("job.operator.framework.executor.enabled", "true")); + + when(this.taskContext.getOperatorExecutor()).thenReturn(mockExecutor); + when(this.jobContext.getConfig()).thenReturn(config); + + opImpl.init(this.internalTaskContext); + opImpl.acceptFutureWithExecutor(mockFuture, mockConsumer); + + verify(mockFuture).thenAcceptAsync(eq(mockConsumer), eq(mockExecutor)); + } @Test(expected = IllegalStateException.class) public void testMultipleInitShouldThrow() { OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); From e1816f3e7f09c27b3642a3e62a356198feb020f7 Mon Sep 17 00:00:00 2001 From: ajo thomas Date: Tue, 21 Nov 2023 17:21:29 -0800 Subject: [PATCH 4/5] SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (#1692) --- .../stream/CoordinatorStreamSystemProducer.java | 8 ++++++++ .../samza/coordinator/stream/CoordinatorStreamWriter.java | 1 + 2 files changed, 9 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java index 61c0ed93e4..43f9dffcfa 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java @@ -110,6 +110,14 @@ public void stop() { isStarted = false; } + /** + * Flushes underlying system producer. + * */ + public void flush(String source) { + log.info("Flushing coordinator stream producer."); + systemProducer.flush(source); + } + /** * Serialize and send a coordinator stream message. * diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java index da659818a2..5bc10f1e95 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java @@ -65,6 +65,7 @@ public void start() { */ public void stop() { log.info("Stopping the coordinator stream producer."); + coordinatorStreamSystemProducer.flush(SOURCE); coordinatorStreamSystemProducer.stop(); } From 8ed3572bee37c04481be7c831fa455a8305f3fe8 Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Wed, 22 Nov 2023 13:49:33 -0800 Subject: [PATCH 5/5] SAMZA-2798: Populate worker.opts in environment variable only if available (#1693) Description Populate worker.opts in the environment variable only if available in the configs. Changes Check if worker.opts is present and then add it to environment variable Tests Updated unit tests --- .../main/java/org/apache/samza/job/ShellCommandBuilder.java | 3 ++- .../java/org/apache/samza/job/TestShellCommandBuilder.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java index 504d9a26f7..4262145eef 100644 --- a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java @@ -44,9 +44,10 @@ public Map buildEnvironment() { envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id); envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString()); envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse("")); - envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, shellCommandConfig.getWorkerOpts().orElse("")); envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, shellCommandConfig.getAdditionalClasspathDir().orElse("")); + shellCommandConfig.getWorkerOpts() + .ifPresent(workerOpts -> envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, workerOpts)); shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome)); return envBuilder.build(); } diff --git a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java index afb6bfeec5..4acba9487c 100644 --- a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java @@ -45,7 +45,6 @@ public void testBasicBuild() throws MalformedURLException { ShellCommandConfig.ENV_CONTAINER_ID, "1", ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING, ShellCommandConfig.ENV_JAVA_OPTS, "", - ShellCommandConfig.WORKER_JVM_OPTS, "", ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); // assertions when command path is not set assertEquals("foo", shellCommandBuilder.buildCommand());