Skip to content

Commit

Permalink
Merge pull request #11 from ministryofjustice/mist_plan_feature
Browse files Browse the repository at this point in the history
Added Mist plan
  • Loading branch information
jamesgreen-moj authored Dec 14, 2023
2 parents 5304745 + e5fe3dd commit 83e2eb6
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ env/
terraform.tfstate

data_src/**.csv
data_src/**mist_plan**.json
.idea
**/__pycache__/
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ run-dev: ## Run the python script while mounting the host. This enables using th
tests: ## Run unit tests for the python app
docker run -v $(shell pwd)/src:/app/src \
-v $(shell pwd)/test:/app/test \
-v $(shell pwd)/data_src:/data_src \
-e RUN_UNIT_TESTS=True $(NAME)

.PHONY: shell
Expand Down
206 changes: 133 additions & 73 deletions src/juniper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import getpass
import sys
from datetime import datetime

# Mist CRUD operations

Expand Down Expand Up @@ -117,9 +118,127 @@ def warn_if_using_org_id_production(org_id):
else:
raise ValueError('Invalid input')

# Main function

def build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
):
site = {'name': d.get('Site Name', ''),
'address': d.get('Site Address', ''),
"latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]},
"country_code": d.get('country_code', ''),
"rftemplate_id": rf_template_id,
"networktemplate_id": network_template_id,
"timezone": d.get('time_zone', ''),
"sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi=d.get('Enable GovWifi', ''),
moj_wifi=d.get('Enable MoJWifi', ''),
site_group_ids=json.loads(site_group_ids)
),
}
# MOJ specific attributes
site_setting = {

"auto_upgrade": {
"enabled": True,
"version": "custom",
"time_of_day": "02:00",
"custom_versions": {
"AP45": "0.12.27066",
"AP32": "0.12.27066"
},
"day_of_week": ""
},

"rogue": {
"min_rssi": -80,
"min_duration": 20,
"enabled": True,
"honeypot_enabled": True,
"whitelisted_bssids": [
""
],
"whitelisted_ssids": [
"GovWifi"
]
},

"persist_config_on_device": True,

"engagement": {
"dwell_tags": {
"passerby": "1-300",
"bounce": "3600-14400",
"engaged": "25200-36000",
"stationed": "50400-86400"
},
"dwell_tag_names": {
"passerby": "Below 5 Min (Passerby)",
"bounce": "1-4 Hours",
"engaged": "7-10 Hours",
"stationed": "14-24 Hours"
}
},
"analytic": {
"enabled": True
},

"vars": {
"site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''),
"site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''),
"address": d.get('Site Address', ''),
"site_name": d.get('Site Name', '')
}

}
return site, site_setting


def plan_of_action(
data,
rf_template_id,
network_template_id,
site_group_ids
):

# Generate a timestamp for the filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp)
json_objects = []

for d in data:
site, site_setting = build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
)
json_objects.append(site)
json_objects.append(site_setting)

# Load file
with open(plan_file_name, "w") as plan_file:
json.dump(json_objects, plan_file, indent=2)

# Print to terminal
with open(plan_file_name, "r") as plan_file:
print(plan_file.read())

print("A file containing all the changes has been created: {file}".format(
file=plan_file_name))
user_input = input("Do you wish to continue? (y/n): ").upper()

if user_input == "Y":
print("Continuing with run")
elif user_input == "N":
sys.exit(0)
else:
raise ValueError('Invalid input')


# Main function
def juniper_script(
data,
org_id=None,
Expand All @@ -129,7 +248,6 @@ def juniper_script(
rf_template_id=None,
network_template_id=None
):

# Configure True/False to enable/disable additional logging of the API response objects
show_more_details = True

Expand All @@ -149,83 +267,27 @@ def juniper_script(
# Prompt user if we are using production org_id
warn_if_using_org_id_production(org_id)

plan_of_action(
data,
rf_template_id,
network_template_id,
site_group_ids
)

# Establish Mist session
admin = Admin(mist_username, mist_login_method)

# Create each site from the CSV file
for d in data:
# Variables
site_id = None
site = {'name': d.get('Site Name', ''),
'address': d.get('Site Address', ''),
"latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]},
"country_code": d.get('country_code', ''),
"rftemplate_id": rf_template_id,
"networktemplate_id": network_template_id,
"timezone": d.get('time_zone', ''),
"sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi=d.get('Enable GovWifi', ''),
moj_wifi=d.get('Enable MoJWifi', ''),
site_group_ids=json.loads(site_group_ids)
),
}

# MOJ specific attributes
site_setting = {

"auto_upgrade": {
"enabled": True,
"version": "custom",
"time_of_day": "02:00",
"custom_versions": {
"AP45": "0.12.27066",
"AP32": "0.12.27066"
},
"day_of_week": ""
},

"rogue": {
"min_rssi": -80,
"min_duration": 20,
"enabled": True,
"honeypot_enabled": True,
"whitelisted_bssids": [
""
],
"whitelisted_ssids": [
"GovWifi"
]
},

"persist_config_on_device": True,

"engagement": {
"dwell_tags": {
"passerby": "1-300",
"bounce": "3600-14400",
"engaged": "25200-36000",
"stationed": "50400-86400"
},
"dwell_tag_names": {
"passerby": "Below 5 Min (Passerby)",
"bounce": "1-4 Hours",
"engaged": "7-10 Hours",
"stationed": "14-24 Hours"
}
},
"analytic": {
"enabled": True
},

"vars": {
"site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''),
"site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''),
"address": d.get('Site Address', ''),
"site_name": d.get('Site Name', '')
},


}
site, site_setting = build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
)

