Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I210 dev fix #890

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ dependencies:
- matplotlib==3.0.0
- dill
- lz4
- ray==0.7.3
- ray==0.8.0
- setproctitle
- psutil
- opencv-python
- boto3==1.4.8
- redis~=2.10.6
- tabulate
- pytz
2 changes: 1 addition & 1 deletion examples/exp_configs/non_rl/i210_subnetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
edge_id = "119257908#1-AddedOnRampEdge"
custom_callables = {
"avg_merge_speed": lambda env: np.nan_to_num(np.mean(
env.k.vehicle.get_speed(env.k.vehicle.get_ids_by_edge(edge_id)))),
env.k.vehicle.get_speed(env.k.vehicle.get_ids()))),
"avg_outflow": lambda env: np.nan_to_num(
env.k.vehicle.get_outflow_rate(120)),
# we multiply by 5 to account for the vehicle length and by 1000 to convert
Expand Down
39 changes: 18 additions & 21 deletions examples/exp_configs/rl/multiagent/multiagent_i210.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"""
import os

from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy
from ray.tune.registry import register_env

from flow.controllers import RLController
from flow.controllers.car_following_models import IDMController
import flow.config as config
from flow.controllers.rlcontroller import RLController
from flow.core.params import EnvParams
from flow.core.params import NetParams
from flow.core.params import InitialConfig
Expand All @@ -23,25 +23,21 @@

# SET UP PARAMETERS FOR THE SIMULATION

# number of training iterations
N_TRAINING_ITERATIONS = 200
# number of rollouts per training iteration
N_ROLLOUTS = 2
# number of steps per rollout
HORIZON = 500
# number of parallel workers
N_CPUS = 1
HORIZON = 4000

# percentage of autonomous vehicles compared to human vehicles on highway
PENETRATION_RATE = 10

# SET UP PARAMETERS FOR THE ENVIRONMENT
additional_env_params = ADDITIONAL_ENV_PARAMS.copy()
additional_env_params.update({
'max_accel': 1,
'max_decel': 1,
'max_accel': 2.6,
'max_decel': 4.5,
# configure the observation space. Look at the I210MultiEnv class for more info.
'lead_obs': True,
# whether to add in a reward for the speed of nearby vehicles
"local_reward": True
})

# CREATE VEHICLE TYPES AND INFLOWS
Expand All @@ -50,9 +46,8 @@
vehicles.add(
"human",
num_vehicles=0,
lane_change_params=SumoLaneChangeParams(
lane_change_mode="strategic",
)
lane_change_params=SumoLaneChangeParams(lane_change_mode="strategic"),
acceleration_controller=(IDMController, {"a": .3, "b": 2.0, "noise": 0.6}),
)
vehicles.add(
"av",
Expand All @@ -68,11 +63,11 @@
inflow.add(
veh_type="human",
edge="119257914",
vehs_per_hour=8378 * pen_rate,
vehs_per_hour=int(10800 * (1 - pen_rate)),
# probability=1.0,
departLane="random",
departSpeed=20)
# on ramp
# # on ramp
# inflow.add(
# veh_type="human",
# edge="27414345",
Expand All @@ -91,7 +86,7 @@
inflow.add(
veh_type="av",
edge="119257914",
vehs_per_hour=int(8378 * pen_rate),
vehs_per_hour=int(10800 * pen_rate),
# probability=1.0,
departLane="random",
departSpeed=20)
Expand Down Expand Up @@ -128,16 +123,18 @@

# simulation-related parameters
sim=SumoParams(
sim_step=0.8,
sim_step=0.5,
render=False,
color_by_speed=True,
restart_instance=True
color_by_speed=False,
restart_instance=True,
use_ballistic=True
),

# environment related parameters (see flow.core.params.EnvParams)
env=EnvParams(
horizon=HORIZON,
sims_per_step=1,
warmup_steps=0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct? do we want warm up steps?

additional_params=additional_env_params,
),

Expand Down Expand Up @@ -171,7 +168,7 @@
obs_space = test_env.observation_space
act_space = test_env.action_space

POLICY_GRAPHS = {'av': (PPOTFPolicy, obs_space, act_space, {})}
POLICY_GRAPHS = {'av': (None, obs_space, act_space, {})}

POLICIES_TO_TRAIN = ['av']

Expand Down
144 changes: 107 additions & 37 deletions examples/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,29 @@
python train.py EXP_CONFIG
"""
import argparse
from datetime import datetime
import json
import os
import sys
from time import strftime
from copy import deepcopy

