diff --git a/assume/markets/clearing_algorithms/__init__.py b/assume/markets/clearing_algorithms/__init__.py index ab8f046f..02abff6b 100644 --- a/assume/markets/clearing_algorithms/__init__.py +++ b/assume/markets/clearing_algorithms/__init__.py @@ -6,6 +6,7 @@ from assume.markets.base_market import MarketRole from .all_or_nothing import PayAsBidAonRole, PayAsClearAonRole +from .contracts import PayAsBidContractRole from .simple import PayAsBidRole, PayAsClearRole clearing_mechanisms: dict[str, MarketRole] = { @@ -13,6 +14,7 @@ "pay_as_bid": PayAsBidRole, "pay_as_bid_aon": PayAsBidAonRole, "pay_as_clear_aon": PayAsClearAonRole, + "pay_as_bid_contract": PayAsBidContractRole, } # try importing pypsa if it is installed diff --git a/assume/markets/clearing_algorithms/contracts.py b/assume/markets/clearing_algorithms/contracts.py new file mode 100644 index 00000000..66b85468 --- /dev/null +++ b/assume/markets/clearing_algorithms/contracts.py @@ -0,0 +1,671 @@ +# SPDX-FileCopyrightText: ASSUME Developers +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import asyncio +import logging +from datetime import datetime, timedelta +from itertools import groupby +from operator import itemgetter +from typing import Callable + +import pandas as pd +from dateutil import rrule as rr +from dateutil.relativedelta import relativedelta as rd + +from assume.common.market_objects import ( + ClearingMessage, + MarketConfig, + MarketProduct, + MetaDict, + Order, + Orderbook, +) +from assume.markets.base_market import MarketRole + +log = logging.getLogger(__name__) + + +class PayAsBidContractRole(MarketRole): + """ + This market role handles contracts between agents. + Contracts can be long term agreements which are valid for a longer period of time. + Each contract has an evaluation frequency in which the actual cashflow of the contract is executed. + This can include a calculation depending on the market result and dispatched energy. + It can be limited which agent can agree to a offered contract at this market by using an `eligible_lambda` in the contract properties. + The available contracts can be configured through the `available_contracts` dictionary. + + Args: + marketconfig (MarketConfig): The market configuration. + limitation (str): a string for limitations - either being "only_co2emissionless" or "only_renewables" + + """ + + required_fields = [ + "sender_id", + "contract", + "eligible_lambda", + "evaluation_frequency", + ] + + def __init__( + self, + marketconfig: MarketConfig, + limitation: str = "only_co2emissionless", + ): + super().__init__(marketconfig) + self.limitation = limitation + self.futures = {} + + def setup(self): + super().setup() + + def accept_data_response(content: dict, meta: MetaDict): + return content.get("context") == "data_response" + + self.context.subscribe_message( + self, self.handle_data_response, accept_data_response + ) + + def handle_data_response(self, content: dict, meta: MetaDict) -> None: + """ + Handles the response data and finishes the async Future waiting for the data. + + Args: + content (dict): message content with data property + meta (MetaDict): message meta + """ + if meta["in_reply_to"] not in self.futures: + log.error(f'data response {meta["in_reply_to"]} not in awaited futures') + else: + self.futures[meta["in_reply_to"]].set_result(content["data"]) + + def validate_registration(self, content: dict, meta: MetaDict) -> bool: + """ + validation function called by handle_registration + Makes it possible to allow only a subset of agents to bid on this market + by using self.limitation of the clearing mechanism. + + Args: + content (dict): message content with registration message and agent information + meta (MetaDict): message meta + + Returns: + bool: True if agent fulfills requirements + """ + if self.limitation: + if self.limitation == "only_co2emissionless": + requirement = lambda x: x in [ + "demand", + "nuclear", + "wind", + "solar", + "biomass", + ] + elif self.limitation == "only_renewables": + requirement = lambda x: x in ["demand", "wind", "solar", "biomass"] + else: + log.error(f"unknown limitation {self.limitation}") + return all( + [requirement(info["technology"]) for info in content["information"]] + ) + else: + return True + + def check_working(self, supply_order: Order, demand_order: Order) -> bool: + """ + Checks if a given supply_order fulfills the criteria of the demand_order and vice versa. + Used to allow bidding on some policies only to some agents. + + Args: + supply_order (Order): the supply order in question + demand_order (Order): the demand order in question + + Returns: + bool: True if the orders are compatible + """ + s_information = self.registered_agents[supply_order["agent_id"]] + d_information = self.registered_agents[demand_order["agent_id"]] + return supply_order["eligible_lambda"](d_information) and demand_order[ + "eligible_lambda" + ](s_information) + + def clear( + self, orderbook: Orderbook, market_products + ) -> (Orderbook, Orderbook, list[dict]): + market_getter = itemgetter( + "start_time", "end_time", "only_hours", "contract", "evaluation_frequency" + ) + accepted_orders: Orderbook = [] + rejected_orders: Orderbook = [] + meta = [] + orderbook.sort(key=market_getter) + for product, product_orders in groupby(orderbook, market_getter): + accepted_demand_orders: Orderbook = [] + accepted_supply_orders: Orderbook = [] + if product[0:3] not in market_products: + rejected_orders.extend(product_orders) + # log.debug(f'found unwanted bids for {product} should be {market_products}') + continue + + accepted_product_orders = [] + + product_orders = list(product_orders) + demand_orders = list(filter(lambda x: x["volume"] < 0, product_orders)) + supply_orders = list(filter(lambda x: x["volume"] > 0, product_orders)) + + # generation + supply_orders.sort(key=lambda i: i["price"]) + # demand + demand_orders.sort(key=lambda i: i["price"], reverse=True) + dem_vol, gen_vol = 0, 0 + # the following algorithm is inspired by one bar for generation and one for demand + # add generation for currents demand price, until it matches demand + # generation above it has to be sold for the lower price (or not at all) + for demand_order in demand_orders: + if not supply_orders: + # if no more generation - reject left over demand + rejected_orders.append(demand_order) + continue + + dem_vol += -demand_order["volume"] + to_commit: Orderbook = [] + + while supply_orders and gen_vol < dem_vol: + supply_order = supply_orders.pop(0) + if supply_order["price"] <= demand_order[ + "price" + ] and self.check_working(supply_order, demand_order): + supply_order["accepted_volume"] = supply_order["volume"] + to_commit.append(supply_order) + gen_vol += supply_order["volume"] + else: + rejected_orders.append(supply_order) + # now we know which orders we need + # we only need to see how to arrange it. + + diff = gen_vol - dem_vol + + if diff < 0: + # gen < dem + # generation is not enough - split demand + split_demand_order = demand_order.copy() + split_demand_order["accepted_volume"] = diff + demand_order["accepted_volume"] = demand_order["volume"] - diff + rejected_orders.append(split_demand_order) + elif diff > 0: + # generation left over - split generation + supply_order = to_commit[-1] + split_supply_order = supply_order.copy() + split_supply_order["volume"] = diff + supply_order["accepted_volume"] = supply_order["volume"] - diff + # only volume-diff can be sold for current price + # add left over to supply_orders again + gen_vol -= diff + + supply_orders.insert(0, split_supply_order) + demand_order["accepted_volume"] = demand_order["volume"] + else: + # diff == 0 perfect match + demand_order["accepted_volume"] = demand_order["volume"] + + accepted_demand_orders.append(demand_order) + # pay as bid + for supply_order in to_commit: + supply_order["accepted_price"] = supply_order["price"] + demand_order["accepted_price"] = supply_order["price"] + supply_order["contractor_unit_id"] = demand_order["sender_id"] + supply_order["contractor_id"] = demand_order["agent_id"] + demand_order["contractor_unit_id"] = supply_order["sender_id"] + demand_order["contractor_id"] = supply_order["agent_id"] + accepted_supply_orders.extend(to_commit) + + for order in supply_orders: + rejected_orders.append(order) + + accepted_product_orders = accepted_demand_orders + accepted_supply_orders + + supply_volume = sum(map(itemgetter("volume"), accepted_supply_orders)) + demand_volume = sum(map(itemgetter("volume"), accepted_demand_orders)) + accepted_orders.extend(accepted_product_orders) + prices = list(map(itemgetter("price"), accepted_supply_orders)) + if not prices: + prices = [self.marketconfig.maximum_bid_price] + + meta.append( + { + "supply_volume": supply_volume, + "demand_volume": demand_volume, + "price": sum(prices) / len(prices), + "max_price": max(prices), + "min_price": min(prices), + "node_id": None, + "product_start": product[0], + "product_end": product[1], + "only_hours": product[2], + } + ) + # demand for contracts is maximum generation capacity of the buyer + # this is needed so that the seller of the contract can lower the volume + + from functools import partial + + for order in accepted_supply_orders: + recurrency_task = rr.rrule( + freq=order["evaluation_frequency"], + dtstart=order["start_time"], + until=order["end_time"], + cache=True, + ) + self.context.schedule_recurrent_task( + partial(self.execute_contract, contract=order), recurrency_task + ) + + # contract clearing (pay_as_bid) takes place + return accepted_orders, rejected_orders, meta + + async def execute_contract(self, contract: Order): + """ + Scheduled async method which executes a contract in the future. + For the execution, the actual generation of the selling agent is queried using the data_request mechanism. + This timeseries is then used as an input to the contract which is used. + + If the contract relies on a market price signal, this is also queried before executing the contract function and sending the result to the buyer and seller. + + Args: + contract (Order): the contract which gets executed + """ + # contract must be executed + # contract from supply is given + buyer, seller = contract["contractor_unit_id"], contract["unit_id"] + seller_agent = contract["agent_id"] + c_function: Callable[str, tuple[Orderbook, Orderbook]] = available_contracts[ + contract["contract"] + ] + + end = datetime.utcfromtimestamp(self.context.current_timestamp) + begin = end - rd(weeks=1) + begin = max(contract["start_time"], begin) + + reply_with = f'{buyer}_{contract["start_time"]}' + self.futures[reply_with] = asyncio.Future() + self.context.schedule_instant_acl_message( + { + "context": "data_request", + "unit": seller, + "metric": "energy", + "start_time": begin, + "end_time": end, + }, + receiver_addr=seller_agent[0], + receiver_id=seller_agent[1], + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + "reply_with": reply_with, + }, + ) + + if contract["contract"] in contract_needs_market: + reply_with_market = f'market_eom_{contract["start_time"]}' + self.futures[reply_with_market] = asyncio.Future() + self.context.schedule_instant_acl_message( + { + "context": "data_request", + # ID3 would be average price of orders cleared in last 3 hours before delivery + # monthly averages are used for EEG + # https://www.netztransparenz.de/de-de/Erneuerbare-Energien-und-Umlagen/EEG/Transparenzanforderungen/Marktpr%C3%A4mie/Marktwert%C3%BCbersicht + "market_id": "EOM", + "metric": "price", + "start_time": begin, + "end_time": end, + }, + # TODO other market might not always be the same agent + receiver_addr=self.context.addr, + receiver_id=self.context.aid, + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + "reply_with": reply_with_market, + }, + ) + market_series = await self.futures[reply_with_market] + else: + market_series = None + + client_series = await self.futures[reply_with] + buyer, seller = c_function(contract, market_series, client_series, begin, end) + + in_reply_to = f'{contract["contract"]}_{contract["start_time"]}' + await self.send_contract_result(contract["contractor_id"], buyer, in_reply_to) + await self.send_contract_result(contract["agent_id"], seller, in_reply_to) + + async def send_contract_result( + self, receiver: tuple, orderbook: Orderbook, in_reply_to: str + ): + """ + Send the result of a contract to the given receiver + + Args: + receiver (tuple): the address and agent id of the receiver + orderbook (Orderbook): the orderbook which is used as execution of the contract + in_reply_to (str): the contract to which this is the resulting response + """ + content: ClearingMessage = { + # using a clearing message is a way of giving payments to the participants + "context": "clearing", + "market_id": self.marketconfig.name, + "accepted_orders": orderbook, + "rejected_orders": [], + } + await self.context.send_acl_message( + content=content, + receiver_addr=receiver[0], + receiver_id=receiver[1], + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + "in_reply_to": in_reply_to, + }, + ) + + +def ppa( + contract: dict, + market_index: pd.Series, + future_generation_series: pd.Series, + start: datetime, + end: datetime, +): + """ + The Power Purchase Agreement (PPA) is used to have an agreement where the total amount is not fixed, but the price is fixed. + As the power is actually bought, the selling agent is not allowed to sell it on other markets. + The buying agent has an uncertainty of the actual traded amount at the time of contracting. + + Args: + contract (dict): the contract which is executed + market_index (pd.Series): the market_index + generation_series (pd.Series): the actual generation or demand of the agent + start (datetime.datetime): the start time of the contract + end (datetime.datetime): the end time of the contract + + Returns: + tuple[dict, dict]: the buyer order and the seller order as a tuple + """ + buyer_agent, seller_agent = contract["contractor_id"], contract["agent_id"] + volume = sum(future_generation_series[start:end]) + buyer: Orderbook = [ + { + "bid_id": contract["contractor_unit_id"], + "unit_id": contract["contractor_unit_id"], + "start_time": start, + "end_time": end, + "volume": volume, + "price": contract["price"], + "accepted_volume": volume, + "accepted_price": contract["price"], + "only_hours": None, + "agent_id": buyer_agent, + } + ] + seller: Orderbook = [ + { + "bid_id": contract["unit_id"], + "unit_id": contract["unit_id"], + "start_time": start, + "end_time": end, + "volume": -volume, + "price": contract["price"], + "accepted_volume": -volume, + "accepted_price": contract["price"], + "only_hours": None, + "agent_id": seller_agent, + } + ] + return buyer, seller + + +def swingcontract( + contract: dict, + market_index: pd.Series, + demand_series: pd.Series, + start: datetime, + end: datetime, +): + """ + The swing contract is used to provide a band in which one price is payed, while the second (higher) price is paid, when the band is left. + + Args: + contract (dict): the contract which is executed + market_index (pd.Series): the market_index + demand_series (pd.Series): the actual generation or demand of the agent + start (datetime.datetime): the start time of the contract + end (datetime.datetime): the end time of the contract + + Returns: + tuple[dict, dict]: the buyer order and the seller order as a tuple + """ + buyer_agent, seller_agent = contract["contractor_id"], contract["agent_id"] + + minDCQ = 80 # daily constraint quantity + maxDCQ = 100 + set_price = contract["price"] # ct/kWh + outer_price = contract["price"] * 1.5 # ct/kwh + # TODO does not work with multiple markets with differing time scales.. + # this only works for whole trading hours (as x MW*1h == x MWh) + demand = -demand_series[start:end] + normal = demand[minDCQ < demand and demand < maxDCQ] * set_price + expensive = ~demand[minDCQ < demand and demand < maxDCQ] * outer_price + price = sum(normal) + sum(expensive) + # volume is hard to calculate with differing units? + # unit conversion is quite hard regarding the different intervals + buyer: Orderbook = [ + { + "bid_id": contract["contractor_unit_id"], + "unit_id": contract["contractor_unit_id"], + "start_time": start, + "end_time": end, + "volume": demand, + "price": price, + "accepted_volume": demand, + "accepted_price": price, + "only_hours": None, + "agent_id": buyer_agent, + } + ] + seller: Orderbook = [ + { + "bid_id": contract["unit_id"], + "unit_id": contract["unit_id"], + "start_time": start, + "end_time": end, + "volume": -demand, + "price": price, + "accepted_volume": -demand, + "accepted_price": price, + "only_hours": None, + "agent_id": seller_agent, + } + ] + return buyer, seller + + +def cfd( + contract: dict, + market_index: pd.Series, + gen_series: pd.Series, + start: datetime, + end: datetime, +): + """ + The Contract for Differences does rely on the market signal and does pay the difference between a set price and the actual price when the contract is due - retrospectively. + + Args: + contract (dict): the contract which is executed + market_index (pd.Series): the market_index + demand_series (pd.Series): the actual generation or demand of the agent + start (datetime.datetime): the start time of the contract + end (datetime.datetime): the end time of the contract + + Returns: + tuple[dict, dict]: the buyer order and the seller order as a tuple + """ + buyer_agent, seller_agent = contract["contractor_id"], contract["agent_id"] + + # TODO does not work with multiple markets with differing time scales.. + # this only works for whole trading hours (as x MW*1h == x MWh) + # price_series = (contract["price"] - market_index[start:end]) * gen_series[seller][ + # start:end + # ] + price_series = (market_index[start:end] - contract["price"]) * gen_series[start:end] + price_series = price_series.dropna() + price = sum(price_series) + volume = sum(gen_series[start:end]) + # volume is hard to calculate with differing units? + # unit conversion is quite hard regarding the different intervals + buyer: Orderbook = [ + { + "bid_id": contract["contractor_unit_id"], + "unit_id": contract["contractor_unit_id"], + "start_time": start, + "end_time": end, + "volume": volume, + "price": price, + "accepted_volume": volume, + "accepted_price": price, + "only_hours": None, + "agent_id": buyer_agent, + } + ] + seller: Orderbook = [ + { + "bid_id": contract["unit_id"], + "unit_id": contract["unit_id"], + "start_time": start, + "end_time": end, + "volume": -volume, + "price": price, + "accepted_volume": -volume, + "accepted_price": price, + "only_hours": None, + "agent_id": seller_agent, + } + ] + return buyer, seller + + +def market_premium( + contract: dict, + market_index: pd.Series, + gen_series: pd.Series, + start: datetime, + end: datetime, +): + """ + The market premium calculates the difference of the market_index and the contracted price. + As the seller already sold the energy on the index market, + + Args: + contract (dict): the contract which is executed + market_index (pd.Series): the market_index + demand_series (pd.Series): the actual generation or demand of the agent + start (datetime.datetime): the start time of the contract + end (datetime.datetime): the end time of the contract + + Returns: + tuple[dict, dict]: the buyer order and the seller order as a tuple + """ + buyer_agent, seller_agent = contract["contractor_id"], contract["agent_id"] + # TODO does not work with multiple markets with differing time scales.. + # this only works for whole trading hours (as x MW*1h == x MWh) + price_series = (market_index[start:end] - contract["price"]) * gen_series[start:end] + price_series = price_series.dropna() + # sum only where market price is below contract price + price = sum(price_series[price_series < 0]) + volume = sum(gen_series[start:end]) + # volume is hard to calculate with differing units? + # unit conversion is quite hard regarding the different intervals + buyer: Orderbook = [ + { + "bid_id": contract["contractor_unit_id"], + "unit_id": contract["contractor_unit_id"], + "start_time": start, + "end_time": end, + "volume": volume, + "price": price, + "accepted_volume": volume, + "accepted_price": price, + "only_hours": None, + "agent_id": buyer_agent, + } + ] + seller: Orderbook = [ + { + "bid_id": contract["unit_id"], + "unit_id": contract["unit_id"], + "start_time": start, + "end_time": end, + "volume": -volume, + "price": price, + "accepted_volume": -volume, + "accepted_price": price, + "only_hours": None, + "agent_id": seller_agent, + } + ] + return buyer, seller + + +def feed_in_tariff( + contract: dict, + market_index: pd.Series, + client_series: pd.Series, + start: datetime, + end: datetime, +): + buyer_agent, seller_agent = contract["contractor_id"], contract["agent_id"] + # TODO does not work with multiple markets with differing time scales.. + # this only works for whole trading hours (as x MW*1h == x MWh) + price_series = contract["price"] * client_series[start:end] + price = sum(price_series) + # volume is hard to calculate with differing units? + # unit conversion is quite hard regarding the different intervals + volume = sum(client_series) + buyer: Orderbook = [ + { + "bid_id": contract["contractor_unit_id"], + "unit_id": contract["contractor_unit_id"], + "start_time": start, + "end_time": end, + "volume": volume, + "price": price, + "accepted_volume": volume, + "accepted_price": price, + "only_hours": None, + "agent_id": buyer_agent, + } + ] + seller: Orderbook = [ + { + "bid_id": contract["unit_id"], + "unit_id": contract["unit_id"], + "start_time": start, + "end_time": end, + "volume": -volume, + "price": price, + "accepted_volume": -volume, + "accepted_price": price, + "only_hours": None, + "agent_id": seller_agent, + } + ] + return buyer, seller + + +available_contracts: dict[str, Callable] = { + "ppa": ppa, + "CFD": cfd, + "FIT": feed_in_tariff, + "MPFIX": market_premium, +} +contract_needs_market = ["CFD", "MPFIX"] diff --git a/assume/scenario/loader_amiris.py b/assume/scenario/loader_amiris.py index 53387b9e..a7668800 100644 --- a/assume/scenario/loader_amiris.py +++ b/assume/scenario/loader_amiris.py @@ -2,16 +2,19 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later +import calendar import logging from datetime import timedelta import dateutil.rrule as rr import pandas as pd import yaml +from dateutil.relativedelta import relativedelta as rd from yamlinclude import YamlIncludeConstructor from assume.common.forecasts import NaiveForecast from assume.common.market_objects import MarketConfig, MarketProduct +from assume.strategies.extended import SupportStrategy from assume.world import World logger = logging.getLogger(__name__) @@ -48,7 +51,20 @@ def read_csv(base_path, filename): )["load"] -def get_send_receive_msgs_per_id(agent_id, contracts_config: list): +def get_send_receive_msgs_per_id(agent_id: int, contracts_config: list[dict]): + """ + AMIRIS contract conversion function which finds the list of ids which receive or send a message from/to the agent with agent_id. + + Args: + agent_id (int): the agent id to which the contracts are found + contracts_config (list[dict]): whole contracts dict read from yaml + + Returns: + tuple: A tuple containing the following: + - list: A list containing the ids of sending agents. + - list: A list containing the ids of receiving agents + """ + sends = [] receives = [] for contracts in contracts_config: @@ -118,11 +134,50 @@ def add_agent_to_world( contracts: list, base_path: str, markups: dict = {}, + supports: dict = {}, ): + """ + Adds an agent from a amiris agent definition to the ASSUME world. + It should be called in load_amiris_async, which loads agents in the correct order. + + Args: + agent (dict): AMIRIS agent dict + world (World): ASSUME world to add the agent to + prices (dict): prices read from amiris scenario beforehand + contracts (list): contracts read from the amiris scenario beforehand + base_path (str): base path to load profile csv files from + markups (dict, optional): markups read from former agents. Defaults to {}. + """ strategies = {m: "naive_eom" for m in list(world.markets.keys())} storage_strategies = {m: "flexable_eom_storage" for m in list(world.markets.keys())} match agent["Type"]: + case "SupportPolicy": + support_data = agent["Attributes"]["SetSupportData"] + supports |= {x.pop("Set"): x for x in support_data} + world.add_unit_operator(agent["Id"]) + + for name, support in supports.items(): + contract = list(support.keys())[0] + value = list(support[contract].values())[0] + # TODO + world.add_unit( + f"{name}_{agent['Id']}", + "demand", + agent["Id"], + { + "min_power": 0, + "max_power": 100000, + "bidding_strategies": { + "energy": "support", + "financial_support": "support", + }, + "technology": "demand", + "price": value, + }, + NaiveForecast(world.index, demand=100000), + ) case "EnergyExchange" | "DayAheadMarketSingleZone": + clearing_section = agent["Attributes"].get("Clearing", agent["Attributes"]) market_config = MarketConfig( market_id=f"Market_{agent['Id']}", opening_hours=rr.rrule( @@ -130,7 +185,7 @@ def add_agent_to_world( ), opening_duration=timedelta(hours=1), market_mechanism=translate_clearing[ - agent["Attributes"]["DistributionMethod"] + clearing_section["DistributionMethod"] ], market_products=[ MarketProduct(timedelta(hours=1), 1, timedelta(hours=1)) @@ -139,6 +194,30 @@ def add_agent_to_world( ) world.add_market_operator(f"Market_{agent['Id']}") world.add_market(f"Market_{agent['Id']}", market_config) + + if supports: + support_config = MarketConfig( + name=f"SupportMarket_{agent['Id']}", + opening_hours=rr.rrule( + rr.YEARLY, dtstart=world.start, until=world.end + ), + opening_duration=timedelta(hours=1), + market_mechanism="pay_as_bid_contract", + market_products=[ + MarketProduct(rd(months=12), 1, timedelta(hours=1)) + ], + additional_fields=[ + "sender_id", + "contract", # one of MPVAR, MPFIX, CFD + "eligible_lambda", + "evaluation_frequency", # monthly + ], + product_type="financial_support", + supports_get_unmatched=True, + maximum_bid_volume=1e6, + ) + world.add_market_operator(f"SupportMarket_{agent['Id']}") + world.add_market(f"SupportMarket_{agent['Id']}", support_config) case "CarbonMarket": co2_price = agent["Attributes"]["Co2Prices"] if isinstance(co2_price, str): @@ -181,7 +260,10 @@ def add_agent_to_world( world.add_unit_operator(operator_id) device = agent["Attributes"]["Device"] strategy = agent["Attributes"]["Strategy"] - if strategy["StrategistType"] != "SINGLE_AGENT_MIN_SYSTEM_COST": + if strategy["StrategistType"] not in [ + "SINGLE_AGENT_MIN_SYSTEM_COST", + "SINGLE_AGENT_MAX_PROFIT", + ]: logger.warning(f"unknown strategy for storage trader: {strategy}") forecast_price = prices.get("co2", 20) @@ -221,6 +303,7 @@ def add_agent_to_world( case "NoSupportTrader": # does not get support - just trades renewables # has a ShareOfRevenues (how much of the profit he keeps) + # can also have a ForecastError world.add_unit_operator(f"Operator_{agent['Id']}") case "SystemOperatorTrader": world.add_unit_operator(f"Operator_{agent['Id']}") @@ -233,12 +316,16 @@ def add_agent_to_world( max_markup = agent["Attributes"]["maxMarkup"] markups[agent["Id"]] = (min_markup, max_markup) case "PredefinedPlantBuilder": - # this is the actual powerplant + # this is the actual powerplant/PlantBuilder prototype = agent["Attributes"]["Prototype"] attr = agent["Attributes"] + # first get send and receives for our PlantBuilder send, receive = get_send_receive_msgs_per_id(agent["Id"], contracts) + # the first multi send includes message from us to our operator/portfolio raw_operator_id = get_matching_send_one_or_multi(agent["Id"], send[0]) + # we need to find send and receive for the raw operator too send_t, receive_t = get_send_receive_msgs_per_id(raw_operator_id, contracts) + # the third entry here is the multi send to the actual trader raw_trader_id = get_matching_send_one_or_multi(raw_operator_id, send_t[2]) operator_id = f"Operator_{raw_operator_id}" fuel_price = prices.get(translate_fuel_type[prototype["FuelType"]], 0) @@ -253,6 +340,7 @@ def add_agent_to_world( ) # TODO UnplannedAvailabilityFactor is not respected + # we get the markups from the trader id: min_markup, max_markup = markups.get(raw_trader_id, (0, 0)) # Amiris interpolates blocks linearly interpolated_values = interpolate_blocksizes( @@ -308,7 +396,25 @@ def add_agent_to_world( fuel_price=fuel_price, co2_price=prices.get("co2", 0), ) - # TODO attr["SupportInstrument"] and + support_instrument = attr.get("SupportInstrument") + support_conf = supports.get(attr.get("Set")) + bidding_params = {} + if support_instrument and support_conf: + for market in world.markets.keys(): + if "SupportMarket" in market: + strategies[market] = "support" + if support_instrument == "FIT": + conf_key = "TsFit" + elif support_instrument in ["CFD", "MPVAR"]: + conf_key = "Lcoe" + else: + conf_key = "Premium" + value = support_conf[support_instrument][conf_key] + bidding_params["contract_types"] = support_instrument + bidding_params["support_value"] = value + # ASSUME evaluates contracts on a monthly schedule + bidding_params["evaluation_frequency"] = rr.MONTHLY + world.add_unit( f"VariableRenewableOperator_{agent['Id']}", "power_plant", @@ -321,6 +427,7 @@ def add_agent_to_world( "fuel_type": translate_fuel_type[attr["EnergyCarrier"]], "emission_factor": 0, "efficiency": 1, + "bidding_params": bidding_params, }, forecast, ) @@ -342,25 +449,31 @@ async def load_amiris_async( study_case: str, base_path: str, ): + """ + Loads an Amiris scenario + + Args: + world (World): the ASSUME world + scenario (str): the scenario name + study_case (str): study case to define + base_path (str): base path from where to load the amrisi scenario + """ amiris_scenario = read_amiris_yaml(base_path) # DeliveryIntervalInSteps = 3600 # In practice - this seems to be a fixed number in AMIRIS - if study_case.lower() == "simple": - print("is simple - adjusting start time") - amiris_scenario["GeneralProperties"]["Simulation"][ - "StartTime" - ] = "2020-12-31_23:58:00" - start = amiris_scenario["GeneralProperties"]["Simulation"]["StartTime"] start = pd.to_datetime(start, format="%Y-%m-%d_%H:%M:%S") + if calendar.isleap(start.year): + start += timedelta(days=1) end = amiris_scenario["GeneralProperties"]["Simulation"]["StopTime"] end = pd.to_datetime(end, format="%Y-%m-%d_%H:%M:%S") # AMIRIS caveat: start and end is always two minutes before actual start start += timedelta(minutes=2) sim_id = f"{scenario}_{study_case}" - save_interval = amiris_scenario["GeneralProperties"]["Output"]["Interval"] // 2 + save_interval = amiris_scenario["GeneralProperties"]["Output"]["Interval"] // 4 prices = {} index = pd.date_range(start=start, end=end, freq="1h", inclusive="left") + world.bidding_strategies["support"] = SupportStrategy await world.setup( start=start, end=end, @@ -370,11 +483,13 @@ async def load_amiris_async( ) # helper dict to map trader markups/markdowns to powerplants markups = {} + supports = {} keyorder = [ "EnergyExchange", "DayAheadMarketSingleZone", "CarbonMarket", "FuelsMarket", + "SupportPolicy", "DemandTrader", "StorageTrader", "RenewableTrader", @@ -386,7 +501,6 @@ async def load_amiris_async( "VariableRenewableOperator", "Biogas", "MeritOrderForecaster", - "SupportPolicy", ] agents_sorted = sorted( amiris_scenario["Agents"], key=lambda agent: keyorder.index((agent["Type"])) @@ -399,6 +513,7 @@ async def load_amiris_async( amiris_scenario["Contracts"], base_path, markups, + supports, ) # calculate market price before simulation world @@ -408,7 +523,7 @@ async def load_amiris_async( # To use this with amiris run: # git clone https://gitlab.com/dlr-ve/esy/amiris/examples.git amiris-examples # next to the assume folder - scenario = "Simple" # Germany2019 or Austria2019 or Simple + scenario = "Germany2019" # Germany2019 or Austria2019 or Simple base_path = f"../amiris-examples/{scenario}/" amiris_scenario = read_amiris_yaml(base_path) sends, receives = get_send_receive_msgs_per_id( @@ -427,7 +542,7 @@ async def load_amiris_async( load_amiris_async( world, "amiris", - scenario, + scenario.lower(), base_path, ) ) diff --git a/assume/strategies/extended.py b/assume/strategies/extended.py index eaaa5f4b..3d090472 100644 --- a/assume/strategies/extended.py +++ b/assume/strategies/extended.py @@ -2,8 +2,11 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later +import dateutil.rrule as rr + from assume.common.base import BaseStrategy, SupportsMinMax from assume.common.market_objects import MarketConfig, Orderbook, Product +from assume.strategies.naive_strategies import NaiveSingleBidStrategy class OTCStrategy(BaseStrategy): @@ -67,6 +70,103 @@ def calculate_bids( return bids +def is_co2emissionless(units): + requirement = lambda x: x in ["demand", "nuclear", "wind", "solar", "biomass"] + return all([requirement(info["technology"]) for info in units]) + + +class SupportStrategy(NaiveSingleBidStrategy): + """ + Strategy for support markets. + A list of allowed `contract_types` is given, as well as a value which is used to bid contracts. + As often not everything which is possible should be bid on contracts, this can be defined through + `contract_amount_fraction` - as well as an execution schedule. + """ + + def __init__( + self, + contract_types: list[str] = [], + contract_value=0, + contract_amount_fraction=1, + evaluation_frequency=rr.WEEKLY, + *args, + **kwargs, + ): + """ + Init function of the support strategy. + Pass a list of contract_types for which this strategy creates individual bids each. + + Args: + contract_types (list[str], optional): List of contract types of available_contracts. Defaults to []. + contract_amount_fraction (float, optional): a fraction of how much of the maximum capacity should be bid at max on this contract. Defaults to 1. + contract_value (float, optional): the value used as a price for the given contract + evaluation_frequency (int, optional): the evaluation frequency as dateutil FREQ + """ + super().__init__(*args, **kwargs) + self.contract_types = contract_types + self.contract_amount_fraction = contract_amount_fraction + self.contract_value = contract_value + self.evaluation_frequency = evaluation_frequency + + def calculate_bids( + self, + unit: SupportsMinMax, + market_config: MarketConfig, + product_tuples: list[Product], + **kwargs, + ) -> Orderbook: + """ + Takes information from a unit that the unit operator manages and defines how it is dispatched to the market. + + Args: + unit (SupportsMinMax): Unit to dispatch. + market_config (MarketConfig): Market configuration. + product_tuples (List[Product]): List of products to dispatch. + **kwargs (dict): Additional arguments. + + Returns: + Orderbook: The orderbook. + """ + if "evaluation_frequency" not in market_config.additional_fields: + # seems like we do not have a contract market here + return super().calculate_bids(unit, market_config, product_tuples, **kwargs) + + bids = [] + # demand is the other way around + # TODO should be generic without asking for Demand name + power = unit.min_power if type(unit).__name__ == "Demand" else unit.max_power + # bid a fraction as support for showcase reasons - not everything should be bid on contracts + power *= self.contract_amount_fraction + for product in product_tuples: + start = product[0] + end = product[1] + current_power = unit.outputs["energy"].at[start] + if not self.contract_value: + price = unit.calculate_marginal_cost(start, current_power) + else: + price = self.contract_value + price = min(price, market_config.maximum_bid_price) + + for contract_type in self.contract_types: + bids.append( + { + "start_time": start, + "end_time": end, + "only_hours": product[2], + "price": price, + "volume": power, + "sender_id": unit.id, + "contract": contract_type, + # by default only bid on co2 emissionless contracts + "eligible_lambda": is_co2emissionless, + # lambda u: u.technology in ["nuclear"], + "evaluation_frequency": self.evaluation_frequency, + } + ) + + return bids + + class MarkupStrategy(BaseStrategy): """ Strategy for Markup (over the counter trading) markets diff --git a/docs/source/index.rst b/docs/source/index.rst index 2f0206b6..95bf079c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -80,6 +80,7 @@ Documentation * :doc:`learning_algorithm` * :doc:`buffers` * :doc:`assume` +* :doc:`support_policies` .. toctree:: :hidden: @@ -94,6 +95,7 @@ Documentation learning_algorithm buffers assume + support_policies Indices and tables diff --git a/docs/source/support_policies.rst b/docs/source/support_policies.rst new file mode 100644 index 00000000..3a58abcf --- /dev/null +++ b/docs/source/support_policies.rst @@ -0,0 +1,72 @@ +.. SPDX-FileCopyrightText: ASSUME Developers +.. +.. SPDX-License-Identifier: AGPL-3.0-or-later + +###################### +Support Policies +###################### + +Support Policies are a very important feature when considering different energy market designs. +A support policy allows to influence the cash flow of unit, making decisions more profitable. + +One can differentiate between support policies which influence the available market capacity (product_type=`energy``) and those which do not. + +If the product_type is `energy`, the volume used for the contract can not be additionally bid on the EOM. + + +Example Policies +===================================== + + +Feed-In-Tariff - FIT +-------------------- + +To create a Feed-In-Tariff (Einspeisevergütung) one has a contract which sets a fixed price for all produced energy. +The energy can not be additionally sold somewhere else (product_type=`energy`). + +The Tariff is contracted at the beginning of the simulation and is valid for X days (1 year). + +The payout is executed on a different repetition schedule (monthly). +For this, the output_agent is asked how much energy an agent produced in the timeframe. + +This is essentially the same as a Power Purchase Agreement (PPA), except that the payment of FIT is continuous and not monthly or yearly. + + +Fixed Market Premium - MPFIX +---------------------------- + +A market premium is paid on top of the market results, based on the results. +As the volume does not influcence the market bidding, the product_type is `financial_support` +So a Market premium is contracted at the beginning of the simulation and is valid for X days (1 year). + +The payout is executed on a different repetition schedule (monthly). +For this, the output_agent is asked how much energy an agent produced in the timeframe and what the clearing price of the market with name "EOM" was. +The differences are then calculated and paid out on a monthly base. + +This mechanism is also known as One-Sided market premium + +Variable Market Premium - MPVAR +------------------------------- + +The Idea of the variable market premium is to be based on some kind of market index (like ID3) received from the output agent. + + +Capacity Premium - CP +--------------------- + +A capacity premium is paid on a yearly basis for a technology. +This is done in € per installed MW of capacity. +It allows to influence the financial flow of plants which would not be profitable. + +Contract for Differences - CfD +------------------------------ + +A fixed LCoE is set as a price, if an Agent accepts the CfD contract, +it has to bid at the hourly EOM - the difference of the market result is paid/received to/from the contractor. + + +Swing Contract +-------------- + +Actor +^^^^^ diff --git a/examples/world_script_policy.py b/examples/world_script_policy.py new file mode 100644 index 00000000..d923bd84 --- /dev/null +++ b/examples/world_script_policy.py @@ -0,0 +1,139 @@ +# SPDX-FileCopyrightText: ASSUME Developers +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import logging +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr +from dateutil.relativedelta import relativedelta as rd + +from assume import World +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct +from assume.markets.clearing_algorithms.extended import PayAsBidContractRole + +log = logging.getLogger(__name__) + +db_uri = "postgresql://assume:assume@localhost:5432/assume" + +world = World(database_uri=db_uri) + + +async def init(): + """ + Init function of the Policy Script Scenario + """ + start = datetime(2019, 1, 1) + end = datetime(2019, 3, 1) + index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", + ) + sim_id = "world_script_policy" + + world.clearing_mechanisms["pay_as_bid_contract"] = PayAsBidContractRole + from assume.strategies.extended import SupportStrategy + + world.bidding_strategies["support"] = SupportStrategy + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + ) + + marketdesign = [ + MarketConfig( + "EOM", + rr.rrule( + rr.HOURLY, interval=24, dtstart=start + timedelta(hours=2), until=end + ), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 24, timedelta(hours=1))], + product_type="energy", + ), + # MarketConfig( + # "SupportEnergy", + # rr.rrule(rr.MONTHLY, dtstart=start, until=end), + # timedelta(hours=1), + # "pay_as_bid_contract", + # [MarketProduct(rd(months=1), 1, timedelta(days=0))], + # additional_fields=[ + # "sender_id", + # "contract", # one of FIT, PPA + # "eligible_lambda", + # "evaluation_frequency", # monthly + # ], + # # it needs to be the same product_type to interfer with output + # product_type="energy", + # supports_get_unmatched=True, + # ), + MarketConfig( + "Support", + rr.rrule(rr.MONTHLY, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_bid_contract", + [MarketProduct(rd(months=1), 1, timedelta(hours=1))], + additional_fields=[ + "sender_id", + "contract", # one of MPVAR, MPFIX, CFD + "eligible_lambda", + "evaluation_frequency", # monthly + ], + product_type="financial_support", + supports_get_unmatched=True, + ), + ] + + mo_id = "market_operator" + world.add_market_operator(id=mo_id) + for market_config in marketdesign: + world.add_market(mo_id, market_config) + + world.add_unit_operator("my_operator") + world.add_unit_operator("brd") + world.add_unit( + "demand1", + "demand", + "brd", + # the unit_params have no hints + { + "min_power": 0, + "max_power": 1000, + "bidding_strategies": {"energy": "support", "financial_support": "support"}, + "bidding_params": { + "contract_fraction": 1.0, + "contract_types": ["CFD"], + }, # Feed-In-Tariff + "technology": "demand", + }, + NaiveForecast(index, demand=1000), + ) + + nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1) + world.add_unit( + "nuclear1", + "power_plant", + "my_operator", + { + "min_power": 200, + "max_power": 1000, + "bidding_strategies": {"energy": "support", "financial_support": "support"}, + "bidding_params": { + "contract_fraction": 1, + "contract_types": ["CFD"], + }, # Feed-In-Tariff + "technology": "nuclear", + }, + nuclear_forecast, + ) + + +world.loop.run_until_complete(init()) +world.run() diff --git a/tests/test_policies.py b/tests/test_policies.py new file mode 100644 index 00000000..14241950 --- /dev/null +++ b/tests/test_policies.py @@ -0,0 +1,131 @@ +# SPDX-FileCopyrightText: ASSUME Developers +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import asyncio +from datetime import datetime + +import pandas as pd +from dateutil import rrule as rr +from dateutil.relativedelta import relativedelta as rd +from mango import Agent, RoleAgent, create_container +from mango.util.clock import ExternalClock + +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct +from assume.common.units_operator import UnitsOperator +from assume.markets.base_market import MarketRole +from assume.strategies.naive_strategies import NaiveSingleBidStrategy +from assume.units.demand import Demand + +start = datetime(2020, 1, 1) +end = datetime(2020, 12, 2) + + +class DataRequester(Agent): + def __init__(self, container, suggested_aid): + super().__init__(container, suggested_aid) + self.await_message: asyncio.Future = None + + async def send_data_request( + self, receiver_addr, receiver_id, content: dict, reply_with + ): + self.await_message = asyncio.Future() + await self.send_acl_message( + content, + receiver_addr=receiver_addr, + receiver_id=receiver_id, + acl_metadata={ + "sender_addr": self.addr, + "sender_id": self.aid, + "reply_with": reply_with, + }, + ) + + return await self.await_message + + def handle_message(self, content, meta): + self.await_message.set_result((content, meta)) + + +async def test_request_messages(): + market_name = "Test" + marketconfig = MarketConfig( + market_id=market_name, + opening_hours=rr.rrule(rr.HOURLY, dtstart=start, until=end), + opening_duration=rd(hours=1), + market_mechanism="pay_as_clear", + market_products=[MarketProduct(rd(hours=1), 1, rd(hours=1))], + ) + clock = ExternalClock(0) + container = await create_container( + addr="world", connection_type="external_connection", clock=clock + ) + units_agent = RoleAgent(container, "test_operator") + units_role = UnitsOperator(available_markets=[marketconfig]) + units_agent.add_role(units_role) + + index = pd.date_range(start=start, end=end + pd.Timedelta(hours=4), freq="1h") + + params_dict = { + "bidding_strategies": {"energy": NaiveSingleBidStrategy()}, + "technology": "energy", + "unit_operator": "test_operator", + "max_power": 1000, + "min_power": 0, + "forecaster": NaiveForecast(index, demand=1000), + } + unit = Demand("testdemand", index=index, **params_dict) + await units_role.add_unit(unit) + + market_role = MarketRole(marketconfig) + market_agent = RoleAgent(container, "market") + market_agent.add_role(market_role) + + dr = DataRequester(container, "data_requester") + + market_content = { + "context": "data_request", + "market_id": "Test", + "metric": "price", + "start_time": index[0], + "end_time": index[1], + } + unit_content = { + "context": "data_request", + "unit": "testdemand", + "metric": "energy", + "start_time": index[0], + "end_time": index[3], + } + + # market results are empty for now + content, meta = await dr.send_data_request( + "world", "market", market_content, "market_request" + ) + assert meta["in_reply_to"] == "market_request" + assert content["context"] == "data_response" + assert content["data"].empty + + market_role.results.append({"time": index[0], "price": 12}) + market_role.results.append({"time": index[1], "price": 18}) + content, meta = await dr.send_data_request( + "world", "market", market_content, "market_request" + ) + # price is now returned correctly + assert content["data"][index[0]] == 12 + + unit.outputs["energy"][index[1]] = 100 + unit.outputs["energy"][index[3]] = 200 + + content, meta = await dr.send_data_request( + "world", "test_operator", unit_content, "unit_request" + ) + assert meta["in_reply_to"] == "unit_request" + assert content["context"] == "data_response" + assert isinstance(content["data"], pd.Series) + assert content["data"][index[1]] == 100 + assert content["data"][index[2]] == 0 + assert content["data"][index[3]] == 200 + + await container.shutdown() diff --git a/tests/test_policies_contracts.py b/tests/test_policies_contracts.py new file mode 100644 index 00000000..5fc664f9 --- /dev/null +++ b/tests/test_policies_contracts.py @@ -0,0 +1,64 @@ +# SPDX-FileCopyrightText: ASSUME Developers +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import asyncio +from datetime import datetime + +import pandas as pd +from dateutil import rrule as rr +from dateutil.relativedelta import relativedelta as rd +from mango import Agent, RoleAgent, create_container +from mango.util.clock import ExternalClock + +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct +from assume.common.units_operator import UnitsOperator +from assume.markets.base_market import MarketRole +from assume.markets.clearing_algorithms.contracts import ( + available_contracts, + market_premium, +) +from assume.strategies.extended import is_co2emissionless +from assume.strategies.naive_strategies import NaiveSingleBidStrategy +from assume.units.demand import Demand + + +def test_contract_functions(): + start = datetime(2019, 1, 1) + end = datetime(2019, 2, 1) + + index = pd.date_range( + start=start, + end=end, + freq="h", + ) + + contract = { + "start_time": start, + "end_time": end, + "only_hours": None, + "price": 10, + "volume": 1000, + "sender_id": "nuclear1", + "eligible_lambda": is_co2emissionless, + "evaluation_frequency": rr.WEEKLY, + "agent_id": ("world", "my_operator"), + "bid_id": "nuclear1_1", + "unit_id": "nuclear1", + "accepted_volume": 1000, + "accepted_price": 4.5, + "contractor_unit_id": "demand1", + "contractor_id": ("world", "brd"), + "market_id": "Support", + } + + market_idx = pd.Series(3, index) + gen_series = pd.Series(1000, index) + + for c_function in available_contracts.values(): + result = c_function(contract, market_idx, gen_series, start, end) + assert result + + result = market_premium(contract, market_idx, gen_series, start, end) + assert result diff --git a/tests/test_simple_market_mechanisms.py b/tests/test_simple_market_mechanisms.py index 42873b34..93913535 100644 --- a/tests/test_simple_market_mechanisms.py +++ b/tests/test_simple_market_mechanisms.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later +import copy from datetime import datetime, timedelta from dateutil import rrule as rr @@ -60,11 +61,13 @@ def test_market(): print(meta) -def test_simple_market_mechanism(): - import copy - +async def test_simple_market_mechanism(): for name, role in clearing_mechanisms.items(): - if "complex" in name or "redispatch" in name: + skip = False + for skip_name in ["complex", "redispatch", "contract"]: + if skip_name in name: + skip = True + if skip: continue print(name) diff --git a/tests/test_utils.py b/tests/test_utils.py index 4f356202..834c6e8d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -399,7 +399,7 @@ def test_plot_function(mock_pyplot): @patch("matplotlib.pyplot.show") -def test_plot_function(mock_pyplot): +def test_visualize_function(mock_pyplot): orderbook = create_orderbook() i = -1 for o in orderbook: