Skip to content

Commit

Permalink
Changing behaviour to grab COSMOS site IDs from table related to quer…
Browse files Browse the repository at this point in the history
…y made. (#2)

* Renamed messaging module

* * BUGFIX: Getting site_ids from query if no IDs given

---------

Co-authored-by: Lewis Chambers <[email protected]>
  • Loading branch information
lewis-chambers and lewis-chambers authored May 31, 2024
1 parent bf2cf77 commit 617ad60
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 99 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ addopts = "--cov=iotdevicesimulator"
markers = [
"asyncio: Tests asynchronous functions.",
"oracle: Requires oracle connection and required config credentials",
"slow: Marks slow tests",
]

[tool.coverage.run]
omit = ["*example.py", "*__init__.py"]
omit = ["*example.py", "*__init__.py", "queries.py", "loggers.py"]
12 changes: 0 additions & 12 deletions src/iotdevicesimulator/__assets__/configs.cfg

This file was deleted.

28 changes: 27 additions & 1 deletion src/iotdevicesimulator/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import getpass
import logging

from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.queries import CosmosQuery, CosmosSiteQuery

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -83,3 +83,29 @@ async def query_latest_from_site(self, site_id: str, query: CosmosQuery) -> dict
return None

return dict(zip(columns, data))

async def query_site_ids(self, query: CosmosSiteQuery) -> list:
"""query_site_ids returns a list of site IDs from COSMOS database
Args:
query (CosmosSiteQuery): The query to run.
Returns:
List[str]: A list of site ID strings.
"""

if not isinstance(query, CosmosSiteQuery):
raise TypeError(
f"`query` must be a `CosmosSiteQuery` Enum, not a `{type(query)}`"
)

async with self.connection.cursor() as cursor:
await cursor.execute(query.value)

data = await cursor.fetchall()
data = [x[0] for x in data]

if not data:
data = []

return data
2 changes: 1 addition & 1 deletion src/iotdevicesimulator/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.db import Oracle
from iotdevicesimulator.mqtt.aws import IotCoreMQTTConnection
from iotdevicesimulator.messaging.aws import IotCoreMQTTConnection
import random

logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/iotdevicesimulator/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.swarm import CosmosSwarm
from iotdevicesimulator.mqtt.aws import IotCoreMQTTConnection
from iotdevicesimulator.messaging.aws import IotCoreMQTTConnection
import asyncio
import config
from pathlib import Path
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
import json
from awscrt.exceptions import AwsCrtError
from iotdevicesimulator.mqtt.core import MessagingBaseClass
from iotdevicesimulator.messaging.core import MessagingBaseClass
import backoff
import logging

Expand Down Expand Up @@ -157,18 +157,20 @@ def _on_connection_closed(connection, callback_data): # pragma: no cover
print("Connection closed")

@backoff.on_exception(backoff.expo, exception=AwsCrtError, logger=logger)
def _connect(self):
def _connect(self): # pragma: no cover
connect_future = self.connection.connect()
connect_future.result()
print("Connected!")

@backoff.on_exception(backoff.expo, exception=AwsCrtError, logger=logger)
def _disconnect(self):
def _disconnect(self): # pragma: no cover
print("Disconnecting...")
disconnect_future = self.connection.disconnect()
disconnect_future.result()

def send_message(self, message: str, topic: str, count: int = 1) -> None:
def send_message(
self, message: str, topic: str, count: int = 1
) -> None: # pragma: no cover
"""Sends a message to the endpoint.
Args:
Expand Down
File renamed without changes.
41 changes: 41 additions & 0 deletions src/iotdevicesimulator/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,44 @@ class CosmosQuery(StrEnum):
ORDER BY date_time DESC
FETCH NEXT 1 ROWS ONLY
"""


@enum.unique
class CosmosSiteQuery(StrEnum):
LEVEL_1_SOILMET_30MIN = "SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_SOILMET_30MIN"

"""Queries unique site IDs from LEVEL1_SOILMET_30MIN.
.. code-block:: sql
SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_SOILMET_30MIN
"""

LEVEL_1_NMDB_1HOUR = "SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_NMDB_1HOUR"

"""Queries unique site IDs from LEVEL1_NMDB_1HOUR table.
.. code-block:: sql
SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_NMDB_1HOUR
"""

LEVEL_1_PRECIP_1MIN = "SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_PRECIP_1MIN"

"""Queries unique site IDs from the LEVEL1_PRECIP_1MIN table.
.. code-block:: sql
SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_PRECIP_1MIN
"""

LEVEL_1_PRECIP_RAINE_1MIN = (
"SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_PRECIP_RAINE_1MIN"
)

"""Queries unique site IDs from the LEVEL1_PRECIP_RAINE_1MIN table.
.. code-block:: sql
SELECT UNIQUE(site_id) FROM COSMOS.LEVEL1_PRECIP_RAINE_1MIN
"""
38 changes: 18 additions & 20 deletions src/iotdevicesimulator/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from iotdevicesimulator.devices import SensorSite
from iotdevicesimulator.db import Oracle
from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.mqtt.core import MessagingBaseClass
from iotdevicesimulator.queries import CosmosQuery, CosmosSiteQuery
from iotdevicesimulator.messaging.core import MessagingBaseClass
import logging.config
from typing import List
from pathlib import Path
Expand Down Expand Up @@ -163,6 +163,7 @@ async def create(
else:
self.sites = await self._init_sites_from_db(
self.oracle,
CosmosSiteQuery[self.query.name],
sleep_time=self.sleep_time,
max_cycles=self.max_cycles,
max_sites=self.max_sites,
Expand Down Expand Up @@ -265,6 +266,7 @@ def _init_sites(
@staticmethod
async def _init_sites_from_db(
oracle: Oracle,
query: CosmosQuery,
sleep_time: int = 10,
max_cycles: int = 3,
max_sites=-1,
Expand All @@ -275,6 +277,7 @@ async def _init_sites_from_db(
Args:
oracle: Oracle DB to query
query: Query to get data from
sleep_time: Length of time to sleep after sending data in seconds.
max_cycles: Maximum number of data sending cycles.
max_sites: Maximum number of sites to initialise. Picks randomly from list if less than number of sites found
Expand All @@ -283,28 +286,23 @@ async def _init_sites_from_db(
Returns:
List[SensorSite]: A list of sensor sites.
TODO: Update to grab sites from unique items in DB tables.
"""

async with oracle.connection.cursor() as cursor:
await cursor.execute("SELECT UNIQUE(SITE_ID) from COSMOS.SITES")

site_ids = await cursor.fetchall()
site_ids = await oracle.query_site_ids(query)

if max_sites != -1:
site_ids = CosmosSwarm._random_list_items(site_ids, max_sites)
if max_sites != -1:
site_ids = CosmosSwarm._random_list_items(site_ids, max_sites)

return [
SensorSite(
site_id[0],
sleep_time=sleep_time,
max_cycles=max_cycles,
inherit_logger=swarm_logger,
delay_first_cycle=delay_first_cycle,
)
for site_id in site_ids
]
return [
SensorSite(
site_id,
sleep_time=sleep_time,
max_cycles=max_cycles,
inherit_logger=swarm_logger,
delay_first_cycle=delay_first_cycle,
)
for site_id in site_ids
]

@staticmethod
def _random_list_items(list_in: List[object], max_count: int) -> List[object]:
Expand Down
32 changes: 29 additions & 3 deletions src/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import config
import pathlib
from iotdevicesimulator import db
from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.queries import CosmosQuery, CosmosSiteQuery
from parameterized import parameterized

CONFIG_PATH = pathlib.Path(
pathlib.Path(__file__).parents[1], "iotdevicesimulator", "__assets__", "config.cfg"
Expand Down Expand Up @@ -39,7 +40,7 @@ async def test_instantiation(self):
@pytest.mark.oracle
@pytest.mark.asyncio
@config_exists
async def test_query(self):
async def test_latest_data_query(self):

site_id = "MORLY"
query = CosmosQuery.LEVEL_1_SOILMET_30MIN
Expand All @@ -48,16 +49,41 @@ async def test_query(self):

self.assertEqual(row["SITE_ID"], site_id)

@pytest.mark.oracle
@pytest.mark.asyncio
@pytest.mark.slow
@config_exists
async def test_site_id_query(self):

queries = [
CosmosSiteQuery.LEVEL_1_NMDB_1HOUR,
CosmosSiteQuery.LEVEL_1_SOILMET_30MIN,
CosmosSiteQuery.LEVEL_1_PRECIP_1MIN,
CosmosSiteQuery.LEVEL_1_PRECIP_RAINE_1MIN,
]

for query in queries:
sites = await self.oracle.query_site_ids(query)

self.assertIsInstance(sites, list)

for site in sites:
self.assertIsInstance(site, str)
self.assertGreater(len(site), 1)

self.assertNotEqual(len(sites), 0)

@pytest.mark.asyncio
@pytest.mark.oracle
@config_exists
async def test_bad_query_type(self):
async def test_bad_latest_data_query_type(self):

site_id = "MORLY"
query = "sql injection goes brr"

with self.assertRaises(TypeError):
await self.oracle.query_latest_from_site(site_id, query)
await self.oracle.query_site_ids(query)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from iotdevicesimulator.devices import SensorSite
from iotdevicesimulator.db import Oracle
from iotdevicesimulator.queries import CosmosQuery
from iotdevicesimulator.mqtt.core import MockMessageConnection
from iotdevicesimulator.messaging.core import MockMessageConnection
from parameterized import parameterized
import pathlib, config

Expand Down
21 changes: 0 additions & 21 deletions src/tests/test_loggers.py

This file was deleted.

16 changes: 8 additions & 8 deletions src/tests/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import pytest
import unittest
from unittest.mock import patch
from iotdevicesimulator.mqtt.core import MockMessageConnection, MessagingBaseClass
from iotdevicesimulator.mqtt.aws import IotCoreMQTTConnection
from iotdevicesimulator.messaging.core import MockMessageConnection, MessagingBaseClass
from iotdevicesimulator.messaging.aws import IotCoreMQTTConnection
from config import Config
from pathlib import Path
import awscrt.mqtt
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_instantiation(self):
def test_non_string_arguments(self):

with self.assertRaises(TypeError):
instance = IotCoreMQTTConnection(
IotCoreMQTTConnection(
1,
self.config["cert_path"],
self.config["key_path"],
Expand All @@ -71,16 +71,16 @@ def test_non_string_arguments(self):
)

with self.assertRaises(TypeError):
instance = IotCoreMQTTConnection(
IotCoreMQTTConnection(
self.config["endpoint"],
self.config["cert_path"],
1,
self.config["key_path"],
self.config["ca_cert_path"],
"client_id",
)

with self.assertRaises(TypeError):
instance = IotCoreMQTTConnection(
IotCoreMQTTConnection(
self.config["endpoint"],
self.config["cert_path"],
1,
Expand All @@ -89,7 +89,7 @@ def test_non_string_arguments(self):
)

with self.assertRaises(TypeError):
instance = IotCoreMQTTConnection(
IotCoreMQTTConnection(
self.config["endpoint"],
self.config["cert_path"],
self.config["key_path"],
Expand All @@ -98,7 +98,7 @@ def test_non_string_arguments(self):
)

with self.assertRaises(TypeError):
instance = IotCoreMQTTConnection(
IotCoreMQTTConnection(
self.config["endpoint"],
self.config["cert_path"],
self.config["key_path"],
Expand Down
Loading

0 comments on commit 617ad60

Please sign in to comment.