Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable writing data during learning process #205

Merged
merged 6 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,20 @@ async def on_stop(self):
await self.store_dfs()
if self.db is None:
return
queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
]
dfs = []

learning_queries = self.learning_queries()
if learning_queries:
queries.extend(learning_queries)
queries = []
if self.learning_mode:
queries.extend(self.learning_queries())
else:
queries = [
f"select 'avg_price' as variable, market_id as ident, avg(price) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_cost' as variable, market_id as ident, sum(price*demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'total_volume' as variable, market_id as ident, sum(demand_volume_energy) as value from market_meta where simulation = '{self.simulation_id}' group by market_id",
f"select 'capacity_factor' as variable, market_id as ident, avg(power/max_power) as value from market_dispatch ud join power_plant_meta um on ud.unit_id = um.\"index\" and ud.simulation=um.simulation where um.simulation = '{self.simulation_id}' group by variable, market_id",
]
nick-harder marked this conversation as resolved.
Show resolved Hide resolved

try:
dfs = []
for query in queries:
df = pd.read_sql(query, self.db)
dfs.append(df)
Expand All @@ -372,13 +373,9 @@ async def on_stop(self):
with self.db.begin() as db:
df.to_sql("kpis", self.db, if_exists="append", index=None)
except ProgrammingError as e:
self.db.rollback()
logger.error(f"No scenario run Yet {e}")

def learning_queries(self):
if not self.learning_mode:
return []

queries = [
f"SELECT 'sum_reward' as variable, simulation as ident, sum(reward) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
f"SELECT 'sum_regret' as variable, simulation as ident, sum(regret) as value FROM rl_params WHERE episode='{self.episode}' AND simulation='{self.simulation_id}' GROUP BY simulation",
Expand Down
111 changes: 57 additions & 54 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,25 @@ async def add_unit(
"""
self.units[unit.id] = unit

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
# send unit data to db agent to store it
message = {
"context": "write_results",
"type": "store_units",
"data": self.units[unit.id].as_dict(),
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content=message,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
)
if self.context.data_dict.get("learning_agent_addr") is None:
nick-harder marked this conversation as resolved.
Show resolved Hide resolved
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
# send unit data to db agent to store it
message = {
"context": "write_results",
"type": "store_units",
"data": self.units[unit.id].as_dict(),
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content=message,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
)

def participate(self, market: MarketConfig):
"""
Expand Down Expand Up @@ -231,33 +232,34 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "market_dispatch",
"data": market_dispatch,
},
)
if unit_dispatch_dfs:
unit_dispatch = pd.concat(unit_dispatch_dfs)
self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)

if self.context.data_dict.get("learning_agent_addr") is None:
nick-harder marked this conversation as resolved.
Show resolved Hide resolved
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "unit_dispatch",
"data": unit_dispatch,
"type": "market_dispatch",
"data": market_dispatch,
},
)

self.valid_orders = list(
filter(lambda x: x["end_time"] >= now, self.valid_orders)
)
if unit_dispatch_dfs:
unit_dispatch = pd.concat(unit_dispatch_dfs)
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "unit_dispatch",
"data": unit_dispatch,
},
)

async def submit_bids(self, opening: OpeningMessage):
"""
Expand Down Expand Up @@ -394,18 +396,20 @@ def write_learning_to_output(self, start: datetime, marketconfig: MarketConfig):
output_dict[f"actions_{i}"] = action_tuple[i]

output_agent_list.append(output_dict)
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "rl_learning_params",
"data": output_agent_list,
},
)

if self.context.data_dict.get("learning_agent_addr"):
db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")
nick-harder marked this conversation as resolved.
Show resolved Hide resolved
if db_aid and db_addr:
self.context.schedule_instant_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
content={
"context": "write_results",
"type": "rl_learning_params",
"data": output_agent_list,
},
)

def write_to_learning(
self,
Expand All @@ -416,17 +420,13 @@ def write_to_learning(
device: str,
learning_unit_count: int,
):
learning_role_id = "learning_agent"
learning_role_addr = self.context.addr

all_observations = []
all_rewards = []
try:
import torch as th

except ImportError:
logger.error("tried writing learning_params, but torch is not installed")
all_actions = np.zeros((learning_unit_count, act_dim))
return

all_observations = th.zeros((learning_unit_count, obs_dim), device=device)
Expand All @@ -450,6 +450,9 @@ def write_to_learning(
all_rewards = np.array(all_rewards)
rl_agent_data = (np.array(all_observations), all_actions, all_rewards)

learning_role_id = self.context.data_dict.get("learning_agent_id")
learning_role_addr = self.context.data_dict.get("learning_agent_addr")

nick-harder marked this conversation as resolved.
Show resolved Hide resolved
self.context.schedule_instant_acl_message(
receiver_id=learning_role_id,
receiver_addr=learning_role_addr,
Expand Down
28 changes: 16 additions & 12 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,17 @@ async def store_order_book(self, orderbook: Orderbook):
:param orderbook: The order book to be stored
:type orderbook: Orderbook
"""
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_order_book",
"sender": self.marketconfig.name,
"data": orderbook,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand All @@ -429,15 +431,17 @@ async def store_market_results(self, market_meta):
:param market_meta: The metadata of the market
:type market_meta: any
"""
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}

db_aid = self.context.data_dict.get("output_agent_id")
db_addr = self.context.data_dict.get("output_agent_addr")

if db_aid and db_addr:
message = {
"context": "write_results",
"type": "store_market_results",
"sender": self.marketconfig.name,
"data": market_meta,
}
await self.context.send_acl_message(
receiver_id=db_aid,
receiver_addr=db_addr,
Expand Down
20 changes: 16 additions & 4 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mango.util.termination_detection import tasks_complete_or_sleeping
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
from tqdm import tqdm

from assume.common import (
Expand Down Expand Up @@ -158,7 +157,10 @@ async def setup_learning(self):
# if self.same_process:
# separate process does not support buffer and learning
if True:
rl_agent = RoleAgent(self.container, suggested_aid="learning_agent")
self.learning_agent_addr = (self.addr, "learning_agent")
rl_agent = RoleAgent(
self.container, suggested_aid=self.learning_agent_addr[1]
)
rl_agent.add_role(self.learning_role)
else:

Expand Down Expand Up @@ -223,6 +225,12 @@ def add_unit_operator(
unit_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"learning_agent_addr": self.learning_agent_addr[0]
if self.learning_mode
else None,
"learning_agent_id": self.learning_agent_addr[1]
if self.learning_mode
else None,
}

async def async_add_unit(
Expand Down Expand Up @@ -313,8 +321,12 @@ def add_market_operator(

# after creation of an agent - we set additional context params
market_operator_agent._role_context.data_dict = {
"output_agent_addr": self.output_agent_addr[0],
"output_agent_id": self.output_agent_addr[1],
"output_agent_addr": None
if self.learning_mode
else self.output_agent_addr[0],
"output_agent_id": None
if self.learning_mode
else self.output_agent_addr[1],
}
self.market_operators[id] = market_operator_agent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,7 +2329,7 @@
},
"timepicker": {},
"timezone": "",
"title": "ASSUME Comparison",
"title": "ASSUME: Compare scenarios",
"uid": "vP8U8-q4k",
"version": 2,
"weekStart": ""
Expand Down
Loading