diff --git a/assume/common/outputs.py b/assume/common/outputs.py index 3e5e21f0..3f0b96f2 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -411,8 +411,10 @@ def get_sum_reward(self): f"select reward FROM rl_params where simulation='{self.simulation_id}'" ) + avg_reward = 0 with self.db.begin() as db: reward = db.execute(query).fetchall() + if len(reward): avg_reward = sum(r[0] for r in reward) / len(reward) return avg_reward diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 97093277..55bf60dc 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -397,7 +397,10 @@ async def clear_market(self, market_products: list[MarketProduct]): for meta in market_meta: logger.debug( - f'clearing price for {self.marketconfig.name} is {meta["price"]:.2f}, volume: {meta["demand_volume"]}' + "clearing price for %s is %.2f, volume: %f", + self.marketconfig.name, + meta["price"], + meta["demand_volume"], ) meta["market_id"] = self.marketconfig.name meta["time"] = meta["product_start"] diff --git a/assume/world.py b/assume/world.py index f13ce4e0..6b577c6a 100644 --- a/assume/world.py +++ b/assume/world.py @@ -110,6 +110,7 @@ async def setup( learning_config: LearningConfig = {}, forecaster: Forecaster = None, manager_address=None, + **kwargs, ): self.clock = ExternalClock(0) self.start = start @@ -140,6 +141,7 @@ async def setup( "broker_addr": "localhost", "client_id": self.addr, } + container_kwargs["mqtt_kwargs"].update(**kwargs) self.container = await create_container( connection_type=connection_type, @@ -150,18 +152,15 @@ async def setup( ) self.learning_mode = self.learning_config.get("learning_mode", False) self.output_agent_addr = (self.addr, "export_agent_1") - if self.distributed_role is True: + if self.distributed_role is False: + self.clock_agent = DistributedClockAgent(self.container) + self.output_agent_addr = (manager_address, "export_agent_1") + else: await self.setup_learning() await self.setup_output_agent(simulation_id, save_frequency_hours) self.clock_manager = DistributedClockManager( self.container, receiver_clock_addresses=self.addresses ) - elif self.distributed_role is None: - await self.setup_learning() - await self.setup_output_agent(simulation_id, save_frequency_hours) - else: - self.clock_agent = DistributedClockAgent(self.container) - self.output_agent_addr = (manager_address, "export_agent_1") async def setup_learning(self): self.bidding_params.update(self.learning_config) @@ -198,7 +197,7 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int # mango multiprocessing is currently only supported on linux # with single - if platform == "linux" and self.distributed_role is None: + if platform == "linux": self.addresses.append(self.addr) def creator(container): @@ -380,7 +379,7 @@ def add_market( self.markets[f"{market_config.name}"] = market_config async def _step(self): - if self.distributed_role: + if self.distributed_role is not False: next_activity = await self.clock_manager.distribute_time() else: next_activity = self.clock.get_next_activity() @@ -404,7 +403,7 @@ async def async_run(self, start_ts, end_ts): # allow registration before first opening self.clock.set_time(start_ts - 1) - if self.distributed_role: + if self.distributed_role is not False: await self.clock_manager.broadcast(self.clock.time) while self.clock.time < end_ts: await asyncio.sleep(0) diff --git a/compose.yml b/compose.yml index 3c2d56fe..1e716211 100644 --- a/compose.yml +++ b/compose.yml @@ -76,6 +76,8 @@ services: profiles: ["mqtt"] ports: - "1883:1883/tcp" + volumes: + - ./docker_configs/mqtt.conf:/mosquitto/config/mosquitto.conf healthcheck: test: "mosquitto_sub -t '$$SYS/#' -C 1 | grep -v Error || exit 1" interval: 45s @@ -90,6 +92,9 @@ services: depends_on: - assume_db - mqtt-broker + environment: + DB_URI: "postgresql://assume:assume@assume_db:5432/assume" + MQTT_BROKER: mqtt-broker entrypoint: python3 ./examples/distributed_world_manager.py simulation_client01: @@ -97,6 +102,9 @@ services: image: ghcr.io/assume-framework/assume:latest profiles: ["mqtt"] build: . + environment: + DB_URI: "postgresql://assume:assume@assume_db:5432/assume" + MQTT_BROKER: mqtt-broker depends_on: - assume_db - mqtt-broker diff --git a/docker_configs/mqtt.conf b/docker_configs/mqtt.conf new file mode 100644 index 00000000..60e38cb7 --- /dev/null +++ b/docker_configs/mqtt.conf @@ -0,0 +1,5 @@ +# https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf +listener 1883 +allow_anonymous true + +max_keepalive 3600 diff --git a/examples/distributed_simulation/config.py b/examples/distributed_simulation/config.py new file mode 100644 index 00000000..03c89714 --- /dev/null +++ b/examples/distributed_simulation/config.py @@ -0,0 +1,84 @@ +import asyncio +import calendar +import logging +import os +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr + +from assume import World +from assume.common.market_objects import MarketConfig, MarketProduct + +log = logging.getLogger(__name__) + + +db_uri = os.getenv("DB_URI", "postgresql://assume:assume@localhost:5432/assume") + +use_mqtt = False + +if use_mqtt: + manager_addr = "manager" + agent_adress = "agent" + agent_adresses = ["agent"] + market_operator_addr = "manager" +else: + manager_addr = ("0.0.0.0", 9099) + agent_adress = ("0.0.0.0", 9098) + agent_adresses = [("0.0.0.0", 9098)] + market_operator_addr = ("0.0.0.0", 9099) + +market_operator_aid = "market_operator" +broker_addr = os.getenv("MQTT_BROKER", ("0.0.0.0", 1883, 600)) + +start = datetime(2023, 10, 4) +end = datetime(2023, 12, 5) +index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", +) +sim_id = "handmade_simulation" + +marketdesign = [ + MarketConfig( + "EOM", + rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 24, timedelta(hours=1))], + additional_fields=["block_id", "link", "exclusive_id"], + ) +] + + +async def worker(world: World, marketdesign: list[MarketConfig], create_worker): + if world.distributed_role: + world.addresses.extend(agent_adresses) + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + manager_address=manager_addr, + broker_addr=broker_addr, + ) + + await create_worker(world, marketdesign) + + await asyncio.sleep(0) + + # wait until done if we are a worker agent + if world.distributed_role: + world.logger.info("sleeping 2s") + await asyncio.sleep(2) + world.logger.info("starting simulation") + await world.async_run( + start_ts=calendar.timegm(world.start.utctimetuple()), + end_ts=calendar.timegm(world.end.utctimetuple()), + ) + elif world.distributed_role is False: + await world.clock_agent.stopped + await world.container.shutdown() diff --git a/examples/distributed_simulation/world_agent.py b/examples/distributed_simulation/world_agent.py new file mode 100644 index 00000000..71c3e0c0 --- /dev/null +++ b/examples/distributed_simulation/world_agent.py @@ -0,0 +1,38 @@ +from config import ( + agent_adress, + db_uri, + index, + market_operator_addr, + market_operator_aid, + marketdesign, + worker, +) + +from assume import MarketConfig, World +from assume.common.forecasts import NaiveForecast + + +async def create_worker(world: World, marketdesign: list[MarketConfig]): + for market_config in marketdesign: + market_config.addr = market_operator_addr + market_config.aid = market_operator_aid + world.markets[f"{market_config.name}"] = market_config + + world.add_unit_operator("my_demand") + world.add_unit( + "demand1", + "demand", + "my_demand", + # the unit_params have no hints + { + "min_power": 0, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "demand", + }, + NaiveForecast(index, demand=100), + ) + + +world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False) +world.loop.run_until_complete(worker(world, marketdesign, create_worker)) diff --git a/examples/distributed_simulation/world_manager.py b/examples/distributed_simulation/world_manager.py new file mode 100644 index 00000000..43dec05a --- /dev/null +++ b/examples/distributed_simulation/world_manager.py @@ -0,0 +1,40 @@ +from config import ( + agent_adress, + agent_adresses, + db_uri, + index, + manager_addr, + market_operator_addr, + market_operator_aid, + marketdesign, + worker, +) + +from assume import MarketConfig, World +from assume.common.forecasts import NaiveForecast + + +async def create_worker(world: World, marketdesign: list[MarketConfig]): + world.add_market_operator(id=market_operator_aid) + for market_config in marketdesign: + world.add_market(market_operator_aid, market_config) + + world.add_unit_operator("my_operator") + + nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1) + world.add_unit( + "nuclear1", + "power_plant", + "my_operator", + { + "min_power": 200, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "nuclear", + }, + nuclear_forecast, + ) + + +world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True) +world.loop.run_until_complete(worker(world, marketdesign, create_worker)) diff --git a/examples/distributed_world_agent.py b/examples/distributed_world_agent.py deleted file mode 100644 index 40aeba2c..00000000 --- a/examples/distributed_world_agent.py +++ /dev/null @@ -1,78 +0,0 @@ -import logging -from datetime import datetime, timedelta - -import pandas as pd -from dateutil import rrule as rr - -from assume import World -from assume.common.forecasts import NaiveForecast -from assume.common.market_objects import MarketConfig, MarketProduct - -log = logging.getLogger(__name__) - -db_uri = "postgresql://assume:assume@localhost:5432/assume" - -manager_addr = ("0.0.0.0", 9099) -agent_adress = [("0.0.0.0", 9098)] -manager_addr = "manager" -agent_adress = "agent" - -world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False) - - -async def worker(): - start = datetime(2023, 10, 4) - end = datetime(2023, 12, 5) - index = pd.date_range( - start=start, - end=end + timedelta(hours=24), - freq="H", - ) - sim_id = "handmade_simulation" - - await world.setup( - start=start, - end=end, - save_frequency_hours=48, - simulation_id=sim_id, - index=index, - manager_address=manager_addr, - ) - - marketdesign = [ - MarketConfig( - "EOM", - rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), - timedelta(hours=1), - "pay_as_clear", - [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], - additional_fields=["block_id", "link", "exclusive_id"], - ) - ] - - mo_id = "market_operator" - world.add_market_operator(id=mo_id) - for market_config in marketdesign: - world.add_market(mo_id, market_config) - - world.add_unit_operator("my_operator") - - nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1) - world.add_unit( - "nuclear1", - "power_plant", - "my_operator", - { - "min_power": 200, - "max_power": 1000, - "bidding_strategies": {"energy": "naive"}, - "technology": "nuclear", - }, - nuclear_forecast, - ) - - await world.clock_agent.stopped - await world.container.shutdown() - - -world.loop.run_until_complete(worker()) diff --git a/examples/distributed_world_manager.py b/examples/distributed_world_manager.py deleted file mode 100644 index 2c123285..00000000 --- a/examples/distributed_world_manager.py +++ /dev/null @@ -1,78 +0,0 @@ -import logging -from datetime import datetime, timedelta - -import pandas as pd -from dateutil import rrule as rr - -from assume import World -from assume.common.forecasts import NaiveForecast -from assume.common.market_objects import MarketConfig, MarketProduct - -log = logging.getLogger(__name__) - -db_uri = "postgresql://assume:assume@localhost:5432/assume" - -manager_addr = ("0.0.0.0", 9099) -agent_adresses = [("0.0.0.0", 9098)] -manager_addr = "manager" -agent_adresses = ["agent"] - -world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True) - - -async def init(): - start = datetime(2023, 10, 4) - end = datetime(2023, 12, 5) - index = pd.date_range( - start=start, - end=end + timedelta(hours=24), - freq="H", - ) - sim_id = "handmade_simulation" - world.addresses.extend(agent_adresses) - - await world.setup( - start=start, - end=end, - save_frequency_hours=48, - simulation_id=sim_id, - index=index, - ) - - marketdesign = [ - MarketConfig( - "EOM", - rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), - timedelta(hours=1), - "pay_as_clear", - [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], - additional_fields=["block_id", "link", "exclusive_id"], - ) - ] - - for market_config in marketdesign: - market_config.addr = agent_adresses[0] - market_config.aid = "market_operator" - world.markets[f"{market_config.name}"] = market_config - - world.add_unit_operator("my_demand") - world.add_unit( - "demand1", - "demand", - "my_demand", - # the unit_params have no hints - { - "min_power": 0, - "max_power": 1000, - "bidding_strategies": {"energy": "naive"}, - "technology": "demand", - }, - NaiveForecast(index, demand=100), - ) - - -world.loop.run_until_complete(init()) -import time - -time.sleep(3) -world.run() diff --git a/examples/world_script.py b/examples/world_script.py index 4643f8ac..bded4a42 100644 --- a/examples/world_script.py +++ b/examples/world_script.py @@ -23,7 +23,7 @@ async def init(): end=end + timedelta(hours=24), freq="H", ) - sim_id = "handmade_simulation" + sim_id = "world_script_simulation" await world.setup( start=start,