Skip to content

Commit

Permalink
improve messaging with typedDicts (#245)
Browse files Browse the repository at this point in the history
fixes an error when
adjust marginal_costs for multiple markets
use market_id consistently
receive registration feedback from market at start of simulation
  • Loading branch information
maurerle authored Nov 8, 2023
1 parent 5834480 commit 2c10e71
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 57 deletions.
3 changes: 2 additions & 1 deletion assume/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def set_dispatch_plan(

self.calculate_cashflow(product_type, orderbook)

self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] = (
self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] += (
self.calculate_marginal_cost(start, self.outputs[product_type].loc[start])
* self.outputs[product_type].loc[start:end_excl]
)
Expand Down Expand Up @@ -195,6 +195,7 @@ def as_dict(self) -> dict:
"id": self.id,
"technology": self.technology,
"unit_operator": self.unit_operator,
"node": self.node,
"unit_type": "base_unit",
}

Expand Down
42 changes: 40 additions & 2 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class MarketConfig:

class OpeningMessage(TypedDict):
"""
Message which is sent to the market to open a market
Message which is sent from the market to participating agent to open a market
:param context: the context of the message
:type context: str
Expand All @@ -204,7 +204,7 @@ class OpeningMessage(TypedDict):

class ClearingMessage(TypedDict):
"""
Message which is sent to the market to clear a market
Message which is sent from the market to agents to clear a market
:param context: the context of the message
:type context: str
Expand All @@ -219,6 +219,44 @@ class ClearingMessage(TypedDict):
orderbook: Orderbook


class OrderBookMessage(TypedDict):
context: str
market_id: str
orderbook: Orderbook


class RegistrationMessage(TypedDict):
context: str
market_id: str
information: dict


class RegistrationReplyMessage(TypedDict):
context: str
market_id: str
accepted: bool


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
http://www.fipa.org/specs/fipa00061/SC00061G.html#_Toc26669700
"""

sender_addr: str | list
sender_id: str
reply_to: str # to which agent follow up messages should be sent
conversation_id: str
performative: str
protocol: str
language: str
encoding: str
ontology: str
reply_with: str # what the answer should contain as in_reply_to
in_reply_to: str # str used to reference an earlier action
reply_by: str # latest time to accept replies


# Class for a Smart Contract which can contain something like:
# - Contract for Differences (CfD) -> based on market result
# - Market Subvention -> based on market result
Expand Down
7 changes: 4 additions & 3 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

logger = logging.getLogger(__name__)

from assume.common.market_objects import MetaDict
from assume.common.utils import separate_orders


Expand Down Expand Up @@ -145,7 +146,7 @@ def setup(self):
)
self.context.schedule_recurrent_task(self.store_dfs, recurrency_task)

def handle_message(self, content, meta):
def handle_message(self, content, meta: MetaDict):
"""
Handles the incoming messages and performs corresponding actions.
Expand All @@ -157,7 +158,7 @@ def handle_message(self, content, meta):
"""

if content.get("type") == "store_order_book":
self.write_market_orders(content.get("data"), content.get("sender"))
self.write_market_orders(content.get("data"), content.get("market_id"))

elif content.get("type") == "store_market_results":
self.write_market_results(content.get("data"))
Expand Down Expand Up @@ -232,7 +233,7 @@ async def store_dfs(self):
try:
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")
except ProgrammingError:
except (ProgrammingError, OperationalError, DataError):
self.check_columns(table, df)
# now try again
with self.db.begin() as db:
Expand Down
73 changes: 52 additions & 21 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from assume.common.market_objects import (
ClearingMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Orderbook,
OrderBookMessage,
RegistrationMessage,
)
from assume.common.utils import aggregate_step_amount
from assume.strategies import BaseStrategy, LearningStrategy
Expand Down Expand Up @@ -73,10 +76,18 @@ def setup(self):
lambda content, meta: content.get("context") == "clearing",
)

self.context.subscribe_message(
self,
self.handle_registration_feedback,
lambda content, meta: content.get("context") == "registration",
)

for market in self.available_markets:
if self.participate(market):
self.register_market(market)
self.registered_markets[market.name] = market
self.context.schedule_timestamp_task(
self.register_market(market),
1, # register after time was updated for the first time
)

async def add_unit(
self,
Expand Down Expand Up @@ -119,43 +130,45 @@ def participate(self, market: MarketConfig):
"""
return True

def register_market(self, market: MarketConfig):
async def register_market(self, market: MarketConfig):
"""
Register a market.
:param market: the market to register
:type market: MarketConfig
"""
self.context.schedule_timestamp_task(
self.context.send_acl_message(
{"context": "registration", "market": market.name},
receiver_addr=market.addr,
receiver_id=market.aid,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
),
1, # register after time was updated for the first time
)
logger.debug(f"{self.id} tried to register at market {market.name}")

def handle_opening(self, opening: OpeningMessage, meta: dict[str, str]):
await self.context.send_acl_message(
{
"context": "registration",
"market_id": market.name,
"information": [u.as_dict() for u in self.units.values()],
},
receiver_addr=market.addr,
receiver_id=market.aid,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")

def handle_opening(self, opening: OpeningMessage, meta: MetaDict):
"""
When we receive an opening from the market, we schedule sending back
our list of orders as a response
:param opening: the opening message
:type opening: OpeningMessage
:param meta: the meta data of the market
:type meta: dict[str, str]
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))

def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str]):
def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
handles the feedback which is received from a market we did bid at
stores accepted orders, sets the received power
Expand All @@ -165,7 +178,7 @@ def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str])
:param content: the content of the clearing message
:type content: ClearingMessage
:param meta: the meta data of the market
:type meta: dict[str, str]
:type meta: MetaDict
"""
logger.debug(f"{self.id} got market result: {content}")
accepted_orders: Orderbook = content["accepted_orders"]
Expand All @@ -183,6 +196,24 @@ def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str])
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
):
logger.debug("Market %s accepted our registration", content["market_id"])
if content["accepted"]:
found = False
for market in self.available_markets:
if content["market_id"] == market.name:
self.registered_markets[market.name] = market
found = True
break
if not found:
logger.error(
"Market %s sent registation but is unknown", content["market_id"]
)
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand Down Expand Up @@ -305,7 +336,7 @@ async def submit_bids(self, opening: OpeningMessage):
await self.context.send_acl_message(
content={
"context": "submit_bids",
"market": market.name,
"market_id": market.name,
"orderbook": orderbook,
},
receiver_addr=market.addr,
Expand Down
Loading

0 comments on commit 2c10e71

Please sign in to comment.