Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt fixes #222

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ def get_sum_reward(self):
f"select reward FROM rl_params where simulation='{self.simulation_id}'"
)

avg_reward = 0
with self.db.begin() as db:
reward = db.execute(query).fetchall()
if len(reward):
avg_reward = sum(r[0] for r in reward) / len(reward)

return avg_reward
5 changes: 4 additions & 1 deletion assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ async def clear_market(self, market_products: list[MarketProduct]):

for meta in market_meta:
logger.debug(
f'clearing price for {self.marketconfig.name} is {meta["price"]:.2f}, volume: {meta["demand_volume"]}'
"clearing price for %s is %.2f, volume: %f",
self.marketconfig.name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please stick to one format for strings? In the whole code we are using f"{}" format. it would be confusing to have it different in some places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this view too for quite a while.
But unfortunately, formatting with f-strings is evaluated every time - even if the logger is set to warning (so the price is formatted even if the line is not printed).
Using it with the % notation does only execute the formatting when loglevel is set to debug.

But I would not like to switch to the %s notation on all other places.. 🤷
So I think we can't really do anything but to use the %s format in logger situations and format strings everywhere else

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, good to know, let's keep it this way

meta["price"],
meta["demand_volume"],
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
Expand Down
19 changes: 9 additions & 10 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
learning_config: LearningConfig = {},
forecaster: Forecaster = None,
manager_address=None,
**kwargs,
):
self.clock = ExternalClock(0)
self.start = start
Expand Down Expand Up @@ -140,6 +141,7 @@
"broker_addr": "localhost",
"client_id": self.addr,
}
container_kwargs["mqtt_kwargs"].update(**kwargs)

Check warning on line 144 in assume/world.py

View check run for this annotation

Codecov / codecov/patch

assume/world.py#L144

Added line #L144 was not covered by tests

self.container = await create_container(
connection_type=connection_type,
Expand All @@ -150,18 +152,15 @@
)
self.learning_mode = self.learning_config.get("learning_mode", False)
self.output_agent_addr = (self.addr, "export_agent_1")
if self.distributed_role is True:
if self.distributed_role is False:
self.clock_agent = DistributedClockAgent(self.container)
self.output_agent_addr = (manager_address, "export_agent_1")

Check warning on line 157 in assume/world.py

View check run for this annotation

Codecov / codecov/patch

assume/world.py#L156-L157

Added lines #L156 - L157 were not covered by tests
else:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
self.clock_manager = DistributedClockManager(
self.container, receiver_clock_addresses=self.addresses
)
elif self.distributed_role is None:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
else:
self.clock_agent = DistributedClockAgent(self.container)
self.output_agent_addr = (manager_address, "export_agent_1")

async def setup_learning(self):
self.bidding_params.update(self.learning_config)
Expand Down Expand Up @@ -198,7 +197,7 @@

# mango multiprocessing is currently only supported on linux
# with single
if platform == "linux" and self.distributed_role is None:
if platform == "linux":
self.addresses.append(self.addr)

def creator(container):
Expand Down Expand Up @@ -380,7 +379,7 @@
self.markets[f"{market_config.name}"] = market_config

async def _step(self):
if self.distributed_role:
if self.distributed_role is not False:
next_activity = await self.clock_manager.distribute_time()
else:
next_activity = self.clock.get_next_activity()
Expand All @@ -404,7 +403,7 @@

# allow registration before first opening
self.clock.set_time(start_ts - 1)
if self.distributed_role:
if self.distributed_role is not False:
await self.clock_manager.broadcast(self.clock.time)
while self.clock.time < end_ts:
await asyncio.sleep(0)
Expand Down
8 changes: 8 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ services:
profiles: ["mqtt"]
ports:
- "1883:1883/tcp"
volumes:
- ./docker_configs/mqtt.conf:/mosquitto/config/mosquitto.conf
healthcheck:
test: "mosquitto_sub -t '$$SYS/#' -C 1 | grep -v Error || exit 1"
interval: 45s
Expand All @@ -90,13 +92,19 @@ services:
depends_on:
- assume_db
- mqtt-broker
environment:
DB_URI: "postgresql://assume:assume@assume_db:5432/assume"
MQTT_BROKER: mqtt-broker
entrypoint: python3 ./examples/distributed_world_manager.py

