diff --git a/doc/install-on-macosx.md b/doc/install-on-macosx.md index 83e0b6ea..bef43a58 100644 --- a/doc/install-on-macosx.md +++ b/doc/install-on-macosx.md @@ -38,3 +38,27 @@ that you've cloned the git repository. ``` python test/runtest.py ``` + +## Optional - web UI + +Ray's web UI requires **Python 3**. To enable the web UI to work, install these +Python packages. + +``` +pip install aioredis asyncio websockets +``` + +Then install +[polymer](https://www.polymer-project.org/1.0/docs/tools/polymer-cli), which +also requires [Node.js](https://nodejs.org/en/download/) and +[Bower](http://bower.io/#install-bower). + +Once you've installed Polymer, run the following. + +``` +cd ray/webui +bower install +``` + +Then while Ray is running, you should be able to view the web UI at +`http://localhost:8080`. diff --git a/doc/install-on-ubuntu.md b/doc/install-on-ubuntu.md index 7c971d51..7a1a3a89 100644 --- a/doc/install-on-ubuntu.md +++ b/doc/install-on-ubuntu.md @@ -39,3 +39,27 @@ that you've cloned the git repository. ``` python test/runtest.py ``` + +## Optional - web UI + +Ray's web UI requires **Python 3**. To enable the web UI to work, install these +Python packages. + +``` +pip install aioredis asyncio websockets +``` + +Then install +[polymer](https://www.polymer-project.org/1.0/docs/tools/polymer-cli), which +also requires [Node.js](https://nodejs.org/en/download/) and +[Bower](http://bower.io/#install-bower). + +Once you've installed Polymer, run the following. + +``` +cd ray/webui +bower install +``` + +Then while Ray is running, you should be able to view the web UI at +`http://localhost:8080`. diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index 1baed584..239e1101 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -45,6 +45,8 @@ Now we've started all of the Ray processes on each node Ray. This includes - A local scheduler on each machine. - One Redis server (on the head node). - One global scheduler (on the head node). +- Optionally, this may start up some processes for visualizing the system state + through a web UI. To run some commands, start up Python on one of the nodes in the cluster, and do the following. @@ -65,9 +67,36 @@ ray.get([f.remote(f.remote(f.remote(0))) for _ in range(1000)]) ``` ### Stopping Ray + When you want to stop the Ray processes, run `./ray/scripts/stop_ray.sh` on each node. +### Using the Web UI on a Cluster + +If you followed the instructions for setting up the web UI, then +`./ray/scripts/start_ray.sh --head` will attempt to start a Python 3 webserver +on the head node. In order to view the web UI from a machine that is not part of +the cluster (like your laptop), you can use SSH port forwarding. The web UI +requires ports 8080 and 8888, which you can forward using a command like the +following. + +``` +ssh -L 8080:localhost:8080 -L 8888:localhost:8888 ubuntu@ +``` + +Then you can view the web UI on your laptop by navigating to +`http://localhost:8080` in a browser. + +#### Troubleshooting the Web UI + +Note that to use the web UI, additional setup is required. In particular, you +must be using Python 3 or you must at least have `python3` on your path. + +If the web UI doesn't work, it's possible that the web UI processes were never +started (check `ps aux | grep polymer` and `ps aux | grep ray_ui.py`). If they +were started, see if you can fetch the web UI from within the head node (try +`wget http://localhost:8080` on the head node). + ### Copying Application Files to Other Nodes (Experimental) If you're running an application that imports Python files that are present diff --git a/python/ray/services.py b/python/ray/services.py index bd63ee20..0ffea40d 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -26,6 +26,7 @@ PROCESS_TYPE_PLASMA_STORE = "plasma_store" PROCESS_TYPE_GLOBAL_SCHEDULER = "global_scheduler" PROCESS_TYPE_REDIS_SERVER = "redis_server" +PROCESS_TYPE_WEB_UI = "web_ui" # This is a dictionary tracking all of the processes of different types that # have been started by this services module. Note that the order of the keys is @@ -37,7 +38,8 @@ (PROCESS_TYPE_PLASMA_MANAGER, []), (PROCESS_TYPE_PLASMA_STORE, []), (PROCESS_TYPE_GLOBAL_SCHEDULER, []), - (PROCESS_TYPE_REDIS_SERVER, [])]) + (PROCESS_TYPE_REDIS_SERVER, []), + (PROCESS_TYPE_WEB_UI, [])]) # True if processes are run in the valgrind profiler. RUN_PHOTON_PROFILER = False @@ -260,7 +262,7 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): Args: redis_address (str): The address of the Redis instance. cleanup (bool): True if using Ray in local mode. If cleanup is true, then - this process will be killed by serices.cleanup() when the Python process + this process will be killed by services.cleanup() when the Python process that imported services exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. @@ -269,6 +271,87 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): if cleanup: all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) +def start_webui(redis_address, cleanup=True, redirect_output=False): + """Attempt to start the Ray web UI. + + Args: + redis_address (str): The address of the Redis server. + cleanup (bool): True if using Ray in local mode. If cleanup is True, then + this process will be killed by services.cleanup() when the Python process + that imported services exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. + + Return: + True if the web UI was successfully started, otherwise false. + """ + webui_backend_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../webui/backend/ray_ui.py") + webui_directory = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../webui/") + + if sys.version_info >= (3, 0): + python_executable = "python" + else: + # If the user is using Python 2, it is still possible to run the webserver + # separately with Python 3, so try to find a Python 3 executable. + try: + python_executable = subprocess.check_output(["which", "python3"]).decode("ascii").strip() + except Exception as e: + print("Not starting the web UI because the web UI requires Python 3.") + return False + + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + backend_process = subprocess.Popen([python_executable, + webui_backend_filepath, + "--redis-address", redis_address], + stdout=stdout, stderr=stderr) + + time.sleep(0.1) + if backend_process.poll() is not None: + # Failed to start the web UI. + print("The web UI failed to start.") + return False + + # Try to start polymer. If this fails, it may that port 8080 is already in + # use. It'd be nice to test for this, but doing so by calling "bind" may start + # using the port and prevent polymer from using it. + try: + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + polymer_process = subprocess.Popen(["polymer", "serve", "--port", "8080"], + cwd=webui_directory, + stdout=stdout, stderr=stderr) + except Exception as e: + print("Failed to start polymer.") + # Kill the backend since it won't work without polymer. + try: + backend_process.kill() + except Exception as e: + pass + return False + + # Unfortunately this block of code is unlikely to catch any problems because + # when polymer throws an error on startup, it is typically after several + # seconds. + time.sleep(0.1) + if polymer_process.poll() is not None: + # Failed to start polymer. + print("Failed to serve the web UI with polymer.") + # Kill the backend since it won't work without polymer. + try: + backend_process.kill() + except Exception as e: + pass + return False + + if cleanup: + all_processes[PROCESS_TYPE_WEB_UI].append(backend_process) + all_processes[PROCESS_TYPE_WEB_UI].append(polymer_process) + + return True + def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, @@ -414,6 +497,7 @@ def start_ray_processes(address_info=None, redirect_output=False, include_global_scheduler=False, include_redis=False, + include_webui=False, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): @@ -441,6 +525,8 @@ def start_ray_processes(address_info=None, start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server process. + include_webui (bool): If True, then attempt to start the web UI. Note that + this is only possible with Python 3. start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. @@ -579,6 +665,14 @@ def start_ray_processes(address_info=None, # Make sure that we've started all the workers. assert(sum(num_workers_per_local_scheduler) == 0) + # Try to start the web UI. + if include_webui: + successfully_started = start_webui(redis_address, cleanup=cleanup, + redirect_output=True) + + if successfully_started: + print("View the web UI at http://localhost:8080.") + # Return the addresses of the relevant processes. return address_info @@ -681,6 +775,7 @@ def start_ray_head(address_info=None, redirect_output=redirect_output, include_global_scheduler=True, include_redis=True, + include_webui=True, start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus) diff --git a/scripts/stop_ray.sh b/scripts/stop_ray.sh index 0191e028..4ef16d6d 100755 --- a/scripts/stop_ray.sh +++ b/scripts/stop_ray.sh @@ -7,3 +7,9 @@ kill $(ps aux | grep redis-server | awk '{ print $2 }') 2> /dev/null # Find the PIDs of the worker processes and kill them. kill $(ps aux | grep default_worker.py | awk '{ print $2 }') 2> /dev/null + +# Kill the processes related to the web UI. +killall polymer + +# Find the PID of the Ray UI backend process and kill it. +kill $(ps aux | grep ray_ui.py | awk '{ print $2 }') 2> /dev/null diff --git a/webui/README.md b/webui/README.md index 07cc2a76..e76880d4 100644 --- a/webui/README.md +++ b/webui/README.md @@ -28,7 +28,7 @@ The following must be done once. First start Ray and note down the address of the Redis server. Then run cd webui/backend - python ray_ui.py --redis-address 127.0.0.1:6379 --port 8888 + python ray_ui.py --redis-address 127.0.0.1:6379 where you substitute your Redis address appropriately. diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 6c28f734..852bb7f1 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -6,18 +6,22 @@ import datetime import json import numpy as np +import os import redis +import sys import time import websockets parser = argparse.ArgumentParser(description="parse information for the web ui") -parser.add_argument("--port", type=int, help="port to use for the web ui") parser.add_argument("--redis-address", required=True, type=str, help="the address to use for redis") loop = asyncio.get_event_loop() IDENTIFIER_LENGTH = 20 +# This prefix must match the value defined in ray_redis_module.c. +DB_CLIENT_PREFIX = b"CL:" + def hex_identifier(identifier): return binascii.hexlify(identifier).decode() @@ -41,6 +45,9 @@ def key_to_hex_identifiers(key): worker_ids = [] +# Cache information about the local schedulers. +local_schedulers = {} + def duration_to_string(duration): """Format a duration in seconds as a string. @@ -161,6 +168,52 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): "task_data": task_data} await websocket.send(json.dumps(reply)) +async def send_heartbeat_payload(websocket): + """Send heartbeat updates to the frontend every half second.""" + while True: + reply = [] + for local_scheduler_id, local_scheduler in local_schedulers.items(): + current_time = time.time() + local_scheduler_info = {"local scheduler ID": local_scheduler_id, + "time since heartbeat": duration_to_string(current_time - local_scheduler["last_heartbeat"]), + "time since heartbeat numeric": str(current_time - local_scheduler["last_heartbeat"]), + "node ip address": local_scheduler["node_ip_address"]} + reply.append(local_scheduler_info) + # Send the payload to the frontend. + await websocket.send(json.dumps(reply)) + # Wait for a little while so as not to overwhelm the frontend. + await asyncio.sleep(0.5) + +async def send_heartbeats(websocket, redis_conn): + # First update the local scheduler info locally. + client_keys = await redis_conn.execute("keys", "CL:*") + clients = [] + for client_key in client_keys: + client_fields = await redis_conn.execute("hgetall", client_key) + client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)} + if client_fields[b"client_type"] == b"photon": + local_scheduler_id = hex_identifier(client_fields[b"ray_client_id"]) + local_schedulers[local_scheduler_id] = {"node_ip_address": client_fields[b"node_ip_address"].decode("ascii"), + "local_scheduler_socket_name": client_fields[b"local_scheduler_socket_name"].decode("ascii"), + "aux_address": client_fields[b"aux_address"].decode("ascii"), + "last_heartbeat": -1 * np.inf} + + # Subscribe to local scheduler heartbeats. + await redis_conn.execute_pubsub("subscribe", "local_schedulers") + + # Start a method in the background to periodically update the frontend. + asyncio.ensure_future(send_heartbeat_payload(websocket)) + + while True: + msg = await redis_conn.pubsub_channels["local_schedulers"].get() + local_scheduler_id_bytes = msg[:IDENTIFIER_LENGTH] + local_scheduler_id = hex_identifier(local_scheduler_id_bytes) + if local_scheduler_id not in local_schedulers: + # A new local scheduler has joined the cluster. Ignore it. This won't be + # displayed in the UI until the page is refreshed. + continue + local_schedulers[local_scheduler_id]["last_heartbeat"] = time.time() + async def serve_requests(websocket, path): redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) @@ -177,6 +230,8 @@ async def serve_requests(websocket, path): await handle_get_drivers(websocket, redis_conn) elif command["command"] == "get-recent-tasks": await handle_get_recent_tasks(websocket, redis_conn, command["num"]) + elif command["command"] == "get-heartbeats": + await send_heartbeats(websocket, redis_conn) if command["command"] == "get-workers": result = [] @@ -248,7 +303,10 @@ async def serve_requests(websocket, path): redis_address = args.redis_address.split(":") redis_ip_address, redis_port = redis_address[0], int(redis_address[1]) - start_server = websockets.serve(serve_requests, "localhost", args.port) + # The port here must match the value used by the frontend to connect over + # websockets. + port = 8888 + start_server = websockets.serve(serve_requests, "localhost", port) loop.run_until_complete(start_server) loop.run_forever() diff --git a/webui/src/ray-app.html b/webui/src/ray-app.html index beecbcfb..60a748b8 100644 --- a/webui/src/ray-app.html +++ b/webui/src/ray-app.html @@ -71,6 +71,7 @@ Menu Overview + Cluster Health Objects Tasks Events @@ -95,6 +96,7 @@ fallback-selection="view404" role="main"> + diff --git a/webui/src/ray-cluster-health.html b/webui/src/ray-cluster-health.html new file mode 100644 index 00000000..dbadd81c --- /dev/null +++ b/webui/src/ray-cluster-health.html @@ -0,0 +1,54 @@ + + + + + + + + diff --git a/webui/src/ray-recent-tasks.html b/webui/src/ray-recent-tasks.html index f8abdf0e..10a86ff8 100644 --- a/webui/src/ray-recent-tasks.html +++ b/webui/src/ray-recent-tasks.html @@ -28,7 +28,7 @@

Ray Recent Tasks