Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appmaster dynamic port #71

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/ClassPathCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import com.yahoo.storm.yarn.Client.ClientCommand;

class ClassPathCommand implements ClientCommand {

ClassPathCommand() {
}

@Override
public Options getOpts() {
Options opts = new Options();
return opts;
}

@Override
public String getHeaderDescription() {
return "storm-yarn classpath";
}

@Override
public void process(CommandLine cl) throws Exception {
String classpath = System.getProperty("java.class.path");
System.out.println(classpath);
}
}
14 changes: 12 additions & 2 deletions src/main/java/com/yahoo/storm/yarn/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ public void printHelpFor(Collection<String> args) {
public void execute(String[] args) throws Exception {
HashMap<String, ClientCommand> commands = new HashMap<String, ClientCommand>();
HelpCommand help = new HelpCommand(commands);

commands.put("help", help);
commands.put("version", new VersionCommand());
commands.put("classpath", new ClassPathCommand());
commands.put("launch", new LaunchCommand());

commands.put("setStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.SET_STORM_CONFIG));
commands.put("getStormConfig", new StormMasterCommand(StormMasterCommand.COMMAND.GET_STORM_CONFIG));
commands.put("addSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.ADD_SUPERVISORS));
Expand All @@ -113,7 +117,13 @@ public void execute(String[] args) throws Exception {
commands.put("startSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.START_SUPERVISORS));
commands.put("stopSupervisors", new StormMasterCommand(StormMasterCommand.COMMAND.STOP_SUPERVISORS));
commands.put("shutdown", new StormMasterCommand(StormMasterCommand.COMMAND.SHUTDOWN));
commands.put("version", new VersionCommand());

commands.put("jar", new StormTopologySubmitCommand());
commands.put("kill", new StormTopologyKillCommand());
commands.put("list", new StormTopologyListCommand());
commands.put("rebalance", new StormTopologyRebalanceCommand());
commands.put("activate", new StormTopologyActivateCommand());
commands.put("deactivate", new StormTopologyDeactivateCommand());

String commandName = null;
String[] commandArgs = null;
Expand All @@ -134,7 +144,7 @@ public void execute(String[] args) throws Exception {
if(!opts.hasOption("h")) {
opts.addOption("h", "help", false, "print out a help message");
}
CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs);
CommandLine cl = new GnuParser().parse(command.getOpts(), commandArgs, true);
if(cl.hasOption("help")) {
help.printHelpFor(Arrays.asList(commandName));
} else {
Expand Down
63 changes: 55 additions & 8 deletions src/main/java/com/yahoo/storm/yarn/MasterServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand All @@ -28,6 +29,11 @@
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
Expand Down Expand Up @@ -113,6 +119,40 @@ public void run() {
return thread;
}

private static void prepareStormConfig(Map stormConfig,
ServerSocketFactory socketFactory) throws IOException {
try {
String host_addr = InetAddress.getLocalHost().getHostAddress();
LOG.info("Storm master host:" + host_addr);
stormConfig.put("nimbus.host", host_addr);
} catch (UnknownHostException ex) {
LOG.warn("Failed to get IP address of local host");
throw ex;
}

stormConfig.put("nimbus.thrift.port", socketFactory.create()
.getLocalPort());
stormConfig.put("ui.port", socketFactory.create().getLocalPort());
stormConfig.put("drpc.port", socketFactory.create().getLocalPort());
stormConfig.put("drpc.invocations.port", socketFactory.create()
.getLocalPort());

stormConfig.put(Config.MASTER_THRIFT_PORT, socketFactory.create()
.getLocalPort());

//update the conf/storm.yaml
FileSystem fs = FileSystem.getLocal(new Configuration());
Path stormYaml = new Path("conf" + Path.SEPARATOR + "storm.yaml");

if (fs.exists(stormYaml)) {
LOG.info("storm.yaml exists");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(stormYaml, permission);
}

Util.writeStormConf(fs, stormConfig, stormYaml);
}

@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
LOG.info("Starting the AM!!!!");
Expand Down Expand Up @@ -144,27 +184,34 @@ public static void main(String[] args) throws Exception {

YarnConfiguration hadoopConf = new YarnConfiguration();

final String host = InetAddress.getLocalHost().getHostName();
storm_conf.put("nimbus.host", host);

ServerSocketFactory socketFactory = new ServerSocketFactory(true);

LOG.info("Prepare Storm config....");
prepareStormConfig(storm_conf, socketFactory);

StormAMRMClient rmClient =
new StormAMRMClient(appAttemptID, storm_conf, hadoopConf);
rmClient.init(hadoopConf);
rmClient.start();

BlockingQueue<Container> launcherQueue = new LinkedBlockingQueue<Container>();

MasterServer server = new MasterServer(storm_conf, rmClient);
MasterServer server = null;
try {
final int port = Utils.getInt(storm_conf.get(Config.MASTER_THRIFT_PORT));
final String target = host + ":" + port;
final Integer masterPort = (Integer) storm_conf.get(Config.MASTER_THRIFT_PORT);
final String target = storm_conf.get("nimbus.host") + ":" + masterPort;
InetSocketAddress addr = NetUtils.createSocketAddr(target);
RegisterApplicationMasterResponse resp =
rmClient.registerApplicationMaster(addr.getHostName(), port, null);
rmClient.registerApplicationMaster(addr.getHostName(), masterPort, null);
LOG.info("Got a registration response "+resp);
LOG.info("Max Capability "+resp.getMaximumResourceCapability());
rmClient.setMaxResource(resp.getMaximumResourceCapability());
LOG.info("Starting HB thread");

//Free the master port so that it can be used by Master Thrift Service
socketFactory.free(masterPort);

server = new MasterServer(storm_conf, rmClient);
server.initAndStartHeartbeat(rmClient, launcherQueue,
(Integer) storm_conf
.get(Config.MASTER_HEARTBEAT_INTERVAL_MILLIS));
Expand All @@ -177,7 +224,7 @@ public static void main(String[] args) throws Exception {
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
"AllDone", null);
} finally {
if (server.isServing()) {
if (null != server && server.isServing()) {
LOG.info("Stop Master Thrift Server");
server.stop();
}
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/ServerSocketFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.storm.yarn;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;

public class ServerSocketFactory {
private boolean reuseAddress = false;
private Map<Integer, ServerSocket> sockets = new HashMap<Integer, ServerSocket>();

public ServerSocketFactory(boolean reusePort) {
this.reuseAddress = reusePort;
}

public ServerSocket create() throws IOException {
ServerSocket socket = new ServerSocket();
socket.setReuseAddress(reuseAddress);
socket.bind(null);
int port = socket.getLocalPort();
sockets.put(port, socket);
return socket;
}

public void free(int port) throws IOException {
ServerSocket socket = sockets.get(port);
if (null != socket) {
socket.close();
sockets.remove(port);
}
}
}
107 changes: 107 additions & 0 deletions src/main/java/com/yahoo/storm/yarn/StormCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.yahoo.storm.yarn;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.io.Files;
import com.yahoo.storm.yarn.Client.ClientCommand;
import com.yahoo.storm.yarn.generated.StormMaster;

/**
* Works as Storm shell command
*
*/
public abstract class StormCommand implements ClientCommand {

private static final Logger LOG = LoggerFactory.getLogger(StormCommand.class);

protected String appId;

@Override
public Options getOpts() {
Options opts = new Options();
opts.addOption("appId", true, "(Required) The storm clusters app ID");
return opts;
}

protected void process(String command, CommandLine cl) throws Exception {
this.appId = cl.getOptionValue("appId");

if (appId == null) {
throw new IllegalArgumentException("-appId is required");
}

Map stormConf = Config.readStormConfig(null);

StormOnYarn storm = null;

try {

storm = StormOnYarn.attachToApp(appId, stormConf);
StormMaster.Client client = storm.getClient();

File tmpStormConfDir = Files.createTempDir();
File tmpStormConf = new File(tmpStormConfDir, "storm.yaml");

StormMasterCommand.downloadStormYaml(client,
tmpStormConf.getAbsolutePath());

String stormHome = System.getProperty("storm.home");

List<String> commands = new ArrayList<String>();
commands.add("python"); // TODO: Change this to python home
commands.add(stormHome + "/bin/storm");
commands.add(command);

String[] args = cl.getArgs();
if (null != args && args.length > 0) {
for (int i = 0; i < args.length; i++) {
commands.add(args[i]);
}
}
commands.add("--config");
commands.add(tmpStormConf.getAbsolutePath());

LOG.info("Running: " + Joiner.on(" ").join(commands));
ProcessBuilder builder = new ProcessBuilder(commands);

Process process = builder.start();
Util.redirectStreamAsync(process.getInputStream(), System.out);
Util.redirectStreamAsync(process.getErrorStream(), System.err);

process.waitFor();

if (process.exitValue() == 0) {
if (null != tmpStormConfDir) {
tmpStormConf.delete();
}
}
} finally {
if (storm != null) {
storm.stop();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/

package com.yahoo.storm.yarn;

import org.apache.commons.cli.CommandLine;

class StormTopologyActivateCommand extends StormCommand {

@Override
public String getHeaderDescription() {
return "storm-yarn activate -appId=xx topologyId";
}

@Override
public void process(CommandLine cl) throws Exception {
super.process("activate", cl);
}
}
Loading