Skip to content

Commit

Permalink
Added topic prefix for basic ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
lewis-chambers committed May 31, 2024
1 parent 680faf8 commit 5ef509b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
32 changes: 28 additions & 4 deletions src/iotdevicesimulator/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class SensorSite:
max_cycles: Maximum number of cycles before shutdown.
inherit_logger: Override for the module logger.
delay_first_cycle: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
"""

cycle: int | None = 0
Expand All @@ -37,8 +38,27 @@ class SensorSite:
_instance_logger: logging.Logger
"""Logger used by the instance."""

topic_prefix: str = None
"""Added as prefix to topic string."""

@property
def topic(self):
"""MQTT message topic."""
return self._topic

@topic.setter
def topic(self, query):
"""Gets the topic"""
_topic = f"fdri/cosmos_site/{self.site_id}/{query}"

if self.topic_prefix is not None:
_topic = f"{self.topic_prefix}/{_topic}"

self._topic = _topic

def __init__(self, site_id: str,*, sleep_time: int|None=None, max_cycles: int|None=None, inherit_logger:logging.Logger|None=None, delay_first_cycle:bool|None=None) -> None:
def __init__(self, site_id: str,*, sleep_time: int|None=None,
max_cycles: int|None=None, inherit_logger:logging.Logger|None=None,
delay_first_cycle:bool|None=None, topic_prefix: str|None=None) -> None:

self.site_id = str(site_id)

Expand Down Expand Up @@ -69,6 +89,9 @@ def __init__(self, site_id: str,*, sleep_time: int|None=None, max_cycles: int|No

self.delay_first_cycle = delay_first_cycle

if topic_prefix is not None:
self.topic_prefix = str(topic_prefix)

self._instance_logger.info(f"Initialised Site: {repr(self)}")

def __repr__(self):
Expand All @@ -86,6 +109,8 @@ async def run(self, oracle: Oracle, query: CosmosQuery,
oracle: The oracle database.
query: Query to process by database.
"""

self.topic = query.name
while True:

if self.delay_first_cycle and self.cycle == 0:
Expand All @@ -99,9 +124,8 @@ async def run(self, oracle: Oracle, query: CosmosQuery,
self._instance_logger.warn(f"No data found.")
else:
self._instance_logger.debug(f"Cycle {self.cycle+1}/{self.max_cycles} Read data from: {row["DATE_TIME"]}")
mqtt_topic = f"fdri/cosmos_site/{self.site_id}/{query.name}"
message_connection.send_message(str(row), mqtt_topic)
self._instance_logger.info(f"Sent message to: {mqtt_topic}")
message_connection.send_message(str(row), self.topic)
self._instance_logger.info(f"Sent message to: {self.topic}")

self.cycle += 1
if self.max_cycles > 0 and self.cycle >= self.max_cycles:
Expand Down
3 changes: 3 additions & 0 deletions src/iotdevicesimulator/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test():
@click.option("--max-sites", type=click.IntRange(0))
@click.option("--swarm-name", type=click.STRING)
@click.option("--delay-start", type=click.BOOL, default=False)
@click.option("--topic-prefix", type=click.STRING)
def mqtt(
ctx,
provider,
Expand All @@ -91,6 +92,7 @@ def mqtt(
max_sites,
swarm_name,
delay_start,
topic_prefix,
):
"""Gets an MQTT connection"""
query = queries.CosmosQuery[query]
Expand Down Expand Up @@ -122,6 +124,7 @@ async def _swarm(query, mqtt_connection, credentials, *args, **kwargs):
max_sites=max_sites,
swarm_name=swarm_name,
delay_first_cycle=delay_start,
topic_prefix=topic_prefix,
)
)

Expand Down
18 changes: 17 additions & 1 deletion src/iotdevicesimulator/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class CosmosSwarm:
query: CosmosQuery
"""Query run in database."""

topic_prefix: str
"""Adds prefix to sensor topic."""

