From d096ed756a1cbd442aaa3792ebd63ecfb3707f65 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 24 Dec 2013 00:12:53 +0800 Subject: [PATCH 1/6] support full set of storm commands --- .../java/com/yahoo/storm/yarn/Client.java | 4 + .../com/yahoo/storm/yarn/StormOnYarn.java | 2 +- .../storm/yarn/StormTopologyKillCommand.java | 102 +++++++++++ .../storm/yarn/StormTopologyListCommand.java | 90 ++++++++++ .../yarn/StormTopologyRebalanceCommand.java | 122 ++++++++++++++ .../yarn/StormTopologySubmitCommand.java | 115 +++++++++++++ src/main/java/com/yahoo/storm/yarn/Util.java | 159 +++++++++++++++--- 7 files changed, 571 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java diff --git a/src/main/java/com/yahoo/storm/yarn/Client.java b/src/main/java/com/yahoo/storm/yarn/Client.java index 06584f2..a13a3a5 100644 --- a/src/main/java/com/yahoo/storm/yarn/Client.java +++ b/src/main/java/com/yahoo/storm/yarn/Client.java @@ -103,6 +103,10 @@ public void execute(String[] args) throws Exception { HelpCommand help = new HelpCommand(commands); commands.put("help", help); commands.put("launch", new LaunchCommand()); + commands.put("submit", new StormTopologySubmitCommand()); + commands.put("kill", new StormTopologyKillCommand()); + commands.put("list", new StormTopologyListCommand()); + commands.put("rebalance", new StormTopologyRebalanceCommand()); commands.put("setStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.SET_STORM_CONFIG)); commands.put("getStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.GET_STORM_CONFIG)); commands.put("addSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.ADD_SUPERVISORS)); diff --git a/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java b/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java index 6445c95..e82da5c 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java +++ b/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java @@ -239,7 +239,7 @@ private void launchApp(String appName, String queue, int amMB, String storm_zip_ reader.close(); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), yarn_class_path); - String stormHomeInZip = Util.getStormHomeInZip(fs, zip, stormVersion.version()); + String stormHomeInZip = Util.getStormHomeInZip(fs, zip); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), "./storm/" + stormHomeInZip + "/*"); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), "./storm/" + stormHomeInZip + "/lib/*"); diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java new file mode 100644 index 0000000..a60bc83 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +class StormTopologyKillCommand implements ClientCommand { + private static final Logger LOG = LoggerFactory + .getLogger(StormMasterCommand.class); + + StormTopologyKillCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + opts.addOption("w", true, "seconds to wait"); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn kill -appId=xx -w wait-time-seconds topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + + Map stormConf = Config.readStormConfig(null); + + + String appId = cl.getOptionValue("appId"); + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + String secondsToWait = cl.getOptionValue("w"); + + String[] args = cl.getArgs(); + if (args.length <= 0) { + throw new IllegalArgumentException("tpologyId required. storm-yarn kill -appId=xx -w wait-time-seconds topologyId"); + } + + String topologyId = args[0]; + + StormOnYarn storm = null; + File tmpStormConf = null; + + try { + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); + + List commands = Util.buildTopologyKillCommands(tmpStormConf.getAbsolutePath(), topologyId, secondsToWait); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + } finally { + if (storm != null) { + storm.stop(); + } + if (null != tmpStormConf) { + tmpStormConf.delete(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java new file mode 100644 index 0000000..fc041d5 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +class StormTopologyListCommand implements ClientCommand { + private static final Logger LOG = LoggerFactory + .getLogger(StormMasterCommand.class); + + StormTopologyListCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn list -appId=xx"; + } + + @Override + public void process(CommandLine cl) throws Exception { + Map stormConf = Config.readStormConfig(null); + + String appId = cl.getOptionValue("appId"); + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + StormOnYarn storm = null; + File tmpStormConf = null; + + try { + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); + + List commands = Util.buildTopologyListCommands(tmpStormConf.getAbsolutePath()); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + } finally { + if (storm != null) { + storm.stop(); + } + if (null != tmpStormConf) { + tmpStormConf.delete(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java new file mode 100644 index 0000000..7a8bc30 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java @@ -0,0 +1,122 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +class StormTopologyRebalanceCommand implements ClientCommand { + private static final Logger LOG = LoggerFactory + .getLogger(StormMasterCommand.class); + + StormTopologyRebalanceCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + opts.addOption("w", true, "(Optional) time(second) to wait"); + opts.addOption("n", true, "(Optional) new number of workers"); + opts.addOption("e", true, "(Optional) component:parallelism,component:parallelism,component:parallelism"); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn rebalance -appId= -w=wait-time-secs -n=new-num-workers -e=component:parallelism,component:parallelism... topology-name"; + } + + @Override + public void process(CommandLine cl) throws Exception { + + Map stormConf = Config.readStormConfig(null); + + String appId = cl.getOptionValue("appId"); + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + int secondsToWait = cl.getOptionValue("w") != null ? Integer.getInteger(cl.getOptionValue("w")) : -1; + int numWorkers = cl.getOptionValue("n") != null ? Integer.getInteger(cl.getOptionValue("n")) : -1; + String parallelism = cl.getOptionValue("e"); + Map parallelismMap = new HashMap(); + if (null != parallelism) { + String[] keyValues = parallelism.split(","); + if (null != keyValues) { + for (String kv : keyValues) { + if (null != kv) { + String [] keyAndValue = kv.split(":"); + if (keyAndValue.length == 2) { + parallelismMap.put(keyAndValue[0], Integer.getInteger(keyAndValue[1])); + } + } + } + } + } + + String[] args = cl.getArgs(); + if (args.length <= 0) { + throw new IllegalArgumentException("tpologyId required. storm-yarn kill -appId=xx -w wait-time-seconds topologyId"); + } + + String topologyId = args[0]; + + + StormOnYarn storm = null; + File tmpStormConf = null; + + try { + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); + + List commands = Util.buildRebalanceCommands(tmpStormConf.getAbsolutePath(), topologyId, + secondsToWait, numWorkers, parallelismMap); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + } finally { + if (storm != null) { + storm.stop(); + } + if (null != tmpStormConf) { + tmpStormConf.delete(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java new file mode 100644 index 0000000..7782c99 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +class StormTopologySubmitCommand implements ClientCommand { + private static final Logger LOG = LoggerFactory + .getLogger(StormMasterCommand.class); + private String[] parameters; + + StormTopologySubmitCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + opts.addOption("jar", true, "Storm jar file"); + opts.addOption("class", true, "Storm Topology class name"); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn -appId=xx -jar=xx MainClass arg0 arg1 arg2"; + } + + @Override + public void process(CommandLine cl) throws Exception { + + Map stormConf = Config.readStormConfig(null); + + String appId = cl.getOptionValue("appId"); + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + String jarName = cl.getOptionValue("jar"); + if (jarName == null) { + throw new IllegalArgumentException("-appId is required"); + } + + String[] args = cl.getArgs(); + if (args.length <= 0) { + throw new IllegalArgumentException("MainClass required. storm-yarn -appId= -jar= MainClass arg0 arg1..."); + } + + String className = args[0]; + this.parameters = new String[args.length - 1]; + for (int i = 1; i < args.length; i++) { + this.parameters[i - 1] = args[i]; + } + + if (className == null) { + throw new IllegalArgumentException("-appId is required"); + } + + StormOnYarn storm = null; + File tmpStormConf = null; + + try { + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); + + List commands = Util.buildTopologySubmissionCommands(className, + jarName, tmpStormConf.getAbsolutePath(), parameters); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + } finally { + if (storm != null) { + storm.stop(); + } + if (null != tmpStormConf) { + tmpStormConf.delete(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java index a2817f7..3094798 100644 --- a/src/main/java/com/yahoo/storm/yarn/Util.java +++ b/src/main/java/com/yahoo/storm/yarn/Util.java @@ -70,7 +70,6 @@ static String getStormHome() { return ret; } - @SuppressWarnings("rawtypes") static Version getStormVersion() throws IOException { String versionNumber = "Unknown"; @@ -99,15 +98,20 @@ static Version getStormVersion() throws IOException { return version; } - static String getStormHomeInZip(FileSystem fs, Path zip, String stormVersion) throws IOException, RuntimeException { + static String getStormHomeInZip(FileSystem fs, Path zip) throws IOException, RuntimeException { FSDataInputStream fsInputStream = fs.open(zip); ZipInputStream zipInputStream = new ZipInputStream(fsInputStream); ZipEntry entry = zipInputStream.getNextEntry(); while (entry != null) { String entryName = entry.getName(); - if (entryName.matches("^storm(-" + stormVersion + ")?/")) { + final String STORM_BIN = "bin/storm"; + if (entryName.endsWith(STORM_BIN)) { fsInputStream.close(); - return entryName.replace("/", ""); + String stormHome = entryName.substring(0, entryName.length() - STORM_BIN.length()); + if (stormHome.endsWith("/")) { + stormHome = stormHome.substring(0, stormHome.length() - 1); + } + return stormHome; } entry = zipInputStream.getNextEntry(); } @@ -221,20 +225,34 @@ private static void CreateLogbackXML(OutputStream out) throws IOException { } @SuppressWarnings("rawtypes") - private static List buildCommandPrefix(Map conf, String childOptsKey) + private static List buildCommandPrefix(String javaHome, String stormConfFile, Map conf, String childOptsKey) throws IOException { String stormHomePath = getStormHome(); List toRet = new ArrayList(); - if (System.getenv("JAVA_HOME") != null) - toRet.add(System.getenv("JAVA_HOME") + "/bin/java"); - else - toRet.add("java"); + + String java = javaHome + File.separator + "bin" + File.separator + "java"; + toRet.add(java); toRet.add("-server"); toRet.add("-Dstorm.home=" + stormHomePath); toRet.add("-Djava.library.path=" + conf.get(backtype.storm.Config.JAVA_LIBRARY_PATH)); - toRet.add("-Dstorm.conf.file=" + new File(STORM_CONF_PATH_STRING).getName()); + + String stormConfFileName = ""; + String stormConfDirPath = ""; + if (null == stormConfFile) { + stormConfFileName = new File(STORM_CONF_PATH_STRING).getName(); + } else { + int slashPosition = stormConfFile.lastIndexOf(File.separator); + if (-1 == slashPosition) { + stormConfFileName = stormConfFile; + } else { + stormConfFileName = stormConfFile.substring(slashPosition + 1); + stormConfDirPath = stormConfFile.substring(0, slashPosition); + } + } + + toRet.add("-Dstorm.conf.file=" + stormConfFileName); toRet.add("-cp"); - toRet.add(buildClassPathArgument()); + toRet.add(buildClassPathArgument(stormConfDirPath)); if (conf.containsKey(childOptsKey) && conf.get(childOptsKey) != null) { @@ -246,8 +264,9 @@ private static List buildCommandPrefix(Map conf, String childOptsKey) @SuppressWarnings("rawtypes") static List buildUICommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.UI_CHILDOPTS); + String javaHome = System.getProperty("java.home"); + List toRet = + buildCommandPrefix(javaHome, null, conf, backtype.storm.Config.UI_CHILDOPTS); toRet.add("-Dstorm.options=" + backtype.storm.Config.NIMBUS_HOST + "=localhost"); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/ui.log"); @@ -258,9 +277,8 @@ static List buildUICommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildNimbusCommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.NIMBUS_CHILDOPTS); - + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, null, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/nimbus.log"); toRet.add("backtype.storm.daemon.nimbus"); @@ -269,19 +287,116 @@ static List buildNimbusCommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildSupervisorCommands(Map conf) throws IOException { - List toRet = - buildCommandPrefix(conf, backtype.storm.Config.NIMBUS_CHILDOPTS); - + List toRet = + buildCommandPrefix("$JAVA_HOME", null, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); toRet.add("-Dworker.logdir="+ ApplicationConstants.LOG_DIR_EXPANSION_VAR); toRet.add("-Dlogfile.name=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/supervisor.log"); toRet.add("backtype.storm.daemon.supervisor"); - return toRet; } - private static String buildClassPathArgument() throws IOException { + @SuppressWarnings("rawtypes") + static List buildTopologyListCommands(String stormConfPath) throws IOException { + Map storm_conf = Config.readStormConfig(stormConfPath); + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, + stormConfPath, storm_conf, null); + + Map env = System.getenv(); + String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); + if (null != stormJarEnv) { + toRet.add(stormJarEnv); + } + toRet.add("backtype.storm.command.list"); + return toRet; + } + + @SuppressWarnings("rawtypes") + static List buildRebalanceCommands(String stormConfPath, + String topologyId, int timeToWait, int newWorkerNumber, Map parallism) throws IOException { + Map storm_conf = Config.readStormConfig(stormConfPath); + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, + stormConfPath, storm_conf, null); + + Map env = System.getenv(); + String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); + if (null != stormJarEnv) { + toRet.add(stormJarEnv); + } + toRet.add("backtype.storm.command.rebalance"); + toRet.add(topologyId); + if (-1 != timeToWait) { + toRet.add("-w"); + toRet.add(Integer.toString(timeToWait)); + } + + if (-1 != newWorkerNumber) { + toRet.add("-n"); + toRet.add(Integer.toString(newWorkerNumber)); + } + + if (null != parallism) { + Set keys = parallism.keySet(); + for (String key : keys) { + toRet.add("-e"); + Integer value = parallism.get(key); + toRet.add(key + "=" + value.toString()); + } + } + return toRet; + } + + @SuppressWarnings("rawtypes") + static List buildTopologyKillCommands(String stormConfPath, String topologyId, String timeToWait) throws IOException { + Map storm_conf = Config.readStormConfig(stormConfPath); + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, + stormConfPath, storm_conf, null); + + Map env = System.getenv(); + String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); + if (null != stormJarEnv) { + toRet.add(stormJarEnv); + } + toRet.add("backtype.storm.command.kill_topology"); + toRet.add(topologyId); + if (null != timeToWait) { + toRet.add("-w"); + toRet.add(timeToWait); + } + return toRet; + } + + @SuppressWarnings("rawtypes") + static List buildTopologySubmissionCommands(String stormJarClass, + String stormJar, String stormConfPath, String[] parameters) throws IOException { + Map storm_conf = Config.readStormConfig(stormConfPath); + String javaHome = System.getProperty("java.home"); + List toRet = buildCommandPrefix(javaHome, + stormConfPath, storm_conf, null); + + toRet.add("-Dstorm.jar=" + stormJar); + + Map env = System.getenv(); + String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); + if (null != stormJarEnv) { + toRet.add(stormJarEnv); + } + toRet.add(stormJarClass); + for (int i = 0; i < parameters.length; i++) { + toRet.add(parameters[i]); + } + return toRet; + } + private static String buildClassPathArgument(String stormConfPath) throws IOException { List paths = new ArrayList(); - paths.add(new File(STORM_CONF_PATH_STRING).getParent()); + + if (null == stormConfPath || stormConfPath.isEmpty()) { + paths.add(new File(STORM_CONF_PATH_STRING).getParent()); + } else { + paths.add(stormConfPath); + } paths.add(getStormHome()); for (String jarPath : findAllJarsInPaths(getStormHome(), getStormHome() + File.separator + "lib")) { paths.add(jarPath); From 55b80563335e983fd8c8a5f0e8acd494edf575dc Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Fri, 3 Jan 2014 13:53:50 +0800 Subject: [PATCH 2/6] change submit back to jar --- src/main/java/com/yahoo/storm/yarn/Client.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/yahoo/storm/yarn/Client.java b/src/main/java/com/yahoo/storm/yarn/Client.java index a13a3a5..5d13299 100644 --- a/src/main/java/com/yahoo/storm/yarn/Client.java +++ b/src/main/java/com/yahoo/storm/yarn/Client.java @@ -103,7 +103,7 @@ public void execute(String[] args) throws Exception { HelpCommand help = new HelpCommand(commands); commands.put("help", help); commands.put("launch", new LaunchCommand()); - commands.put("submit", new StormTopologySubmitCommand()); + commands.put("jar", new StormTopologySubmitCommand()); commands.put("kill", new StormTopologyKillCommand()); commands.put("list", new StormTopologyListCommand()); commands.put("rebalance", new StormTopologyRebalanceCommand()); From 187d54e44b8c0eb0a8e1e4acc7b6ce64bbcd486f Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Sun, 9 Mar 2014 16:17:32 +0800 Subject: [PATCH 3/6] Address the reivew comments, use storm python script to submit storm topology commands --- .../yahoo/storm/yarn/ClassPathCommand.java | 42 ++++ .../java/com/yahoo/storm/yarn/Client.java | 18 +- .../java/com/yahoo/storm/yarn/Config.java | 2 + .../com/yahoo/storm/yarn/LaunchCommand.java | 2 +- .../com/yahoo/storm/yarn/StormCommand.java | 107 ++++++++++ .../yarn/StormTopologyActivateCommand.java | 30 +++ .../yarn/StormTopologyDeactivateCommand.java | 30 +++ .../storm/yarn/StormTopologyKillCommand.java | 76 +------ .../storm/yarn/StormTopologyListCommand.java | 64 +----- .../yarn/StormTopologyRebalanceCommand.java | 96 +-------- .../yarn/StormTopologySubmitCommand.java | 91 +-------- src/main/java/com/yahoo/storm/yarn/Util.java | 187 +++++------------- .../com/yahoo/storm/yarn/VersionCommand.java | 12 -- 13 files changed, 283 insertions(+), 474 deletions(-) create mode 100644 src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java create mode 100644 src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java diff --git a/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java b/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java new file mode 100644 index 0000000..2b155a6 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import com.yahoo.storm.yarn.Client.ClientCommand; + +class ClassPathCommand implements ClientCommand { + + ClassPathCommand() { + } + + @Override + public Options getOpts() { + Options opts = new Options(); + return opts; + } + + @Override + public String getHeaderDescription() { + return "storm-yarn classpath"; + } + + @Override + public void process(CommandLine cl) throws Exception { + String classpath = System.getProperty("java.class.path"); + System.out.println(classpath); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/Client.java b/src/main/java/com/yahoo/storm/yarn/Client.java index 5d13299..064c465 100644 --- a/src/main/java/com/yahoo/storm/yarn/Client.java +++ b/src/main/java/com/yahoo/storm/yarn/Client.java @@ -101,12 +101,12 @@ public void printHelpFor(Collection args) { public void execute(String[] args) throws Exception { HashMap commands = new HashMap(); HelpCommand help = new HelpCommand(commands); + commands.put("help", help); + commands.put("version", new VersionCommand()); + commands.put("classpath", new ClassPathCommand()); commands.put("launch", new LaunchCommand()); - commands.put("jar", new StormTopologySubmitCommand()); - commands.put("kill", new StormTopologyKillCommand()); - commands.put("list", new StormTopologyListCommand()); - commands.put("rebalance", new StormTopologyRebalanceCommand()); + commands.put("setStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.SET_STORM_CONFIG)); commands.put("getStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.GET_STORM_CONFIG)); commands.put("addSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.ADD_SUPERVISORS)); @@ -117,7 +117,13 @@ public void execute(String[] args) throws Exception { commands.put("startSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.START_SUPERVISORS)); commands.put("stopSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.STOP_SUPERVISORS)); commands.put("shutdown", new StormMasterCommand(StormMasterCommand.COMMAND.SHUTDOWN)); - commands.put("version", new VersionCommand()); + + commands.put("jar", new StormTopologySubmitCommand()); + commands.put("kill", new StormTopologyKillCommand()); + commands.put("list", new StormTopologyListCommand()); + commands.put("rebalance", new StormTopologyRebalanceCommand()); + commands.put("activate", new StormTopologyActivateCommand()); + commands.put("deactivate", new StormTopologyDeactivateCommand()); String commandName = null; String[] commandArgs = null; @@ -138,7 +144,7 @@ public void execute(String[] args) throws Exception { if(!opts.hasOption("h")) { opts.addOption("h", "help", false, "print out a help message"); } - CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs); + CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs, true); if(cl.hasOption("help")) { help.printHelpFor(Arrays.asList(commandName)); } else { diff --git a/src/main/java/com/yahoo/storm/yarn/Config.java b/src/main/java/com/yahoo/storm/yarn/Config.java index 0d1d6df..4dba2c4 100644 --- a/src/main/java/com/yahoo/storm/yarn/Config.java +++ b/src/main/java/com/yahoo/storm/yarn/Config.java @@ -31,6 +31,8 @@ public class Config { final public static String MASTER_THRIFT_PORT = "master.thrift.port"; final public static String MASTER_TIMEOUT_SECS = "master.timeout.secs"; final public static String MASTER_SIZE_MB = "master.container.size-mb"; + final public static String SUPERVISOR_CONTAINER_SIZE_MB = "supervisor.container.size-mb"; + final public static String SUPERVISOR_VCORE_NUM = "supervisor.vcores"; final public static String MASTER_NUM_SUPERVISORS = "master.initial-num-supervisors"; final public static String MASTER_CONTAINER_PRIORITY = "master.container.priority"; //# of milliseconds to wait for YARN report on Storm Master host/port diff --git a/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java b/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java index f5dab8b..05eb0be 100644 --- a/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java @@ -70,7 +70,7 @@ public void process(CommandLine cl) throws Exception { queue, amSize, stormConf, storm_zip_location); - LOG.debug("Submitted application's ID:" + storm.getAppId()); + LOG.info("Submitted application's ID:" + storm.getAppId()); //download storm.yaml file String storm_yaml_output = cl.getOptionValue("stormConfOutput"); diff --git a/src/main/java/com/yahoo/storm/yarn/StormCommand.java b/src/main/java/com/yahoo/storm/yarn/StormCommand.java new file mode 100644 index 0000000..bc51e52 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormCommand.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.yahoo.storm.yarn; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.io.Files; +import com.yahoo.storm.yarn.Client.ClientCommand; +import com.yahoo.storm.yarn.generated.StormMaster; + +/** + * Works as Storm shell command + * + */ +public abstract class StormCommand implements ClientCommand { + + private static final Logger LOG = LoggerFactory.getLogger(StormCommand.class); + + protected String appId; + + @Override + public Options getOpts() { + Options opts = new Options(); + opts.addOption("appId", true, "(Required) The storm clusters app ID"); + return opts; + } + + protected void process(String command, CommandLine cl) throws Exception { + this.appId = cl.getOptionValue("appId"); + + if (appId == null) { + throw new IllegalArgumentException("-appId is required"); + } + + Map stormConf = Config.readStormConfig(null); + + StormOnYarn storm = null; + + try { + + storm = StormOnYarn.attachToApp(appId, stormConf); + StormMaster.Client client = storm.getClient(); + + File tmpStormConfDir = Files.createTempDir(); + File tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); + + StormMasterCommand.downloadStormYaml(client, + tmpStormConf.getAbsolutePath()); + + String stormHome = System.getProperty("storm.home"); + + List commands = new ArrayList(); + commands.add("python"); // TODO: Change this to python home + commands.add(stormHome + "/bin/storm"); + commands.add(command); + + String[] args = cl.getArgs(); + if (null != args && args.length > 0) { + for (int i = 0; i < args.length; i++) { + commands.add(args[i]); + } + } + commands.add("--config"); + commands.add(tmpStormConf.getAbsolutePath()); + + LOG.info("Running: " + Joiner.on(" ").join(commands)); + ProcessBuilder builder = new ProcessBuilder(commands); + + Process process = builder.start(); + Util.redirectStreamAsync(process.getInputStream(), System.out); + Util.redirectStreamAsync(process.getErrorStream(), System.err); + + process.waitFor(); + + if (process.exitValue() == 0) { + if (null != tmpStormConfDir) { + tmpStormConf.delete(); + } + } + } finally { + if (storm != null) { + storm.stop(); + } + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java new file mode 100644 index 0000000..9628427 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyActivateCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyActivateCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn activate -appId=xx topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + super.process("activate", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java new file mode 100644 index 0000000..3e17c09 --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyDeactivateCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package com.yahoo.storm.yarn; + +import org.apache.commons.cli.CommandLine; + +class StormTopologyDeactivateCommand extends StormCommand { + + @Override + public String getHeaderDescription() { + return "storm-yarn deactivate -appId=xx topologyId"; + } + + @Override + public void process(CommandLine cl) throws Exception { + super.process("deactivate", cl); + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java index a60bc83..2026248 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyKillCommand.java @@ -14,35 +14,10 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; -import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; -class StormTopologyKillCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); - - StormTopologyKillCommand() { - } +class StormTopologyKillCommand extends StormCommand { - @Override - public Options getOpts() { - Options opts = new Options(); - opts.addOption("appId", true, "(Required) The storm clusters app ID"); - opts.addOption("w", true, "seconds to wait"); - return opts; - } - @Override public String getHeaderDescription() { return "storm-yarn kill -appId=xx -w wait-time-seconds topologyId"; @@ -50,53 +25,6 @@ public String getHeaderDescription() { @Override public void process(CommandLine cl) throws Exception { - - Map stormConf = Config.readStormConfig(null); - - - String appId = cl.getOptionValue("appId"); - if (appId == null) { - throw new IllegalArgumentException("-appId is required"); - } - - String secondsToWait = cl.getOptionValue("w"); - - String[] args = cl.getArgs(); - if (args.length <= 0) { - throw new IllegalArgumentException("tpologyId required. storm-yarn kill -appId=xx -w wait-time-seconds topologyId"); - } - - String topologyId = args[0]; - - StormOnYarn storm = null; - File tmpStormConf = null; - - try { - storm = StormOnYarn.attachToApp(appId, stormConf); - StormMaster.Client client = storm.getClient(); - - File tmpStormConfDir = Files.createTempDir(); - tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); - StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); - - List commands = Util.buildTopologyKillCommands(tmpStormConf.getAbsolutePath(), topologyId, secondsToWait); - - LOG.info("Running: " + Joiner.on(" ").join(commands)); - ProcessBuilder builder = new ProcessBuilder(commands); - - Process process = builder.start(); - Util.redirectStreamAsync(process.getInputStream(), System.out); - Util.redirectStreamAsync(process.getErrorStream(), System.err); - - process.waitFor(); - - } finally { - if (storm != null) { - storm.stop(); - } - if (null != tmpStormConf) { - tmpStormConf.delete(); - } - } + super.process("kill", cl); } } diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java index fc041d5..82ef1f8 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyListCommand.java @@ -14,34 +14,10 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; -import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; -class StormTopologyListCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); - - StormTopologyListCommand() { - } +class StormTopologyListCommand extends StormCommand { - @Override - public Options getOpts() { - Options opts = new Options(); - opts.addOption("appId", true, "(Required) The storm clusters app ID"); - return opts; - } - @Override public String getHeaderDescription() { return "storm-yarn list -appId=xx"; @@ -49,42 +25,6 @@ public String getHeaderDescription() { @Override public void process(CommandLine cl) throws Exception { - Map stormConf = Config.readStormConfig(null); - - String appId = cl.getOptionValue("appId"); - if (appId == null) { - throw new IllegalArgumentException("-appId is required"); - } - - StormOnYarn storm = null; - File tmpStormConf = null; - - try { - storm = StormOnYarn.attachToApp(appId, stormConf); - StormMaster.Client client = storm.getClient(); - - File tmpStormConfDir = Files.createTempDir(); - tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); - StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); - - List commands = Util.buildTopologyListCommands(tmpStormConf.getAbsolutePath()); - - LOG.info("Running: " + Joiner.on(" ").join(commands)); - ProcessBuilder builder = new ProcessBuilder(commands); - - Process process = builder.start(); - Util.redirectStreamAsync(process.getInputStream(), System.out); - Util.redirectStreamAsync(process.getErrorStream(), System.err); - - process.waitFor(); - - } finally { - if (storm != null) { - storm.stop(); - } - if (null != tmpStormConf) { - tmpStormConf.delete(); - } - } + process("list", cl); } } diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java index 7a8bc30..4099674 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java @@ -14,38 +14,10 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; -import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; -class StormTopologyRebalanceCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); - - StormTopologyRebalanceCommand() { - } +class StormTopologyRebalanceCommand extends StormCommand { - @Override - public Options getOpts() { - Options opts = new Options(); - opts.addOption("appId", true, "(Required) The storm clusters app ID"); - opts.addOption("w", true, "(Optional) time(second) to wait"); - opts.addOption("n", true, "(Optional) new number of workers"); - opts.addOption("e", true, "(Optional) component:parallelism,component:parallelism,component:parallelism"); - return opts; - } - @Override public String getHeaderDescription() { return "storm-yarn rebalance -appId= -w=wait-time-secs -n=new-num-workers -e=component:parallelism,component:parallelism... topology-name"; @@ -53,70 +25,6 @@ public String getHeaderDescription() { @Override public void process(CommandLine cl) throws Exception { - - Map stormConf = Config.readStormConfig(null); - - String appId = cl.getOptionValue("appId"); - if (appId == null) { - throw new IllegalArgumentException("-appId is required"); - } - - int secondsToWait = cl.getOptionValue("w") != null ? Integer.getInteger(cl.getOptionValue("w")) : -1; - int numWorkers = cl.getOptionValue("n") != null ? Integer.getInteger(cl.getOptionValue("n")) : -1; - String parallelism = cl.getOptionValue("e"); - Map parallelismMap = new HashMap(); - if (null != parallelism) { - String[] keyValues = parallelism.split(","); - if (null != keyValues) { - for (String kv : keyValues) { - if (null != kv) { - String [] keyAndValue = kv.split(":"); - if (keyAndValue.length == 2) { - parallelismMap.put(keyAndValue[0], Integer.getInteger(keyAndValue[1])); - } - } - } - } - } - - String[] args = cl.getArgs(); - if (args.length <= 0) { - throw new IllegalArgumentException("tpologyId required. storm-yarn kill -appId=xx -w wait-time-seconds topologyId"); - } - - String topologyId = args[0]; - - - StormOnYarn storm = null; - File tmpStormConf = null; - - try { - storm = StormOnYarn.attachToApp(appId, stormConf); - StormMaster.Client client = storm.getClient(); - - File tmpStormConfDir = Files.createTempDir(); - tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); - StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); - - List commands = Util.buildRebalanceCommands(tmpStormConf.getAbsolutePath(), topologyId, - secondsToWait, numWorkers, parallelismMap); - - LOG.info("Running: " + Joiner.on(" ").join(commands)); - ProcessBuilder builder = new ProcessBuilder(commands); - - Process process = builder.start(); - Util.redirectStreamAsync(process.getInputStream(), System.out); - Util.redirectStreamAsync(process.getErrorStream(), System.err); - - process.waitFor(); - - } finally { - if (storm != null) { - storm.stop(); - } - if (null != tmpStormConf) { - tmpStormConf.delete(); - } - } + process("rebalance", cl); } } diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java index 7782c99..fbcd0b5 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologySubmitCommand.java @@ -14,102 +14,17 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; -import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; -class StormTopologySubmitCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); - private String[] parameters; - - StormTopologySubmitCommand() { - } +class StormTopologySubmitCommand extends StormCommand { - @Override - public Options getOpts() { - Options opts = new Options(); - opts.addOption("appId", true, "(Required) The storm clusters app ID"); - opts.addOption("jar", true, "Storm jar file"); - opts.addOption("class", true, "Storm Topology class name"); - return opts; - } - @Override public String getHeaderDescription() { - return "storm-yarn -appId=xx -jar=xx MainClass arg0 arg1 arg2"; + return "storm-yarn jar -appId=xx jarPath MainClass arg0 arg1 arg2"; } @Override public void process(CommandLine cl) throws Exception { - - Map stormConf = Config.readStormConfig(null); - - String appId = cl.getOptionValue("appId"); - if (appId == null) { - throw new IllegalArgumentException("-appId is required"); - } - - String jarName = cl.getOptionValue("jar"); - if (jarName == null) { - throw new IllegalArgumentException("-appId is required"); - } - - String[] args = cl.getArgs(); - if (args.length <= 0) { - throw new IllegalArgumentException("MainClass required. storm-yarn -appId= -jar= MainClass arg0 arg1..."); - } - - String className = args[0]; - this.parameters = new String[args.length - 1]; - for (int i = 1; i < args.length; i++) { - this.parameters[i - 1] = args[i]; - } - - if (className == null) { - throw new IllegalArgumentException("-appId is required"); - } - - StormOnYarn storm = null; - File tmpStormConf = null; - - try { - storm = StormOnYarn.attachToApp(appId, stormConf); - StormMaster.Client client = storm.getClient(); - - File tmpStormConfDir = Files.createTempDir(); - tmpStormConf = new File(tmpStormConfDir, "storm.yaml"); - StormMasterCommand.downloadStormYaml(client, tmpStormConf.getAbsolutePath()); - - List commands = Util.buildTopologySubmissionCommands(className, - jarName, tmpStormConf.getAbsolutePath(), parameters); - - LOG.info("Running: " + Joiner.on(" ").join(commands)); - ProcessBuilder builder = new ProcessBuilder(commands); - - Process process = builder.start(); - Util.redirectStreamAsync(process.getInputStream(), System.out); - Util.redirectStreamAsync(process.getErrorStream(), System.err); - - process.waitFor(); - - } finally { - if (storm != null) { - storm.stop(); - } - if (null != tmpStormConf) { - tmpStormConf.delete(); - } - } + process("jar", cl); } } diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java index 3094798..4a602ba 100644 --- a/src/main/java/com/yahoo/storm/yarn/Util.java +++ b/src/main/java/com/yahoo/storm/yarn/Util.java @@ -23,8 +23,10 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.io.OutputStreamWriter; @@ -143,6 +145,27 @@ static void rmNulls(Map map) { } } + static void writeStormConf(FileSystem fs, Map stormConf, Path dest) + throws IOException { + //storm.yaml + FSDataOutputStream out = fs.create(dest); + Yaml yaml = new Yaml(); + OutputStreamWriter writer = new OutputStreamWriter(out); + rmNulls(stormConf); + yaml.dump(stormConf, writer); + writer.close(); + out.close(); + } + + static void writeYarnConf(FileSystem fs, YarnConfiguration yarnConf, Path dest) + throws IOException { + FSDataOutputStream out = fs.create(dest); + OutputStreamWriter writer = new OutputStreamWriter(out); + yarnConf.writeXml(writer); + writer.close(); + out.close(); + } + @SuppressWarnings("rawtypes") static Path createConfigurationFileInFs(FileSystem fs, String appHome, Map stormConf, YarnConfiguration yarnConf) @@ -154,25 +177,15 @@ static Path createConfigurationFileInFs(FileSystem fs, fs.mkdirs(dirDst); //storm.yaml - FSDataOutputStream out = fs.create(confDst); - Yaml yaml = new Yaml(); - OutputStreamWriter writer = new OutputStreamWriter(out); - rmNulls(stormConf); - yaml.dump(stormConf, writer); - writer.close(); - out.close(); - + writeStormConf(fs, stormConf, confDst); + //yarn-site.xml Path yarn_site_xml = new Path(dirDst, "yarn-site.xml"); - out = fs.create(yarn_site_xml); - writer = new OutputStreamWriter(out); - yarnConf.writeXml(writer); - writer.close(); - out.close(); - + writeYarnConf(fs, yarnConf, yarn_site_xml); + //logback.xml Path logback_xml = new Path(dirDst, "logback.xml"); - out = fs.create(logback_xml); + FSDataOutputStream out = fs.create(logback_xml); CreateLogbackXML(out); out.close(); @@ -225,8 +238,15 @@ private static void CreateLogbackXML(OutputStream out) throws IOException { } @SuppressWarnings("rawtypes") - private static List buildCommandPrefix(String javaHome, String stormConfFile, Map conf, String childOptsKey) + private static List buildCommandPrefix(String javaHome, Map conf, String childOptsKey) throws IOException { + String stormConfFileName = new File(STORM_CONF_PATH_STRING).getName(); + return buildCommandPrefix(javaHome, stormConfFileName, conf, childOptsKey); + } + + @SuppressWarnings("rawtypes") + private static List buildCommandPrefix(String javaHome, String stormConfFilePath, Map conf, + String childOptsKey) throws IOException { String stormHomePath = getStormHome(); List toRet = new ArrayList(); @@ -235,24 +255,10 @@ private static List buildCommandPrefix(String javaHome, String stormConf toRet.add("-server"); toRet.add("-Dstorm.home=" + stormHomePath); toRet.add("-Djava.library.path=" + conf.get(backtype.storm.Config.JAVA_LIBRARY_PATH)); - - String stormConfFileName = ""; - String stormConfDirPath = ""; - if (null == stormConfFile) { - stormConfFileName = new File(STORM_CONF_PATH_STRING).getName(); - } else { - int slashPosition = stormConfFile.lastIndexOf(File.separator); - if (-1 == slashPosition) { - stormConfFileName = stormConfFile; - } else { - stormConfFileName = stormConfFile.substring(slashPosition + 1); - stormConfDirPath = stormConfFile.substring(0, slashPosition); - } - } - - toRet.add("-Dstorm.conf.file=" + stormConfFileName); + + toRet.add("-Dstorm.conf.file=" + stormConfFilePath); toRet.add("-cp"); - toRet.add(buildClassPathArgument(stormConfDirPath)); + toRet.add(buildClassPathArgument()); if (conf.containsKey(childOptsKey) && conf.get(childOptsKey) != null) { @@ -266,10 +272,12 @@ private static List buildCommandPrefix(String javaHome, String stormConf static List buildUICommands(Map conf) throws IOException { String javaHome = System.getProperty("java.home"); List toRet = - buildCommandPrefix(javaHome, null, conf, backtype.storm.Config.UI_CHILDOPTS); + buildCommandPrefix(javaHome, conf, backtype.storm.Config.UI_CHILDOPTS); toRet.add("-Dstorm.options=" + backtype.storm.Config.NIMBUS_HOST + "=localhost"); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/ui.log"); + toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); + toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.ui.core"); return toRet; @@ -278,8 +286,10 @@ static List buildUICommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildNimbusCommands(Map conf) throws IOException { String javaHome = System.getProperty("java.home"); - List toRet = buildCommandPrefix(javaHome, null, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); + List toRet = buildCommandPrefix(javaHome, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/nimbus.log"); + toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); + toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.daemon.nimbus"); return toRet; @@ -288,115 +298,18 @@ static List buildNimbusCommands(Map conf) throws IOException { @SuppressWarnings("rawtypes") static List buildSupervisorCommands(Map conf) throws IOException { List toRet = - buildCommandPrefix("$JAVA_HOME", null, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); + buildCommandPrefix("$JAVA_HOME", conf, backtype.storm.Config.SUPERVISOR_CHILDOPTS); toRet.add("-Dworker.logdir="+ ApplicationConstants.LOG_DIR_EXPANSION_VAR); toRet.add("-Dlogfile.name=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/supervisor.log"); + toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); + toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.daemon.supervisor"); return toRet; } - - @SuppressWarnings("rawtypes") - static List buildTopologyListCommands(String stormConfPath) throws IOException { - Map storm_conf = Config.readStormConfig(stormConfPath); - String javaHome = System.getProperty("java.home"); - List toRet = buildCommandPrefix(javaHome, - stormConfPath, storm_conf, null); - - Map env = System.getenv(); - String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); - if (null != stormJarEnv) { - toRet.add(stormJarEnv); - } - toRet.add("backtype.storm.command.list"); - return toRet; - } - @SuppressWarnings("rawtypes") - static List buildRebalanceCommands(String stormConfPath, - String topologyId, int timeToWait, int newWorkerNumber, Map parallism) throws IOException { - Map storm_conf = Config.readStormConfig(stormConfPath); - String javaHome = System.getProperty("java.home"); - List toRet = buildCommandPrefix(javaHome, - stormConfPath, storm_conf, null); - - Map env = System.getenv(); - String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); - if (null != stormJarEnv) { - toRet.add(stormJarEnv); - } - toRet.add("backtype.storm.command.rebalance"); - toRet.add(topologyId); - if (-1 != timeToWait) { - toRet.add("-w"); - toRet.add(Integer.toString(timeToWait)); - } - - if (-1 != newWorkerNumber) { - toRet.add("-n"); - toRet.add(Integer.toString(newWorkerNumber)); - } - - if (null != parallism) { - Set keys = parallism.keySet(); - for (String key : keys) { - toRet.add("-e"); - Integer value = parallism.get(key); - toRet.add(key + "=" + value.toString()); - } - } - return toRet; - } - - @SuppressWarnings("rawtypes") - static List buildTopologyKillCommands(String stormConfPath, String topologyId, String timeToWait) throws IOException { - Map storm_conf = Config.readStormConfig(stormConfPath); - String javaHome = System.getProperty("java.home"); - List toRet = buildCommandPrefix(javaHome, - stormConfPath, storm_conf, null); - - Map env = System.getenv(); - String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); - if (null != stormJarEnv) { - toRet.add(stormJarEnv); - } - toRet.add("backtype.storm.command.kill_topology"); - toRet.add(topologyId); - if (null != timeToWait) { - toRet.add("-w"); - toRet.add(timeToWait); - } - return toRet; - } - - @SuppressWarnings("rawtypes") - static List buildTopologySubmissionCommands(String stormJarClass, - String stormJar, String stormConfPath, String[] parameters) throws IOException { - Map storm_conf = Config.readStormConfig(stormConfPath); - String javaHome = System.getProperty("java.home"); - List toRet = buildCommandPrefix(javaHome, - stormConfPath, storm_conf, null); - - toRet.add("-Dstorm.jar=" + stormJar); - - Map env = System.getenv(); - String stormJarEnv = env.get("STORM_JAR_JVM_OPTS"); - if (null != stormJarEnv) { - toRet.add(stormJarEnv); - } - toRet.add(stormJarClass); - for (int i = 0; i < parameters.length; i++) { - toRet.add(parameters[i]); - } - return toRet; - } - private static String buildClassPathArgument(String stormConfPath) throws IOException { + private static String buildClassPathArgument() throws IOException { List paths = new ArrayList(); - - if (null == stormConfPath || stormConfPath.isEmpty()) { - paths.add(new File(STORM_CONF_PATH_STRING).getParent()); - } else { - paths.add(stormConfPath); - } + paths.add(new File(STORM_CONF_PATH_STRING).getParent()); paths.add(getStormHome()); for (String jarPath : findAllJarsInPaths(getStormHome(), getStormHome() + File.separator + "lib")) { paths.add(jarPath); diff --git a/src/main/java/com/yahoo/storm/yarn/VersionCommand.java b/src/main/java/com/yahoo/storm/yarn/VersionCommand.java index 6fb1fd7..3b090a7 100644 --- a/src/main/java/com/yahoo/storm/yarn/VersionCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/VersionCommand.java @@ -14,23 +14,11 @@ package com.yahoo.storm.yarn; -import java.io.File; -import java.util.List; -import java.util.Map; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.io.Files; import com.yahoo.storm.yarn.Client.ClientCommand; -import com.yahoo.storm.yarn.generated.StormMaster; class VersionCommand implements ClientCommand { - private static final Logger LOG = LoggerFactory - .getLogger(StormMasterCommand.class); VersionCommand() { } From b030b4d1b7bbab72c029a8ad563128d6669f73bc Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Sun, 9 Mar 2014 16:37:40 +0800 Subject: [PATCH 4/6] remove changes that are not related --- .../java/com/yahoo/storm/yarn/Config.java | 2 - .../com/yahoo/storm/yarn/LaunchCommand.java | 2 +- .../com/yahoo/storm/yarn/StormOnYarn.java | 2 +- src/main/java/com/yahoo/storm/yarn/Util.java | 64 ++++++------------- 4 files changed, 23 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/yahoo/storm/yarn/Config.java b/src/main/java/com/yahoo/storm/yarn/Config.java index 4dba2c4..0d1d6df 100644 --- a/src/main/java/com/yahoo/storm/yarn/Config.java +++ b/src/main/java/com/yahoo/storm/yarn/Config.java @@ -31,8 +31,6 @@ public class Config { final public static String MASTER_THRIFT_PORT = "master.thrift.port"; final public static String MASTER_TIMEOUT_SECS = "master.timeout.secs"; final public static String MASTER_SIZE_MB = "master.container.size-mb"; - final public static String SUPERVISOR_CONTAINER_SIZE_MB = "supervisor.container.size-mb"; - final public static String SUPERVISOR_VCORE_NUM = "supervisor.vcores"; final public static String MASTER_NUM_SUPERVISORS = "master.initial-num-supervisors"; final public static String MASTER_CONTAINER_PRIORITY = "master.container.priority"; //# of milliseconds to wait for YARN report on Storm Master host/port diff --git a/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java b/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java index 05eb0be..f5dab8b 100644 --- a/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/LaunchCommand.java @@ -70,7 +70,7 @@ public void process(CommandLine cl) throws Exception { queue, amSize, stormConf, storm_zip_location); - LOG.info("Submitted application's ID:" + storm.getAppId()); + LOG.debug("Submitted application's ID:" + storm.getAppId()); //download storm.yaml file String storm_yaml_output = cl.getOptionValue("stormConfOutput"); diff --git a/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java b/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java index e82da5c..6445c95 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java +++ b/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java @@ -239,7 +239,7 @@ private void launchApp(String appName, String queue, int amMB, String storm_zip_ reader.close(); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), yarn_class_path); - String stormHomeInZip = Util.getStormHomeInZip(fs, zip); + String stormHomeInZip = Util.getStormHomeInZip(fs, zip, stormVersion.version()); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), "./storm/" + stormHomeInZip + "/*"); Apps.addToEnvironment(env, Environment.CLASSPATH.name(), "./storm/" + stormHomeInZip + "/lib/*"); diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java index 4a602ba..852bdb0 100644 --- a/src/main/java/com/yahoo/storm/yarn/Util.java +++ b/src/main/java/com/yahoo/storm/yarn/Util.java @@ -23,10 +23,8 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; -import java.io.InputStream; import java.io.PrintStream; import java.io.OutputStreamWriter; @@ -72,6 +70,7 @@ static String getStormHome() { return ret; } + @SuppressWarnings("rawtypes") static Version getStormVersion() throws IOException { String versionNumber = "Unknown"; @@ -100,20 +99,15 @@ static Version getStormVersion() throws IOException { return version; } - static String getStormHomeInZip(FileSystem fs, Path zip) throws IOException, RuntimeException { + static String getStormHomeInZip(FileSystem fs, Path zip, String stormVersion) throws IOException, RuntimeException { FSDataInputStream fsInputStream = fs.open(zip); ZipInputStream zipInputStream = new ZipInputStream(fsInputStream); ZipEntry entry = zipInputStream.getNextEntry(); while (entry != null) { String entryName = entry.getName(); - final String STORM_BIN = "bin/storm"; - if (entryName.endsWith(STORM_BIN)) { + if (entryName.matches("^storm(-" + stormVersion + ")?/")) { fsInputStream.close(); - String stormHome = entryName.substring(0, entryName.length() - STORM_BIN.length()); - if (stormHome.endsWith("/")) { - stormHome = stormHome.substring(0, stormHome.length() - 1); - } - return stormHome; + return entryName.replace("/", ""); } entry = zipInputStream.getNextEntry(); } @@ -145,27 +139,6 @@ static void rmNulls(Map map) { } } - static void writeStormConf(FileSystem fs, Map stormConf, Path dest) - throws IOException { - //storm.yaml - FSDataOutputStream out = fs.create(dest); - Yaml yaml = new Yaml(); - OutputStreamWriter writer = new OutputStreamWriter(out); - rmNulls(stormConf); - yaml.dump(stormConf, writer); - writer.close(); - out.close(); - } - - static void writeYarnConf(FileSystem fs, YarnConfiguration yarnConf, Path dest) - throws IOException { - FSDataOutputStream out = fs.create(dest); - OutputStreamWriter writer = new OutputStreamWriter(out); - yarnConf.writeXml(writer); - writer.close(); - out.close(); - } - @SuppressWarnings("rawtypes") static Path createConfigurationFileInFs(FileSystem fs, String appHome, Map stormConf, YarnConfiguration yarnConf) @@ -177,15 +150,25 @@ static Path createConfigurationFileInFs(FileSystem fs, fs.mkdirs(dirDst); //storm.yaml - writeStormConf(fs, stormConf, confDst); - + FSDataOutputStream out = fs.create(confDst); + Yaml yaml = new Yaml(); + OutputStreamWriter writer = new OutputStreamWriter(out); + rmNulls(stormConf); + yaml.dump(stormConf, writer); + writer.close(); + out.close(); + //yarn-site.xml Path yarn_site_xml = new Path(dirDst, "yarn-site.xml"); - writeYarnConf(fs, yarnConf, yarn_site_xml); - + out = fs.create(yarn_site_xml); + writer = new OutputStreamWriter(out); + yarnConf.writeXml(writer); + writer.close(); + out.close(); + //logback.xml Path logback_xml = new Path(dirDst, "logback.xml"); - FSDataOutputStream out = fs.create(logback_xml); + out = fs.create(logback_xml); CreateLogbackXML(out); out.close(); @@ -276,8 +259,6 @@ static List buildUICommands(Map conf) throws IOException { toRet.add("-Dstorm.options=" + backtype.storm.Config.NIMBUS_HOST + "=localhost"); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/ui.log"); - toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); - toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.ui.core"); return toRet; @@ -288,8 +269,6 @@ static List buildNimbusCommands(Map conf) throws IOException { String javaHome = System.getProperty("java.home"); List toRet = buildCommandPrefix(javaHome, conf, backtype.storm.Config.NIMBUS_CHILDOPTS); toRet.add("-Dlogfile.name=" + System.getenv("STORM_LOG_DIR") + "/nimbus.log"); - toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); - toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.daemon.nimbus"); return toRet; @@ -301,12 +280,11 @@ static List buildSupervisorCommands(Map conf) throws IOException { buildCommandPrefix("$JAVA_HOME", conf, backtype.storm.Config.SUPERVISOR_CHILDOPTS); toRet.add("-Dworker.logdir="+ ApplicationConstants.LOG_DIR_EXPANSION_VAR); toRet.add("-Dlogfile.name=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/supervisor.log"); - toRet.add("-Daccess.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/access.log"); - toRet.add("-Dmetrics.logfile.name=" + System.getenv("STORM_LOG_DIR") + "/metrics.log"); toRet.add("backtype.storm.daemon.supervisor"); + return toRet; } - + private static String buildClassPathArgument() throws IOException { List paths = new ArrayList(); paths.add(new File(STORM_CONF_PATH_STRING).getParent()); From c9a549d1ae7eccfb4d821b8dc1271941082af6f3 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 12 Mar 2014 11:12:31 +0800 Subject: [PATCH 5/6] update the help message for rebalance command --- .../com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java index 4099674..0b22648 100644 --- a/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java +++ b/src/main/java/com/yahoo/storm/yarn/StormTopologyRebalanceCommand.java @@ -20,7 +20,7 @@ class StormTopologyRebalanceCommand extends StormCommand { @Override public String getHeaderDescription() { - return "storm-yarn rebalance -appId= -w=wait-time-secs -n=new-num-workers -e=component:parallelism,component:parallelism... topology-name"; + return "storm-yarn rebalance -appId= topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]"; } @Override From c1aa93ff734c382ef38199ca50ba54f0b76dfb23 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 12 Mar 2014 11:28:14 +0800 Subject: [PATCH 6/6] Support app master dynamic port --- .../com/yahoo/storm/yarn/MasterServer.java | 63 ++++++++++++++++--- .../yahoo/storm/yarn/ServerSocketFactory.java | 49 +++++++++++++++ src/main/java/com/yahoo/storm/yarn/Util.java | 41 +++++++----- 3 files changed, 130 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java diff --git a/src/main/java/com/yahoo/storm/yarn/MasterServer.java b/src/main/java/com/yahoo/storm/yarn/MasterServer.java index 27b8cae..f5baedc 100644 --- a/src/main/java/com/yahoo/storm/yarn/MasterServer.java +++ b/src/main/java/com/yahoo/storm/yarn/MasterServer.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -28,6 +29,11 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.hadoop.service.Service; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -113,6 +119,40 @@ public void run() { return thread; } + private static void prepareStormConfig(Map stormConfig, + ServerSocketFactory socketFactory) throws IOException { + try { + String host_addr = InetAddress.getLocalHost().getHostAddress(); + LOG.info("Storm master host:" + host_addr); + stormConfig.put("nimbus.host", host_addr); + } catch (UnknownHostException ex) { + LOG.warn("Failed to get IP address of local host"); + throw ex; + } + + stormConfig.put("nimbus.thrift.port", socketFactory.create() + .getLocalPort()); + stormConfig.put("ui.port", socketFactory.create().getLocalPort()); + stormConfig.put("drpc.port", socketFactory.create().getLocalPort()); + stormConfig.put("drpc.invocations.port", socketFactory.create() + .getLocalPort()); + + stormConfig.put(Config.MASTER_THRIFT_PORT, socketFactory.create() + .getLocalPort()); + + //update the conf/storm.yaml + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path stormYaml = new Path("conf" + Path.SEPARATOR + "storm.yaml"); + + if (fs.exists(stormYaml)) { + LOG.info("storm.yaml exists"); + FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + fs.setPermission(stormYaml, permission); + } + + Util.writeStormConf(fs, stormConfig, stormYaml); + } + @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { LOG.info("Starting the AM!!!!"); @@ -144,9 +184,11 @@ public static void main(String[] args) throws Exception { YarnConfiguration hadoopConf = new YarnConfiguration(); - final String host = InetAddress.getLocalHost().getHostName(); - storm_conf.put("nimbus.host", host); - + ServerSocketFactory socketFactory = new ServerSocketFactory(true); + + LOG.info("Prepare Storm config...."); + prepareStormConfig(storm_conf, socketFactory); + StormAMRMClient rmClient = new StormAMRMClient(appAttemptID, storm_conf, hadoopConf); rmClient.init(hadoopConf); @@ -154,17 +196,22 @@ public static void main(String[] args) throws Exception { BlockingQueue launcherQueue = new LinkedBlockingQueue(); - MasterServer server = new MasterServer(storm_conf, rmClient); + MasterServer server = null; try { - final int port = Utils.getInt(storm_conf.get(Config.MASTER_THRIFT_PORT)); - final String target = host + ":" + port; + final Integer masterPort = (Integer) storm_conf.get(Config.MASTER_THRIFT_PORT); + final String target = storm_conf.get("nimbus.host") + ":" + masterPort; InetSocketAddress addr = NetUtils.createSocketAddr(target); RegisterApplicationMasterResponse resp = - rmClient.registerApplicationMaster(addr.getHostName(), port, null); + rmClient.registerApplicationMaster(addr.getHostName(), masterPort, null); LOG.info("Got a registration response "+resp); LOG.info("Max Capability "+resp.getMaximumResourceCapability()); rmClient.setMaxResource(resp.getMaximumResourceCapability()); LOG.info("Starting HB thread"); + + //Free the master port so that it can be used by Master Thrift Service + socketFactory.free(masterPort); + + server = new MasterServer(storm_conf, rmClient); server.initAndStartHeartbeat(rmClient, launcherQueue, (Integer) storm_conf .get(Config.MASTER_HEARTBEAT_INTERVAL_MILLIS)); @@ -177,7 +224,7 @@ public static void main(String[] args) throws Exception { rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "AllDone", null); } finally { - if (server.isServing()) { + if (null != server && server.isServing()) { LOG.info("Stop Master Thrift Server"); server.stop(); } diff --git a/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java b/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java new file mode 100644 index 0000000..fccb14e --- /dev/null +++ b/src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.storm.yarn; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.HashMap; +import java.util.Map; + +public class ServerSocketFactory { + private boolean reuseAddress = false; + private Map sockets = new HashMap(); + + public ServerSocketFactory(boolean reusePort) { + this.reuseAddress = reusePort; + } + + public ServerSocket create() throws IOException { + ServerSocket socket = new ServerSocket(); + socket.setReuseAddress(reuseAddress); + socket.bind(null); + int port = socket.getLocalPort(); + sockets.put(port, socket); + return socket; + } + + public void free(int port) throws IOException { + ServerSocket socket = sockets.get(port); + if (null != socket) { + socket.close(); + sockets.remove(port); + } + } +} diff --git a/src/main/java/com/yahoo/storm/yarn/Util.java b/src/main/java/com/yahoo/storm/yarn/Util.java index 852bdb0..4eae80c 100644 --- a/src/main/java/com/yahoo/storm/yarn/Util.java +++ b/src/main/java/com/yahoo/storm/yarn/Util.java @@ -139,6 +139,27 @@ static void rmNulls(Map map) { } } + static void writeStormConf(FileSystem fs, Map stormConf, Path dest) + throws IOException { + //storm.yaml + FSDataOutputStream out = fs.create(dest); + Yaml yaml = new Yaml(); + OutputStreamWriter writer = new OutputStreamWriter(out); + rmNulls(stormConf); + yaml.dump(stormConf, writer); + writer.close(); + out.close(); + } + + static void writeYarnConf(FileSystem fs, YarnConfiguration yarnConf, Path dest) + throws IOException { + FSDataOutputStream out = fs.create(dest); + OutputStreamWriter writer = new OutputStreamWriter(out); + yarnConf.writeXml(writer); + writer.close(); + out.close(); + } + @SuppressWarnings("rawtypes") static Path createConfigurationFileInFs(FileSystem fs, String appHome, Map stormConf, YarnConfiguration yarnConf) @@ -150,25 +171,15 @@ static Path createConfigurationFileInFs(FileSystem fs, fs.mkdirs(dirDst); //storm.yaml - FSDataOutputStream out = fs.create(confDst); - Yaml yaml = new Yaml(); - OutputStreamWriter writer = new OutputStreamWriter(out); - rmNulls(stormConf); - yaml.dump(stormConf, writer); - writer.close(); - out.close(); - + writeStormConf(fs, stormConf, confDst); + //yarn-site.xml Path yarn_site_xml = new Path(dirDst, "yarn-site.xml"); - out = fs.create(yarn_site_xml); - writer = new OutputStreamWriter(out); - yarnConf.writeXml(writer); - writer.close(); - out.close(); - + writeYarnConf(fs, yarnConf, yarn_site_xml); + //logback.xml Path logback_xml = new Path(dirDst, "logback.xml"); - out = fs.create(logback_xml); + FSDataOutputStream out = fs.create(logback_xml); CreateLogbackXML(out); out.close();