From ced12c5af65b1e9a93809a9a2c91dd3c946822dd Mon Sep 17 00:00:00 2001 From: ryanmerolle Date: Mon, 30 Jan 2023 14:59:39 +0000 Subject: [PATCH] improve ACLInterfaceAssignmentForm clean method --- netbox_acls/forms/models.py | 198 ++++++++++++++++++++++-------------- 1 file changed, 119 insertions(+), 79 deletions(-) diff --git a/netbox_acls/forms/models.py b/netbox_acls/forms/models.py index 148fd5b..3c1f799 100644 --- a/netbox_acls/forms/models.py +++ b/netbox_acls/forms/models.py @@ -182,24 +182,24 @@ def clean(self): acl_type = cleaned_data.get("type") # Check if more than one host type selected. - host_types = self.get_host_types() + host_types = self._get_host_types() # Check if no hosts selected. - self.validate_host_types(host_types) + self._validate_host_types(host_types) host_type, host = host_types[0] # Check if duplicate entry. - self.validate_duplicate_entry(name, host_type, host, error_message) + self._validate_duplicate_entry(name, host_type, host, error_message) # Check if Access List has no existing rules before change the Access List's type. - self.validate_acl_type_change(acl_type, error_message) + self._validate_acl_type_change(acl_type, error_message) if error_message: raise forms.ValidationError(error_message) return cleaned_data - def get_host_types(self): + def _get_host_types(self): """ Get host type assigned to the Access List. """ @@ -213,7 +213,7 @@ def get_host_types(self): ] return [x for x in host_types if x[1]] - def validate_host_types(self, host_types): + def _validate_host_types(self, host_types): """ Check number of host types selected. """ @@ -227,7 +227,7 @@ def validate_host_types(self, host_types): "Access Lists must be assigned to a device, virtual chassis or virtual machine.", ) - def validate_duplicate_entry(self, name, host_type, host, error_message): + def _validate_duplicate_entry(self, name, host_type, host, error_message): """ Check if duplicate entry. (Because of GFK.) """ @@ -246,7 +246,7 @@ def validate_duplicate_entry(self, name, host_type, host, error_message): "name": [error_same_acl_name], } - def validate_acl_type_change(self, acl_type, error_message): + def _validate_acl_type_change(self, acl_type, error_message): """ Check if Access List has no existing rules before change the Access List's type. """ @@ -374,6 +374,7 @@ class Meta: ), } + def clean(self): """ Validates form inputs before submitting: @@ -384,87 +385,126 @@ def clean(self): - Check for duplicate entry. (Because of GFK) - Check that the interface does not have an existing ACL applied in the direction already. """ + # Call the parent class's `clean` method cleaned_data = super().clean() - error_message = {} - access_list = cleaned_data.get("access_list") - direction = cleaned_data.get("direction") - interface = cleaned_data.get("interface") - vminterface = cleaned_data.get("vminterface") - - # Check if both interface and vminterface are set. - if interface and vminterface: - error_too_many_interfaces = "Access Lists must be assigned to one type of interface at a time (VM interface or physical interface)" - error_message |= { - "interface": [error_too_many_interfaces], - "vminterface": [error_too_many_interfaces], - } - elif not (interface or vminterface): - error_no_interface = ( - "An Access List assignment but specify an Interface or VM Interface." - ) - error_message |= { - "interface": [error_no_interface], - "vminterface": [error_no_interface], - } - else: - if interface: - assigned_object = interface - assigned_object_type = "interface" - host_type = "device" + + # Get the interface types assigned to the Access List + interface_types = self._get_interface_types() + + # Initialize an error message variable + error_message = self._validate_interface_types(interface_types) + + if not error_message: + assigned_object_type, assigned_object = interface_types[0] + host_type = ("device" if assigned_object_type == "interface" else "virtual_machine") + + # Get the parent host (device or virtual machine) of the assigned interface + if assigned_object_type == "interface": host = Interface.objects.get(pk=assigned_object.pk).device assigned_object_id = Interface.objects.get(pk=assigned_object.pk).pk else: - assigned_object = vminterface - assigned_object_type = "vminterface" - host_type = "virtual_machine" host = VMInterface.objects.get(pk=assigned_object.pk).virtual_machine assigned_object_id = VMInterface.objects.get(pk=assigned_object.pk).pk - assigned_object_type_id = ContentType.objects.get_for_model( - assigned_object, - ).pk - access_list_host = AccessList.objects.get(pk=access_list.pk).assigned_object - - # Check that an interface's parent device/virtual_machine is assigned to the Access List. - if access_list_host != host: - error_acl_not_assigned_to_host = ( - "Access List not present on selected host." - ) - error_message |= { - "access_list": [error_acl_not_assigned_to_host], - assigned_object_type: [error_acl_not_assigned_to_host], - host_type: [error_acl_not_assigned_to_host], - } - # Check for duplicate entry. - if ACLInterfaceAssignment.objects.filter( - access_list=access_list, - assigned_object_id=assigned_object_id, - assigned_object_type=assigned_object_type_id, - direction=direction, - ).exists(): - error_duplicate_entry = "An ACL with this name is already associated to this interface & direction." - error_message |= { - "access_list": [error_duplicate_entry], - "direction": [error_duplicate_entry], - assigned_object_type: [error_duplicate_entry], - } - # Check that the interface does not have an existing ACL applied in the direction already. - if ACLInterfaceAssignment.objects.filter( - assigned_object_id=assigned_object_id, - assigned_object_type=assigned_object_type_id, - direction=direction, - ).exists(): - error_interface_already_assigned = ( - "Interfaces can only have 1 Access List assigned in each direction." - ) - error_message |= { - "direction": [error_interface_already_assigned], - assigned_object_type: [error_interface_already_assigned], - } + # Get the ContentType id for the assigned object + assigned_object_type_id = ContentType.objects.get_for_model(assigned_object).pk + + if not error_message: + # Check if the parent host is assigned to the Access List + error_message |= self._check_if_interface_parent_is_assigned_to_access_list(cleaned_data.get("access_list"), assigned_object_type, host_type, host) + + if not error_message: + # Check for duplicate entries in the Access List + error_message |= self._check_for_duplicate_entry(cleaned_data.get("access_list"), assigned_object_id, assigned_object_type_id, cleaned_data.get("direction"),) + + if not error_message: + # Check if the interface already has an ACL applied in the specified direction + error_message |= self._check_if_interface_already_has_acl_in_direction(assigned_object_id, assigned_object_type_id, cleaned_data.get("direction")) if error_message: raise forms.ValidationError(error_message) - return cleaned_data + else: + return cleaned_data + + + def _get_interface_types(self): + """ + Get interface type/model assigned to the Access List. + """ + interface = self.cleaned_data.get("interface") + vminterface = self.cleaned_data.get("vminterface") + interface_types = [ + ("interface", interface), + ("vminterface", vminterface), + ] + return [x for x in interface_types if x[1]] + + def _validate_interface_types(self, interface_types): + """ + Check if number of interface type selected is 1. + """ + # Check if more than 1 hosts selected. + if len(interface_types) > 1: + return "Assignment can only be to one interface at a time (either a interface or vm_interface)." + # Check if no hosts selected. + elif not interface_types: + return "No interface or vm_interface selected." + else: + return {} + + def _check_if_interface_parent_is_assigned_to_access_list(self, access_list, assigned_object_type, host_type, host): + """ + Check that an interface's parent device/virtual_machine is assigned to the Access List. + """ + + access_list_host = AccessList.objects.get(pk=access_list.pk).assigned_object + + if access_list_host != host: + ERROR_ACL_NOT_ASSIGNED_TO_HOST = ( + "Access List not present on selected host." + ) + return { + "access_list": [ERROR_ACL_NOT_ASSIGNED_TO_HOST], + assigned_object_type: [ERROR_ACL_NOT_ASSIGNED_TO_HOST], + host_type: [ERROR_ACL_NOT_ASSIGNED_TO_HOST], + } + else: + return {} + + def _check_for_duplicate_entry(self, access_list, assigned_object_id, assigned_object_type_id, direction): + """ + Check for duplicate entry. (Because of GFK) + """ + + if ACLInterfaceAssignment.objects.filter( + access_list=access_list, + assigned_object_id=assigned_object_id, + assigned_object_type=assigned_object_type_id, + direction=direction, + ).exists(): + return {"access_list": ["Duplicate entry."]} + else: + return {} + + def _check_if_interface_already_has_acl_in_direction(self, assigned_object_id, assigned_object_type_id, direction): + """ + Check that the interface does not have an existing ACL applied in the direction already. + """ + if ACLInterfaceAssignment.objects.filter( + assigned_object_id=assigned_object_id, + assigned_object_type=assigned_object_type_id, + direction=direction, + ).exists(): + error_interface_already_assigned = ( + "Interfaces can only have 1 Access List assigned in each direction." + ) + return { + "direction": [error_interface_already_assigned], + assigned_object_type: [error_interface_already_assigned], + } + else: + return {} + def save(self, *args, **kwargs): """