Skip to content

Commit

Permalink
Data Request mechanism (#247)
Browse files Browse the repository at this point in the history
This adds a new data_request mechanism which can be used to request the
power generation of a unit or the market price of a market.
This is needed for contracts, which are developed in a follow-up PR

* add data_request mechanism to market and units_operator
* do not shadow on_stop and setup methods
* unify usage of start_time and end_time

This allows workflows like:
1. request market_price of last week from market
2. wait for result from market
3. calculate contract result (e. g. hours in which the price is above or
below some value) in the contract_market_agent
  • Loading branch information
maurerle authored Nov 14, 2023
1 parent 61d58a8 commit 3bddb13
Show file tree
Hide file tree
Showing 7 changed files with 1,009 additions and 398 deletions.
37 changes: 26 additions & 11 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ class Order(TypedDict):
:param end_time: the end time of the order
:type end_time: datetime
:param volume: the volume of the order
:type volume: int
:type volume: float
:param price: the price of the order
:type price: int
:type price: float
:param accepted_volume: the accepted volume of the order
:type accepted_volume: float
:param accepted_price: the accepted price of the order
:type accepted_price: float
:param only_hours: tuple of hours from which this order is available, on multi day products
:type only_hours: OnlyHours | None
:param agent_id: the id of the agent
Expand Down Expand Up @@ -187,18 +191,18 @@ class OpeningMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param start: the start time of the market
:type start: float
:param stop: the stop time of the market
:type stop: float
:param start_time: the start time of the market
:type start_time: float
:param end_time: the stop time of the market
:type end_time: float
:param products: list of products which are available at the market to be traded
:type products: list[Product]
"""

context: str
market_id: str
start: float
stop: float
start_time: float
end_time: float
products: list[Product]


Expand All @@ -210,13 +214,16 @@ class ClearingMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param orderbook: the orderbook of the market
:type orderbook: Orderbook
:param accepted_orders: the orders accepted by the market
:type accepted_orders: Orderbook
:param rejected_orders: the orders rejected by the market
:type rejected_orders: Orderbook
"""

context: str
market_id: str
orderbook: Orderbook
accepted_orders: Orderbook
rejected_orders: Orderbook


class OrderBookMessage(TypedDict):
Expand All @@ -237,6 +244,14 @@ class RegistrationReplyMessage(TypedDict):
accepted: bool


class DataRequestMessage(TypedDict):
context: str
market_id: str
metric: str
start_time: datetime
end_time: datetime


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
Expand Down
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def setup(self):
"""
Sets up the WriteOutput instance by subscribing to messages and scheduling recurrent tasks of storing the data.
"""
super().setup()

self.context.subscribe_message(
self,
Expand Down Expand Up @@ -355,6 +356,7 @@ async def on_stop(self):
"""
This function makes it possible to calculate Key Performance Indicators
"""
await super().on_stop()

# insert left records into db
await self.store_dfs()
Expand Down
69 changes: 54 additions & 15 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from collections import defaultdict
from datetime import datetime
from itertools import groupby
from operator import itemgetter
Expand All @@ -14,6 +15,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Expand Down Expand Up @@ -46,7 +48,6 @@ def __init__(
):
super().__init__()

self.bids_map = {}
self.available_markets = available_markets
self.registered_markets: dict[str, MarketConfig] = {}
self.last_sent_dispatch = 0
Expand All @@ -58,11 +59,12 @@ def __init__(
self.use_portfolio_opt = opt_portfolio[0]
self.portfolio_strategy = opt_portfolio[1]

# should be a list per product_type
self.valid_orders = []
# valid_orders per product_type
self.valid_orders = defaultdict(list)
self.units: dict[str, BaseUnit] = {}

def setup(self):
super().setup()
self.id = self.context.aid
self.context.subscribe_message(
self,
Expand All @@ -82,6 +84,12 @@ def setup(self):
lambda content, meta: content.get("context") == "registration",
)

self.context.subscribe_message(
self,
self.handle_data_request,
lambda content, meta: content.get("context") == "data_request",
)

for market in self.available_markets:
if self.participate(market):
self.context.schedule_timestamp_task(
Expand Down Expand Up @@ -149,6 +157,7 @@ async def register_market(self, market: MarketConfig):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": market.name,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")
Expand All @@ -164,9 +173,9 @@ def handle_opening(self, opening: OpeningMessage, meta: MetaDict):
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
f'{self.id} received opening from: {opening["market_id"]} {opening["start_time"]} until: {opening["end_time"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))
self.context.schedule_instant_task(coroutine=self.submit_bids(opening, meta))

def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
Expand All @@ -187,14 +196,12 @@ def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):

for order in orderbook:
order["market_id"] = content["market_id"]
# map bid id to unit id
order["unit_id"] = self.bids_map[order["bid_id"]]

self.valid_orders.extend(orderbook)
marketconfig = self.registered_markets[content["market_id"]]
self.valid_orders[marketconfig.product_type].extend(orderbook)
self.set_unit_dispatch(orderbook, marketconfig)
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()
self.write_actual_dispatch(marketconfig.product_type)

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
Expand All @@ -214,6 +221,31 @@ def handle_registration_feedback(
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
unit = content["unit"]
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

data = []
try:
data = self.units[unit].outputs[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand All @@ -233,7 +265,7 @@ def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
orderbook=orderbook,
)

def write_actual_dispatch(self):
def write_actual_dispatch(self, product_type: str):
"""
sends the actual aggregated dispatch curve
works across multiple markets
Expand All @@ -250,7 +282,10 @@ def write_actual_dispatch(self):
start = datetime.utcfromtimestamp(last)

market_dispatch = aggregate_step_amount(
self.valid_orders, start, now, groupby=["market_id", "unit_id"]
self.valid_orders[product_type],
start,
now,
groupby=["market_id", "unit_id"],
)
unit_dispatch_dfs = []
for unit_id, unit in self.units.items():
Expand All @@ -269,8 +304,11 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] > now, self.valid_orders)
self.valid_orders[product_type] = list(
filter(
lambda x: x["end_time"] > now,
self.valid_orders[product_type],
)
)

db_aid = self.context.data_dict.get("output_agent_id")
Expand All @@ -297,7 +335,7 @@ def write_actual_dispatch(self):
},
)

async def submit_bids(self, opening: OpeningMessage):
async def submit_bids(self, opening: OpeningMessage, meta: MetaDict):
"""
formulates an orderbook and sends it to the market.
This will handle optional portfolio processing
Expand Down Expand Up @@ -330,6 +368,7 @@ async def submit_bids(self, opening: OpeningMessage):
"sender_id": self.context.aid,
"sender_addr": self.context.addr,
"conversation_id": "conversation01",
"in_reply_to": meta.get("reply_with"),
}
await self.context.send_acl_message(
content={
Expand Down Expand Up @@ -404,8 +443,8 @@ async def formulate_bids(
order["price"] = round(order["price"] / market.price_tick)

order["bid_id"] = f"{unit_id}_{i+1}"
order["unit_id"] = unit_id
orderbook.append(order)
self.bids_map[order["bid_id"]] = unit_id

return orderbook

Expand Down
50 changes: 47 additions & 3 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MarketProduct,
MetaDict,
Expand Down Expand Up @@ -41,9 +42,11 @@ class MarketMechanism:
name: str

def __init__(self, marketconfig: MarketConfig):
super().__init__()
self.marketconfig = marketconfig
self.open_auctions = set()
self.all_orders = []
self.results = []

def validate_registration(
self, content: RegistrationMessage, meta: MetaDict
Expand Down Expand Up @@ -163,6 +166,7 @@ def setup(self):
Schedules the opening() method to run at the next opening time of the market.
"""
super().setup()
self.marketconfig.addr = self.context.addr
self.marketconfig.aid = self.context.aid

Expand Down Expand Up @@ -204,6 +208,15 @@ def accept_get_unmatched(content: dict, meta: MetaDict):
and content.get("market_id") == self.marketconfig.name
)

def accept_data_request(content: dict, meta: MetaDict):
return (
content.get("context") == "data_request"
and content.get("market_id") == self.marketconfig.name
)

self.context.subscribe_message(
self, self.handle_data_request, accept_data_request
)
self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook)
self.context.subscribe_message(
self, self.handle_registration, accept_registration
Expand Down Expand Up @@ -238,8 +251,8 @@ async def opening(self):
opening_message: OpeningMessage = {
"context": "opening",
"market_id": self.marketconfig.name,
"start": market_open,
"stop": market_closing,
"start_time": market_open,
"end_time": market_closing,
"products": products,
}

Expand All @@ -254,6 +267,7 @@ async def opening(self):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": f"{self.marketconfig.name}_{market_open}",
},
)

Expand Down Expand Up @@ -306,6 +320,7 @@ def handle_registration(self, content: RegistrationMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

Expand Down Expand Up @@ -337,10 +352,38 @@ def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": 1,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

data = []
try:
import pandas as pd

data = pd.DataFrame(self.results)
data.index = data["time"]
data = data[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_get_unmatched(self, content: dict, meta: MetaDict):
"""
A handler which sends the orderbook with unmatched orders to an agent.
Expand Down Expand Up @@ -442,6 +485,7 @@ async def clear_market(self, market_products: list[MarketProduct]):
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
self.results.append(meta)

await self.store_market_results(market_meta)

Expand Down
Loading

0 comments on commit 3bddb13

Please sign in to comment.