Skip to content

Commit

Permalink
adding eval episodes (#209)
Browse files Browse the repository at this point in the history
-restructing load scenario
-adjusting outputs

---------

Co-authored-by: Florian Maurer <[email protected]>
  • Loading branch information
nick-harder and maurerle authored Sep 27, 2023
1 parent ac75b59 commit db2fa3f
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 69 deletions.
File renamed without changes.
File renamed without changes.
22 changes: 16 additions & 6 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from mango import Role
from pandas.api.types import is_numeric_dtype
from sqlalchemy import inspect, text
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.exc import DataError, OperationalError, ProgrammingError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,6 +43,7 @@ def __init__(
export_csv_path: str = "",
save_frequency_hours: int = None,
learning_mode: bool = False,
evaluation_mode: bool = False,
):
super().__init__()

Expand All @@ -56,16 +57,23 @@ def __init__(
self.p = Path(self.export_csv_path, simulation_id)
shutil.rmtree(self.p, ignore_errors=True)
self.p.mkdir(parents=True)

self.db = db_engine

self.learning_mode = learning_mode
self.evaluation_mode = evaluation_mode

# learning
# get episode number if in learning or evaluation mode
self.episode = None
if self.learning_mode:
if self.learning_mode or self.evaluation_mode:
episode = self.simulation_id.split("_")[-1]
if episode.isdigit():
self.episode = int(episode)

# check if episode=0 and delete all similar runs
if self.episode == 0:
self.del_similar_runs()

# contruct all timeframe under which hourly values are written to excel and db
self.start = start
self.end = end
Expand Down Expand Up @@ -99,7 +107,7 @@ def delete_db_scenario(self, simulation_id):
logger.debug("deleted %s rows from %s", rowcount, table_name)

def del_similar_runs(self):
query = text("select distinct simulation from market_meta")
query = text("select distinct simulation from rl_params")

try:
with self.db.begin() as db:
Expand Down Expand Up @@ -173,10 +181,12 @@ def write_rl_params(self, rl_params):
df = pd.DataFrame.from_records(rl_params, index="datetime")
if df.empty:
return

df["simulation"] = self.simulation_id
df["learning_mode"] = self.learning_mode
# get characters after last "_" of simulation id string
df["evaluation_mode"] = self.evaluation_mode
df["episode"] = self.episode

self.write_dfs["rl_params"].append(df)

def write_market_results(self, market_meta):
Expand Down Expand Up @@ -357,7 +367,7 @@ async def on_stop(self):
for query in queries:
try:
df = pd.read_sql(query, self.db)
except (OperationalError, ProgrammingError):
except (OperationalError, DataError):
continue
except Exception as e:
logger.error("could not read query: %s", e)
Expand Down
58 changes: 38 additions & 20 deletions assume/common/scenario_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ async def load_scenario_folder_async(
inputs_path: str,
scenario: str,
study_case: str,
disable_learning: bool = False,
perform_learning: bool = True,
perform_evaluation: bool = False,
episode: int = 0,
eval_episode: int = 0,
load_learned_path: str = "",
):
"""Load a scenario from a given path. Raises: ValueError: If the scenario or study case is not found.
Expand All @@ -248,11 +250,10 @@ async def load_scenario_folder_async(

# load the config file
path = f"{inputs_path}/{scenario}"
with open(f"{path}/config.yaml", "r") as f:
config = yaml.safe_load(f)
if not study_case:
study_case = list(config.keys())[0]
config = config[study_case]
config = yaml.safe_load(open(f"{path}/config.yaml", "r"))
if not study_case:
study_case = list(config.keys())[0]
config = config[study_case]
logger.info(f"Starting Scenario {scenario}/{study_case} from {inputs_path}")

world.reset()
Expand Down Expand Up @@ -298,8 +299,10 @@ async def load_scenario_folder_async(
learning_config: LearningConfig = config.get("learning_config", {})
bidding_strategy_params = config.get("bidding_strategy_params", {})

if disable_learning:
learning_config["learning_mode"] = False
learning_config["learning_mode"] = (
config.get("learning_mode", False) and perform_learning
)
learning_config["evaluation_mode"] = perform_evaluation

if "load_learned_path" not in learning_config.keys():
if load_learned_path:
Expand All @@ -312,6 +315,9 @@ async def load_scenario_folder_async(
if learning_config.get("learning_mode", False):
sim_id = f"{sim_id}_{episode}"

if learning_config.get("evaluation_mode", False):
sim_id = f"{sim_id}_eval_{eval_episode}"

# add forecast provider
logger.info("Adding forecast")
forecaster = CsvForecaster(
Expand Down Expand Up @@ -515,8 +521,10 @@ def load_scenario_folder(
inputs_path: str,
scenario: str,
study_case: str,
disable_learning: bool = False,
perform_learning: bool = True,
perform_evaluation: bool = False,
episode: int = 0,
eval_episode: int = 0,
load_learned_path="",
):
"""
Expand All @@ -537,8 +545,10 @@ def load_scenario_folder(
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
disable_learning=disable_learning,
perform_learning=perform_learning,
perform_evaluation=perform_evaluation,
episode=episode,
eval_episode=eval_episode,
load_learned_path=load_learned_path,
)
)
Expand All @@ -564,28 +574,30 @@ def run_learning(world: World, inputs_path: str, scenario: str, study_case: str)
actors_and_critics = None
world.output_role.del_similar_runs()

eval_episode = 1
for episode in tqdm(
range(world.learning_role.training_episodes),
range(1, world.learning_role.training_episodes + 1),
desc="Training Episodes",
):
# TODO normally, loading twice should not create issues, somehow a scheduling issue is raised currently
if episode:
if episode != 1:
load_scenario_folder(
world,
inputs_path,
scenario,
study_case,
perform_learning=True,
episode=episode,
disable_learning=False,
)

# give the newly created rl_agent the buffer that we stored from the beginning
world.learning_role.create_actors_and_critics(
actors_and_critics=actors_and_critics
)
world.learning_role.buffer = buffer
world.learning_role.episodes_done = episode

if episode + 1 > world.learning_role.episodes_collecting_initial_experience:
if episode > world.learning_role.episodes_collecting_initial_experience:
world.learning_role.turn_off_initial_exploration()

world.run()
Expand All @@ -594,9 +606,10 @@ def run_learning(world: World, inputs_path: str, scenario: str, study_case: str)
world.learning_role.training_episodes,
world.learning_config.get("validation_episodes_interval", 5),
)
if (episode + 1) % validation_interval == 0 and (
episode + 1
) > world.learning_role.episodes_collecting_initial_experience:
if (
episode % validation_interval == 0
and episode > world.learning_role.episodes_collecting_initial_experience
):
old_path = world.learning_config["load_learned_path"]
new_path = f"{old_path}_eval"
# save validation params in validation path
Expand All @@ -609,7 +622,9 @@ def run_learning(world: World, inputs_path: str, scenario: str, study_case: str)
inputs_path,
scenario,
study_case,
disable_learning=True,
perform_learning=False,
perform_evaluation=True,
eval_episode=eval_episode,
load_learned_path=new_path,
)
world.run()
Expand All @@ -620,13 +635,16 @@ def run_learning(world: World, inputs_path: str, scenario: str, study_case: str)
# save new best params for simulation
best_reward = avg_reward
world.learning_role.save_params(directory=old_path)

