From 0cbb7015742e4ee8cca9ef089dbc2784c13a7b40 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Tue, 7 Nov 2023 11:07:17 +0100 Subject: [PATCH] improve messaging with typedDicts fixes an error when adjust marginal_costs for multiple markets use market_id consistently receive registration feedback from market at start of simulation --- assume/common/base.py | 3 +- assume/common/market_objects.py | 42 ++++++++++++++++- assume/common/outputs.py | 7 +-- assume/common/units_operator.py | 73 ++++++++++++++++++++--------- assume/markets/base_market.py | 83 ++++++++++++++++++++++----------- tests/test_market.py | 11 +++-- 6 files changed, 162 insertions(+), 57 deletions(-) diff --git a/assume/common/base.py b/assume/common/base.py index 28e04575..965e1f0d 100644 --- a/assume/common/base.py +++ b/assume/common/base.py @@ -136,7 +136,7 @@ def set_dispatch_plan( self.calculate_cashflow(product_type, orderbook) - self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] = ( + self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] += ( self.calculate_marginal_cost(start, self.outputs[product_type].loc[start]) * self.outputs[product_type].loc[start:end_excl] ) @@ -195,6 +195,7 @@ def as_dict(self) -> dict: "id": self.id, "technology": self.technology, "unit_operator": self.unit_operator, + "node": self.node, "unit_type": "base_unit", } diff --git a/assume/common/market_objects.py b/assume/common/market_objects.py index 72c860d6..2d4f49bd 100644 --- a/assume/common/market_objects.py +++ b/assume/common/market_objects.py @@ -181,7 +181,7 @@ class MarketConfig: class OpeningMessage(TypedDict): """ - Message which is sent to the market to open a market + Message which is sent from the market to participating agent to open a market :param context: the context of the message :type context: str @@ -204,7 +204,7 @@ class OpeningMessage(TypedDict): class ClearingMessage(TypedDict): """ - Message which is sent to the market to clear a market + Message which is sent from the market to agents to clear a market :param context: the context of the message :type context: str @@ -219,6 +219,44 @@ class ClearingMessage(TypedDict): orderbook: Orderbook +class OrderBookMessage(TypedDict): + context: str + market_id: str + orderbook: Orderbook + + +class RegistrationMessage(TypedDict): + context: str + market_id: str + information: dict + + +class RegistrationReplyMessage(TypedDict): + context: str + market_id: str + accepted: bool + + +class MetaDict(TypedDict): + """ + Message Meta of a FIPA ACL Message + http://www.fipa.org/specs/fipa00061/SC00061G.html#_Toc26669700 + """ + + sender_addr: str | list + sender_id: str + reply_to: str # to which agent follow up messages should be sent + conversation_id: str + performative: str + protocol: str + language: str + encoding: str + ontology: str + reply_with: str # what the answer should contain as in_reply_to + in_reply_to: str # str used to reference an earlier action + reply_by: str # latest time to accept replies + + # Class for a Smart Contract which can contain something like: # - Contract for Differences (CfD) -> based on market result # - Market Subvention -> based on market result diff --git a/assume/common/outputs.py b/assume/common/outputs.py index e564d870..1c09e454 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) +from assume.common.market_objects import MetaDict from assume.common.utils import separate_orders @@ -145,7 +146,7 @@ def setup(self): ) self.context.schedule_recurrent_task(self.store_dfs, recurrency_task) - def handle_message(self, content, meta): + def handle_message(self, content, meta: MetaDict): """ Handles the incoming messages and performs corresponding actions. @@ -157,7 +158,7 @@ def handle_message(self, content, meta): """ if content.get("type") == "store_order_book": - self.write_market_orders(content.get("data"), content.get("sender")) + self.write_market_orders(content.get("data"), content.get("market_id")) elif content.get("type") == "store_market_results": self.write_market_results(content.get("data")) @@ -232,7 +233,7 @@ async def store_dfs(self): try: with self.db.begin() as db: df.to_sql(table, db, if_exists="append") - except ProgrammingError: + except (ProgrammingError, OperationalError, DataError): self.check_columns(table, df) # now try again with self.db.begin() as db: diff --git a/assume/common/units_operator.py b/assume/common/units_operator.py index b48aeca0..3ef14334 100644 --- a/assume/common/units_operator.py +++ b/assume/common/units_operator.py @@ -15,8 +15,11 @@ from assume.common.market_objects import ( ClearingMessage, MarketConfig, + MetaDict, OpeningMessage, Orderbook, + OrderBookMessage, + RegistrationMessage, ) from assume.common.utils import aggregate_step_amount from assume.strategies import BaseStrategy, LearningStrategy @@ -73,10 +76,18 @@ def setup(self): lambda content, meta: content.get("context") == "clearing", ) + self.context.subscribe_message( + self, + self.handle_registration_feedback, + lambda content, meta: content.get("context") == "registration", + ) + for market in self.available_markets: if self.participate(market): - self.register_market(market) - self.registered_markets[market.name] = market + self.context.schedule_timestamp_task( + self.register_market(market), + 1, # register after time was updated for the first time + ) async def add_unit( self, @@ -119,28 +130,30 @@ def participate(self, market: MarketConfig): """ return True - def register_market(self, market: MarketConfig): + async def register_market(self, market: MarketConfig): """ Register a market. :param market: the market to register :type market: MarketConfig """ - self.context.schedule_timestamp_task( - self.context.send_acl_message( - {"context": "registration", "market": market.name}, - receiver_addr=market.addr, - receiver_id=market.aid, - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - }, - ), - 1, # register after time was updated for the first time - ) - logger.debug(f"{self.id} tried to register at market {market.name}") - def handle_opening(self, opening: OpeningMessage, meta: dict[str, str]): + await self.context.send_acl_message( + { + "context": "registration", + "market_id": market.name, + "information": [u.as_dict() for u in self.units.values()], + }, + receiver_addr=market.addr, + receiver_id=market.aid, + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + }, + ), + logger.debug(f"{self.id} sent market registration to {market.name}") + + def handle_opening(self, opening: OpeningMessage, meta: MetaDict): """ When we receive an opening from the market, we schedule sending back our list of orders as a response @@ -148,14 +161,14 @@ def handle_opening(self, opening: OpeningMessage, meta: dict[str, str]): :param opening: the opening message :type opening: OpeningMessage :param meta: the meta data of the market - :type meta: dict[str, str] + :type meta: MetaDict """ logger.debug( f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.' ) self.context.schedule_instant_task(coroutine=self.submit_bids(opening)) - def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str]): + def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict): """ handles the feedback which is received from a market we did bid at stores accepted orders, sets the received power @@ -165,7 +178,7 @@ def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str]) :param content: the content of the clearing message :type content: ClearingMessage :param meta: the meta data of the market - :type meta: dict[str, str] + :type meta: MetaDict """ logger.debug(f"{self.id} got market result: {content}") accepted_orders: Orderbook = content["accepted_orders"] @@ -183,6 +196,24 @@ def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str]) self.write_learning_params(orderbook, marketconfig) self.write_actual_dispatch() + def handle_registration_feedback( + self, content: RegistrationMessage, meta: MetaDict + ): + logger.debug("Market %s accepted our registration", content["market_id"]) + if content["accepted"]: + found = False + for market in self.available_markets: + if content["market_id"] == market.name: + self.registered_markets[market.name] = market + found = True + break + if not found: + logger.error( + "Market %s sent registation but is unknown", content["market_id"] + ) + else: + logger.error("Market %s did not accept registration", meta["sender_id"]) + def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig): """ feeds the current market result back to the units @@ -305,7 +336,7 @@ async def submit_bids(self, opening: OpeningMessage): await self.context.send_acl_message( content={ "context": "submit_bids", - "market": market.name, + "market_id": market.name, "orderbook": orderbook, }, receiver_addr=market.addr, diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index ef048b92..2b9c0dcf 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -15,8 +15,12 @@ ClearingMessage, MarketConfig, MarketProduct, + MetaDict, OpeningMessage, Orderbook, + OrderBookMessage, + RegistrationMessage, + RegistrationReplyMessage, ) from assume.common.utils import get_available_products, separate_orders @@ -41,12 +45,16 @@ def __init__(self, marketconfig: MarketConfig): self.open_auctions = set() self.all_orders = [] - def validate_registration(self, meta: dict) -> bool: + def validate_registration( + self, content: RegistrationMessage, meta: MetaDict + ) -> bool: """ method to validate a given registration. Used to check if a participant is eligible to bid on this market """ - return True + # simple check that 1 MW can be bid at least + requirement = lambda unit: unit["max_power"] >= 1 or unit["min_power"] <= -1 + return all([requirement(info) for info in content["information"]]) def validate_orderbook(self, orderbook: Orderbook, agent_tuple: tuple) -> None: """ @@ -115,12 +123,12 @@ class MarketRole(MarketMechanism, Role): longitude: float latitude: float marketconfig: MarketConfig - registered_agents: list[tuple[str, str]] + registered_agents: dict[tuple[str, str], dict] required_fields: list[str] = [] def __init__(self, marketconfig: MarketConfig): super().__init__(marketconfig) - self.registered_agents = [] + self.registered_agents = {} if marketconfig.price_tick: if marketconfig.maximum_bid_price % marketconfig.price_tick != 0: logger.warning( @@ -158,7 +166,7 @@ def setup(self): for field in self.required_fields: assert field in self.marketconfig.additional_fields, "missing field" - def accept_orderbook(content: dict, meta: dict): + def accept_orderbook(content: OrderBookMessage, meta: MetaDict): if not isinstance(content, dict): return False @@ -166,12 +174,13 @@ def accept_orderbook(content: dict, meta: dict): meta["sender_addr"] = tuple(meta["sender_addr"]) return ( - content.get("market") == self.marketconfig.name + content.get("market_id") == self.marketconfig.name and content.get("orderbook") is not None - and (meta["sender_addr"], meta["sender_id"]) in self.registered_agents + and (meta["sender_addr"], meta["sender_id"]) + in self.registered_agents.keys() ) - def accept_registration(content: dict, meta: dict): + def accept_registration(content: RegistrationMessage, meta: MetaDict): if not isinstance(content, dict): return False if isinstance(meta["sender_addr"], list): @@ -179,17 +188,17 @@ def accept_registration(content: dict, meta: dict): return ( content.get("context") == "registration" - and content.get("market") == self.marketconfig.name + and content.get("market_id") == self.marketconfig.name ) - def accept_get_unmatched(content: dict, meta: dict): + def accept_get_unmatched(content: dict, meta: MetaDict): if not isinstance(content, dict): return False if isinstance(meta["sender_addr"], list): meta["sender_addr"] = tuple(meta["sender_addr"]) return ( content.get("context") == "get_unmatched" - and content.get("market") == self.marketconfig.name + and content.get("market_id") == self.marketconfig.name ) self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook) @@ -233,7 +242,7 @@ async def opening(self): self.open_auctions |= set(opening_message["products"]) - for agent in self.registered_agents: + for agent in self.registered_agents.keys(): agent_addr, agent_id = agent await self.context.send_acl_message( opening_message, @@ -255,12 +264,15 @@ async def opening(self): next_opening_ts = calendar.timegm(next_opening.utctimetuple()) self.context.schedule_timestamp_task(self.opening(), next_opening_ts) logger.debug( - f"market opening: {self.marketconfig.name} - {market_open} - {market_closing}" + f"market opening: %s - %s - %s", + self.marketconfig.name, + market_open, + market_closing, ) else: - logger.debug(f"market {self.marketconfig.name} - does not reopen") + logger.debug("market %s - does not reopen", self.marketconfig.name) - def handle_registration(self, content: dict, meta: dict): + def handle_registration(self, content: RegistrationMessage, meta: MetaDict): """ This method handles incoming registration messages. It adds the sender of the message to the list of registered agents @@ -270,12 +282,31 @@ def handle_registration(self, content: dict, meta: dict): :param meta: The metadata of the message :type meta: any """ - agent = meta["sender_id"] + agent_id = meta["sender_id"] agent_addr = meta["sender_addr"] - if self.validate_registration(meta): - self.registered_agents.append((agent_addr, agent)) + assert content["market_id"] == self.marketconfig.name + if self.validate_registration(content, meta): + self.registered_agents[(agent_addr, agent_id)] = content["information"] + accepted = True + else: + accepted = False + + msg: RegistrationReplyMessage = { + "context": "registration", + "market_id": self.marketconfig.name, + "accepted": accepted, + } + self.context.schedule_instant_acl_message( + content=msg, + receiver_addr=agent_addr, + receiver_id=agent_id, + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + }, + ) - def handle_orderbook(self, content: dict, meta: dict): + def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict): """ This method handles incoming order book messages. It validates the order book and adds it to the list of all orders. @@ -303,11 +334,11 @@ def handle_orderbook(self, content: dict, meta: dict): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, - "reply_to": 1, + "in_reply_to": 1, }, ) - def handle_get_unmatched(self, content: dict, meta: dict): + def handle_get_unmatched(self, content: dict, meta: MetaDict): """ A handler which sends the orderbook with unmatched orders to an agent. Allows to query a subset of the orderbook. @@ -315,7 +346,7 @@ def handle_get_unmatched(self, content: dict, meta: dict): :param content: The content of the message :type content: dict :param meta: The metadata of the message - :type meta: dict + :type meta: MetaDict :raises AssertionError: If the order book is invalid """ @@ -342,7 +373,7 @@ def order_matches_req(o): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, - "reply_to": 1, + "in_reply_to": 1, }, ) @@ -376,7 +407,7 @@ async def clear_market(self, market_products: list[MarketProduct]): for agent, bids in groupby(rejected_orderbook, itemgetter("agent_id")) } - for agent in self.registered_agents: + for agent in self.registered_agents.keys(): addr, aid = agent meta = {"sender_addr": self.context.addr, "sender_id": self.context.aid} closing: ClearingMessage = { @@ -429,7 +460,7 @@ async def store_order_book(self, orderbook: Orderbook): message = { "context": "write_results", "type": "store_order_book", - "sender": self.marketconfig.name, + "market_id": self.marketconfig.name, "data": orderbook, } await self.context.send_acl_message( @@ -454,7 +485,7 @@ async def store_market_results(self, market_meta): message = { "context": "write_results", "type": "store_market_results", - "sender": self.marketconfig.name, + "market_id": self.marketconfig.name, "data": market_meta, } await self.context.send_acl_message( diff --git a/tests/test_market.py b/tests/test_market.py index 0c299d97..54fe7a29 100644 --- a/tests/test_market.py +++ b/tests/test_market.py @@ -207,10 +207,13 @@ async def test_market_registration(market_role: MarketRole): "sender_id": "test_aid", } - assert market_role.registered_agents == [] - market_role.handle_registration({}, meta=meta) - assert len(market_role.registered_agents) == 1 - assert market_role.registered_agents[0] == tuple(meta.values()) + assert market_role.registered_agents == {} + info = [{"technology": "nuclear", "max_power": 2}] + market_role.handle_registration( + {"market_id": market_role.marketconfig.name, "information": info}, meta=meta + ) + assert len(market_role.registered_agents.keys()) == 1 + assert market_role.registered_agents[tuple(meta.values())] == info async def test_market_unmatched(market_role: MarketRole):