print('Calling the Mist Create Site API...')
result = admin.post('/api/v1/orgs/' + org_id + '/sites', site)
Expand All @@ -244,8 +306,6 @@ def juniper_script(
print(json.dumps(result, sort_keys=True, indent=4))
print('\nUsing id in the Mist Update Setting API request')

print()

# Update Site Setting
print('Calling the Mist Update Setting API...')
result = admin.put('/api/v1/sites/' + site_id + '/setting',
Expand Down
45 changes: 42 additions & 3 deletions test/test_juniper.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import unittest
from unittest.mock import patch, MagicMock
from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production
from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production, plan_of_action
from io import StringIO
from datetime import datetime
import os


class TestJuniperScript(unittest.TestCase):

@patch('src.juniper.plan_of_action')
@patch('getpass.getpass', return_value='token')
@patch('src.juniper.requests.Session.get', return_value=MagicMock(status_code=200))
@patch('src.juniper.Admin.post')
@patch('src.juniper.Admin.put')
def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token):
def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token, mock_plan_of_action):
# Mock Mist API responses
mock_post.return_value = {'id': '123', 'name': 'TestSite'}
mock_put.return_value = {'status': 'success'}
Expand Down Expand Up @@ -154,8 +157,9 @@ def test_juniper_script_missing_network_template_id(self):

self.assertEqual(str(cm.exception), 'Must define network_template_id')

@patch('src.juniper.plan_of_action')
@patch('src.juniper.Admin')
def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin):
def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin, mock_plan_of_action):
output_buffer = StringIO()
with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
juniper_script([],
Expand Down Expand Up @@ -316,3 +320,38 @@ def test_append_neither_wifi(self):
result = check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi, moj_wifi, self.site_group_ids)
self.assertEqual(result, [])


class TestPlanOfActionFunction(unittest.TestCase):

def setUp(self):
self.data = {'Site Name': 'TestSite', 'Site Address': '123 Main St',
'gps': [1.23, 4.56], 'country_code': 'US', 'time_zone': 'UTC',
'Enable GovWifi': 'true', 'Enable MoJWifi': 'false',
'Wired NACS Radius Key': 'key1', 'GovWifi Radius Key': 'key2'},
self.rf_template_id = "rf_template_id",
self.network_template_id = "network_template_id",
self.site_group_ids = '{"moj_wifi": "foo","gov_wifi": "bar"}'

def test_plan_of_action_creates_file(self):
with patch('builtins.input', return_value='Y'), patch('sys.exit') as mock_exit:
plan_of_action(self.data, self.rf_template_id,
self.network_template_id, self.site_group_ids)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
expected_file_name = f"../data_src/mist_plan_{timestamp}.json"
self.assertTrue(os.path.exists(expected_file_name))
os.remove(expected_file_name)

@patch("builtins.open")
def test_plan_of_action_exit_on_no(self, mock_open_file):
with patch('builtins.input', return_value='N'), self.assertRaises(SystemExit) as cm:
plan_of_action(self.data, self.rf_template_id,
self.network_template_id, self.site_group_ids)
self.assertEqual(cm.exception.code, 0)

@patch("builtins.open")
def test_plan_of_action_invalid_input(self, mock_open_file):
with patch('builtins.input', return_value='invalid_input'), self.assertRaises(ValueError) as cm:
plan_of_action(self.data, self.rf_template_id,
self.network_template_id, self.site_group_ids)
self.assertEqual(str(cm.exception), 'Invalid input')

0 comments on commit 83e2eb6

Please sign in to comment.