Skip to content

Commit

Permalink
Disable writing data during learning process (#205)
Browse files Browse the repository at this point in the history
* no more outputs written when learning
* separate setting for learning output
* adjusted learning dashboard
  • Loading branch information
nick-harder authored Sep 25, 2023
1 parent 5d9234d commit caabadb
Show file tree
Hide file tree
Showing 8 changed files with 813 additions and 392 deletions.
63 changes: 28 additions & 35 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from mango import Role
from pandas.api.types import is_numeric_dtype
from sqlalchemy import inspect, text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.exc import OperationalError, ProgrammingError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -339,52 +339,45 @@ async def on_stop(self):

# insert left records into db
await self.store_dfs()

if self.db is None:
return

queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_profit' as variable, simulation as ident, sum(profit) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
]
dfs = []

learning_queries = self.learning_queries()
if learning_queries:
queries.extend(learning_queries)

try:
for query in queries:
dfs = []
for query in queries:
try:
df = pd.read_sql(query, self.db)
dfs.append(df)
df = pd.concat(dfs)
df.reset_index()
df["simulation"] = self.simulation_id
if self.export_csv_path:
kpi_data_path = self.p.joinpath("kpis.csv")
df.to_csv(
kpi_data_path,
mode="a",
header=not kpi_data_path.exists(),
index=None,
)
if self.db is not None and not df.empty:
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)
except ProgrammingError as e:
self.db.rollback()
logger.error(f"No scenario run Yet {e}")
except OperationalError:
continue

def learning_queries(self):
if not self.learning_mode:
return []
dfs.append(df)

queries = [
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_profit' as variable, simulation as ident, sum(profit) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
]
return queries
df = pd.concat(dfs)
df.reset_index()
df["simulation"] = self.simulation_id

if self.export_csv_path:
kpi_data_path = self.p.joinpath("kpis.csv")
df.to_csv(
kpi_data_path,
mode="a",
header=not kpi_data_path.exists(),
index=None,
)

if self.db is not None and not df.empty:
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)

def get_sum_reward(self):
query = text(
Expand Down
39 changes: 20 additions & 19 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
Expand All @@ -255,10 +259,6 @@ def write_actual_dispatch(self):
},
)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

async def submit_bids(self, opening: OpeningMessage):
"""
formulates an orderbook and sends it to the market.
Expand Down Expand Up @@ -394,8 +394,9 @@ def write_learning_to_output(self, start: datetime, marketconfig: MarketConfig):
output_dict[f"actions_{i}"] = action_tuple[i]

output_agent_list.append(output_dict)
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

db_aid = self.context.data_dict.get("learning_output_agent_id")
db_addr = self.context.data_dict.get("learning_output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
Expand All @@ -416,17 +417,13 @@ def write_to_learning(
device: str,
learning_unit_count: int,
):
learning_role_id = "learning_agent"
learning_role_addr = self.context.addr

all_observations = []
all_rewards = []
try:
import torch as th

except ImportError:
logger.error("tried writing learning_params, but torch is not installed")
all_actions = np.zeros((learning_unit_count, act_dim))
return

all_observations = th.zeros((learning_unit_count, obs_dim), device=device)
Expand All @@ -450,15 +447,19 @@ def write_to_learning(
all_rewards = np.array(all_rewards)
rl_agent_data = (np.array(all_observations), all_actions, all_rewards)

self.context.schedule_instant_acl_message(
receiver_id=learning_role_id,
receiver_addr=learning_role_addr,
content={
"context": "rl_training",
"type": "replay_buffer",
"data": rl_agent_data,
},
)
learning_role_id = self.context.data_dict.get("learning_agent_id")
learning_role_addr = self.context.data_dict.get("learning_agent_addr")

if learning_role_id and learning_role_addr:
self.context.schedule_instant_acl_message(
receiver_id=learning_role_id,
receiver_addr=learning_role_addr,
content={
"context": "rl_training",
"type": "replay_buffer",
"data": rl_agent_data,
},
)

def write_learning_params(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
Expand Down
28 changes: 16 additions & 12 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,17 @@ async def store_order_book(self, orderbook: Orderbook):
:param orderbook: The order book to be stored
:type orderbook: Orderbook
"""
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand All @@ -429,15 +431,17 @@ async def store_market_results(self, market_meta):
:param market_meta: The metadata of the market
:type market_meta: any
"""
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand Down
31 changes: 23 additions & 8 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mango.util.termination_detection import tasks_complete_or_sleeping
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
from tqdm import tqdm

from assume.common import (
Expand Down Expand Up @@ -158,7 +157,10 @@ async def setup_learning(self):
# if self.same_process:
# separate process does not support buffer and learning
if True:
rl_agent = RoleAgent(self.container, suggested_aid="learning_agent")
self.learning_agent_addr = (self.addr, "learning_agent")
rl_agent = RoleAgent(
self.container, suggested_aid=self.learning_agent_addr[1]
)
rl_agent.add_role(self.learning_role)
else:

Expand Down Expand Up @@ -220,10 +222,19 @@ def add_unit_operator(
self.unit_operators[id] = units_operator

# after creation of an agent - we set additional context params
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
}
unit_operator_agent._role_context.data_dict = {}
if self.learning_mode:
unit_operator_agent._role_context.data_dict = {
"learning_output_agent_addr": self.output_agent_addr[0],
"learning_output_agent_id": self.output_agent_addr[1],
"learning_agent_addr": self.learning_agent_addr[0],
"learning_agent_id": self.learning_agent_addr[1],
}
else:
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
}

async def async_add_unit(
self,
Expand Down Expand Up @@ -313,8 +324,12 @@ def add_market_operator(

# after creation of an agent - we set additional context params
market_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"output_agent_addr": None
if self.learning_mode
else self.output_agent_addr[0],
"output_agent_id": None
if self.learning_mode
else self.output_agent_addr[1],
}
self.market_operators[id] = market_operator_agent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,7 +2329,7 @@
},
"timepicker": {},
"timezone": "",
"title": "ASSUME Comparison",
"title": "ASSUME: Compare scenarios",
"uid": "vP8U8-q4k",
"version": 2,
"weekStart": ""
Expand Down
Loading

0 comments on commit caabadb

Please sign in to comment.