eval_episode += 1

world.reset()

# in load_scenario_folder_async, we initiate new container and kill old if present
# as long as we do not skip setup container should be handled correctly
# if enough initial experience was collected according to specifications in learning config
# turn off initial exploration and go into full learning mode
if episode + 1 >= world.learning_role.episodes_collecting_initial_experience:
if episode >= world.learning_role.episodes_collecting_initial_experience:
world.learning_role.turn_off_initial_exploration()

# container shutdown implicitly with new initialisation
Expand All @@ -640,5 +658,5 @@ def run_learning(world: World, inputs_path: str, scenario: str, study_case: str)
inputs_path,
scenario,
study_case,
disable_learning=True,
perform_learning=False,
)
2 changes: 1 addition & 1 deletion assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def write_learning_to_output(self, start: datetime, marketconfig: MarketConfig):
db_aid = self.context.data_dict.get("learning_output_agent_id")
db_addr = self.context.data_dict.get("learning_output_agent_addr")

if db_aid and db_addr:
if db_aid and db_addr and output_agent_list:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand Down
20 changes: 12 additions & 8 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ async def setup(

async def setup_learning(self):
self.bidding_params.update(self.learning_config)

# initiate learning if the learning mode is on and hence we want to learn new strategies
self.learning_mode = self.learning_config.get("learning_mode", False)
self.evaluation_mode = self.learning_config.get("evaluation_mode", False)

if self.learning_mode:
# if so, we initate the rl learning role with parameters
from assume.reinforcement_learning.learning_role import Learning
Expand Down Expand Up @@ -182,6 +185,7 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int
export_csv_path=self.export_csv_path,
save_frequency_hours=save_frequency_hours,
learning_mode=self.learning_mode,
evaluation_mode=self.evaluation_mode,
)
if self.same_process:
output_agent = RoleAgent(
Expand Down Expand Up @@ -325,14 +329,14 @@ def add_market_operator(
market_operator_agent.markets = []

# after creation of an agent - we set additional context params
market_operator_agent._role_context.data_dict = {
"output_agent_addr": None
if self.learning_mode
else self.output_agent_addr[0],
"output_agent_id": None
if self.learning_mode
else self.output_agent_addr[1],
}
market_operator_agent._role_context.data_dict = {}
if not self.learning_mode and not self.evaluation_mode:
market_operator_agent._role_context.data_dict.update(
{
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
}
)
self.market_operators[id] = market_operator_agent

def add_market(
Expand Down
Loading

0 comments on commit db2fa3f

Please sign in to comment.