import numpy as np
import pytz

from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines import PPO2

import ray
from ray import tune
from ray.tune import run_experiments
from ray.tune.registry import register_env
try:
from ray.rllib.agents.agent import get_agent_class
except ImportError:
from ray.rllib.agents.registry import get_agent_class

from flow.core.util import ensure_dir
from flow.core.rewards import energy_consumption
from flow.utils.registry import env_constructor
from flow.utils.rllib import FlowParamsEncoder, get_flow_params
from flow.utils.registry import make_create_env
Expand Down Expand Up @@ -54,16 +58,35 @@ def parse_args(args):
parser.add_argument(
'--rl_trainer', type=str, default="rllib",
help='the RL trainer to use. either rllib or Stable-Baselines')

parser.add_argument(
'--algorithm', type=str, default="PPO",
help='RL algorithm to use. Options are PPO, TD3, MATD3 (MADDPG w/ TD3) right now.'
)
parser.add_argument('--exp_title', type=str, default='test',
help='Informative experiment title to help distinguish results')
parser.add_argument(
'--num_cpus', type=int, default=1,
help='How many CPUs to use')
parser.add_argument(
'--num_steps', type=int, default=5000,
help='How many total steps to perform learning over')
help='How many total steps to perform learning over. Relevant for stable-baselines')
parser.add_argument(
'--grid_search', action='store_true', default=False,
help='Whether to grid search over hyperparams')
parser.add_argument(
'--num_iterations', type=int, default=200,
help='How many iterations are in a training run.')
parser.add_argument(
'--num_rollouts', type=int, default=1,
help='How many rollouts are in a training batch')
parser.add_argument(
'--rollout_size', type=int, default=1000,
help='How many steps are in a training batch.')
parser.add_argument('--use_s3', action='store_true', help='If true, upload results to s3')
parser.add_argument('--local_mode', action='store_true', default=False,
help='If true only 1 CPU will be used')
parser.add_argument('--render', action='store_true', default=False,
help='If true, we render the display')
parser.add_argument(
'--checkpoint_path', type=str, default=None,
help='Directory with checkpoint to restore training from.')
Expand Down Expand Up @@ -110,9 +133,11 @@ def run_model_stablebaseline(flow_params,
def setup_exps_rllib(flow_params,
n_cpus,
n_rollouts,
flags,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break replay scripts? are there any other handles to setup_exps_rllib?

policy_graphs=None,
policy_mapping_fn=None,
policies_to_train=None):
policies_to_train=None,
):
"""Return the relevant components of an RLlib experiment.

Parameters
Expand All @@ -123,13 +148,14 @@ def setup_exps_rllib(flow_params,
number of CPUs to run the experiment over
n_rollouts : int
number of rollouts per training iteration
flags:
custom arguments
policy_graphs : dict, optional
TODO
policy_mapping_fn : function, optional
TODO
policies_to_train : list of str, optional
TODO

Returns
-------
str
Expand All @@ -141,20 +167,59 @@ def setup_exps_rllib(flow_params,
"""
horizon = flow_params['env'].horizon

alg_run = "PPO"

agent_cls = get_agent_class(alg_run)
config = deepcopy(agent_cls._default_config)

config["num_workers"] = n_cpus
config["train_batch_size"] = horizon * n_rollouts
config["gamma"] = 0.999 # discount rate
config["model"].update({"fcnet_hiddens": [32, 32, 32]})
config["use_gae"] = True
config["lambda"] = 0.97
config["kl_target"] = 0.02
config["num_sgd_iter"] = 10
config["horizon"] = horizon
alg_run = flags.algorithm.upper()

if alg_run == "PPO":
agent_cls = get_agent_class(alg_run)
config = deepcopy(agent_cls._default_config)

config["num_workers"] = n_cpus
config["horizon"] = horizon
config["model"].update({"fcnet_hiddens": [32, 32, 32]})
config["train_batch_size"] = horizon * n_rollouts
config["gamma"] = 0.999 # discount rate
config["use_gae"] = True
config["lambda"] = 0.97
config["kl_target"] = 0.02
config["num_sgd_iter"] = 10
elif alg_run == "TD3":
agent_cls = get_agent_class(alg_run)
config = deepcopy(agent_cls._default_config)