simulation_client01:
container_name: simulation_client01
image: ghcr.io/assume-framework/assume:latest
profiles: ["mqtt"]
build: .
environment:
DB_URI: "postgresql://assume:assume@assume_db:5432/assume"
MQTT_BROKER: mqtt-broker
depends_on:
- assume_db
- mqtt-broker
Expand Down
5 changes: 5 additions & 0 deletions docker_configs/mqtt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf
listener 1883
allow_anonymous true

max_keepalive 3600
84 changes: 84 additions & 0 deletions examples/distributed_simulation/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import calendar
import logging
import os
from datetime import datetime, timedelta

import pandas as pd
from dateutil import rrule as rr

from assume import World
from assume.common.market_objects import MarketConfig, MarketProduct

log = logging.getLogger(__name__)


db_uri = os.getenv("DB_URI", "postgresql://assume:assume@localhost:5432/assume")

use_mqtt = False

if use_mqtt:
manager_addr = "manager"
agent_adress = "agent"
agent_adresses = ["agent"]
market_operator_addr = "manager"
else:
manager_addr = ("0.0.0.0", 9099)
agent_adress = ("0.0.0.0", 9098)
agent_adresses = [("0.0.0.0", 9098)]
market_operator_addr = ("0.0.0.0", 9099)

market_operator_aid = "market_operator"
broker_addr = os.getenv("MQTT_BROKER", ("0.0.0.0", 1883, 600))

start = datetime(2023, 10, 4)
end = datetime(2023, 12, 5)
index = pd.date_range(
start=start,
end=end + timedelta(hours=24),
freq="H",
)
sim_id = "handmade_simulation"

marketdesign = [
MarketConfig(
"EOM",
rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end),
timedelta(hours=1),
"pay_as_clear",
[MarketProduct(timedelta(hours=1), 24, timedelta(hours=1))],
additional_fields=["block_id", "link", "exclusive_id"],
)
]


async def worker(world: World, marketdesign: list[MarketConfig], create_worker):
if world.distributed_role:
world.addresses.extend(agent_adresses)

await world.setup(
start=start,
end=end,
save_frequency_hours=48,
simulation_id=sim_id,
index=index,
manager_address=manager_addr,
broker_addr=broker_addr,
)

await create_worker(world, marketdesign)

await asyncio.sleep(0)

# wait until done if we are a worker agent
if world.distributed_role:
world.logger.info("sleeping 2s")
await asyncio.sleep(2)
world.logger.info("starting simulation")
await world.async_run(
start_ts=calendar.timegm(world.start.utctimetuple()),
end_ts=calendar.timegm(world.end.utctimetuple()),
)
elif world.distributed_role is False:
await world.clock_agent.stopped
await world.container.shutdown()
38 changes: 38 additions & 0 deletions examples/distributed_simulation/world_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from config import (
agent_adress,
db_uri,
index,
market_operator_addr,
market_operator_aid,
marketdesign,
worker,
)

from assume import MarketConfig, World
from assume.common.forecasts import NaiveForecast


async def create_worker(world: World, marketdesign: list[MarketConfig]):
for market_config in marketdesign:
market_config.addr = market_operator_addr
market_config.aid = market_operator_aid
world.markets[f"{market_config.name}"] = market_config

world.add_unit_operator("my_demand")
world.add_unit(
"demand1",
"demand",
"my_demand",
# the unit_params have no hints
{
"min_power": 0,
"max_power": 1000,
"bidding_strategies": {"energy": "naive"},
"technology": "demand",
},
NaiveForecast(index, demand=100),
)


world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False)
world.loop.run_until_complete(worker(world, marketdesign, create_worker))
40 changes: 40 additions & 0 deletions examples/distributed_simulation/world_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from config import (
agent_adress,
agent_adresses,
db_uri,
index,
manager_addr,
market_operator_addr,
market_operator_aid,
marketdesign,
worker,
)

from assume import MarketConfig, World
from assume.common.forecasts import NaiveForecast


async def create_worker(world: World, marketdesign: list[MarketConfig]):
world.add_market_operator(id=market_operator_aid)
for market_config in marketdesign:
world.add_market(market_operator_aid, market_config)

world.add_unit_operator("my_operator")

nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1)
world.add_unit(
"nuclear1",
"power_plant",
"my_operator",
{
"min_power": 200,
"max_power": 1000,
"bidding_strategies": {"energy": "naive"},
"technology": "nuclear",
},
nuclear_forecast,
)


world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True)
world.loop.run_until_complete(worker(world, marketdesign, create_worker))
78 changes: 0 additions & 78 deletions examples/distributed_world_agent.py

This file was deleted.

Loading