Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhars-li authored Nov 29, 2023
2 parents 80b03c6 + 8ed3572 commit a8080d8
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 70 deletions.
25 changes: 25 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,16 @@ <h1>Samza Configuration Reference</h1>
</td>
</tr>

<tr>
<td class="property" id="job.operator.framework.executor.enabled">job.operator.framework.executor.enabled</td>
<td class="default">false</td>
<td class="description">
If enabled, framework thread pool will be used for message hand off and sub DAG execution. Otherwise, the
execution will fall back to using caller thread or java fork join pool depending on the type of work
chained as part of message hand off.
</td>
</tr>

<tr>
<!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
<th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
Expand Down Expand Up @@ -2048,6 +2058,21 @@ <h1>Samza Configuration Reference</h1>
</th>
</tr>

<tr>
<td class="property" id="worker-opts">worker.opts</td>
<td class="default"></td>
<td class="description">
Any JVM options to include in the command line when executing worker process in portable execution of Samza using beam. For example,
this can be used to set the JVM heap size, to tune the garbage collector, or to enable
<a href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote debugging</a>.
Anything you put in <code>worker.opts</code> gets forwarded directly to the commandline of worker process as part of the JVM invocation.
<b>Note:</b> The configuration only applies for Samza Beam portable mode.
<dl>
<dt>Example: <code>worker.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC</code></dt>
</dl>
</td>
</tr>

