Skip to content

Commit

Permalink
Started implementing CR1000X device
Browse files Browse the repository at this point in the history
  • Loading branch information
lewis-chambers committed Jun 5, 2024
1 parent 0ad4f34 commit 75379ed
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 38 deletions.
79 changes: 60 additions & 19 deletions src/iotdevicesimulator/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,14 @@ class BaseDevice(abc.ABC):
topic_prefix: str = None
"""Added as prefix to topic string."""

topic_suffix: str = None
"""Adds a topic suffix"""

data_source: str = "cosmos"
"""Specifies the source of data to use."""

@property
def topic(self):
"""MQTT message topic."""
return self._topic

@topic.setter
def topic(self, query):
"""Gets the topic"""
_topic = f"fdri/cosmos_site/{self.sensor_type}/{self.device_id}/{query}"

if self.topic_prefix is not None:
_topic = f"{self.topic_prefix}/{_topic}"

self._topic = _topic
mqtt_topic: str
"""Topic used to send MQTT messages"""

def __init__(
self,
Expand All @@ -65,6 +56,7 @@ def __init__(
inherit_logger: logging.Logger | None = None,
delay_start: bool | None = None,
topic_prefix: str | None = None,
topic_suffix: str | None = None,
data_source: str | None = None,
) -> None:
"""Initializer
Expand All @@ -76,6 +68,7 @@ def __init__(
inherit_logger: Override for the module logger.
delay_start: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
topic_prefix: Suffixes the sensor topic.
data_source: Source of data to retrieve
"""
self.device_id = str(device_id)
Expand Down Expand Up @@ -115,8 +108,8 @@ def __init__(
)
self.data_source = data_source

if topic_prefix is not None:
self.topic_prefix = str(topic_prefix)
topics = self._get_mqtt_topics(prefix=topic_prefix, suffix=topic_suffix)
self.mqtt_topic = topics[0]

self._instance_logger.info(f"Initialised Site: {repr(self)}")

Expand All @@ -135,6 +128,19 @@ def __repr__(self):
def __str__(self):
return f'Site ID: "{self.device_id}", Sleep Time: {self.sleep_time}, Max Cycles: {self.max_cycles}, Cycle: {self.cycle}'

def _get_mqtt_topics(self, prefix: str | None = None, suffix: str | None = None):

mqtt_topic = f"{self.device_type}/{self.device_id}"

if prefix is not None:
prefix = str(prefix)
mqtt_topic = f"{prefix}/{mqtt_topic}"

if suffix is not None:
mqtt_topic = f"{mqtt_topic}/{suffix}"

return mqtt_topic, prefix, suffix

async def _add_delay(self):
delay = random.randint(0, self.sleep_time)
self._instance_logger.debug(f"Delaying first cycle for: {delay}s")
Expand All @@ -148,8 +154,8 @@ def _send_payload(
return

if isinstance(message_connection, IotCoreMQTTConnection):
message_connection.send_message(payload, self.topic)
self._instance_logger.info(f"Sent message to: {self.topic}")
message_connection.send_message(payload, self.mqtt_topic)
self._instance_logger.info(f"Sent message to: {self.mqtt_topic}")
elif isinstance(message_connection, MessagingBaseClass):
message_connection.send_message()
self._instance_logger.info("Ate a message")
Expand Down Expand Up @@ -243,8 +249,43 @@ def __repr__(self):
class CosmosSensorDevice(CosmosDevice):
"""Digital representation of a site used in FDRI"""

sensor_type = "cosmos-sensor-site"
device_type = "cosmos-sensor-device"

def __init__(self, *args, **kwargs) -> None:

super().__init__(*args, **kwargs)


class CR1000X(CosmosDevice):
"Represents a CR1000X datalogger."

device_type = "CR1000X"

def __init__(self, *args, **kwargs) -> None:

super().__init__(*args, **kwargs)

def _format_payload(self, payload: dict):
"""Formats the payload into datalogger method."""

f_payload = dict()

f_payload["head"] = {
"transaction": 0,
"signature": 111111,
"environment": {
"station_name": self.device_id,
"table_name": self.topic_suffix,
"model": self.device_type,
"os_version": "Not a real OS",
"prog_name": "test",
},
}

time = payload["DATE_TIME"]

payload.pop("DATE_TIME")

f_payload["data"] = {"time": time, "vals": list(payload.values())}
f_payload["fields"] = [{"name": key} for key in payload.keys()]
return f_payload
8 changes: 8 additions & 0 deletions src/iotdevicesimulator/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,14 @@ async def _list_sites(ctx, query):
@click.option(
"--topic-prefix",
type=click.STRING,
default="fdri/cosmos_test",
help="Prefixes the MQTT topic with a string. Can augment the calculated MQTT topic returned by each site.",
)
@click.option(
"--topic-suffix",
type=click.STRING,
help="Suffixes the MQTT topic with a string. Can augment the calculated MQTT topic returned by each site.",
)
@click.option("--dry", is_flag=True, default=False, help="Doesn't send out any data.")
def mqtt(
ctx,
Expand All @@ -176,6 +182,7 @@ def mqtt(
swarm_name,
delay_start,
topic_prefix,
topic_suffix,
dry,
):
"""Sends The cosmos data via MQTT protocol using PROVIDER.
Expand Down Expand Up @@ -214,6 +221,7 @@ async def _swarm(query, mqtt_connection, credentials, *args, **kwargs):
swarm_name=swarm_name,
delay_start=delay_start,
topic_prefix=topic_prefix,
topic_suffix=topic_suffix,
)
)

Expand Down
20 changes: 15 additions & 5 deletions src/iotdevicesimulator/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from iotdevicesimulator.messaging.core import MessagingBaseClass
import logging.config
from typing import List
from pathlib import Path
import asyncio
import random
import uuid
Expand Down Expand Up @@ -58,6 +57,9 @@ class CosmosSwarm:
topic_prefix: str | None = None
"""Adds prefix to sensor topic."""

topic_suffix: str | None = None
"""Adds suffix to sensor topic."""

def __len__(self):
"""Returns number of sites"""
return len(self.sites)
Expand Down Expand Up @@ -94,6 +96,7 @@ async def create(
swarm_name: str | None = None,
delay_start: bool | None = None,
topic_prefix: str | None = None,
topic_suffix: str | None = None,
) -> None:
"""Factory method for initialising the class.
Expand All @@ -108,6 +111,7 @@ async def create(
swarm_name: Name / ID given to swarm.
delay_start: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
topic_suffix: Suffixes the sensor topic.
"""
self = cls()

Expand Down Expand Up @@ -174,6 +178,11 @@ async def create(
if topic_prefix is not None:
self.topic_prefix = str(topic_prefix)

if topic_suffix is not None:
self.topic_suffix = str(topic_suffix)
else:
self.topic_suffix = self.query.name

self.database = await self._get_database(
credentials=credentials, inherit_logger=self._instance_logger
)
Expand All @@ -194,6 +203,7 @@ async def create(
swarm_logger=self._instance_logger,
delay_start=self.delay_start,
topic_prefix=self.topic_prefix,
topic_suffix=self.topic_suffix,
)

self._instance_logger.debug("Swarm Ready")
Expand All @@ -208,10 +218,7 @@ async def run(self) -> None:

self._instance_logger.debug("Running main loop")
await asyncio.gather(
*[
site.run(self.database, self.query, self.message_connection)
for site in self.sites
]
*[site.run(self.message_connection) for site in self.sites]
)

self._instance_logger.info("Finished")
Expand Down Expand Up @@ -261,6 +268,7 @@ def _init_sites(
swarm_logger: logging.Logger | None = None,
delay_start: bool = False,
topic_prefix: str | None = None,
topic_suffix: str | None = None,
):
"""Initialises a list of CosmosSensorDevices.
Expand All @@ -272,6 +280,7 @@ def _init_sites(
swarm_logger: Passes the instance logger to sites
delay_start: Adds a random delay to first invocation from 0 - `sleep_time`.
topic_prefix: Prefixes the sensor topic.
topic_suffix: Suffixes the sensor topic.
Returns:
List[CosmosSensorDevice]: A list of sensor sites.
Expand All @@ -289,6 +298,7 @@ def _init_sites(
inherit_logger=swarm_logger,
delay_start=delay_start,
topic_prefix=topic_prefix,
topic_suffix=topic_suffix,
)
for site_id in site_ids
]
Expand Down
58 changes: 44 additions & 14 deletions src/tests/test_devices.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import unittest
import asyncio
import pytest
from iotdevicesimulator.devices import CosmosSensorDevice, BaseDevice, CosmosDevice
from iotdevicesimulator.devices import (
CosmosSensorDevice,
BaseDevice,
CosmosDevice,
CR1000X,
)
from iotdevicesimulator.db import Oracle, BaseDatabase
from iotdevicesimulator.queries import CosmosQuery, CosmosSiteQuery
from iotdevicesimulator.messaging.core import MockMessageConnection
from parameterized import parameterized
from unittest.mock import patch
from datetime import datetime


class TestBaseClass(unittest.TestCase):
Expand Down Expand Up @@ -77,6 +83,22 @@ def test_payload_formatter(self, payload):

self.assertEqual(payload, CosmosSensorDevice._format_payload(payload))

@parameterized.expand(
[
["MORLY", None, None, "cosmos-device/MORLY"],
["ALIC1", "prefix", None, "prefix/cosmos-device/ALIC1"],
["TEST", None, "suffix", "cosmos-device/TEST/suffix"],
["ANOTHER", "prefix", "suffix", "prefix/cosmos-device/ANOTHER/suffix"],
]
)
def test_topic_set(self, site_id, prefix, suffix, expected):
"""Tests the topic setting"""

device = CosmosDevice(self.query, self.database, site_id)
self.assertEqual(
device._get_mqtt_topics(prefix=prefix, suffix=suffix)[0], expected
)


class TestCosmosSensorDeviceInstantiation(unittest.TestCase):
"""Suite to test objects for simulating FDRI site objects."""
Expand Down Expand Up @@ -164,22 +186,13 @@ def test_sleep_time_bad_value_gives_error(self, sleep_time):
self.query, self.database, "device_id", sleep_time=sleep_time
)

def test_topic_prefix(self):
@parameterized.expand([["MORLY", None, None, "cosmos-sensor-device/MORLY"]])
def test_topic_prefix(self, site_id, prefix, suffix, expected):
"""Tests that the topic prefix is set"""
site = CosmosSensorDevice(self.query, self.database, "device_id")
site = CosmosSensorDevice(self.query, self.database, site_id)

site.topic = "topic1"
self.assertEqual(
site.topic, "fdri/cosmos_site/cosmos-sensor-site/device_id/topic1"
)

site = CosmosSensorDevice(
self.query, self.database, "SITE2", topic_prefix="$some/rule/path"
)
site.topic = "topic2"
self.assertEqual(
site.topic,
"$some/rule/path/fdri/cosmos_site/cosmos-sensor-site/SITE2/topic2",
site._get_mqtt_topics(prefix=prefix, suffix=suffix)[0], expected
)


Expand Down Expand Up @@ -223,5 +236,22 @@ async def test_multi_instances_stop_at_max_cycles(self):
self.assertEqual(site.cycle, max_cycles[i])


class TestCR1000X(unittest.TestCase):

@patch.multiple(BaseDatabase, __abstractmethods__=set())
def setUp(self) -> None:
self.database = BaseDatabase()
self.query = CosmosQuery.LEVEL_1_SOILMET_30MIN

def test_payload_formatting(self):

device = CR1000X(self.query, self.database, "site")

payload = {"DATE_TIME": datetime.now(), "A": 1, "B": 2.1233}
formatted = device._format_payload(payload)

print(formatted)


if __name__ == "__main__":
unittest.main()

0 comments on commit 75379ed

Please sign in to comment.