Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable writing data during learning process #205

Merged
merged 6 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 28 additions & 35 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from mango import Role
from pandas.api.types import is_numeric_dtype
from sqlalchemy import inspect, text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.exc import OperationalError, ProgrammingError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -339,52 +339,45 @@

# insert left records into db
await self.store_dfs()

if self.db is None:
return

queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_profit' as variable, simulation as ident, sum(profit) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
]
dfs = []

learning_queries = self.learning_queries()
if learning_queries:
queries.extend(learning_queries)

try:
for query in queries:
dfs = []
for query in queries:
try:
df = pd.read_sql(query, self.db)
dfs.append(df)
df = pd.concat(dfs)
df.reset_index()
df["simulation"] = self.simulation_id
if self.export_csv_path:
kpi_data_path = self.p.joinpath("kpis.csv")
df.to_csv(
kpi_data_path,
mode="a",
header=not kpi_data_path.exists(),
index=None,
)
if self.db is not None and not df.empty:
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)
except ProgrammingError as e:
self.db.rollback()
logger.error(f"No scenario run Yet {e}")
except OperationalError:
continue

def learning_queries(self):
if not self.learning_mode:
return []
dfs.append(df)

queries = [
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_profit' as variable, simulation as ident, sum(profit) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
]
return queries
df = pd.concat(dfs)
df.reset_index()
df["simulation"] = self.simulation_id

if self.export_csv_path:
kpi_data_path = self.p.joinpath("kpis.csv")
df.to_csv(

Check warning on line 371 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L370-L371

Added lines #L370 - L371 were not covered by tests
kpi_data_path,
mode="a",
header=not kpi_data_path.exists(),
index=None,
)

if self.db is not None and not df.empty:
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)

def get_sum_reward(self):
query = text(
Expand Down
39 changes: 20 additions & 19 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
Expand All @@ -255,10 +259,6 @@ def write_actual_dispatch(self):
},
)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

async def submit_bids(self, opening: OpeningMessage):
"""
formulates an orderbook and sends it to the market.
Expand Down Expand Up @@ -394,8 +394,9 @@ def write_learning_to_output(self, start: datetime, marketconfig: MarketConfig):
output_dict[f"actions_{i}"] = action_tuple[i]

output_agent_list.append(output_dict)
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

db_aid = self.context.data_dict.get("learning_output_agent_id")
db_addr = self.context.data_dict.get("learning_output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
Expand All @@ -416,17 +417,13 @@ def write_to_learning(
device: str,
learning_unit_count: int,
):
learning_role_id = "learning_agent"
learning_role_addr = self.context.addr

all_observations = []
all_rewards = []
try:
import torch as th

except ImportError:
logger.error("tried writing learning_params, but torch is not installed")
all_actions = np.zeros((learning_unit_count, act_dim))
return

all_observations = th.zeros((learning_unit_count, obs_dim), device=device)
Expand All @@ -450,15 +447,19 @@ def write_to_learning(
all_rewards = np.array(all_rewards)
rl_agent_data = (np.array(all_observations), all_actions, all_rewards)

self.context.schedule_instant_acl_message(
receiver_id=learning_role_id,
receiver_addr=learning_role_addr,
content={
"context": "rl_training",
"type": "replay_buffer",
"data": rl_agent_data,
},
)
learning_role_id = self.context.data_dict.get("learning_agent_id")
learning_role_addr = self.context.data_dict.get("learning_agent_addr")

nick-harder marked this conversation as resolved.
Show resolved Hide resolved
if learning_role_id and learning_role_addr:
self.context.schedule_instant_acl_message(
receiver_id=learning_role_id,
receiver_addr=learning_role_addr,
content={
"context": "rl_training",
"type": "replay_buffer",
"data": rl_agent_data,
},
)

def write_learning_params(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
Expand Down
28 changes: 16 additions & 12 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,17 @@ async def store_order_book(self, orderbook: Orderbook):
:param orderbook: The order book to be stored
:type orderbook: Orderbook
"""
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand All @@ -429,15 +431,17 @@ async def store_market_results(self, market_meta):
:param market_meta: The metadata of the market
:type market_meta: any
"""
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand Down
31 changes: 23 additions & 8 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mango.util.termination_detection import tasks_complete_or_sleeping
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
from tqdm import tqdm

from assume.common import (
Expand Down Expand Up @@ -158,7 +157,10 @@ async def setup_learning(self):
# if self.same_process:
# separate process does not support buffer and learning
if True:
rl_agent = RoleAgent(self.container, suggested_aid="learning_agent")
self.learning_agent_addr = (self.addr, "learning_agent")
rl_agent = RoleAgent(
self.container, suggested_aid=self.learning_agent_addr[1]
)
rl_agent.add_role(self.learning_role)
else:

Expand Down Expand Up @@ -220,10 +222,19 @@ def add_unit_operator(
self.unit_operators[id] = units_operator

# after creation of an agent - we set additional context params
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
}
unit_operator_agent._role_context.data_dict = {}
if self.learning_mode:
unit_operator_agent._role_context.data_dict = {
"learning_output_agent_addr": self.output_agent_addr[0],
"learning_output_agent_id": self.output_agent_addr[1],
"learning_agent_addr": self.learning_agent_addr[0],
"learning_agent_id": self.learning_agent_addr[1],
}
else:
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
}

async def async_add_unit(
self,
Expand Down Expand Up @@ -313,8 +324,12 @@ def add_market_operator(

# after creation of an agent - we set additional context params
market_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"output_agent_addr": None
if self.learning_mode
else self.output_agent_addr[0],
"output_agent_id": None
if self.learning_mode
else self.output_agent_addr[1],
}
self.market_operators[id] = market_operator_agent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,7 +2329,7 @@
},
"timepicker": {},
"timezone": "",
"title": "ASSUME Comparison",
"title": "ASSUME: Compare scenarios",
"uid": "vP8U8-q4k",
"version": 2,
"weekStart": ""
Expand Down
Loading