diff --git a/environment.yml b/environment.yml index f57c8d33d..480ea7eba 100644 --- a/environment.yml +++ b/environment.yml @@ -21,9 +21,11 @@ dependencies: - matplotlib==3.0.0 - dill - lz4 - - ray==0.7.3 + - ray==0.8.0 - setproctitle - psutil - opencv-python - boto3==1.4.8 - redis~=2.10.6 + - tabulate + - pytz diff --git a/examples/exp_configs/non_rl/i210_subnetwork.py b/examples/exp_configs/non_rl/i210_subnetwork.py index dd85c56cf..d993ae93a 100644 --- a/examples/exp_configs/non_rl/i210_subnetwork.py +++ b/examples/exp_configs/non_rl/i210_subnetwork.py @@ -101,7 +101,7 @@ edge_id = "119257908#1-AddedOnRampEdge" custom_callables = { "avg_merge_speed": lambda env: np.nan_to_num(np.mean( - env.k.vehicle.get_speed(env.k.vehicle.get_ids_by_edge(edge_id)))), + env.k.vehicle.get_speed(env.k.vehicle.get_ids()))), "avg_outflow": lambda env: np.nan_to_num( env.k.vehicle.get_outflow_rate(120)), # we multiply by 5 to account for the vehicle length and by 1000 to convert diff --git a/examples/exp_configs/rl/multiagent/multiagent_i210.py b/examples/exp_configs/rl/multiagent/multiagent_i210.py index 94f709ff4..1779adf69 100644 --- a/examples/exp_configs/rl/multiagent/multiagent_i210.py +++ b/examples/exp_configs/rl/multiagent/multiagent_i210.py @@ -5,11 +5,11 @@ """ import os -from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy from ray.tune.registry import register_env +from flow.controllers import RLController +from flow.controllers.car_following_models import IDMController import flow.config as config -from flow.controllers.rlcontroller import RLController from flow.core.params import EnvParams from flow.core.params import NetParams from flow.core.params import InitialConfig @@ -23,14 +23,8 @@ # SET UP PARAMETERS FOR THE SIMULATION -# number of training iterations -N_TRAINING_ITERATIONS = 200 -# number of rollouts per training iteration -N_ROLLOUTS = 2 # number of steps per rollout -HORIZON = 500 -# number of parallel workers -N_CPUS = 1 +HORIZON = 4000 # percentage of autonomous vehicles compared to human vehicles on highway PENETRATION_RATE = 10 @@ -38,10 +32,12 @@ # SET UP PARAMETERS FOR THE ENVIRONMENT additional_env_params = ADDITIONAL_ENV_PARAMS.copy() additional_env_params.update({ - 'max_accel': 1, - 'max_decel': 1, + 'max_accel': 2.6, + 'max_decel': 4.5, # configure the observation space. Look at the I210MultiEnv class for more info. 'lead_obs': True, + # whether to add in a reward for the speed of nearby vehicles + "local_reward": True }) # CREATE VEHICLE TYPES AND INFLOWS @@ -50,9 +46,8 @@ vehicles.add( "human", num_vehicles=0, - lane_change_params=SumoLaneChangeParams( - lane_change_mode="strategic", - ) + lane_change_params=SumoLaneChangeParams(lane_change_mode="strategic"), + acceleration_controller=(IDMController, {"a": .3, "b": 2.0, "noise": 0.6}), ) vehicles.add( "av", @@ -68,11 +63,11 @@ inflow.add( veh_type="human", edge="119257914", - vehs_per_hour=8378 * pen_rate, + vehs_per_hour=int(10800 * (1 - pen_rate)), # probability=1.0, departLane="random", departSpeed=20) -# on ramp +# # on ramp # inflow.add( # veh_type="human", # edge="27414345", @@ -91,7 +86,7 @@ inflow.add( veh_type="av", edge="119257914", - vehs_per_hour=int(8378 * pen_rate), + vehs_per_hour=int(10800 * pen_rate), # probability=1.0, departLane="random", departSpeed=20) @@ -128,16 +123,18 @@ # simulation-related parameters sim=SumoParams( - sim_step=0.8, + sim_step=0.5, render=False, - color_by_speed=True, - restart_instance=True + color_by_speed=False, + restart_instance=True, + use_ballistic=True ), # environment related parameters (see flow.core.params.EnvParams) env=EnvParams( horizon=HORIZON, sims_per_step=1, + warmup_steps=0, additional_params=additional_env_params, ), @@ -171,7 +168,7 @@ obs_space = test_env.observation_space act_space = test_env.action_space -POLICY_GRAPHS = {'av': (PPOTFPolicy, obs_space, act_space, {})} +POLICY_GRAPHS = {'av': (None, obs_space, act_space, {})} POLICIES_TO_TRAIN = ['av'] diff --git a/examples/train.py b/examples/train.py index a1288e2f0..aaa9caddb 100644 --- a/examples/train.py +++ b/examples/train.py @@ -7,18 +7,21 @@ python train.py EXP_CONFIG """ import argparse +from datetime import datetime import json import os import sys from time import strftime from copy import deepcopy +import numpy as np +import pytz + from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv from stable_baselines import PPO2 import ray from ray import tune -from ray.tune import run_experiments from ray.tune.registry import register_env try: from ray.rllib.agents.agent import get_agent_class @@ -26,6 +29,7 @@ from ray.rllib.agents.registry import get_agent_class from flow.core.util import ensure_dir +from flow.core.rewards import energy_consumption from flow.utils.registry import env_constructor from flow.utils.rllib import FlowParamsEncoder, get_flow_params from flow.utils.registry import make_create_env @@ -54,16 +58,35 @@ def parse_args(args): parser.add_argument( '--rl_trainer', type=str, default="rllib", help='the RL trainer to use. either rllib or Stable-Baselines') - + parser.add_argument( + '--algorithm', type=str, default="PPO", + help='RL algorithm to use. Options are PPO, TD3, MATD3 (MADDPG w/ TD3) right now.' + ) + parser.add_argument('--exp_title', type=str, default='test', + help='Informative experiment title to help distinguish results') parser.add_argument( '--num_cpus', type=int, default=1, help='How many CPUs to use') parser.add_argument( '--num_steps', type=int, default=5000, - help='How many total steps to perform learning over') + help='How many total steps to perform learning over. Relevant for stable-baselines') + parser.add_argument( + '--grid_search', action='store_true', default=False, + help='Whether to grid search over hyperparams') + parser.add_argument( + '--num_iterations', type=int, default=200, + help='How many iterations are in a training run.') + parser.add_argument( + '--num_rollouts', type=int, default=1, + help='How many rollouts are in a training batch') parser.add_argument( '--rollout_size', type=int, default=1000, help='How many steps are in a training batch.') + parser.add_argument('--use_s3', action='store_true', help='If true, upload results to s3') + parser.add_argument('--local_mode', action='store_true', default=False, + help='If true only 1 CPU will be used') + parser.add_argument('--render', action='store_true', default=False, + help='If true, we render the display') parser.add_argument( '--checkpoint_path', type=str, default=None, help='Directory with checkpoint to restore training from.') @@ -110,9 +133,11 @@ def run_model_stablebaseline(flow_params, def setup_exps_rllib(flow_params, n_cpus, n_rollouts, + flags, policy_graphs=None, policy_mapping_fn=None, - policies_to_train=None): + policies_to_train=None, + ): """Return the relevant components of an RLlib experiment. Parameters @@ -123,13 +148,14 @@ def setup_exps_rllib(flow_params, number of CPUs to run the experiment over n_rollouts : int number of rollouts per training iteration + flags: + custom arguments policy_graphs : dict, optional TODO policy_mapping_fn : function, optional TODO policies_to_train : list of str, optional TODO - Returns ------- str @@ -141,20 +167,59 @@ def setup_exps_rllib(flow_params, """ horizon = flow_params['env'].horizon - alg_run = "PPO" - - agent_cls = get_agent_class(alg_run) - config = deepcopy(agent_cls._default_config) - - config["num_workers"] = n_cpus - config["train_batch_size"] = horizon * n_rollouts - config["gamma"] = 0.999 # discount rate - config["model"].update({"fcnet_hiddens": [32, 32, 32]}) - config["use_gae"] = True - config["lambda"] = 0.97 - config["kl_target"] = 0.02 - config["num_sgd_iter"] = 10 - config["horizon"] = horizon + alg_run = flags.algorithm.upper() + + if alg_run == "PPO": + agent_cls = get_agent_class(alg_run) + config = deepcopy(agent_cls._default_config) + + config["num_workers"] = n_cpus + config["horizon"] = horizon + config["model"].update({"fcnet_hiddens": [32, 32, 32]}) + config["train_batch_size"] = horizon * n_rollouts + config["gamma"] = 0.999 # discount rate + config["use_gae"] = True + config["lambda"] = 0.97 + config["kl_target"] = 0.02 + config["num_sgd_iter"] = 10 + elif alg_run == "TD3": + agent_cls = get_agent_class(alg_run) + config = deepcopy(agent_cls._default_config) + + config["num_workers"] = n_cpus + config["horizon"] = horizon + config["buffer_size"] = 20000 # reduced to test if this is the source of memory problems + if flags.grid_search: + config["prioritized_replay"] = tune.grid_search(['True', 'False']) + config["actor_lr"] = tune.grid_search([1e-3, 1e-4]) + config["critic_lr"] = tune.grid_search([1e-3, 1e-4]) + config["n_step"] = tune.grid_search([1, 10]) + else: + sys.exit("We only support PPO and TD3 right now.") + + # define some standard and useful callbacks + def on_episode_start(info): + episode = info["episode"] + episode.user_data["avg_speed"] = [] + episode.user_data["avg_energy"] = [] + + def on_episode_step(info): + episode = info["episode"] + env = info["env"].get_unwrapped()[0] + speed = np.mean([speed for speed in env.k.vehicle.get_speed(env.k.vehicle.get_ids()) if speed >= 0]) + if not np.isnan(speed): + episode.user_data["avg_speed"].append(speed) + episode.user_data["avg_energy"].append(energy_consumption(env)) + + def on_episode_end(info): + episode = info["episode"] + avg_speed = np.mean(episode.user_data["avg_speed"]) + episode.custom_metrics["avg_speed"] = avg_speed + episode.custom_metrics["avg_energy_per_veh"] = np.mean(episode.user_data["avg_energy"]) + + config["callbacks"] = {"on_episode_start": tune.function(on_episode_start), + "on_episode_step": tune.function(on_episode_step), + "on_episode_end": tune.function(on_episode_end)} # save the flow params for replay flow_json = json.dumps( @@ -167,8 +232,7 @@ def setup_exps_rllib(flow_params, print("policy_graphs", policy_graphs) config['multiagent'].update({'policies': policy_graphs}) if policy_mapping_fn is not None: - config['multiagent'].update( - {'policy_mapping_fn': tune.function(policy_mapping_fn)}) + config['multiagent'].update({'policy_mapping_fn': tune.function(policy_mapping_fn)}) if policies_to_train is not None: config['multiagent'].update({'policies_to_train': policies_to_train}) @@ -182,34 +246,40 @@ def setup_exps_rllib(flow_params, def train_rllib(submodule, flags): """Train policies using the PPO algorithm in RLlib.""" flow_params = submodule.flow_params - n_cpus = submodule.N_CPUS - n_rollouts = submodule.N_ROLLOUTS + flow_params['sim'].render = flags.render policy_graphs = getattr(submodule, "POLICY_GRAPHS", None) policy_mapping_fn = getattr(submodule, "policy_mapping_fn", None) policies_to_train = getattr(submodule, "policies_to_train", None) alg_run, gym_name, config = setup_exps_rllib( - flow_params, n_cpus, n_rollouts, + flow_params, flags.num_cpus, flags.num_rollouts, flags, policy_graphs, policy_mapping_fn, policies_to_train) - ray.init(num_cpus=n_cpus + 1, object_store_memory=200 * 1024 * 1024) - exp_config = { - "run": alg_run, - "env": gym_name, - "config": { - **config - }, + config['num_workers'] = flags.num_cpus + config['env'] = gym_name + + if flags.local_mode: + ray.init(local_mode=True) + else: + ray.init() + exp_dict = { + "run_or_experiment": alg_run, + "name": gym_name, + "config": config, "checkpoint_freq": 20, "checkpoint_at_end": True, - "max_failures": 999, + "max_failures": 0, "stop": { - "training_iteration": flags.num_steps, + "training_iteration": flags.num_iterations, }, } - - if flags.checkpoint_path is not None: - exp_config['restore'] = flags.checkpoint_path - run_experiments({flow_params["exp_tag"]: exp_config}) + date = datetime.now(tz=pytz.utc) + date = date.astimezone(pytz.timezone('US/Pacific')).strftime("%m-%d-%Y") + s3_string = "s3://i210.experiments/i210/" \ + + date + '/' + flags.exp_title + if flags.use_s3: + exp_dict['upload_dir'] = s3_string + tune.run(**exp_dict, queue_trials=False, raise_on_failed_trial=False) def train_h_baselines(flow_params, args, multiagent): diff --git a/flow/envs/multiagent/i210.py b/flow/envs/multiagent/i210.py index 409aeb14f..4082eb415 100644 --- a/flow/envs/multiagent/i210.py +++ b/flow/envs/multiagent/i210.py @@ -16,6 +16,8 @@ "max_decel": 1, # whether we use an obs space that contains adjacent lane info or just the lead obs "lead_obs": True, + # whether the reward should come from local vehicles instead of global rewards + "local_reward": True } @@ -137,35 +139,47 @@ def compute_reward(self, rl_actions, **kwargs): return {} rewards = {} - for rl_id in self.k.vehicle.get_rl_ids(): - if self.env_params.evaluate: - # reward is speed of vehicle if we are in evaluation mode - reward = self.k.vehicle.get_speed(rl_id) - elif kwargs['fail']: - # reward is 0 if a collision occurred - reward = 0 - else: - # reward high system-level velocities - cost1 = average_velocity(self, fail=kwargs['fail']) - - # penalize small time headways - cost2 = 0 - t_min = 1 # smallest acceptable time headway - - lead_id = self.k.vehicle.get_leader(rl_id) - if lead_id not in ["", None] \ - and self.k.vehicle.get_speed(rl_id) > 0: - t_headway = max( - self.k.vehicle.get_headway(rl_id) / - self.k.vehicle.get_speed(rl_id), 0) - cost2 += min((t_headway - t_min) / t_min, 0) - - # weights for cost1, cost2, and cost3, respectively - eta1, eta2 = 1.00, 0.10 - - reward = max(eta1 * cost1 + eta2 * cost2, 0) - - rewards[rl_id] = reward + if self.env_params.additional_params["local_reward"]: + for rl_id in self.k.vehicle.get_rl_ids(): + rewards[rl_id] = 0 + speeds = [] + follow_speed = self.k.vehicle.get_speed(self.k.vehicle.get_follower(rl_id)) + speeds.extend([speed for speed in follow_speed if speed >= 0]) + if self.k.vehicle.get_speed(rl_id) >= 0: + speeds.append(self.k.vehicle.get_speed(rl_id)) + if len(speeds) > 0: + # rescale so the q function can estimate it quickly + rewards[rl_id] = np.mean(speeds) / 500.0 + else: + for rl_id in self.k.vehicle.get_rl_ids(): + if self.env_params.evaluate: + # reward is speed of vehicle if we are in evaluation mode + reward = self.k.vehicle.get_speed(rl_id) + elif kwargs['fail']: + # reward is 0 if a collision occurred + reward = 0 + else: + # reward high system-level velocities + cost1 = average_velocity(self, fail=kwargs['fail']) + + # penalize small time headways + cost2 = 0 + t_min = 1 # smallest acceptable time headway + + lead_id = self.k.vehicle.get_leader(rl_id) + if lead_id not in ["", None] \ + and self.k.vehicle.get_speed(rl_id) > 0: + t_headway = max( + self.k.vehicle.get_headway(rl_id) / + self.k.vehicle.get_speed(rl_id), 0) + cost2 += min((t_headway - t_min) / t_min, 0) + + # weights for cost1, cost2, and cost3, respectively + eta1, eta2 = 1.00, 0.10 + + reward = max(eta1 * cost1 + eta2 * cost2, 0) + + rewards[rl_id] = reward return rewards def additional_command(self): diff --git a/requirements.txt b/requirements.txt index 546cb4e26..4569dfca5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ pyglet==1.3.2 matplotlib==3.1.0 imutils==0.5.1 numpydoc -ray==0.7.3 +ray==0.8.0 opencv-python dill lz4 @@ -25,3 +25,5 @@ boto3==1.4.8 redis~=2.10.6 pandas==0.24.2 plotly==2.4.0 +tabulate +pytz \ No newline at end of file