diff --git a/src/rockstor/storageadmin/tests/test_disks.py b/src/rockstor/storageadmin/tests/test_disks.py index 9c91be2e3..c87d86bfe 100644 --- a/src/rockstor/storageadmin/tests/test_disks.py +++ b/src/rockstor/storageadmin/tests/test_disks.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2013 RockStor, Inc. +Copyright (c) 2012-2019 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -26,180 +26,180 @@ class DiskTests(APITestMixin, APITestCase): # fixtures = ['fix1.json'] - fixtures = ['test_disks.json'] - BASE_URL = '/api/disks' + fixtures = ["test_disks.json"] + BASE_URL = "/api/disks" @classmethod def setUpClass(cls): super(DiskTests, cls).setUpClass() # post mocks - cls.patch_wipe_disk = patch('storageadmin.views.disk.wipe_disk') + cls.patch_wipe_disk = patch("storageadmin.views.disk.wipe_disk") cls.mock_wipe_disk = cls.patch_wipe_disk.start() - cls.mock_wipe_disk.return_value = 'out', 'err', 0 + cls.mock_wipe_disk.return_value = "out", "err", 0 - cls.patch_scan_disks = patch('storageadmin.views.disk.scan_disks') + cls.patch_scan_disks = patch("storageadmin.views.disk.scan_disks") cls.mock_scan_disks = cls.patch_scan_disks.start() - cls.patch_blink_disk = patch('storageadmin.views.disk.blink_disk') + cls.patch_blink_disk = patch("storageadmin.views.disk.blink_disk") cls.mock_blink_disk = cls.patch_blink_disk.start() - cls.patch_mount_root = patch('storageadmin.views.disk.mount_root') + cls.patch_mount_root = patch("storageadmin.views.disk.mount_root") cls.mock_mount_root = cls.patch_mount_root.start() - cls.patch_pool_raid = patch('storageadmin.views.disk.pool_raid') + cls.patch_pool_raid = patch("storageadmin.views.disk.pool_raid") cls.mock_pool_raid = cls.patch_pool_raid.start() - cls.mock_pool_raid.return_value = {'data': 'single', - 'metadata': 'single'} + cls.mock_pool_raid.return_value = {"data": "single", "metadata": "single"} - cls.patch_enable_quota = patch('storageadmin.views.disk.enable_quota') + cls.patch_enable_quota = patch("storageadmin.views.disk.enable_quota") cls.mock_enable_quota = cls.patch_enable_quota.start() - cls.patch_import_shares = patch( - 'storageadmin.views.disk.import_shares') + cls.patch_import_shares = patch("storageadmin.views.disk.import_shares") cls.mock_import_shares = cls.patch_import_shares.start() # TODO: maybe patch as storageadmin.views.disk.smart.toggle_smart - cls.patch_toggle_smart = patch('system.smart.toggle_smart') + cls.patch_toggle_smart = patch("system.smart.toggle_smart") cls.mock_toggle_smart = cls.patch_toggle_smart.start() - cls.mock_toggle_smart.return_value = [''], [''], 0 + cls.mock_toggle_smart.return_value = [""], [""], 0 # primarily for test_btrfs_disk_import (to emulate a successful import) - cls.patch_get_pool_info = patch( - 'storageadmin.views.disk.get_pool_info') + cls.patch_get_pool_info = patch("storageadmin.views.disk.get_pool_info") cls.mock_get_pool_info = cls.patch_get_pool_info.start() - cls.fake_pool_info = {'disks': ['mock-disk'], 'label': 'mock-label', - 'uuid': 'b3d201a8-b497-4365-a90d-a50c50b8e808'} + cls.fake_pool_info = { + "disks": ["mock-disk"], + "label": "mock-label", + "uuid": "b3d201a8-b497-4365-a90d-a50c50b8e808", + } cls.mock_get_pool_info.return_value = cls.fake_pool_info - cls.temp_disk = Disk(id=2, name='mock-disk', size=88025459, - parted=False) + cls.temp_disk = Disk(id=2, name="mock-disk", size=88025459, parted=False) @classmethod def tearDownClass(cls): super(DiskTests, cls).tearDownClass() def test_disk_scan(self): - response = self.client.post(('{}/scan'.format(self.BASE_URL)), - data=None, format='json') + response = self.client.post( + ("{}/scan".format(self.BASE_URL)), data=None, format="json" + ) self.assertEqual(response.status_code, status.HTTP_200_OK) def test_invalid_disk_wipe(self): fake_dId = 99999 - url = ('{}/{}/wipe'.format(self.BASE_URL, fake_dId)) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) - e_msg = 'Disk id ({}) does not exist.'.format(fake_dId) + url = "{}/{}/wipe".format(self.BASE_URL, fake_dId) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + e_msg = "Disk id ({}) does not exist.".format(fake_dId) self.assertEqual(response.data[0], e_msg) def test_invalid_command(self): - url = ('{}/1/invalid'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) - e_msg = ('Unsupported command (invalid). Valid commands are; wipe, ' - 'btrfs-wipe, luks-format, btrfs-disk-import, blink-drive, ' - 'enable-smart, disable-smart, smartcustom-drive, ' - 'spindown-drive, pause, role-drive, ' - 'luks-drive.') + url = "{}/1/invalid".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + e_msg = ( + "Unsupported command (invalid). Valid commands are; wipe, " + "btrfs-wipe, luks-format, btrfs-disk-import, blink-drive, " + "enable-smart, disable-smart, smartcustom-drive, " + "spindown-drive, pause, role-drive, " + "luks-drive." + ) self.assertEqual(response.data[0], e_msg) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_disk_wipe(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk # btrfs-wipe is an aliase for wipe; ensure it exists. - url = ('{}/2/btrfs-wipe'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') + url = "{}/2/btrfs-wipe".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - url = ('{}/2/wipe'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') + url = "{}/2/wipe".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - e_msg = 'mock example exception surfaced from wipe_disk()' + e_msg = "mock example exception surfaced from wipe_disk()" self.mock_wipe_disk.side_effect = Exception(e_msg) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) self.assertEqual(response.data[0], e_msg) self.mock_wipe_disk.side_effect = None - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_btrfs_disk_import(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk - url = ('{}/2/btrfs-disk-import'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') + url = "{}/2/btrfs-disk-import".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_btrfs_disk_import_fail(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk - mock_e_msg = ("Error running a command. cmd = /sbin/btrfs fi show " - "/dev/disk/by-id/{}. rc = 1. stdout = ['']. stderr = " - "['ERROR: not a valid btrfs filesystem: /dev/disk/by-id/" - "{}', '']").format(self.temp_disk.name, - self.temp_disk.name) + mock_e_msg = ( + "Error running a command. cmd = /sbin/btrfs fi show " + "/dev/disk/by-id/{}. rc = 1. stdout = ['']. stderr = " + "['ERROR: not a valid btrfs filesystem: /dev/disk/by-id/" + "{}', '']" + ).format(self.temp_disk.name, self.temp_disk.name) self.mock_get_pool_info.side_effect = Exception(mock_e_msg) - url = ('{}/2/btrfs-disk-import'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') - e_msg = "Failed to import any pool on device id ({}). " \ - "Error: ({}).".format(self.temp_disk.id, mock_e_msg) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) + url = "{}/2/btrfs-disk-import".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") + e_msg = ( + "Failed to import any pool on device db id ({}). " + "Error: ({}).".format(self.temp_disk.id, mock_e_msg) + ) + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) self.assertEqual(response.data[0], e_msg) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_blink_drive(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk - url = ('{}/2/blink-drive'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') + url = "{}/2/blink-drive".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_enable_smart(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk - url = ('{}/2/enable-smart'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) - e_msg = 'S.M.A.R.T support is not available on ' \ - 'disk ({}).'.format(self.temp_disk.name) + url = "{}/2/enable-smart".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + e_msg = "S.M.A.R.T support is not available on disk ({}).".format( + self.temp_disk.name + ) self.assertEqual(response.data[0], e_msg) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_enable_smart_when_available(self, mock_disk): self.temp_disk.smart_available = True mock_disk.objects.get.return_value = self.temp_disk - url = ('{}/2/enable-smart'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_200_OK) + url = "{}/2/enable-smart".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) - @mock.patch('storageadmin.views.disk.Disk') + @mock.patch("storageadmin.views.disk.Disk") def test_disable_smart(self, mock_disk): mock_disk.objects.get.return_value = self.temp_disk - url = ('{}/2/disable-smart'.format(self.BASE_URL)) - response = self.client.post(url, data=None, format='json') - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR) - e_msg = 'S.M.A.R.T support is not available on ' \ - 'disk ({}).'.format(self.temp_disk.name) + url = "{}/2/disable-smart".format(self.BASE_URL) + response = self.client.post(url, data=None, format="json") + self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + e_msg = "S.M.A.R.T support is not available on disk ({}).".format( + self.temp_disk.name + ) self.assertEqual(response.data[0], e_msg) diff --git a/src/rockstor/storageadmin/tests/test_samba.py b/src/rockstor/storageadmin/tests/test_samba.py index a4a7767b3..7621d73ac 100644 --- a/src/rockstor/storageadmin/tests/test_samba.py +++ b/src/rockstor/storageadmin/tests/test_samba.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2013 RockStor, Inc. +Copyright (c) 2012-2019 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -20,77 +20,310 @@ from rest_framework.test import APITestCase from mock import patch -from storageadmin.models import Pool, Share, SambaCustomConfig, SambaShare +from storageadmin.exceptions import RockStorAPIException +from storageadmin.models import Pool, Share, SambaCustomConfig, SambaShare, User from storageadmin.tests.test_api import APITestMixin +from storageadmin.views.samba import SambaListView, logger -class SambaTests(APITestMixin, APITestCase): + +class SambaTests(APITestMixin, APITestCase, SambaListView): # fixture with: # share-smb - SMB exported with defaults: (comment "Samba-Export") # {'browsable': 'yes', 'guest_ok': 'no', 'read_only': 'no'} # share2 - no SMB export # fixtures = ['fix3.json'] - fixtures = ['test_smb.json'] - BASE_URL = '/api/samba' + fixtures = ["test_smb.json"] + BASE_URL = "/api/samba" @classmethod def setUpClass(cls): super(SambaTests, cls).setUpClass() # post mocks - cls.patch_mount_share = patch('storageadmin.views.samba.mount_share') + cls.patch_mount_share = patch("storageadmin.views.samba.mount_share") cls.mock_mount_share = cls.patch_mount_share.start() # mock Share model's mount_status utility # True = Share is mounted, False = Share unmounted - cls.patch_mount_status = patch('system.osi.mount_status') + cls.patch_mount_status = patch("system.osi.mount_status") cls.mock_mount_status = cls.patch_mount_status.start() cls.mock_mount_status.return_value = True - cls.patch_status = patch('storageadmin.views.samba.status') + cls.patch_status = patch("storageadmin.views.samba.status") cls.mock_status = cls.patch_status.start() - cls.mock_status.return_value = 'out', 'err', 0 + cls.mock_status.return_value = "out", "err", 0 - cls.patch_restart_samba = patch('storageadmin.views.samba.' - 'restart_samba') + cls.patch_restart_samba = patch("storageadmin.views.samba.restart_samba") cls.mock_status = cls.patch_restart_samba.start() - cls.patch_refresh_smb_config = patch('storageadmin.views.samba.' - 'refresh_smb_config') + cls.patch_refresh_smb_config = patch( + "storageadmin.views.samba.refresh_smb_config" + ) cls.mock_refresh_smb_config = cls.patch_refresh_smb_config.start() - cls.mock_refresh_smb_config.return_value = 'smbconfig' + cls.mock_refresh_smb_config.return_value = "smbconfig" # all values as per fixture - cls.temp_pool = Pool(id=11, name='rock-pool', size=5242880) - cls.temp_share_smb = Share(id=23, name='share-smb', pool=cls.temp_pool) + cls.temp_pool = Pool(id=11, name="rock-pool", size=5242880) + cls.temp_share_smb = Share(id=23, name="share-smb", pool=cls.temp_pool) cls.temp_sambashare = SambaShare(id=1, share=cls.temp_share_smb) # cls.temp_smb_custom_config = \ # SambaCustomConfig(id=1, smb_share=cls.temp_sambashare) - cls.temp_share2 = Share(id=24, name='share2', pool=cls.temp_pool) + cls.temp_share2 = Share(id=24, name="share2", pool=cls.temp_pool) @classmethod def tearDownClass(cls): super(SambaTests, cls).tearDownClass() - def test_get(self): + def test_validate_input(self): + """ + Test that _validate_input() returns a valid dict when: + 1. all input data are filled and valid + 2. no input data are specified (should return default options) + """ + # 1. all input data are filled and valid + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": False, + "guest_ok": "no", + } + expected_result = { + "comment": "Samba-Export", + "read_only": "no", + "browsable": "yes", + "custom_config": [], + "guest_ok": "no", + "shadow_copy": False, + } + returned = self._validate_input(rdata=data) + self.assertEqual( + returned, + expected_result, + msg="Un-expected _validate_input() result:\n " + "returned = ({}).\n " + "expected = ({}).".format(returned, expected_result), + ) + + # 2. no input data are specified (should return default options) + data = {} + expected_result = { + "comment": "samba export", + "read_only": "no", + "browsable": "yes", + "custom_config": [], + "guest_ok": "no", + "shadow_copy": False, + } + returned = self._validate_input(rdata=data) + self.assertEqual( + returned, + expected_result, + msg="Un-expected _validate_input() result:\n " + "returned = ({}).\n " + "expected = ({}).".format(returned, expected_result), + ) + + def test_validate_input_error(self): + """ + Test that _validate_input raises Exceptions when input data contain error(s). + The following cases are tested below: + 1. invalid 'custom_config' information + 2. invalid 'browsable' information + 3. invalid 'guest_ok' information + 4. invalid 'read_only' information + 5. invalid 'shadow_copy' information + """ + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "yes", + "custom_config": "not-a-list", + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": False, + "guest_ok": "no", + } + with self.assertRaises(RockStorAPIException): + self._validate_input(rdata=data) + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "maybe", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": False, + "guest_ok": "no", + } + with self.assertRaises(RockStorAPIException): + self._validate_input(rdata=data) + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": False, + "guest_ok": "maybe", + } + with self.assertRaises(RockStorAPIException): + self._validate_input(rdata=data) + data = { + "read_only": "maybe", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": False, + "guest_ok": "no", + } + with self.assertRaises(RockStorAPIException): + self._validate_input(rdata=data) + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": ["test"], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["9", "10"], + "shadow_copy": True, + "guest_ok": "no", + } + with self.assertRaises(RockStorAPIException): + self._validate_input(rdata=data) + + @mock.patch("storageadmin.views.samba.ShareMixin._validate_share") + def test_create_samba_share(self, mock_validate_share): + """ + Test that create_samba_share() returns a correct SambaShare object + when all conditions are valid. + """ + mock_validate_share.return_value = self.temp_share_smb + + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": [], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["23"], + "shadow_copy": False, + "guest_ok": "no", + } + expected_result = self.temp_sambashare + returned = self.create_samba_share(data) + self.assertEqual( + returned, + expected_result, + msg="Un-expected create_samba_share() result:\n " + "returned = ({} with id {}).\n " + "expected = ({} with id {}).".format( + returned, returned.id, expected_result, expected_result.id + ), + ) + + def test_create_samba_share_incorrect_share(self): + """ + Test that create_samba_share() raises an exception + when a non-existing share is provided. + """ + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": [], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["32"], + "shadow_copy": False, + "guest_ok": "no", + } + with self.assertRaises(RockStorAPIException): + self.create_samba_share(rdata=data) + + @mock.patch("storageadmin.views.samba.ShareMixin._validate_share") + @mock.patch("storageadmin.views.samba.SambaShare.objects") + @mock.patch("storageadmin.views.samba.logger") + def test_create_samba_share_existing_export( + self, mock_validate_share, mock_sambashare, mock_logger + ): + """ + Test that create_samba_share() logs the appropriate error + when the given share is already exported via Samba. + """ + mock_validate_share.return_value = self.temp_share_smb + mock_sambashare.filter.return_value = mock_sambashare + mock_sambashare.exists.return_value = True + + data = { + "read_only": "no", + "comment": "Samba-Export", + "admin_users": [], + "browsable": "yes", + "custom_config": [], + "snapshot_prefix": "", + "shares": ["23"], + "shadow_copy": False, + "guest_ok": "no", + } + # e_msg = ("Share ({}) is already exported via Samba.").format( + # self.temp_share_smb.name + # ) + self.create_samba_share(rdata=data) + mock_logger.error.assert_called() + # mock_logger.error.assert_called_with(e_msg) + + @mock.patch("storageadmin.views.samba.SambaShare") + def test_get(self, mock_sambashare): """ Test GET request 1. Get base URL - 2. Get request with id + 2. Get request with valid id """ - # get base URL self.get_base(self.BASE_URL) + # Create mock SambaShare + # smb_id = self.temp_sambashare.id + # Some conflict exists from a previous mock_sambashare, so set smb_id manually + smb_id = 1 + # print("smb_id is {}".format(smb_id)) + mock_sambashare.objects.get.return_value = self.temp_sambashare + + # test get of detailed view for a valid smb_id + response = self.client.get("{}/{}".format(self.BASE_URL, smb_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) + self.assertEqual(response.data["id"], smb_id) + + def test_get_non_existent(self): + """ + Test GET request + 1. Get base URL + 2. Get request with invalid id + """ + # get base URL + self.get_base(self.BASE_URL) # # get sambashare with id # response = self.client.get('{}/1'.format(self.BASE_URL)) # self.assertEqual(response.status_code, status.HTTP_200_OK, # msg=response) - - # get sambashare with non-existant id - response = self.client.get('{}/5'.format(self.BASE_URL)) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND, - msg=response) + # get sambashare with non-existent id + response = self.client.get("{}/5".format(self.BASE_URL)) + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND, msg=response) def test_post_requests_1(self): """ @@ -99,65 +332,100 @@ def test_post_requests_1(self): """ # create samba export with no share names - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', } + data = {"browsable": "yes", "guest_ok": "yes", "read_only": "yes"} response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Must provide share names.' + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Must provide share names." self.assertEqual(response.data[0], e_msg) - def test_post_requests_2(self): + @mock.patch("storageadmin.views.samba.ShareMixin._validate_share") + @mock.patch("storageadmin.views.samba.User") + def test_post_requests_2(self, mock_user, mock_validate_share): """ . Create a samba export for the share that has already been exported """ + mock_validate_share.return_value = self.temp_share_smb # create samba with invalid browsable, guest_ok, read_only choices - data = {'shares': (24,), 'browsable': 'Y', 'guest_ok': 'yes', - 'read_only': 'yes'} + data = { + "shares": ["23"], + "browsable": "Y", + "guest_ok": "yes", + "read_only": "yes", + } response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = ('Invalid choice for browsable. Possible choices ' - 'are yes or no.') + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Invalid choice for browsable. Possible choices are yes or no." self.assertEqual(response.data[0], e_msg) - data = {'shares': (24,), 'browsable': 'yes', 'guest_ok': 'Y', - 'read_only': 'yes'} + data = { + "shares": ["23"], + "browsable": "yes", + "guest_ok": "Y", + "read_only": "yes", + } response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = ('Invalid choice for guest_ok. Possible options are ' - 'yes or no.') + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Invalid choice for guest_ok. Possible options are yes or no." self.assertEqual(response.data[0], e_msg) - data = {'shares': (24,), 'browsable': 'yes', 'guest_ok': 'yes', - 'read_only': 'Y'} + data = { + "shares": ["23"], + "browsable": "yes", + "guest_ok": "yes", + "read_only": "Y", + } response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = ('Invalid choice for read_only. Possible options ' - 'are yes or no.') + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Invalid choice for read_only. Possible options are yes or no." self.assertEqual(response.data[0], e_msg) # create samba export # we use share id 24 (share2) as not yet smb exported. - data = {'shares': (24, ), 'browsable': 'yes', 'guest_ok': 'yes', - 'read_only': 'yes', 'admin_users': ('admin', ), - 'custom_config': ('CONFIG', 'XYZ')} - response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) - smb_id = response.data['id'] + data = { + "shares": ["24"], + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "admin_users": ("admin",), + "custom_config": ("CONFIG", "XYZ"), + } + mock_validate_share.return_value = self.temp_share2 + mock_user.objects.get.side_effects = None + temp_user = User.objects.create( + username="admin", uid=1, gid=1, admin=False, user=self.user + ) + mock_user.objects.get.return_value = temp_user - # test get of detailed view for the smb_id - response = self.client.get('{}/{}'.format(self.BASE_URL, smb_id)) - self.assertEqual(response.status_code, status.HTTP_200_OK, - msg=response.data) - self.assertEqual(response.data['id'], smb_id) + response = self.client.post(self.BASE_URL, data=data) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) + + ## The test below may better belong to test_get() above, so move it there. + # smb_id = response.data["id"] + # print("response.data is {}".format(response.data)) + # + # # test get of detailed view for the smb_id + # # First, save the SambaShare as created above (with id = 1) + # mock_sambashare.objects.get.return_value = self.temp_sambashare + # response = self.client.get("{}/{}".format(self.BASE_URL, smb_id)) + # self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) + # self.assertEqual(response.data["id"], smb_id) # # TODO: Needs multiple share instances mocked # # create samba exports for multiple(3) shares at once @@ -168,23 +436,42 @@ def test_post_requests_2(self): # self.assertEqual(response.status_code, # status.HTTP_200_OK, msg=response.data) - # create samba export with no admin users - data = {'shares': ('share5', ), 'browsable': 'yes', 'guest_ok': 'yes', - 'read_only': 'yes', 'custom_config': ('CONFIG', 'XYZ')} - response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) + ## post() does not raise exception in this condition anymore, but rather + ## logs an error as "Share (share.name) is already exported via Samba". + # # create samba export with the share that has already been exported above + # data = { + # "shares": ["24"], + # "browsable": "no", + # "guest_ok": "yes", + # "read_only": "yes", + # } + # response = self.client.post(self.BASE_URL, data=data) + # self.assertEqual( + # response.status_code, + # status.HTTP_500_INTERNAL_SERVER_ERROR, + # msg=response.data, + # ) + # e_msg = "Share (share2) is already exported via Samba." + # self.assertEqual(response.data[0], e_msg) + + @mock.patch("storageadmin.views.samba.ShareMixin._validate_share") + def test_post_requests_no_admin(self, mock_validate_share): + """ + Test a valid post request creating a samba export + when no admin user is specified + """ + mock_validate_share.return_value = self.temp_share_smb - # create samba export with the share that has already been exported - # above - data = {'shares': (24, ), 'browsable': 'no', 'guest_ok': 'yes', - 'read_only': 'yes'} + # create samba export with no admin users + data = { + "shares": ["24"], + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "custom_config": ("CONFIG", "XYZ"), + } response = self.client.post(self.BASE_URL, data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Share (share2) is already exported via Samba.' - self.assertEqual(response.data[0], e_msg) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) def test_put_requests_1(self): """ @@ -193,18 +480,22 @@ def test_put_requests_1(self): # edit samba that does not exist smb_id = 99999 - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', - 'admin_users': 'usr'} - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Samba export for the id ({}) does not exist.'.format(smb_id) + data = { + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "admin_users": "usr", + } + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Samba export for the id ({}) does not exist.".format(smb_id) self.assertEqual(response.data[0], e_msg) - @mock.patch('storageadmin.views.samba.SambaShare') - + @mock.patch("storageadmin.views.samba.SambaShare") def test_put_requests_2(self, mock_sambashare): """ 1. Edit samba that does not exists @@ -215,14 +506,19 @@ def test_put_requests_2(self, mock_sambashare): # edit samba with invalid custom config smb_id = 1 - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', - 'custom_config': 'CONFIGXYZ'} - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Custom config must be a list of strings.' + data = { + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "custom_config": "CONFIGXYZ", + } + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Custom config must be a list of strings." self.assertEqual(response.data[0], e_msg) # test mount_share exception case @@ -233,14 +529,15 @@ def test_put_requests_2(self, mock_sambashare): self.mock_mount_status.return_value = False smb_id = 1 - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', } - self.mock_mount_share.side_effect = KeyError('error') - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Failed to mount share (share-smb) due to a low level error.' + data = {"browsable": "yes", "guest_ok": "yes", "read_only": "yes"} + self.mock_mount_share.side_effect = KeyError("error") + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Failed to mount share (share-smb) due to a low level error." self.assertEqual(response.data[0], e_msg) # Return share mock mounted_state to mounted self.mock_mount_status.return_value = True @@ -248,31 +545,38 @@ def test_put_requests_2(self, mock_sambashare): # happy path smb_id = 1 self.mock_mount_share.side_effect = None - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', - 'admin_users': ('admin', ), 'custom_config': ('CONFIG', 'XYZ')} - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) + data = { + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "admin_users": ("admin",), + "custom_config": ("CONFIG", "XYZ"), + } + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) # edit samba export with admin user other than admin smb_id = 1 - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', - 'admin_users': ('admin2', ), 'custom_config': ('CONFIG', - 'XYZ')} - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) + data = { + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "admin_users": ("admin2",), + "custom_config": ("CONFIG", "XYZ"), + } + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) # edit samba export passing no admin users smb_id = 1 - data = {'browsable': 'yes', 'guest_ok': 'yes', 'read_only': 'yes', - 'custom_config': ('CONFIG', 'XYZ')} - response = self.client.put('{}/{}'.format(self.BASE_URL, smb_id), - data=data) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) + data = { + "browsable": "yes", + "guest_ok": "yes", + "read_only": "yes", + "custom_config": ("CONFIG", "XYZ"), + } + response = self.client.put("{}/{}".format(self.BASE_URL, smb_id), data=data) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) def test_delete_requests_1(self): """ @@ -280,24 +584,24 @@ def test_delete_requests_1(self): """ # Delete samba that does nor exists smb_id = 99999 - response = self.client.delete('{}/{}'.format(self.BASE_URL, smb_id)) - self.assertEqual(response.status_code, - status.HTTP_500_INTERNAL_SERVER_ERROR, - msg=response.data) - e_msg = 'Samba export for the id ({}) does not exist.'.format(smb_id) + response = self.client.delete("{}/{}".format(self.BASE_URL, smb_id)) + self.assertEqual( + response.status_code, + status.HTTP_500_INTERNAL_SERVER_ERROR, + msg=response.data, + ) + e_msg = "Samba export for the id ({}) does not exist.".format(smb_id) self.assertEqual(response.data[0], e_msg) - @mock.patch('storageadmin.views.samba.SambaShare') + @mock.patch("storageadmin.views.samba.SambaShare") def test_delete_requests_2(self, mock_sambashare): """ . Delete samba - """ mock_sambashare.objects.get.return_value = self.temp_sambashare # happy path smb_id = 1 - response = self.client.delete('{}/{}'.format(self.BASE_URL, smb_id)) - self.assertEqual(response.status_code, - status.HTTP_200_OK, msg=response.data) + response = self.client.delete("{}/{}".format(self.BASE_URL, smb_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data) diff --git a/src/rockstor/storageadmin/views/config_backup.py b/src/rockstor/storageadmin/views/config_backup.py index 6b9a7c301..2a98b67b2 100644 --- a/src/rockstor/storageadmin/views/config_backup.py +++ b/src/rockstor/storageadmin/views/config_backup.py @@ -82,9 +82,9 @@ def restore_samba_exports(ml): if (m['model'] == 'storageadmin.sambashare'): exports.append(m['fields']) for e in exports: - e['shares'] = [e['path'].split('/')[-1], ] - generic_post('%s/samba' % BASE_URL, e) - logger.debug('Finished restoring Samba exports.') + e['shares'] = [] + e['shares'].append(e['share']) + generic_post('{}/samba'.format(BASE_URL), exports) def restore_afp_exports(ml): diff --git a/src/rockstor/storageadmin/views/samba.py b/src/rockstor/storageadmin/views/samba.py index 4f3f6b3b1..3de35b38c 100644 --- a/src/rockstor/storageadmin/views/samba.py +++ b/src/rockstor/storageadmin/views/samba.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2013 RockStor, Inc. +Copyright (c) 2012-2019 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -21,83 +21,82 @@ from rest_framework.exceptions import NotFound from django.db import transaction from django.conf import settings -from storageadmin.models import (SambaShare, User, SambaCustomConfig) +from storageadmin.models import SambaShare, User, SambaCustomConfig from storageadmin.serializers import SambaShareSerializer from storageadmin.util import handle_exception import rest_framework_custom as rfc from share import ShareMixin -from system.samba import (refresh_smb_config, status, restart_samba) +from system.samba import refresh_smb_config, status, restart_samba from fs.btrfs import mount_share import logging + logger = logging.getLogger(__name__) class SambaMixin(object): serializer_class = SambaShareSerializer DEF_OPTS = { - 'comment': 'samba export', - 'browsable': 'yes', - 'guest_ok': 'no', - 'read_only': 'no', - 'custom_config': None, - 'shadow_copy': False, - 'snapshot_prefix': None, + "comment": "samba export", + "browsable": "yes", + "guest_ok": "no", + "read_only": "no", + "custom_config": None, + "shadow_copy": False, + "snapshot_prefix": None, } - BOOL_OPTS = ('yes', 'no',) + BOOL_OPTS = ("yes", "no") @staticmethod def _restart_samba(): out = status() - if (out[2] == 0): + if out[2] == 0: restart_samba(hard=True) @classmethod - def _validate_input(cls, request, smbo=None): + def _validate_input(cls, rdata, smbo=None): options = {} def_opts = cls.DEF_OPTS - if (smbo is not None): + if smbo is not None: def_opts = cls.DEF_OPTS.copy() - def_opts['comment'] = smbo.comment - def_opts['browsable'] = smbo.browsable - def_opts['guest_ok'] = smbo.guest_ok - def_opts['read_only'] = smbo.read_only - def_opts['shadow_copy'] = smbo.shadow_copy - - options['comment'] = request.data.get('comment', def_opts['comment']) - options['browsable'] = request.data.get('browsable', - def_opts['browsable']) - - options['custom_config'] = request.data.get('custom_config', []) - if (type(options['custom_config']) != list): - e_msg = 'Custom config must be a list of strings.' - handle_exception(Exception(e_msg), request) - if (options['browsable'] not in cls.BOOL_OPTS): - e_msg = ('Invalid choice for browsable. Possible ' - 'choices are yes or no.') - handle_exception(Exception(e_msg), request) - options['guest_ok'] = request.data.get('guest_ok', - def_opts['guest_ok']) - if (options['guest_ok'] not in cls.BOOL_OPTS): - e_msg = ('Invalid choice for guest_ok. Possible ' - 'options are yes or no.') - handle_exception(Exception(e_msg), request) - options['read_only'] = request.data.get('read_only', - def_opts['read_only']) - if (options['read_only'] not in cls.BOOL_OPTS): - e_msg = ('Invalid choice for read_only. Possible ' - 'options are yes or no.') - handle_exception(Exception(e_msg), request) - options['shadow_copy'] = request.data.get('shadow_copy', - def_opts['shadow_copy']) - if (options['shadow_copy']): - options['snapshot_prefix'] = request.data.get( - 'snapshot_prefix', def_opts['snapshot_prefix']) - if (options['snapshot_prefix'] is None or - len(options['snapshot_prefix'].strip()) == 0): - e_msg = ('Invalid choice for snapshot_prefix. It must be a ' - 'valid non-empty string.') - handle_exception(Exception(e_msg), request) + def_opts["comment"] = smbo.comment + def_opts["browsable"] = smbo.browsable + def_opts["guest_ok"] = smbo.guest_ok + def_opts["read_only"] = smbo.read_only + def_opts["shadow_copy"] = smbo.shadow_copy + + options["comment"] = rdata.get("comment", def_opts["comment"]) + options["browsable"] = rdata.get("browsable", def_opts["browsable"]) + + options["custom_config"] = rdata.get("custom_config", []) + if type(options["custom_config"]) != list: + e_msg = "Custom config must be a list of strings." + handle_exception(Exception(e_msg), rdata) + if options["browsable"] not in cls.BOOL_OPTS: + e_msg = "Invalid choice for browsable. Possible choices are yes or no." + handle_exception(Exception(e_msg), rdata) + options["guest_ok"] = rdata.get("guest_ok", def_opts["guest_ok"]) + if options["guest_ok"] not in cls.BOOL_OPTS: + e_msg = "Invalid choice for guest_ok. Possible options are yes or no." + handle_exception(Exception(e_msg), rdata) + options["read_only"] = rdata.get("read_only", def_opts["read_only"]) + if options["read_only"] not in cls.BOOL_OPTS: + e_msg = "Invalid choice for read_only. Possible options are yes or no." + handle_exception(Exception(e_msg), rdata) + options["shadow_copy"] = rdata.get("shadow_copy", def_opts["shadow_copy"]) + if options["shadow_copy"]: + options["snapshot_prefix"] = rdata.get( + "snapshot_prefix", def_opts["snapshot_prefix"] + ) + if ( + options["snapshot_prefix"] is None + or len(options["snapshot_prefix"].strip()) == 0 + ): + e_msg = ( + "Invalid choice for snapshot_prefix. It must be a " + "valid non-empty string." + ) + handle_exception(Exception(e_msg), rdata) return options @@ -111,14 +110,17 @@ def _set_admin_users(admin_users, smb_share): # object. try: system_user = pwd.getpwnam(au) - auo = User(username=au, uid=system_user.pw_uid, - gid=system_user.pw_gid, admin=False) + auo = User( + username=au, + uid=system_user.pw_uid, + gid=system_user.pw_gid, + admin=False, + ) auo.save() except KeyError: # raise the outer exception as it's more meaningful to the # user. - raise Exception('Requested admin user(%s) does ' - 'not exist.' % au) + raise Exception("Requested admin user(%s) does not exist." % au) finally: auo.smb_shares.add(smb_share) @@ -128,46 +130,55 @@ class SambaListView(SambaMixin, ShareMixin, rfc.GenericView): @transaction.atomic def post(self, request): - if ('shares' not in request.data): - e_msg = 'Must provide share names.' - handle_exception(Exception(e_msg), request) - shares = [self._validate_share(request, s) for s in - request.data['shares']] - options = self._validate_input(request) - custom_config = options['custom_config'] - del(options['custom_config']) - for share in shares: - if (SambaShare.objects.filter(share=share).exists()): - e_msg = ('Share ({}) is already exported via ' - 'Samba.').format(share.name) - handle_exception(Exception(e_msg), request) - with self._handle_exception(request): + if isinstance(request.data, list): + for se in request.data: + smb_share = self.create_samba_share(se) + else: + smb_share = self.create_samba_share(request.data) + refresh_smb_config(list(SambaShare.objects.all())) + self._restart_samba() + return Response(SambaShareSerializer(smb_share).data) + + def create_samba_share(self, rdata): + if "shares" not in rdata: + e_msg = "Must provide share names." + handle_exception(Exception(e_msg), rdata) + shares = [self._validate_share(rdata, s) for s in rdata["shares"]] + options = self._validate_input(rdata) + custom_config = options["custom_config"] + del options["custom_config"] + with self._handle_exception(rdata): for share in shares: - mnt_pt = ('%s%s' % (settings.MNT_PT, share.name)) - options['share'] = share - options['path'] = mnt_pt + if SambaShare.objects.filter(share=share).exists(): + e_msg = ("Share ({}) is already exported via Samba.").format( + share.name + ) + logger.error(e_msg) + smb_share = SambaShare.objects.get(share=share) + # handle_exception(Exception(e_msg), rdata) + continue + mnt_pt = "{}{}".format(settings.MNT_PT, share.name) + options["share"] = share + options["path"] = mnt_pt smb_share = SambaShare(**options) smb_share.save() for cc in custom_config: - cco = SambaCustomConfig(smb_share=smb_share, - custom_config=cc) + cco = SambaCustomConfig(smb_share=smb_share, custom_config=cc) cco.save() if not share.is_mounted: mount_share(share, mnt_pt) - admin_users = request.data.get('admin_users', []) - if (admin_users is None): + admin_users = rdata.get("admin_users", []) + if admin_users is None: admin_users = [] self._set_admin_users(admin_users, smb_share) - refresh_smb_config(list(SambaShare.objects.all())) - self._restart_samba() - return Response(SambaShareSerializer(smb_share).data) + return smb_share class SambaDetailView(SambaMixin, rfc.GenericView): def get(self, *args, **kwargs): try: - data = SambaShare.objects.get(id=self.kwargs['smb_id']) + data = SambaShare.objects.get(id=self.kwargs["smb_id"]) serialized_data = SambaShareSerializer(data) return Response(serialized_data.data) except SambaShare.DoesNotExist: @@ -180,8 +191,7 @@ def delete(self, request, smb_id): SambaCustomConfig.objects.filter(smb_share=smbo).delete() smbo.delete() except: - e_msg = ('Samba export for the id ({}) ' - 'does not exist.').format(smb_id) + e_msg = ("Samba export for the id ({}) does not exist.").format(smb_id) handle_exception(Exception(e_msg), request) with self._handle_exception(request): @@ -195,24 +205,25 @@ def put(self, request, smb_id): try: smbo = SambaShare.objects.get(id=smb_id) except: - e_msg = ('Samba export for the id ({}) ' - 'does not exist.').format(smb_id) + e_msg = ("Samba export for the id ({}) does not exist.").format( + smb_id + ) handle_exception(Exception(e_msg), request) - options = self._validate_input(request, smbo=smbo) - custom_config = options['custom_config'] - del(options['custom_config']) + options = self._validate_input(request.data, smbo=smbo) + custom_config = options["custom_config"] + del options["custom_config"] smbo.__dict__.update(**options) - admin_users = request.data.get('admin_users', []) - if (admin_users is None): + admin_users = request.data.get("admin_users", []) + if admin_users is None: admin_users = [] for uo in User.objects.filter(smb_shares=smbo): - if (uo.username not in admin_users): + if uo.username not in admin_users: uo.smb_shares.remove(smbo) self._set_admin_users(admin_users, smbo) smbo.save() for cco in SambaCustomConfig.objects.filter(smb_share=smbo): - if (cco.custom_config not in custom_config): + if cco.custom_config not in custom_config: cco.delete() else: custom_config.remove(cco.custom_config) @@ -221,14 +232,16 @@ def put(self, request, smb_id): cco.save() for smb_o in SambaShare.objects.all(): if not smb_o.share.is_mounted: - mnt_pt = ('%s%s' % (settings.MNT_PT, smb_o.share.name)) + mnt_pt = "%s%s" % (settings.MNT_PT, smb_o.share.name) try: mount_share(smb_o.share, mnt_pt) except Exception as e: logger.exception(e) - if (smb_o.id == smbo.id): - e_msg = ('Failed to mount share ({}) due to a low ' - 'level error.').format(smb_o.share.name) + if smb_o.id == smbo.id: + e_msg = ( + "Failed to mount share ({}) due to a low " + "level error." + ).format(smb_o.share.name) handle_exception(Exception(e_msg), request) refresh_smb_config(list(SambaShare.objects.all())) diff --git a/src/rockstor/system/config_backup.py b/src/rockstor/system/config_backup.py index 6aaf9e6c4..edb5fe3b9 100644 --- a/src/rockstor/system/config_backup.py +++ b/src/rockstor/system/config_backup.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2016 RockStor, Inc. +Copyright (c) 2012-2019 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -21,37 +21,42 @@ from datetime import datetime from django.core.management import call_command from storageadmin.models import ConfigBackup -from system.osi import (run_command, md5sum) +from system.osi import run_command, md5sum logger = logging.getLogger(__name__) def backup_config(): - models = {'storageadmin': - ['user', 'group', 'sambashare', 'sambacustomconfig', - 'netatalkshare', 'nfsexport', - 'nfsexportgroup', 'advancednfsexport', ], - 'smart_manager': - ['service', ], } + models = { + "storageadmin": [ + "user", + "group", + "sambashare", + "sambacustomconfig", + "netatalkshare", + "nfsexport", + "nfsexportgroup", + "advancednfsexport", + ], + "smart_manager": ["service"], + } model_list = [] for a in models: for m in models[a]: - model_list.append('%s.%s' % (a, m)) - logger.debug('model list = %s' % model_list) + model_list.append("{}.{}".format(a, m)) - filename = ('backup-%s.json' % datetime.now().strftime('%Y-%m-%d-%H%M%S')) + filename = "backup-{}.json".format(datetime.now().strftime("%Y-%m-%d-%H%M%S")) cb_dir = ConfigBackup.cb_dir() - if (not os.path.isdir(cb_dir)): + if not os.path.isdir(cb_dir): os.mkdir(cb_dir) fp = os.path.join(cb_dir, filename) - with open(fp, 'w') as dfo: - call_command('dumpdata', *model_list, stdout=dfo) - dfo.write('\n') - call_command('dumpdata', database='smart_manager', *model_list, - stdout=dfo) - run_command(['/usr/bin/gzip', fp]) - gz_name = ('%s.gz' % filename) + with open(fp, "w") as dfo: + call_command("dumpdata", *model_list, stdout=dfo) + dfo.write("\n") + call_command("dumpdata", database="smart_manager", *model_list, stdout=dfo) + run_command(["/usr/bin/gzip", fp]) + gz_name = "{}.gz".format(filename) fp = os.path.join(cb_dir, gz_name) size = os.stat(fp).st_size cbo = ConfigBackup(filename=gz_name, md5sum=md5sum(fp), size=size)