def __len__(self):
"""Returns number of sites"""
return len(self.sites)
Expand All @@ -71,19 +74,21 @@ async def create(
max_sites: int | None = None,
swarm_name: str | None = None,
delay_first_cycle: bool | None = None,
topic_prefix: str | None = None,
) -> None:
"""Factory method for initialising the class.
Args:
query: A query retrieve from the database.
message_connection: Object used to send data.
credentials: A path to database credentials.
site_ids: A list of site ID strings.
sleep_time: Length of time to sleep after sending data in seconds.
max_cycles: Maximum number of data sending cycles.
max_sites: Maximum number of sites to initialise.
swarm_name: Name / ID given to swarm.
delay_first_cycle: Adds a random delay to first invocation from 0 - `sleep_time`.
credentials: A path to database credentials.
topic_prefix: Prefixes the sensor topic.
"""
self = cls()

Expand Down Expand Up @@ -147,6 +152,9 @@ async def create(
)
self.delay_first_cycle = delay_first_cycle

if topic_prefix is not None:
self.topic_prefix = str(topic_prefix)

self.oracle = await self._get_oracle(
credentials=credentials, inherit_logger=self._instance_logger
)
Expand All @@ -159,6 +167,7 @@ async def create(
max_sites=self.max_sites,
swarm_logger=self._instance_logger,
delay_first_cycle=self.delay_first_cycle,
topic_prefix=self.topic_prefix,
)
else:
self.sites = await self._init_sites_from_db(
Expand All @@ -169,6 +178,7 @@ async def create(
max_sites=self.max_sites,
swarm_logger=self._instance_logger,
delay_first_cycle=self.delay_first_cycle,
topic_prefix=self.topic_prefix,
)

self._instance_logger.debug("Swarm Ready")
Expand Down Expand Up @@ -235,6 +245,7 @@ def _init_sites(
max_sites: int = 0,
swarm_logger: logging.Logger | None = None,
delay_first_cycle: bool = False,
topic_prefix: str | None = None,
):
"""Initialises a list of SensorSites.
Expand All @@ -245,6 +256,7 @@ def _init_sites(
max_sites: Maximum number of sites to initialise. Picks randomly from list if given
swarm_logger: Passes the instance logger to sites
delay_first_cycle: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
Returns:
List[SensorSite]: A list of sensor sites.
Expand All @@ -259,6 +271,7 @@ def _init_sites(
max_cycles=max_cycles,
inherit_logger=swarm_logger,
delay_first_cycle=delay_first_cycle,
topic_prefix=topic_prefix,
)
for site_id in site_ids
]
Expand All @@ -272,6 +285,7 @@ async def _init_sites_from_db(
max_sites=0,
swarm_logger: logging.Logger | None = None,
delay_first_cycle: bool = False,
topic_prefix: str | None = None,
) -> List[SensorSite]:
"""Initialised sensor sites from the COSMOS DB.
Expand All @@ -283,6 +297,7 @@ async def _init_sites_from_db(
max_sites: Maximum number of sites to initialise. Picks randomly from list if less than number of sites found
swarm_logger: Passes the instance logger to sites
delay_first_cycle: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
Returns:
List[SensorSite]: A list of sensor sites.
Expand All @@ -300,6 +315,7 @@ async def _init_sites_from_db(
max_cycles=max_cycles,
inherit_logger=swarm_logger,
delay_first_cycle=delay_first_cycle,
topic_prefix=topic_prefix,
)
for site_id in site_ids
]
Expand Down
11 changes: 11 additions & 0 deletions src/tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ def test_sleep_time_bad_value_gives_error(self, sleep_time):
with self.assertRaises(ValueError):
SensorSite("SITE_ID", sleep_time=sleep_time)

def test_topic_prefix(self):
"""Tests that the topic prefix is set"""
site = SensorSite("SITE_ID")

site.topic = "topic1"
self.assertEqual(site.topic, "fdri/cosmos_site/SITE_ID/topic1")

site = SensorSite("SITE2", topic_prefix="$some/rule/path")
site.topic = "topic2"
self.assertEqual(site.topic, "$some/rule/path/fdri/cosmos_site/SITE2/topic2")


CONFIG_PATH = pathlib.Path(
pathlib.Path(__file__).parents[1], "iotdevicesimulator", "__assets__", "config.cfg"
Expand Down

0 comments on commit 5ef509b

Please sign in to comment.