Skip to content

Commit

Permalink
-no more outputs written when learning
Browse files Browse the repository at this point in the history
-adjusted learning dashboard
  • Loading branch information
nick-harder committed Sep 25, 2023
1 parent 8ede6e7 commit 40cb44b
Show file tree
Hide file tree
Showing 6 changed files with 688 additions and 314 deletions.
26 changes: 12 additions & 14 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,20 @@ async def on_stop(self):
await self.store_dfs()
if self.db is None:
return
queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
]
dfs = []

learning_queries = self.learning_queries()
if learning_queries:
queries.extend(learning_queries)
queries = []
if self.learning_mode:
queries.extend(self.learning_queries())
else:
queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
]

try:
dfs = []
for query in queries:
df = pd.read_sql(query, self.db)
dfs.append(df)
Expand All @@ -372,13 +373,10 @@ async def on_stop(self):
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)
except ProgrammingError as e:
self.db.rollback()
# self.db.rollback() not working, no rollback function
logger.error(f"No scenario run Yet {e}")

def learning_queries(self):
if not self.learning_mode:
return []

queries = [
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
Expand Down
105 changes: 54 additions & 51 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,25 @@ async def add_unit(
"""
self.units[unit.id] = unit

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
# send unit data to db agent to store it
message = {
"context": "write_results",
"type": "store_units",
"data": self.units[unit.id].as_dict(),
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content=message,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
)
if not self.context.data_dict.get("learning_mode"):
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
# send unit data to db agent to store it
message = {
"context": "write_results",
"type": "store_units",
"data": self.units[unit.id].as_dict(),
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content=message,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
)

def participate(self, market: MarketConfig):
"""
Expand Down Expand Up @@ -231,33 +232,34 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "market_dispatch",
"data": market_dispatch,
},
)
if unit_dispatch_dfs:
unit_dispatch = pd.concat(unit_dispatch_dfs)
self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

if not self.context.data_dict.get("learning_mode"):
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "unit_dispatch",
"data": unit_dispatch,
"type": "market_dispatch",
"data": market_dispatch,
},
)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)
if unit_dispatch_dfs:
unit_dispatch = pd.concat(unit_dispatch_dfs)
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "unit_dispatch",
"data": unit_dispatch,
},
)

async def submit_bids(self, opening: OpeningMessage):
"""
Expand Down Expand Up @@ -394,18 +396,20 @@ def write_learning_to_output(self, start: datetime, marketconfig: MarketConfig):
output_dict[f"actions_{i}"] = action_tuple[i]

output_agent_list.append(output_dict)
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "rl_learning_params",
"data": output_agent_list,
},
)

if self.context.data_dict.get("learning_mode"):
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "rl_learning_params",
"data": output_agent_list,
},
)

def write_to_learning(
self,
Expand All @@ -426,7 +430,6 @@ def write_to_learning(

except ImportError:
logger.error("tried writing learning_params, but torch is not installed")
all_actions = np.zeros((learning_unit_count, act_dim))
return

all_observations = th.zeros((learning_unit_count, obs_dim), device=device)
Expand Down
8 changes: 8 additions & 0 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,16 @@ async def store_order_book(self, orderbook: Orderbook):
:param orderbook: The order book to be stored
:type orderbook: Orderbook
"""
if self.context.data_dict.get("learning_mode"):
return

message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
Expand All @@ -429,6 +433,10 @@ async def store_market_results(self, market_meta):
:param market_meta: The metadata of the market
:type market_meta: any
"""

if self.context.data_dict.get("learning_mode"):
return

message = {
"context": "write_results",
"type": "store_market_results",
Expand Down
3 changes: 2 additions & 1 deletion assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mango.util.termination_detection import tasks_complete_or_sleeping
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
from tqdm import tqdm

from assume.common import (
Expand Down Expand Up @@ -214,6 +213,7 @@ def add_unit_operator(
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"learning_mode": self.learning_mode,
}

async def async_add_unit(
Expand Down Expand Up @@ -306,6 +306,7 @@ def add_market_operator(
market_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"learning_mode": self.learning_mode,
}
self.market_operators[id] = market_operator_agent

Expand Down
Loading

0 comments on commit 40cb44b

Please sign in to comment.