diff --git a/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java b/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java new file mode 100644 index 0000000..2b155a6 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import com.yahoo.storm.yarn.Client.ClientCommand; + +class ClassPathCommand implements ClientCommand { + + ClassPathCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn classpath"; + } + + @Override + public void process(CommandLine cl) throws Exception { + String classpath = System.getProperty("java.class.path"); + System.out.println(classpath); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/Client.java b/src/main/java/com/yahoo/storm/yarn/Client.java index 06584f2..064c465 100644 --- a/src/main/java/com/yahoo/storm/yarn/Client.java +++ b/src/main/java/com/yahoo/storm/yarn/Client.java @@ -101,8 +101,12 @@ public void printHelpFor(Collection args) { public void execute(String[] args) throws Exception { HashMap commands = new HashMap(); HelpCommand help = new HelpCommand(commands); + commands.put("help", help); + commands.put("version", new VersionCommand()); + commands.put("classpath", new ClassPathCommand()); commands.put("launch", new LaunchCommand()); + commands.put("setStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.SET_STORM_CONFIG)); commands.put("getStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.GET_STORM_CONFIG)); commands.put("addSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.ADD_SUPERVISORS)); @@ -113,7 +117,13 @@ public void execute(String[] args) throws Exception { commands.put("startSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.START_SUPERVISORS)); commands.put("stopSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.STOP_SUPERVISORS)); commands.put("shutdown", new StormMasterCommand(StormMasterCommand.COMMAND.SHUTDOWN)); - commands.put("version", new VersionCommand()); + + commands.put("jar", new StormTopologySubmitCommand()); + commands.put("kill", new StormTopologyKillCommand()); + commands.put("list", new StormTopologyListCommand()); + commands.put("rebalance", new StormTopologyRebalanceCommand()); + commands.put("activate", new StormTopologyActivateCommand()); + commands.put("deactivate", new StormTopologyDeactivateCommand()); String commandName = null; String[] commandArgs = null; @@ -134,7 +144,7 @@ public void execute(String[] args) throws Exception { if(!opts.hasOption("h")) { opts.addOption("h", "help", false, "print out a help message"); } - CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs); + CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs, true); if(cl.hasOption("help")) { help.printHelpFor(Arrays.asList(commandName)); } else { diff --git a/src/main/java/com/yahoo/storm/yarn/MasterServer.java b/src/main/java/com/yahoo/storm/yarn/MasterServer.java index 27b8cae..f5baedc 100644 --- a/src/main/java/com/yahoo/storm/yarn/MasterServer.java +++ b/src/main/java/com/yahoo/storm/yarn/MasterServer.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -28,6 +29,11 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.hadoop.service.Service; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -113,6 +119,40 @@ public void run() { return thread; } + private static void prepareStormConfig(Map stormConfig, + ServerSocketFactory socketFactory) throws IOException { + try { + String host_addr = InetAddress.getLocalHost().getHostAddress(); + LOG.info("Storm master host:" + host_addr); + stormConfig.put("nimbus.host", host_addr); + } catch (UnknownHostException ex) { + LOG.warn("Failed to get IP address of local host"); + throw ex; + } + + stormConfig.put("nimbus.thrift.port", socketFactory.create() + .getLocalPort()); + stormConfig.put("ui.port", socketFactory.create().getLocalPort()); + stormConfig.put("drpc.port", socketFactory.create().getLocalPort()); + stormConfig.put("drpc.invocations.port", socketFactory.create() + .getLocalPort()); + + stormConfig.put(Config.MASTER_THRIFT_PORT, socketFactory.create() + .getLocalPort()); + + //update the conf/storm.yaml + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path stormYaml = new Path("conf" + Path.SEPARATOR + "storm.yaml"); + + if (fs.exists(stormYaml)) { + LOG.info("storm.yaml exists"); + FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + fs.setPermission(stormYaml, permission); + } + + Util.writeStormConf(fs, stormConfig, stormYaml); + } + @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { LOG.info("Starting the AM!!!!"); @@ -144,9 +184,11 @@ public static void main(String[] args) throws Exception { YarnConfiguration hadoopConf = new YarnConfiguration(); - final String host = InetAddress.getLocalHost().getHostName(); - storm_conf.put("nimbus.host", host); - + ServerSocketFactory socketFactory = new ServerSocketFactory(true); + + LOG.info("Prepare Storm config...."); + prepareStormConfig(storm_conf, socketFactory); + StormAMRMClient rmClient = new StormAMRMClient(appAttemptID, storm_conf, hadoopConf); rmClient.init(hadoopConf); @@ -154,17 +196,22 @@ public static void main(String[] args) throws Exception { BlockingQueue launcherQueue = new LinkedBlockingQueue(); - MasterServer server = new MasterServer(storm_conf, rmClient); + MasterServer server = null; try { - final int port = Utils.getInt(storm_conf.get(Config.MASTER_THRIFT_PORT)); - final String target = host + ":" + port; + final Integer masterPort = (Integer) storm_conf.get(Config.MASTER_THRIFT_PORT); + final String target = storm_conf.get("nimbus.host") + ":" + masterPort; InetSocketAddress addr = NetUtils.createSocketAddr(target); RegisterApplicationMasterResponse resp = - rmClient.registerApplicationMaster(addr.getHostName(), port, null); + rmClient.registerApplicationMaster(addr.getHostName(), masterPort, null); LOG.info("Got a registration response "+resp); LOG.info("Max Capability "+resp.getMaximumResourceCapability()); rmClient.setMaxResource(resp.getMaximumResourceCapability()); LOG.info("Starting HB thread"); + + //Free the master port so that it can be used by Master Thrift Service + socketFactory.free(masterPort); + + server = new MasterServer(storm_conf, rmClient); server.initAndStartHeartbeat(rmClient, launcherQueue, (Integer) storm_conf .get(Config.MASTER_HEARTBEAT_INTERVAL_MILLIS)); @@ -177,7 +224,7 @@ public static void main(String[] args) throws Exception { rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "AllDone", null); } finally { - if (server.isServing()) { + if (null != server && server.isServing()) { LOG.info("Stop Master Thrift Server"); server.stop(); } diff --git a/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java b/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java new file mode 100644 index 0000000..fccb14e --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.storm.yarn; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.HashMap; +import java.util.Map; + +public class ServerSocketFactory { + private boolean reuseAddress = false; + private Map sockets = new HashMap(); + + public ServerSocketFactory(boolean reusePort) { + this.reuseAddress = reusePort; + } + + public ServerSocket create() throws IOException { + ServerSocket socket = new ServerSocket(); + socket.setReuseAddress(reuseAddress); + socket.bind(null); + int port = socket.getLocalPort(); + sockets.put(port, socket); + return socket; + } + + public void free(int port) throws IOException { + ServerSocket socket = sockets.get(port); + if (null != socket) { + socket.close(); + sockets.remove(port); + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormCommand.java b/src/main/java/com/yahoo/storm/yarn/StormCommand.java new file mode 100644 index 0000000..bc51e52 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormCommand.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +/** + * Works as Storm shell command + * + */ +public abstract class StormCommand implements ClientCommand { + + private static final Logger LOG = LoggerFactory.getLogger(StormCommand.class); + + protected String appId; + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + return opts; + } + + protected void process(String command, CommandLine cl) throws Exception { + this.appId = cl.getOptionValue("appId"); + + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + Map stormConf = Config.readStormConfig(null); + + StormOnYarn storm = null; + + try { + + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + File tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + + StormMasterCommand.downloadStormYaml(client, + tmpStormConf.getAbsolutePath()); + + String stormHome = System.getProperty("storm.home"); + + List commands = new ArrayList(); + commands.add("python"); // TODO: Change this to python home + commands.add(stormHome + "/bin/storm"); + commands.add(command); + + String[] args = cl.getArgs(); + if (null != args && args.length > 0) { + for (int i = 0; i < args.length; i++) { + commands.add(args[i]); + } + } + commands.add("--config"); + commands.add(tmpStormConf.getAbsolutePath()); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + if (process.exitValue() == 0) { + if (null != tmpStormConfDir) { + tmpStormConf.delete(); + } + } + } finally { + if (storm != null) { + storm.stop(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java new file mode 100644 index 0000000..9628427 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyActivateCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn activate -appId=xx topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + super.process("activate", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java new file mode 100644 index 0000000..3e17c09 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyDeactivateCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn deactivate -appId=xx topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + super.process("deactivate", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java new file mode 100644 index 0000000..2026248 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyKillCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn kill -appId=xx -w wait-time-seconds topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + super.process("kill", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java new file mode 100644 index 0000000..82ef1f8 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyListCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn list -appId=xx"; + } + + @Override + public void process(CommandLine cl) throws Exception { + process("list", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java new file mode 100644 index 0000000..0b22648 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyRebalanceCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn rebalance -appId= topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]"; + } + + @Override + public void process(CommandLine cl) throws Exception { + process("rebalance", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java new file mode 100644 index 0000000..fbcd0b5 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologySubmitCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn jar -appId=xx jarPath MainClass arg0 arg1 arg2"; + } + + @Override + public void process(CommandLine cl) throws Exception { + process("jar", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java index a2817f7..4eae80c 100644 --- a/src/main/java/com/yahoo/storm/yarn/Util.java +++ b/src/main/java/com/yahoo/storm/yarn/Util.java @@ -139,6 +139,27 @@ static void rmNulls(Map map) { } } + static void writeStormConf(FileSystem fs, Map stormConf, Path dest) + throws IOException { + //storm.yaml + FSDataOutputStream out = fs.create(dest); + Yaml yaml = new Yaml(); + OutputStreamWriter writer = new OutputStreamWriter(out); + rmNulls(stormConf); + yaml.dump(stormConf, writer); + writer.close(); + out.close(); + } + + static void writeYarnConf(FileSystem fs, YarnConfiguration yarnConf, Path dest) + throws IOException { + FSDataOutputStream out = fs.create(dest); + OutputStreamWriter writer = new OutputStreamWriter(out); + yarnConf.writeXml(writer); + writer.close(); + out.close(); + } + @SuppressWarnings("rawtypes") static Path createConfigurationFileInFs(FileSystem fs, String appHome, Map stormConf, YarnConfiguration yarnConf) @@ -150,25 +171,15 @@ static Path createConfigurationFileInFs(FileSystem fs, fs.mkdirs(dirDst); //storm.yaml - FSDataOutputStream out = fs.create(confDst); - Yaml yaml = new Yaml(); - OutputStreamWriter writer = new OutputStreamWriter(out); - rmNulls(stormConf); - yaml.dump(stormConf, writer); - writer.close(); - out.close(); - + writeStormConf(fs, stormConf, confDst); + //yarn-site.xml Path yarn_site_xml = new Path(dirDst, "yarn-site.xml"); - out = fs.create(yarn_site_xml); - writer = new OutputStreamWriter(out); - yarnConf.writeXml(writer); - writer.close(); - out.close(); - + writeYarnConf(fs, yarnConf, yarn_site_xml); + //logback.xml Path logback_xml = new Path(dirDst, "logback.xml"); - out = fs.create(logback_xml); + FSDataOutputStream out = fs.create(logback_xml); CreateLogbackXML(out); out.close(); @@ -221,18 +232,25 @@ private static void CreateLogbackXML(OutputStream out) throws IOException { } @SuppressWarnings("rawtypes") - private static List buildCommandPrefix(Map conf, String childOptsKey) + private static List buildCommandPrefix(String javaHome, Map conf, String childOptsKey) throws IOException { + String stormConfFileName = new File(STORM_CONF_PATH_STRING).getName(); + return buildCommandPrefix(javaHome, stormConfFileName, conf, childOptsKey); + } + + @SuppressWarnings("rawtypes") + private static List buildCommandPrefix(String javaHome, String stormConfFilePath, Map conf, + String childOptsKey) throws IOException { String stormHomePath = getStormHome(); List toRet = new ArrayList(); - if (System.getenv("JAVA_HOME") != null) - toRet.add(System.getenv("JAVA_HOME") + "/bin/java"); - else - toRet.add("java"); + + String java = javaHome + File.separator + "bin" + File.separator + "java"; + toRet.add(java); toRet.add("-server"); toRet.add("-Dstorm.home=" + stormHomePath); toRet.add("-Djava.library.path=" + conf.get(backtype.storm.Config.JAVA_LIBRARY_PATH)); - toRet.add("-Dstorm.conf.file=" + new File(STORM_CONF_PATH_STRING).getName()); + + toRet.add("-Dstorm.conf.file=" + stormConfFilePath); toRet.add("-cp"); toRet.add(buildClassPathArgument()); @@ -246,8 +264,9 @@ private static List buildCommandPrefix(Map conf, String childOptsKey) @SuppressWarnings("rawtypes") static List buildUICommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.UI_CHILDOPTS); + String javaHome = System.getProperty("java.home"); + List toRet = + buildCommandPrefix(javaHome, conf, backtype.storm.Config.UI_CHILDOPTS); toRet.add("-Dstorm.options=" + backtype.storm.Config.NIMBUS_HOST + "=localhost"); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/ui.log"); @@ -258,9 +277,8 @@ static List buildUICommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildNimbusCommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.NIMBUS_CHILDOPTS); - + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/nimbus.log"); toRet.add("backtype.storm.daemon.nimbus"); @@ -269,9 +287,8 @@ static List buildNimbusCommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildSupervisorCommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.NIMBUS_CHILDOPTS); - + List toRet = + buildCommandPrefix("$JAVA_HOME", conf, backtype.storm.Config.SUPERVISOR_CHILDOPTS); toRet.add("-Dworker.logdir="+ ApplicationConstants.LOG_DIR_EXPANSION_VAR); toRet.add("-Dlogfile.name=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/supervisor.log"); toRet.add("backtype.storm.daemon.supervisor"); diff --git a/src/main/java/com/yahoo/storm/yarn/VersionCommand.java b/src/main/java/com/yahoo/storm/yarn/VersionCommand.java index 6fb1fd7..3b090a7 100644 --- a/src/main/java/com/yahoo/storm/yarn/VersionCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/VersionCommand.java @@ -14,23 +14,11 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; class VersionCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); VersionCommand() { }