Skip to content

Commit

Permalink
chore: authenticate in NMS prior to making other API calls (#476)
Browse files Browse the repository at this point in the history
* chore: authenticate in Notary prior to making other API calls

Signed-off-by: Guillaume Belanger <[email protected]>

* chore: improve docstrings

Signed-off-by: Guillaume Belanger <[email protected]>

* chore: remove unused ca_certificate field

Signed-off-by: Guillaume Belanger <[email protected]>

* chore: add waits to improve reliability

Signed-off-by: Guillaume Belanger <[email protected]>

* Fixes getting Juju secret ID

* Fixes DeviceGroup and NetworkSlice creation

* Increases time for network config propagation

---------

Signed-off-by: Guillaume Belanger <[email protected]>
Co-authored-by: Bartlomiej Gmerek <[email protected]>
Co-authored-by: Bartlomiej Gmerek <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent a4310a5 commit 790013f
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 65 deletions.
87 changes: 86 additions & 1 deletion tests/integration/juju_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from contextlib import contextmanager
from subprocess import CalledProcessError, call, check_output
from typing import Optional
from typing import Optional, Tuple

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -177,6 +177,91 @@ def set_application_config(model_name: str, application_name: str, config: dict)
juju_wait_for_active_idle(model_name, 60)


def _get_juju_secret_id(model_name: str, juju_secret_label: str) -> Optional[str]:
"""Get Juju secret ID.
The `juju secrets` command returns a JSON object with the following format:
{
"csuci57mp25c7993rgeg": {
"revision": 1,
"owner": "nms",
"label": "NMS_LOGIN",
"created": "2024-11-19T17:21:25Z",
"updated": "2024-11-19T17:21:25Z"
}
}
Args:
model_name(str): Juju model name
juju_secret_label(str): Juju secret label
"""
with juju_context(model_name):
cmd_out = check_output(["juju", "secrets", "--format=json"]).decode()
for key, value in json.loads(cmd_out).items():
if value["label"] == juju_secret_label:
return key
return None


def wait_for_nms_credentials(
model_name, juju_secret_label, timeout=300
) -> Tuple[Optional[str], Optional[str]]:
"""Wait for NMS credentials to be available in Juju secret.
Args:
model_name(str): Juju model name
juju_secret_label(str): Juju secret label
timeout(int): Time to wait for the credentials to be available
Raises:
TimeoutError: Raised if NMS credentials are not available within given time
"""
now = time.time()
while time.time() - now <= timeout:
username, password = get_nms_credentials(model_name, juju_secret_label)
if username and password:
return username, password
time.sleep(5)
raise TimeoutError(
f"Timed out waiting for NMS credentials in juju secret after {timeout} seconds!"
)


def get_nms_credentials(
model_name: str, juju_secret_label: str
) -> Tuple[Optional[str], Optional[str]]:
"""Get NMS credentials from Juju secret.
The `juju show-secret --reveal` command returns a JSON object with the following format:
{
"csuci57mp25c7993rgeg": {
"revision": 1,
"owner": "nms",
"label": "NMS_LOGIN",
"created": "2024-11-19T17:21:25Z",
"updated": "2024-11-19T17:21:25Z",
"content": {
"Data": {
"password": "whatever-password",
"token": "whatever-token",
"username": "whatever-username"
}
}
}
}
"""
secret_id = _get_juju_secret_id(model_name, juju_secret_label)
if not secret_id:
logger.warning("could not find secret with label %s", juju_secret_label)
return None, None
with juju_context(model_name):
cmd_out = check_output(
["juju", "show-secret", "--reveal", "--format=json", secret_id]
).decode()
secret_content = json.loads(cmd_out)[secret_id]["content"]["Data"]
return secret_content.get("username"), secret_content.get("password")


@contextmanager
def juju_context(model_name: str):
"""Allow changing currently active Juju model.
Expand Down
198 changes: 144 additions & 54 deletions tests/integration/nms_helper.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Module use to handle NMS API calls."""

import json
import logging
import time
from dataclasses import asdict, dataclass
from typing import Any, List, Optional

import requests

logger = logging.getLogger(__name__)

ACCOUNTS_URL = "config/v1/account"

JSON_HEADER = {"Content-Type": "application/json"}

SUBSCRIBER_CONFIG = {
"UeId": "PLACEHOLDER",
"plmnId": "00101",
"opc": "981d464c7c52eb6e5036234984ad0bcf",
"key": "5122250214c33e723a5dd523fc145fc0",
"sequenceNumber": "16f3b3f70fc2",
}

DEVICE_GROUP_CONFIG = {
"imsis": [],
"site-info": "demo",
Expand All @@ -34,6 +43,8 @@
},
},
}


NETWORK_SLICE_CONFIG = {
"slice-id": {"sst": "1", "sd": "102030"},
"site-device-group": [],
Expand All @@ -46,65 +57,144 @@
}


class Nms:
def __init__(self, nms_ip: str) -> None:
"""Construct the NMS class.
@dataclass
class StatusResponse:
"""Response from NMS when checking the status."""

Args:
nms_ip (str): IP address of the NMS application unit
"""
self.nms_ip = nms_ip
initialized: bool

