Skip to content

Commit

Permalink
[FLINK-22091][yarn] Make Flink on YARN honor env.java.home (apache#25877
Browse files Browse the repository at this point in the history
)
  • Loading branch information
X-czh authored Jan 7, 2025
1 parent 9878e56 commit f0c8f18
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>String</td>
<td>A string of default JVM options to prepend to <code class="highlighter-rouge">env.java.opts.taskmanager</code>. This is intended to be set by administrators.</td>
</tr>
<tr>
<td><h5>env.java.home</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location where Java is installed. If not specified, Flink will use your default Java installation.</td>
</tr>
<tr>
<td><h5>env.java.opts.all</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public final class ConfigConstants {

// ----------------------------- Environment Variables ----------------------------

public static final String ENV_JAVA_HOME = "JAVA_HOME";

/** The environment variable name which contains the location of the configuration directory. */
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List<String> base, List<String> append)
// process parameters
// ------------------------------------------------------------------------

public static final ConfigOption<String> FLINK_JAVA_HOME =
ConfigOptions.key("env.java.home")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Location where Java is installed. If not specified,"
+ " Flink will use your default Java installation.")
.build());

public static final ConfigOption<String> FLINK_JVM_OPTIONS =
ConfigOptions.key("env.java.opts.all")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ResourceManagerOptions;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;

/** This class describes the basic parameters for launching a TaskManager process. */
public class ContaineredTaskManagerParameters implements java.io.Serializable {

Expand Down Expand Up @@ -90,6 +93,10 @@ public static ContaineredTaskManagerParameters create(
}
}

// set JAVA_HOME
config.getOptional(CoreOptions.FLINK_JAVA_HOME)
.ifPresent(javaHome -> envVars.put(ENV_JAVA_HOME, javaHome));

// done
return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
Expand Down Expand Up @@ -1969,6 +1970,10 @@ Map<String, String> generateApplicationMasterEnv(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,
this.flinkConfiguration));
// set JAVA_HOME
this.flinkConfiguration
.getOptional(CoreOptions.FLINK_JAVA_HOME)
.ifPresent(javaHome -> env.put(ENV_JAVA_HOME, javaHome));
// set Flink app class path
env.put(ENV_FLINK_CLASSPATH, classPathStr);
// Set FLINK_LIB_DIR to `lib` folder under working dir in container
Expand Down
54 changes: 40 additions & 14 deletions flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -56,6 +59,19 @@ class UtilsTest {
private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";

private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC =
new TaskExecutorProcessSpec(
new CPUResource(1.0),
new MemorySize(0), // frameworkHeapSize
new MemorySize(0), // frameworkOffHeapSize
new MemorySize(111), // taskHeapSize
new MemorySize(0), // taskOffHeapSize
new MemorySize(222), // networkMemSize
new MemorySize(0), // managedMemorySize
new MemorySize(333), // jvmMetaspaceSize
new MemorySize(0), // jvmOverheadSize
Collections.emptyList());

@Test
void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception {
final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink");
Expand Down Expand Up @@ -208,20 +224,8 @@ void testGetYarnConfiguration() {
@Test
void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
final TaskExecutorProcessSpec taskExecutorProcessSpec =
new TaskExecutorProcessSpec(
new CPUResource(1.0),
new MemorySize(0), // frameworkHeapSize
new MemorySize(0), // frameworkOffHeapSize
new MemorySize(111), // taskHeapSize
new MemorySize(0), // taskOffHeapSize
new MemorySize(222), // networkMemSize
new MemorySize(0), // managedMemorySize
new MemorySize(333), // jvmMetaspaceSize
new MemorySize(0), // jvmOverheadSize
Collections.emptyList());
final ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>());
new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>());

// no logging, with/out krb5
final String java = "$JAVA_HOME/bin/java";
Expand All @@ -238,7 +242,8 @@ void testGetTaskManagerShellCommand() {
+ " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
final String mainClass = "org.apache.flink.yarn.UtilsTest";
final String dynamicConfigs =
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC)
.trim();
final String basicArgs = "--configDir ./conf";
final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
Expand Down Expand Up @@ -674,6 +679,27 @@ void testGenerateJvmOptsString() {
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
}

@Test
void testGetTaskManagerEnvsWithJavaHomeSet() {
final Configuration cfg = new Configuration();
cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
final ContaineredTaskManagerParameters containeredParams =
ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
final Map<String, String> envVars = containeredParams.taskManagerEnv();
assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val");
}

@Test
void testGetTaskManagerEnvsWithoutJavaHomeSet() {
final Configuration cfg = new Configuration();
cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
final ContaineredTaskManagerParameters containeredParams =
ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
final Map<String, String> envVars = containeredParams.taskManagerEnv();
assertThat(envVars).doesNotContainKey(ENV_JAVA_HOME);
}

private static void verifyUnitResourceVariousSchedulers(
YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
final String fakeLocalFlinkJar = "./lib/flink_dist.jar";
final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar";
final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Configuration flinkConfig = new Configuration();
flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
final Map<String, String> masterEnv =
getTestMasterEnv(
new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);

assertThat(masterEnv)
.containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk")
.containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib")
.containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString())
.containsEntry(
Expand All @@ -940,6 +943,20 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
.containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath());
}

@Test
public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir)
throws IOException {
final Configuration flinkConfig = new Configuration();
final Map<String, String> masterEnv =
getTestMasterEnv(
flinkConfig,
flinkHomeDir,
"",
"./lib/flink_dist.jar",
ApplicationId.newInstance(0, 0));
assertThat(masterEnv).doesNotContainKey(ConfigConstants.ENV_JAVA_HOME);
}

@Test
public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir)
throws IOException {
Expand Down

0 comments on commit f0c8f18

Please sign in to comment.