<tr>
<td class="property" id="yarn-package-path">yarn.package.path</td>
<td class="default"></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public class JobConfig extends MapConfig {
public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb";

public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
Expand Down Expand Up @@ -196,6 +197,10 @@ public class JobConfig extends MapConfig {
public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor";
public static final int DEFAULT_JOB_ELASTICITY_FACTOR = 1;

public static final String JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED = "job.operator.framework.executor.enabled";

public static final boolean DEFAULT_JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED = false;

public JobConfig(Config config) {
super(config);
}
Expand Down Expand Up @@ -527,4 +532,8 @@ public int getElasticityFactor() {
public String getCoordinatorExecuteCommand() {
return get(COORDINATOR_EXECUTE_COMMAND, DEFAULT_COORDINATOR_EXECUTE_COMMAND);
}

public boolean getOperatorFrameworkExecutorEnabled() {
return getBoolean(JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED, DEFAULT_JOB_OPERATOR_FRAMEWORK_EXECUTOR_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.samza.config;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;


public class ShellCommandConfig extends MapConfig {
Expand Down Expand Up @@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig {

public static final String COMMAND_SHELL_EXECUTE = "task.execute";
public static final String TASK_JVM_OPTS = "task.opts";
public static final String WORKER_JVM_OPTS = "worker.opts";
public static final String TASK_JAVA_HOME = "task.java.home";

/**
Expand All @@ -97,20 +100,19 @@ public String getCommand() {
}

public Optional<String> getTaskOpts() {
Optional<String> jvmOpts = Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
Optional<String> maxHeapMbOptional = Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
if (new JobConfig(this).getAutosizingEnabled() && maxHeapMbOptional.isPresent()) {
String maxHeapMb = maxHeapMbOptional.get();
String xmxSetting = "-Xmx" + maxHeapMb + "m";
if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", xmxSetting));
} else if (jvmOpts.isPresent()) {
jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
} else {
jvmOpts = Optional.of(xmxSetting);
}
}
return jvmOpts;
String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS);
String autosizingContainerMaxHeap = get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB);

return Optional.ofNullable(getFinalJvmOptions(taskOpts, autosizingContainerMaxHeap));
}

/**
* Returns the worker opts for the application if available.
*/
public Optional<String> getWorkerOpts() {
String autosizingWorkerHeapMb = get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB);
String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS);
return Optional.ofNullable(getFinalJvmOptions(workerOpts, autosizingWorkerHeapMb));
}

public Optional<String> getJavaHome() {
Expand All @@ -120,4 +122,26 @@ public Optional<String> getJavaHome() {
public Optional<String> getAdditionalClasspathDir() {
return Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
}

/**
* Returns the final JVM options by applying the heap override if available to the jvm opts
*/
@VisibleForTesting
String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) {
String finalJvmOpts = jvmOpts;
if (new JobConfig(this).getAutosizingEnabled() && StringUtils.isNotEmpty(maxHeapOverride)) {
String xmxSetting = "-Xmx" + maxHeapOverride + "m";
if (StringUtils.isNotBlank(jvmOpts)) {
if (jvmOpts.contains("-Xmx")) {
finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting);
} else {
finalJvmOpts = jvmOpts.concat(" " + xmxSetting);
}
} else {
finalJvmOpts = xmxSetting;
}
}

return finalJvmOpts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public void stop() {
isStarted = false;
}

/**
* Flushes underlying system producer.
* */
public void flush(String source) {
log.info("Flushing coordinator stream producer.");
systemProducer.flush(source);
}

/**
* Serialize and send a coordinator stream message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void start() {
*/
public void stop() {
log.info("Stopping the coordinator stream producer.");
coordinatorStreamSystemProducer.flush(SOURCE);
coordinatorStreamSystemProducer.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public Map<String, String> buildEnvironment() {
envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
shellCommandConfig.getAdditionalClasspathDir().orElse(""));
shellCommandConfig.getWorkerOpts()
.ifPresent(workerOpts -> envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, workerOpts));
shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
return envBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
Expand Down Expand Up @@ -95,6 +97,7 @@ public abstract class OperatorImpl<M, RM> {
private ControlMessageSender controlMessageSender;
private int elasticityFactor;
private ExecutorService operatorExecutor;
private boolean operatorExecutorEnabled;

/**
* Initialize this {@link OperatorImpl} and its user-defined functions.
Expand Down Expand Up @@ -136,7 +139,9 @@ public final void init(InternalTaskContext internalTaskContext) {
this.taskModel = taskContext.getTaskModel();
this.callbackScheduler = taskContext.getCallbackScheduler();
handleInit(context);
this.elasticityFactor = new JobConfig(config).getElasticityFactor();
JobConfig jobConfig = new JobConfig(config);
this.elasticityFactor = jobConfig.getElasticityFactor();
this.operatorExecutorEnabled = jobConfig.getOperatorFrameworkExecutorEnabled();
this.operatorExecutor = context.getTaskContext().getOperatorExecutor();

initialized = true;
Expand Down Expand Up @@ -192,21 +197,20 @@ public final CompletionStage<Void> onMessageAsync(M message, MessageCollector co
getOpImplId(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
}

CompletionStage<Void> result = completableResultsFuture.thenComposeAsync(results -> {
CompletionStage<Void> result = composeFutureWithExecutor(completableResultsFuture, results -> {
long endNs = this.highResClock.nanoTime();
this.handleMessageNs.update(endNs - startNs);

return CompletableFuture.allOf(results.stream()
.flatMap(r -> this.registeredOperators.stream()
.map(op -> op.onMessageAsync(r, collector, coordinator)))
.flatMap(r -> this.registeredOperators.stream().map(op -> op.onMessageAsync(r, collector, coordinator)))
.toArray(CompletableFuture[]::new));
}, operatorExecutor);
});

WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
if (watermarkFn != null) {
// check whether there is new watermark emitted from the user function
Long outputWm = watermarkFn.getOutputWatermark();
return result.thenComposeAsync(ignored -> propagateWatermark(outputWm, collector, coordinator), operatorExecutor);
return composeFutureWithExecutor(result, ignored -> propagateWatermark(outputWm, collector, coordinator));
}

return result;
Expand Down Expand Up @@ -245,11 +249,9 @@ public final CompletionStage<Void> onTimer(MessageCollector collector, TaskCoord
.map(op -> op.onMessageAsync(r, collector, coordinator)))
.toArray(CompletableFuture[]::new));

return resultFuture.thenComposeAsync(x ->
CompletableFuture.allOf(this.registeredOperators
.stream()
.map(op -> op.onTimer(collector, coordinator))
.toArray(CompletableFuture[]::new)), operatorExecutor);
return composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf(this.registeredOperators.stream()
.map(op -> op.onTimer(collector, coordinator))
.toArray(CompletableFuture[]::new)));
}

/**
Expand Down Expand Up @@ -315,15 +317,14 @@ public final CompletionStage<Void> aggregateEndOfStream(EndOfStreamMessage eos,
}

// populate the end-of-stream through the dag
endOfStreamFuture = onEndOfStream(collector, coordinator)
.thenAcceptAsync(result -> {
if (eosStates.allEndOfStream()) {
// all inputs have been end-of-stream, shut down the task
LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}, operatorExecutor);
endOfStreamFuture = acceptFutureWithExecutor(onEndOfStream(collector, coordinator), result -> {
if (eosStates.allEndOfStream()) {
// all inputs have been end-of-stream, shut down the task
LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
});
}

return endOfStreamFuture;
Expand All @@ -347,10 +348,10 @@ private CompletionStage<Void> onEndOfStream(MessageCollector collector, TaskCoor
.map(op -> op.onMessageAsync(r, collector, coordinator)))
.toArray(CompletableFuture[]::new));

endOfStreamFuture = resultFuture.thenComposeAsync(x ->
CompletableFuture.allOf(this.registeredOperators.stream()
endOfStreamFuture = composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf(
this.registeredOperators.stream()
.map(op -> op.onEndOfStream(collector, coordinator))
.toArray(CompletableFuture[]::new)), operatorExecutor);
.toArray(CompletableFuture[]::new)));
}

return endOfStreamFuture;
Expand Down Expand Up @@ -406,15 +407,14 @@ public final CompletionStage<Void> aggregateDrainMessages(DrainMessage drainMess
controlMessageSender.broadcastToOtherPartitions(new DrainMessage(drainMessage.getRunId()), ssp, collector);
}

drainFuture = onDrainOfStream(collector, coordinator)
.thenAcceptAsync(result -> {
if (drainStates.areAllStreamsDrained()) {
// All input streams have been drained, shut down the task
LOG.info("All input streams have been drained for task {}. Requesting shutdown.", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}, operatorExecutor);
drainFuture = acceptFutureWithExecutor(onDrainOfStream(collector, coordinator), result -> {
if (drainStates.areAllStreamsDrained()) {
// All input streams have been drained, shut down the task
LOG.info("All input streams have been drained for task {}. Requesting shutdown.", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
});
}

return drainFuture;
Expand All @@ -439,10 +439,10 @@ private CompletionStage<Void> onDrainOfStream(MessageCollector collector, TaskCo
.toArray(CompletableFuture[]::new));

// propagate DrainMessage to downstream operators
drainFuture = resultFuture.thenComposeAsync(x ->
CompletableFuture.allOf(this.registeredOperators.stream()
drainFuture = composeFutureWithExecutor(resultFuture, x -> CompletableFuture.allOf(
this.registeredOperators.stream()
.map(op -> op.onDrainOfStream(collector, coordinator))
.toArray(CompletableFuture[]::new)), operatorExecutor);
.toArray(CompletableFuture[]::new)));
}

return drainFuture;
Expand Down Expand Up @@ -474,8 +474,8 @@ public final CompletionStage<Void> aggregateWatermark(WatermarkMessage watermark
controlMessageSender.broadcastToOtherPartitions(new WatermarkMessage(watermark), ssp, collector);
}
// populate the watermark through the dag
watermarkFuture = onWatermark(watermark, collector, coordinator)
.thenAcceptAsync(ignored -> watermarkStates.updateAggregateMetric(ssp, watermark), operatorExecutor);
watermarkFuture = acceptFutureWithExecutor(onWatermark(watermark, collector, coordinator),
ignored -> watermarkStates.updateAggregateMetric(ssp, watermark));
}

return watermarkFuture;
Expand Down Expand Up @@ -530,8 +530,8 @@ private CompletionStage<Void> onWatermark(long watermark, MessageCollector colle
.toArray(CompletableFuture[]::new));
}

watermarkFuture = watermarkFuture.thenComposeAsync(res -> propagateWatermark(outputWm, collector, coordinator),
operatorExecutor);
watermarkFuture =
composeFutureWithExecutor(watermarkFuture, res -> propagateWatermark(outputWm, collector, coordinator));
}

return watermarkFuture;
Expand Down Expand Up @@ -679,6 +679,20 @@ final Collection<RM> handleMessage(M message, MessageCollector collector, TaskCo
.toCompletableFuture().join();
}

@VisibleForTesting
final <T, U> CompletionStage<U> composeFutureWithExecutor(CompletionStage<T> futureToChain,
Function<? super T, ? extends CompletionStage<U>> fn) {
return operatorExecutorEnabled ? futureToChain.thenComposeAsync(fn, operatorExecutor)
: futureToChain.thenCompose(fn);
}

@VisibleForTesting
final <T> CompletionStage<Void> acceptFutureWithExecutor(CompletionStage<T> futureToChain,
Consumer<? super T> consumer) {
return operatorExecutorEnabled ? futureToChain.thenAcceptAsync(consumer, operatorExecutor)
: futureToChain.thenAccept(consumer);
}

private HighResolutionClock createHighResClock(Config config) {
MetricsConfig metricsConfig = new MetricsConfig(config);
// The timer metrics calculation here is only enabled for debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public ExecutorService getOperatorExecutor(TaskName taskName, Config config) {
} else {
LOG.info("Using single threaded thread pool as operator thread pool for task {}", key.getTaskName());
operatorExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("Samza " + key.getTaskName() + " Thread-%d").build());
new ThreadFactoryBuilder().setNameFormat("Samza " + key.getTaskName() + " Thread-%d")
.setDaemon(true)
.build());
}

return operatorExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ class TaskInstance(
val jobConfig = new JobConfig(jobContext.getConfig)
val taskExecutorFactory = ReflectionUtil.getObj(jobConfig.getTaskExecutorFactory, classOf[TaskExecutorFactory])

var operatorExecutor = Option.empty[java.util.concurrent.ExecutorService].orNull
if (jobConfig.getOperatorFrameworkExecutorEnabled) {
operatorExecutor = taskExecutorFactory.getOperatorExecutor(taskName, jobContext.getConfig)
}
new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache,
systemStreamPartitions, taskExecutorFactory.getOperatorExecutor(taskName, jobContext.getConfig))
systemStreamPartitions, operatorExecutor)
}
// need separate field for this instead of using it through Context, since Context throws an exception if it is null
private val applicationTaskContextOption = applicationTaskContextFactoryOption
Expand Down
Loading

0 comments on commit a8080d8

Please sign in to comment.