Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve messaging with typedDicts #245

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion assume/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def set_dispatch_plan(

self.calculate_cashflow(product_type, orderbook)

self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] = (
self.outputs[product_type + "_marginal_costs"].loc[start:end_excl] += (
self.calculate_marginal_cost(start, self.outputs[product_type].loc[start])
* self.outputs[product_type].loc[start:end_excl]
)
Expand Down Expand Up @@ -195,6 +195,7 @@ def as_dict(self) -> dict:
"id": self.id,
"technology": self.technology,
"unit_operator": self.unit_operator,
"node": self.node,
"unit_type": "base_unit",
}

Expand Down
42 changes: 40 additions & 2 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class MarketConfig:

class OpeningMessage(TypedDict):
"""
Message which is sent to the market to open a market
Message which is sent from the market to participating agent to open a market
:param context: the context of the message
:type context: str
Expand All @@ -204,7 +204,7 @@ class OpeningMessage(TypedDict):

class ClearingMessage(TypedDict):
"""
Message which is sent to the market to clear a market
Message which is sent from the market to agents to clear a market
:param context: the context of the message
:type context: str
Expand All @@ -219,6 +219,44 @@ class ClearingMessage(TypedDict):
orderbook: Orderbook


class OrderBookMessage(TypedDict):
context: str
market_id: str
orderbook: Orderbook


class RegistrationMessage(TypedDict):
context: str
market_id: str
information: dict


class RegistrationReplyMessage(TypedDict):
context: str
market_id: str
accepted: bool


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
http://www.fipa.org/specs/fipa00061/SC00061G.html#_Toc26669700
"""

sender_addr: str | list
sender_id: str
reply_to: str # to which agent follow up messages should be sent
conversation_id: str
performative: str
protocol: str
language: str
encoding: str
ontology: str
reply_with: str # what the answer should contain as in_reply_to
in_reply_to: str # str used to reference an earlier action
reply_by: str # latest time to accept replies


# Class for a Smart Contract which can contain something like:
# - Contract for Differences (CfD) -> based on market result
# - Market Subvention -> based on market result
Expand Down
7 changes: 4 additions & 3 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

logger = logging.getLogger(__name__)

from assume.common.market_objects import MetaDict
from assume.common.utils import separate_orders


Expand Down Expand Up @@ -145,7 +146,7 @@
)
self.context.schedule_recurrent_task(self.store_dfs, recurrency_task)

def handle_message(self, content, meta):
def handle_message(self, content, meta: MetaDict):
"""
Handles the incoming messages and performs corresponding actions.

Expand All @@ -157,7 +158,7 @@
"""

if content.get("type") == "store_order_book":
self.write_market_orders(content.get("data"), content.get("sender"))
self.write_market_orders(content.get("data"), content.get("market_id"))

elif content.get("type") == "store_market_results":
self.write_market_results(content.get("data"))
Expand Down Expand Up @@ -232,7 +233,7 @@
try:
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")
except ProgrammingError:
except (ProgrammingError, OperationalError, DataError):

Check warning on line 236 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L236

Added line #L236 was not covered by tests
self.check_columns(table, df)
# now try again
with self.db.begin() as db:
Expand Down
73 changes: 52 additions & 21 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from assume.common.market_objects import (
ClearingMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Orderbook,
OrderBookMessage,
RegistrationMessage,
)
from assume.common.utils import aggregate_step_amount
from assume.strategies import BaseStrategy, LearningStrategy
Expand Down Expand Up @@ -73,10 +76,18 @@
lambda content, meta: content.get("context") == "clearing",
)

self.context.subscribe_message(
self,
self.handle_registration_feedback,
lambda content, meta: content.get("context") == "registration",
)

for market in self.available_markets:
if self.participate(market):
self.register_market(market)
self.registered_markets[market.name] = market
self.context.schedule_timestamp_task(
self.register_market(market),
1, # register after time was updated for the first time
)

async def add_unit(
self,
Expand Down Expand Up @@ -119,43 +130,45 @@
"""
return True

def register_market(self, market: MarketConfig):
async def register_market(self, market: MarketConfig):
"""
Register a market.

:param market: the market to register
:type market: MarketConfig
"""
self.context.schedule_timestamp_task(
self.context.send_acl_message(
{"context": "registration", "market": market.name},
receiver_addr=market.addr,
receiver_id=market.aid,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
),
1, # register after time was updated for the first time
)
logger.debug(f"{self.id} tried to register at market {market.name}")

def handle_opening(self, opening: OpeningMessage, meta: dict[str, str]):
await self.context.send_acl_message(
{
"context": "registration",
"market_id": market.name,
"information": [u.as_dict() for u in self.units.values()],
},
receiver_addr=market.addr,
receiver_id=market.aid,
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")

def handle_opening(self, opening: OpeningMessage, meta: MetaDict):
"""
When we receive an opening from the market, we schedule sending back
our list of orders as a response

:param opening: the opening message
:type opening: OpeningMessage
:param meta: the meta data of the market
:type meta: dict[str, str]
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))

def handle_market_feedback(self, content: ClearingMessage, meta: dict[str, str]):
def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
handles the feedback which is received from a market we did bid at
stores accepted orders, sets the received power
Expand All @@ -165,7 +178,7 @@
:param content: the content of the clearing message
:type content: ClearingMessage
:param meta: the meta data of the market
:type meta: dict[str, str]
:type meta: MetaDict
"""
logger.debug(f"{self.id} got market result: {content}")
accepted_orders: Orderbook = content["accepted_orders"]
Expand All @@ -183,6 +196,24 @@
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
):
logger.debug("Market %s accepted our registration", content["market_id"])
if content["accepted"]:
found = False
for market in self.available_markets:
if content["market_id"] == market.name:
self.registered_markets[market.name] = market
found = True
break
if not found:
logger.error(

Check warning on line 211 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L211

Added line #L211 was not covered by tests
"Market %s sent registation but is unknown", content["market_id"]
)
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

Check warning on line 215 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L215

Added line #L215 was not covered by tests

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand Down Expand Up @@ -305,7 +336,7 @@
await self.context.send_acl_message(
content={
"context": "submit_bids",
"market": market.name,
"market_id": market.name,
"orderbook": orderbook,
},
receiver_addr=market.addr,
Expand Down
Loading