config["num_workers"] = n_cpus
config["horizon"] = horizon
config["buffer_size"] = 20000 # reduced to test if this is the source of memory problems
if flags.grid_search:
config["prioritized_replay"] = tune.grid_search(['True', 'False'])
config["actor_lr"] = tune.grid_search([1e-3, 1e-4])
config["critic_lr"] = tune.grid_search([1e-3, 1e-4])
config["n_step"] = tune.grid_search([1, 10])
else:
sys.exit("We only support PPO and TD3 right now.")

# define some standard and useful callbacks
def on_episode_start(info):
episode = info["episode"]
episode.user_data["avg_speed"] = []
episode.user_data["avg_energy"] = []

def on_episode_step(info):
episode = info["episode"]
env = info["env"].get_unwrapped()[0]
speed = np.mean([speed for speed in env.k.vehicle.get_speed(env.k.vehicle.get_ids()) if speed >= 0])
if not np.isnan(speed):
episode.user_data["avg_speed"].append(speed)
episode.user_data["avg_energy"].append(energy_consumption(env))

def on_episode_end(info):
episode = info["episode"]
avg_speed = np.mean(episode.user_data["avg_speed"])
episode.custom_metrics["avg_speed"] = avg_speed
episode.custom_metrics["avg_energy_per_veh"] = np.mean(episode.user_data["avg_energy"])

config["callbacks"] = {"on_episode_start": tune.function(on_episode_start),
"on_episode_step": tune.function(on_episode_step),
"on_episode_end": tune.function(on_episode_end)}

# save the flow params for replay
flow_json = json.dumps(
Expand All @@ -167,8 +232,7 @@ def setup_exps_rllib(flow_params,
print("policy_graphs", policy_graphs)
config['multiagent'].update({'policies': policy_graphs})
if policy_mapping_fn is not None:
config['multiagent'].update(
{'policy_mapping_fn': tune.function(policy_mapping_fn)})
config['multiagent'].update({'policy_mapping_fn': tune.function(policy_mapping_fn)})
if policies_to_train is not None:
config['multiagent'].update({'policies_to_train': policies_to_train})

Expand All @@ -182,34 +246,40 @@ def setup_exps_rllib(flow_params,
def train_rllib(submodule, flags):
"""Train policies using the PPO algorithm in RLlib."""
flow_params = submodule.flow_params
n_cpus = submodule.N_CPUS
n_rollouts = submodule.N_ROLLOUTS
flow_params['sim'].render = flags.render
policy_graphs = getattr(submodule, "POLICY_GRAPHS", None)
policy_mapping_fn = getattr(submodule, "policy_mapping_fn", None)
policies_to_train = getattr(submodule, "policies_to_train", None)

alg_run, gym_name, config = setup_exps_rllib(
flow_params, n_cpus, n_rollouts,
flow_params, flags.num_cpus, flags.num_rollouts, flags,
policy_graphs, policy_mapping_fn, policies_to_train)

ray.init(num_cpus=n_cpus + 1, object_store_memory=200 * 1024 * 1024)
exp_config = {
"run": alg_run,
"env": gym_name,
"config": {
**config
},
config['num_workers'] = flags.num_cpus
config['env'] = gym_name

if flags.local_mode:
ray.init(local_mode=True)
else:
ray.init()
exp_dict = {
"run_or_experiment": alg_run,
"name": gym_name,
"config": config,
"checkpoint_freq": 20,
"checkpoint_at_end": True,
"max_failures": 999,
"max_failures": 0,
"stop": {
"training_iteration": flags.num_steps,
"training_iteration": flags.num_iterations,
},
}

if flags.checkpoint_path is not None:
exp_config['restore'] = flags.checkpoint_path
run_experiments({flow_params["exp_tag"]: exp_config})
date = datetime.now(tz=pytz.utc)
date = date.astimezone(pytz.timezone('US/Pacific')).strftime("%m-%d-%Y")
s3_string = "s3://i210.experiments/i210/" \
+ date + '/' + flags.exp_title
if flags.use_s3:
exp_dict['upload_dir'] = s3_string
tune.run(**exp_dict, queue_trials=False, raise_on_failed_trial=False)


def train_h_baselines(flow_params, args, multiagent):
Expand Down
Loading