def create_subscriber(self, imsi: str) -> None:
"""Create a subscriber.

Args:
imsi (str): Subscriber's IMSI
"""
SUBSCRIBER_CONFIG["UeId"] = imsi
url = f"https://{self.nms_ip}:5000/api/subscriber/imsi-{imsi}"
response = requests.post(url=url, data=json.dumps(SUBSCRIBER_CONFIG), verify=False)
response.raise_for_status()
logger.info(f"Created subscriber with IMSI {imsi}.")
@dataclass
class LoginParams:
"""Parameters to login to NMS."""

def create_device_group(self, device_group_name: str, imsis: list) -> None:
"""Create a device group.
username: str
password: str

Args:
device_group_name (str): Device group name
imsis (list): List of IMSIs to be included in the device group
"""
DEVICE_GROUP_CONFIG["imsis"] = imsis
url = f"https://{self.nms_ip}:5000/config/v1/device-group/{device_group_name}"
response = requests.post(url, json=DEVICE_GROUP_CONFIG, verify=False)
response.raise_for_status()
now = time.time()
timeout = 5
while time.time() - now <= timeout:
if requests.get(url, verify=False).json():
logger.info(f"Created device group {device_group_name}.")

@dataclass
class LoginResponse:
"""Response from NMS when logging in."""

token: str


class NMS:
"""Handle NMS API calls."""

def __init__(self, url: str):
if url.endswith("/"):
url = url[:-1]
self.url = url

def _make_request(
self,
method: str,
endpoint: str,
token: Optional[str] = None,
data: any = None, # type: ignore[reportGeneralTypeIssues]
) -> Any | None:
"""Make an HTTP request and handle common error patterns."""
headers = JSON_HEADER
if token:
headers["Authorization"] = f"Bearer {token}"
url = f"{self.url}{endpoint}"
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=data,
verify=False,
)
except requests.exceptions.SSLError as e:
logger.error("SSL error: %s", e)
return None
except requests.RequestException as e:
logger.error("HTTP request failed: %s", e)
return None
except OSError as e:
logger.error("couldn't complete HTTP request: %s", e)
return None
try:
response.raise_for_status()
except requests.HTTPError:
logger.error(
"Request failed: code %s",
response.status_code,
)
return None
try:
json_response = response.json()
except json.JSONDecodeError:
return None
return json_response

def is_initialized(self) -> bool:
"""Return if NMS is initialized."""
status = self.get_status()
return status.initialized if status else False

def is_api_available(self) -> bool:
"""Return if NMS is reachable."""
status = self.get_status()
return status is not None

def get_status(self) -> StatusResponse | None:
"""Return if NMS is initialized."""
response = self._make_request("GET", "/status")
if response:
return StatusResponse(
initialized=response.get("initialized"),
)
return None

def wait_for_api_to_be_available(self, timeout: int = 300) -> None:
"""Wait for NMS API to be available."""
t0 = time.time()
while time.time() - t0 < timeout:
if self.is_api_available():
return
time.sleep(5)
raise TimeoutError(f"NMS API is not available after {timeout} seconds.")

def wait_for_initialized(self, timeout: int = 300) -> None:
"""Wait for NMS to be initialized."""
t0 = time.time()
while time.time() - t0 < timeout:
if self.is_initialized():
return
else:
time.sleep(1)
raise TimeoutError("Timed out creating device group.")
time.sleep(5)
raise TimeoutError(f"NMS is not initialized after {timeout} seconds.")

def login(self, username: str, password: str) -> LoginResponse | None:
"""Login to NMS by sending the username and password and return a Token."""
login_params = LoginParams(username=username, password=password)
response = self._make_request("POST", "/login", data=asdict(login_params))
if response:
return LoginResponse(
token=response.get("token"),
)
return None

def create_subscriber(self, imsi: str, token: str) -> None:
"""Create a subscriber."""
url = f"/api/subscriber/imsi-{imsi}"
data = SUBSCRIBER_CONFIG.copy()
data["UeId"] = imsi
self._make_request("POST", url, token=token, data=data)
logger.info(f"Created subscriber with IMSI {imsi}.")

def create_network_slice(self, network_slice_name: str, device_groups: list) -> None:
"""Create a network slice.
def create_device_group(self, name: str, imsis: List[str], token: str) -> None:
"""Create a device group."""
DEVICE_GROUP_CONFIG["imsis"] = imsis
url = f"/config/v1/device-group/{name}"
self._make_request("POST", url, token=token, data=DEVICE_GROUP_CONFIG)
logger.info(f"Created device group {name}.")

Args:
network_slice_name (str): Network slice name
device_groups (list): List of device groups to be included in the network slice
"""
def create_network_slice(self, name: str, device_groups: List[str], token: str) -> None:
"""Create a network slice."""
NETWORK_SLICE_CONFIG["site-device-group"] = device_groups
url = f"https://{self.nms_ip}:5000/config/v1/network-slice/{network_slice_name}"
response = requests.post(url, json=NETWORK_SLICE_CONFIG, verify=False)
response.raise_for_status()
now = time.time()
timeout = 5
while time.time() - now <= timeout:
if requests.get(url, verify=False).json():
logger.info(f"Created network slice {network_slice_name}.")
return
else:
time.sleep(1)
raise TimeoutError("Timed out creating network slice.")
url = f"/config/v1/network-slice/{name}"
self._make_request("POST", url, token=token, data=NETWORK_SLICE_CONFIG)
logger.info(f"Created network slice {name}.")
Loading

0 comments on commit 790013f

Please sign in to comment.