From bf9e3b8e16ea742e65bf1eddfc030c93d9f6be02 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Sun, 3 Nov 2024 18:31:09 +0100 Subject: [PATCH] make a more slim version of the MarketMechanism --- emarketpy/base_market.py | 625 +++++++++++++++++++++++++-------------- 1 file changed, 405 insertions(+), 220 deletions(-) diff --git a/emarketpy/base_market.py b/emarketpy/base_market.py index 2753894..ef7ebb2 100644 --- a/emarketpy/base_market.py +++ b/emarketpy/base_market.py @@ -4,11 +4,10 @@ import logging import math -from decimal import Decimal from itertools import groupby from operator import itemgetter -from mango import Role +from mango import AgentAddress, Role, create_acl, sender_addr from .market_objects import ( ClearingMessage, @@ -41,96 +40,15 @@ class MarketMechanism: In the Marketmechanism, all data needed for the clearing is present. Attributes: - all_orders (Orderbook): The list of all orders. marketconfig (MarketConfig): The configuration of the market. - open_auctions (set): The list of open auctions. - results (list[dict]): The list of market metadata. Args: marketconfig (MarketConfig): The configuration of the market. """ def __init__(self, marketconfig: MarketConfig): - super().__init__() + super().__init__(marketconfig) self.marketconfig = marketconfig - self.open_auctions = set() - self.all_orders = [] - self.results = [] - - def validate_registration( - self, content: RegistrationMessage, meta: MetaDict - ) -> bool: - """ - Validates a given registration. - Used to check if a participant is eligible to bid on this market. - - Args: - content (RegistrationMessage): The content of the message. - meta (MetaDict): The metadata of the message. - - Returns: - bool: True if the registration is valid, False otherwise. - """ - - # simple check that 1 MW can be bid at least by powerplants - def requirement(unit: dict): - return unit.get("unit_type") != "power_plant" or abs(unit["max_power"]) >= 0 - - return all([requirement(info) for info in content["information"]]) - - def validate_orderbook(self, orderbook: Orderbook, agent_tuple: tuple) -> None: - """ - Validates a given orderbook. - - This is needed to check if all required fields for this mechanism are present. - - Args: - orderbook (Orderbook): The orderbook to be validated. - agent_tuple (tuple): The tuple of the agent. - - Raises: - AssertionError: If the orderbook is invalid. - """ - max_price = self.marketconfig.maximum_bid_price - min_price = self.marketconfig.minimum_bid_price - max_volume = self.marketconfig.maximum_bid_volume - - if self.marketconfig.price_tick: - assert max_price is not None, "max_price unset" - assert min_price is not None, "min_price unset" - assert max_volume is not None, "max_volume unset" - # max and min should be in units - max_price = math.floor(max_price / self.marketconfig.price_tick) - min_price = math.ceil(min_price / self.marketconfig.price_tick) - if self.marketconfig.volume_tick: - assert max_volume is not None, "max_volume unset" - max_volume = math.floor(max_volume / self.marketconfig.volume_tick) - - for order in orderbook: - order["agent_id"] = agent_tuple - if not order.get("only_hours"): - order["only_hours"] = None - for field in self.marketconfig.additional_fields: - assert field in order.keys(), f"missing field: {field}" - - sep_orders = separate_orders(orderbook.copy()) - for order in sep_orders: - assert order["price"] <= max_price, f"maximum_bid_price {order['price']}" - assert order["price"] >= min_price, f"minimum_bid_price {order['price']}" - - # check that the product is part of an open auction - product = (order["start_time"], order["end_time"], order["only_hours"]) - assert product in self.open_auctions, "no open auction" - - if max_volume: - assert ( - abs(order["volume"]) <= max_volume - ), f"max_volume {order['volume']}" - - if self.marketconfig.price_tick: - assert isinstance(order["price"], int) - if self.marketconfig.volume_tick: - assert isinstance(order["volume"], int) def clear( self, orderbook: Orderbook, market_products: list[MarketProduct] @@ -163,12 +81,15 @@ class MarketRole(MarketMechanism, Role): longitude: float latitude: float marketconfig: MarketConfig - registered_agents: dict[tuple[str, str], dict] + registered_agents: dict[AgentAddress, dict] required_fields: list[str] = [] def __init__(self, marketconfig: MarketConfig): super().__init__(marketconfig) self.registered_agents = {} + self.open_auctions = set() + self.all_orders = [] + self.results = [] if marketconfig.price_tick: if marketconfig.maximum_bid_price % marketconfig.price_tick != 0: logger.warning( @@ -180,12 +101,7 @@ def __init__(self, marketconfig: MarketConfig): ) if marketconfig.volume_tick and marketconfig.maximum_bid_volume: - # modulo with floating point can be a disaster - if ( - Decimal(str(marketconfig.maximum_bid_volume)) - % Decimal(str(marketconfig.volume_tick)) - != 0 - ): + if marketconfig.maximum_bid_volume % marketconfig.volume_tick != 0: logger.warning( f"{marketconfig.market_id} - max volume not a multiple of tick size" ) @@ -194,31 +110,45 @@ def __init__(self, marketconfig: MarketConfig): def setup(self): """ - This method sets up the initial configuration and subscriptions for the market role. + Sets up the initial configuration and subscriptions for the market role. - It sets the address and agent ID of the market config to match the current context. - It Defines three filter methods (accept_orderbook, accept_registration, and accept_get_unmatched) - that serve as validation steps for different types of incoming messages. - Subscribes the role to handle incoming order book messages using the handle_orderbook method. - Subscribes the role to handle incoming registration messages using the handle_registration method - If the market configuration supports "get unmatched" functionality, subscribes the role to handle - such messages using the handle_get_unmatched. - Schedules the opening() method to run at the next opening time of the market. + This method performs the following actions: + + - Sets the address and agent ID of the market configuration to match the current context. + - Validates that all required fields are present in the market configuration. + - Defines filter methods (`accept_orderbook`, `accept_registration`, `accept_get_unmatched`, `accept_data_request`) that serve as validation steps for different types of incoming messages. + - Subscribes the role to handle incoming messages using the appropriate handler methods. + - Schedules the `opening()` method to run at the next opening time of the market. + - Sends grid topology data once, if available. Raises: - AssertionError: If a required field is missing. + ValueError: If a required field is missing from the market configuration. """ + super().setup() self.marketconfig.addr = self.context.addr - self.marketconfig.aid = self.context.aid - for field in self.required_fields: - assert ( - field in self.marketconfig.additional_fields - ), f"{field} missing from additional_fiels" + market_id = getattr(self.marketconfig, "market_id", "Unknown Market ID") + + # Validate required fields in market configuration + missing_fields = [ + field + for field in self.required_fields + if field not in self.marketconfig.additional_fields + ] + if missing_fields: + error_message = ( + f"Missing required field(s) {missing_fields} from additional_fields " + f"for market '{market_id}'." + ) + logger.error(error_message) + raise ValueError(error_message) def accept_orderbook(content: OrderBookMessage, meta: MetaDict): - if not isinstance(content, dict): + if ( + not isinstance(content, dict) + and content.get("context") == "submit_bids" + ): return False if isinstance(meta["sender_addr"], list): @@ -227,20 +157,19 @@ def accept_orderbook(content: OrderBookMessage, meta: MetaDict): return ( content.get("market_id") == self.marketconfig.market_id and content.get("orderbook") is not None - and (meta["sender_addr"], meta["sender_id"]) - in self.registered_agents.keys() + and sender_addr(meta) in self.registered_agents.keys() ) def accept_registration(content: RegistrationMessage, meta: MetaDict): - if not isinstance(content, dict): + if ( + not isinstance(content, dict) + or content.get("context") != "registration" + ): return False if isinstance(meta["sender_addr"], list): meta["sender_addr"] = tuple(meta["sender_addr"]) - return ( - content.get("context") == "registration" - and content.get("market_id") == self.marketconfig.market_id - ) + return content.get("market_id") == self.marketconfig.market_id def accept_get_unmatched(content: dict, meta: MetaDict): if not isinstance(content, dict): @@ -271,14 +200,15 @@ def accept_data_request(content: dict, meta: MetaDict): self, self.handle_get_unmatched, accept_get_unmatched ) + def on_ready(self): current = timestamp2datetime(self.context.current_timestamp) next_opening = self.marketconfig.opening_hours.after(current, inc=True) opening_ts = datetime2timestamp(next_opening) self.context.schedule_timestamp_task(self.opening(), opening_ts) # send grid topology data once - if self.grid_data is not None: - self.context.schedule_instant_acl_message( + if self.grid_data is not None and self.context.data.get("output_agent_addr"): + self.context.schedule_instant_message( { "context": "write_results", "type": "grid_topology", @@ -286,7 +216,6 @@ def accept_data_request(content: dict, meta: MetaDict): "market_id": self.marketconfig.market_id, }, receiver_addr=self.context.data.get("output_agent_addr"), - receiver_id=self.context.data.get("output_agent_id"), ) async def opening(self): @@ -316,16 +245,16 @@ async def opening(self): self.open_auctions |= set(opening_message["products"]) for agent in self.registered_agents.keys(): - agent_addr, agent_id = agent - await self.context.send_acl_message( - opening_message, - receiver_addr=agent_addr, - receiver_id=agent_id, - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - "reply_with": f"{self.marketconfig.market_id}_{market_open}", - }, + await self.context.send_message( + create_acl( + opening_message, + receiver_addr=agent, + sender_addr=self.context.addr, + acl_metadata={ + "reply_with": f"{self.marketconfig.market_id}_{market_open}", + }, + ), + receiver_addr=agent, ) # schedule closing this market @@ -346,80 +275,257 @@ async def opening(self): else: logger.debug("market %s - does not reopen", self.marketconfig.market_id) - def handle_registration(self, content: RegistrationMessage, meta: MetaDict): + + def validate_registration( + self, content: RegistrationMessage, meta: MetaDict + ) -> bool: """ - This method handles incoming registration messages and adds the sender of the message to the list of registered agents. + Validates a given registration. + Used to check if a participant is eligible to bid on this market. Args: content (RegistrationMessage): The content of the message. meta (MetaDict): The metadata of the message. + + Returns: + bool: True if the registration is valid, False otherwise. """ - agent_id = meta["sender_id"] - agent_addr = meta["sender_addr"] - assert content["market_id"] == self.marketconfig.market_id - if self.validate_registration(content, meta): - self.registered_agents[(agent_addr, agent_id)] = content["information"] - accepted = True - else: + + # simple check that 1 MW can be bid at least by powerplants + def requirement(unit: dict): + return unit.get("unit_type") != "power_plant" or abs(unit["max_power"]) >= 0 + + return all([requirement(info) for info in content["information"]]) + + def validate_orderbook( + self, orderbook: Orderbook, agent_addr: AgentAddress + ) -> None: + """ + Validates a given orderbook. + + This is needed to check if all required fields for this mechanism are present. + + Args: + orderbook (Orderbook): The orderbook to be validated. + agent_addr (AgentAddress): The address of the agent. + + Raises: + ValueError: If max_price, min_price, or max_volume are unset when required. + KeyError: If a required field is missing in an order. + TypeError: If order['price'] or order['volume'] is not an integer when required. + """ + max_price = self.marketconfig.maximum_bid_price + min_price = self.marketconfig.minimum_bid_price + max_volume = self.marketconfig.maximum_bid_volume + + market_id = self.marketconfig.market_id # Get the market ID for error messages + + # Validate and adjust max_price, min_price, and max_volume if price_tick is set + if self.marketconfig.price_tick: + if max_price is None: + raise ValueError(f"max_price is unset for market '{market_id}'.") + if min_price is None: + raise ValueError(f"min_price is unset for market '{market_id}'.") + if max_volume is None: + raise ValueError(f"max_volume is unset for market '{market_id}'.") + + # Convert prices to units based on price_tick + max_price = math.floor(max_price / self.marketconfig.price_tick) + min_price = math.ceil(min_price / self.marketconfig.price_tick) + + # Validate and adjust max_volume if volume_tick is set + if self.marketconfig.volume_tick: + if max_volume is None: + raise ValueError(f"max_volume is unset for market '{market_id}'.") + max_volume = math.floor(max_volume / self.marketconfig.volume_tick) + + # Validate each order in the orderbook + for order in orderbook: + order["agent_addr"] = agent_addr + + # Ensure 'only_hours' field is present + if not order.get("only_hours"): + order["only_hours"] = None + + # Check for additional required fields + for field in self.marketconfig.additional_fields: + if field not in order: + raise KeyError( + f"Missing required field '{field}' for order {order} in market '{market_id}'." + ) + + # Process separated orders + sep_orders = separate_orders(orderbook.copy()) + for order in sep_orders: + # Adjust order price if it exceeds max_price or is below min_price + if order["price"] > max_price: + logger.warning( + f"Order price {order['price']} exceeds maximum price {max_price} in market '{market_id}'. Setting to max_price." + ) + order["price"] = max_price + elif order["price"] < min_price: + logger.warning( + f"Order price {order['price']} is below minimum price {min_price} in market '{market_id}'. Setting to min_price." + ) + order["price"] = min_price + + # Check that the product is part of an open auction + product = (order["start_time"], order["end_time"], order["only_hours"]) + if product not in self.open_auctions: + logger.warning( + f"Product {product} is not part of an open auction in market '{market_id}'. Skipping this order." + ) + continue # Skip to the next order + + # Adjust order volume if it exceeds max_volume + if abs(order["volume"]) > max_volume: + logger.warning( + f"Order volume {order['volume']} exceeds max_volume {max_volume} in market '{market_id}'. Adjusting volume." + ) + order["volume"] = max_volume if order["volume"] > 0 else -max_volume + + # Ensure 'price' is an integer if price_tick is set + if self.marketconfig.price_tick: + if not isinstance(order["price"], int): + raise TypeError( + f"Order price {order['price']} must be an integer when price_tick is set in market '{market_id}'." + ) + + # Ensure 'volume' is an integer if volume_tick is set + if self.marketconfig.volume_tick: + if not isinstance(order["volume"], int): + raise TypeError( + f"Order volume {order['volume']} must be an integer when volume_tick is set in market '{market_id}'." + ) + + def handle_registration(self, content: RegistrationMessage, meta: MetaDict): + """ + Handles incoming registration messages and adds the sender to the list of registered agents. + + This method performs the following actions: + - Validates that the incoming message's market ID matches the current market configuration. + - Validates the registration details using the `validate_registration` method. + - Registers the agent if validation is successful. + - Sends a registration reply message indicating acceptance or rejection. + + Args: + content (RegistrationMessage): The content of the registration message. + meta (MetaDict): The metadata of the message, including sender information. + + Raises: + KeyError: If required keys are missing in `content` or `meta`. + """ + agent_addr = sender_addr(meta) + + incoming_market_id = content.get("market_id") + current_market_id = self.marketconfig.market_id + + # Validate market ID + if incoming_market_id != current_market_id: + logger.warning( + f"Received registration for market '{incoming_market_id}' which does not match current market '{current_market_id}'. Registration rejected." + ) accepted = False + else: + # Validate registration details + if self.validate_registration(content, meta): + self.registered_agents[agent_addr] = content["information"] + accepted = True + logger.debug( + f"Agent '{agent_addr}' successfully registered for market '{current_market_id}'." + ) + else: + accepted = False + logger.warning( + f"Agent '{agent_addr}' failed registration validation for market '{current_market_id}'." + ) + # Prepare the registration reply message msg: RegistrationReplyMessage = { "context": "registration", - "market_id": self.marketconfig.market_id, + "market_id": current_market_id, "accepted": accepted, } - self.context.schedule_instant_acl_message( - content=msg, + + self.context.schedule_instant_message( + create_acl( + content=msg, + receiver_addr=agent_addr, + sender_addr=self.context.addr, + acl_metadata={ + "in_reply_to": meta.get("reply_with"), + }, + ), receiver_addr=agent_addr, - receiver_id=agent_id, - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - "in_reply_to": meta.get("reply_with"), - }, ) + logger.debug(f"Sent registration reply to agent '{agent_addr}': {msg}") def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict): """ - Handles incoming order book messages, validates the order book and adds it to the list of all orders. + Handles incoming order book messages, validates the order book, and adds valid orders to the list of all orders. + + If the order book is invalid or an error occurs during processing, logs the error and sends a single rejection + message to the sender. Args: - content (OrderBookMessage): The content of the message. - meta (MetaDict): The metadata of the message. + content (OrderBookMessage): The content of the message, expected to contain an 'orderbook'. + meta (MetaDict): The metadata of the message, expected to contain 'sender_addr' and 'sender_id'. Raises: - AssertionError: If the order book is invalid. + KeyError: If required keys ('orderbook', 'sender_addr', 'sender_id') are missing in the message. + ValueError: If the order book fails validation. + Exception: If an unexpected error occurs during processing. """ - orderbook: Orderbook = content["orderbook"] - agent_addr = meta["sender_addr"] - agent_id = meta["sender_id"] try: - self.validate_orderbook(orderbook, (agent_addr, agent_id)) + # Safely retrieve required keys from 'content' and 'meta' + orderbook: Orderbook = content.get("orderbook") + if orderbook is None: + raise KeyError("Missing 'orderbook' in content.") + + agent_addr = sender_addr(meta) + # Validate the order book + self.validate_orderbook(orderbook, agent_addr) + + # Add each validated order to 'all_orders' for order in orderbook: self.all_orders.append(order) + except Exception as e: - logger.error(f"error handling message from {agent_id} - {e}") - self.context.schedule_instant_acl_message( - content={"context": "submit_bids", "message": "Rejected"}, + # Log the error with agent details for better traceability + logger.error( + f"Error handling orderbook message from agent '{agent_addr}': {e}" + ) + + # Prepare a rejection message with a generic error description + rejection_message = { + "context": "submit_bids", + "message": "Rejected: Unable to process your orderbook submission due to an error.", + } + + # Send the single rejection message back to the agent + self.context.schedule_instant_message( + create_acl( + content=rejection_message, + receiver_addr=agent_addr, + sender_addr=self.context.addr, + acl_metadata={ + "in_reply_to": meta.get("reply_with", 1), + }, + ), receiver_addr=agent_addr, - receiver_id=agent_id, - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - "in_reply_to": meta.get("reply_with"), - }, + ) + logger.debug( + f"Sent rejection message to agent '{agent_addr}': {rejection_message}" ) def handle_data_request(self, content: DataRequestMessage, meta: MetaDict): """ - This method handles incoming data request messages. + Handles incoming data request messages. Args: content (DataRequestMessage): The content of the message. meta (MetaDict): The metadata of the message. - Raises: - AssertionError: If the order book is invalid. """ metric_type = content["metric"] start = content["start_time"] @@ -433,58 +539,63 @@ def handle_data_request(self, content: DataRequestMessage, meta: MetaDict): data.index = data["time"] data = data[metric_type][start:end] except Exception: - logger.exception("error handling data request") - self.context.schedule_instant_acl_message( - content={ - "context": "data_response", - "data": data, - }, - receiver_addr=meta["sender_addr"], - receiver_id=meta["sender_id"], - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - "in_reply_to": meta.get("reply_with"), - }, + logger.exception("Error handling data request") + + self.context.schedule_instant_message( + create_acl( + content={ + "context": "data_response", + "data": data, + }, + receiver_addr=sender_addr(meta), + sender_addr=self.context.addr, + acl_metadata={ + "in_reply_to": meta.get("reply_with"), + }, + ), + receiver_addr=sender_addr(meta), ) def handle_get_unmatched(self, content: dict, meta: MetaDict): """ - A handler which sends the orderbook with unmatched orders to an agent and allows to query a subset of the orderbook. + Sends the orderbook with unmatched orders to an agent and allows querying a subset of the orderbook. Args: content (dict): The content of the message. meta (MetaDict): The metadata of the message. Raises: - AssertionError: If the order book is invalid. + KeyError: If required keys ('sender_addr', 'sender_id') are missing in `meta`. """ - order = content.get("order") - agent_addr = meta["sender_addr"] - agent_id = meta["sender_id"] - if order: - - def order_matches_req(o): - return ( - o["start_time"] == order["start_time"] - and o["end_time"] == order["end_time"] - and o["only_hours"] == order["only_hours"] - ) - - available_orders = list(filter(order_matches_req, self.all_orders)) - else: - available_orders = self.all_orders + try: + order = content.get("order") + agent_addr = sender_addr(meta) + + if order: + + def order_matches_req(o): + return ( + o.get("start_time") == order.get("start_time") + and o.get("end_time") == order.get("end_time") + and o.get("only_hours") == order.get("only_hours") + ) + + available_orders = list(filter(order_matches_req, self.all_orders)) + else: + available_orders = self.all_orders + + self.context.schedule_instant_message( + content={ + "context": "get_unmatched", + "available_orders": available_orders, + }, + receiver_addr=agent_addr, + ) + logger.debug(f"Sent unmatched orders to agent '{agent_addr}'.") - self.context.schedule_instant_acl_message( - content={"context": "get_unmatched", "available_orders": available_orders}, - receiver_addr=agent_addr, - receiver_id=agent_id, - acl_metadata={ - "sender_addr": self.context.addr, - "sender_id": self.context.aid, - "in_reply_to": 1, - }, - ) + except KeyError as ke: + logger.error(f"Missing key in meta data: {ke}") + # Optionally, handle the missing key scenario here async def clear_market(self, market_products: list[MarketProduct]): """ @@ -494,11 +605,9 @@ async def clear_market(self, market_products: list[MarketProduct]): market_products (list[MarketProduct]): The products to be traded. """ try: - ( - accepted_orderbook, - rejected_orderbook, - market_meta, - ) = self.clear(self.all_orders, market_products) + (accepted_orderbook, rejected_orderbook, market_meta, flows) = self.clear( + self.all_orders, market_products + ) except Exception as e: logger.error("clearing failed: %s", e) raise e @@ -521,20 +630,23 @@ async def clear_market(self, market_products: list[MarketProduct]): self.open_auctions - set(market_products) - accepted_orderbook.sort(key=itemgetter("agent_id")) - rejected_orderbook.sort(key=itemgetter("agent_id")) + accepted_orderbook = sorted( + accepted_orderbook, key=lambda x: str(x["agent_addr"]) + ) + rejected_orderbook = sorted( + rejected_orderbook, key=lambda x: str(x["agent_addr"]) + ) accepted_orders = { agent: list(bids) - for agent, bids in groupby(accepted_orderbook, itemgetter("agent_id")) + for agent, bids in groupby(accepted_orderbook, itemgetter("agent_addr")) } rejected_orders = { agent: list(bids) - for agent, bids in groupby(rejected_orderbook, itemgetter("agent_id")) + for agent, bids in groupby(rejected_orderbook, itemgetter("agent_addr")) } for agent in self.registered_agents.keys(): - addr, aid = agent meta = {"sender_addr": self.context.addr, "sender_id": self.context.aid} closing: ClearingMessage = { "context": "clearing", @@ -542,11 +654,14 @@ async def clear_market(self, market_products: list[MarketProduct]): "accepted_orders": accepted_orders.get(agent, []), "rejected_orders": rejected_orders.get(agent, []), } - await self.context.send_acl_message( - closing, - receiver_addr=addr, - receiver_id=aid, - acl_metadata=meta, + await self.context.send_message( + create_acl( + closing, + acl_metadata=meta, + receiver_addr=agent, + sender_addr=self.context.addr, + ), + receiver_addr=agent, ) # store order book in db agent if not accepted_orderbook: @@ -569,4 +684,74 @@ async def clear_market(self, market_products: list[MarketProduct]): await self.store_market_results(market_meta) + if flows and len(flows) > 0: + await self.store_flows(flows) + return accepted_orderbook, market_meta + + async def store_order_book(self, orderbook: Orderbook): + # Send a message to the OutputRole to update data in the database + """ + Sends a message to the OutputRole to update data in the database. + + Args: + orderbook (Orderbook): The order book to be stored. + """ + + db_addr = self.context.data.get("output_agent_addr") + + if db_addr: + message = { + "context": "write_results", + "type": "store_order_book", + "market_id": self.marketconfig.market_id, + "data": orderbook, + } + await self.context.send_message( + receiver_addr=db_addr, + content=message, + ) + + async def store_market_results(self, market_meta): + """ + Sends a message to the OutputRole to update data in the database. + + Args: + market_meta: The metadata of the market. + """ + + db_addr = self.context.data.get("output_agent_addr") + + if db_addr: + message = { + "context": "write_results", + "type": "store_market_results", + "market_id": self.marketconfig.market_id, + "data": market_meta, + } + await self.context.send_message( + receiver_addr=db_addr, + content=message, + ) + + async def store_flows(self, flows: dict[tuple, float]): + """ + Sends a message to the OutputRole to update data in the database. + + Args: + flows (flows): The electric flows between nodes to be stored. + """ + + db_addr = self.context.data.get("output_agent_addr") + + if db_addr: + message = { + "context": "write_results", + "type": "store_flows", + "market_id": self.marketconfig.market_id, + "data": flows, + } + await self.context.send_message( + content=message, + receiver_addr=db_addr, + )