Skip to content

Commit

Permalink
improve ACLInterfaceAssignmentForm clean method
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanmerolle committed Jan 30, 2023
1 parent 1861c56 commit ced12c5
Showing 1 changed file with 119 additions and 79 deletions.
198 changes: 119 additions & 79 deletions netbox_acls/forms/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,24 @@ def clean(self):
acl_type = cleaned_data.get("type")

# Check if more than one host type selected.
host_types = self.get_host_types()
host_types = self._get_host_types()

# Check if no hosts selected.
self.validate_host_types(host_types)
self._validate_host_types(host_types)

host_type, host = host_types[0]

# Check if duplicate entry.
self.validate_duplicate_entry(name, host_type, host, error_message)
self._validate_duplicate_entry(name, host_type, host, error_message)
# Check if Access List has no existing rules before change the Access List's type.
self.validate_acl_type_change(acl_type, error_message)
self._validate_acl_type_change(acl_type, error_message)

if error_message:
raise forms.ValidationError(error_message)

return cleaned_data

def get_host_types(self):
def _get_host_types(self):
"""
Get host type assigned to the Access List.
"""
Expand All @@ -213,7 +213,7 @@ def get_host_types(self):
]
return [x for x in host_types if x[1]]

def validate_host_types(self, host_types):
def _validate_host_types(self, host_types):
"""
Check number of host types selected.
"""
Expand All @@ -227,7 +227,7 @@ def validate_host_types(self, host_types):
"Access Lists must be assigned to a device, virtual chassis or virtual machine.",
)

def validate_duplicate_entry(self, name, host_type, host, error_message):
def _validate_duplicate_entry(self, name, host_type, host, error_message):
"""
Check if duplicate entry. (Because of GFK.)
"""
Expand All @@ -246,7 +246,7 @@ def validate_duplicate_entry(self, name, host_type, host, error_message):
"name": [error_same_acl_name],
}

def validate_acl_type_change(self, acl_type, error_message):
def _validate_acl_type_change(self, acl_type, error_message):
"""
Check if Access List has no existing rules before change the Access List's type.
"""
Expand Down Expand Up @@ -374,6 +374,7 @@ class Meta:
),
}


def clean(self):
"""
Validates form inputs before submitting:
Expand All @@ -384,87 +385,126 @@ def clean(self):
- Check for duplicate entry. (Because of GFK)
- Check that the interface does not have an existing ACL applied in the direction already.
"""
# Call the parent class's `clean` method
cleaned_data = super().clean()
error_message = {}
access_list = cleaned_data.get("access_list")
direction = cleaned_data.get("direction")
interface = cleaned_data.get("interface")
vminterface = cleaned_data.get("vminterface")

# Check if both interface and vminterface are set.
if interface and vminterface:
error_too_many_interfaces = "Access Lists must be assigned to one type of interface at a time (VM interface or physical interface)"
error_message |= {
"interface": [error_too_many_interfaces],
"vminterface": [error_too_many_interfaces],
}
elif not (interface or vminterface):
error_no_interface = (
"An Access List assignment but specify an Interface or VM Interface."
)
error_message |= {
"interface": [error_no_interface],
"vminterface": [error_no_interface],
}
else:
if interface:
assigned_object = interface
assigned_object_type = "interface"
host_type = "device"

# Get the interface types assigned to the Access List
interface_types = self._get_interface_types()

# Initialize an error message variable
error_message = self._validate_interface_types(interface_types)

if not error_message:
assigned_object_type, assigned_object = interface_types[0]
host_type = ("device" if assigned_object_type == "interface" else "virtual_machine")

# Get the parent host (device or virtual machine) of the assigned interface
if assigned_object_type == "interface":
host = Interface.objects.get(pk=assigned_object.pk).device
assigned_object_id = Interface.objects.get(pk=assigned_object.pk).pk
else:
assigned_object = vminterface
assigned_object_type = "vminterface"
host_type = "virtual_machine"
host = VMInterface.objects.get(pk=assigned_object.pk).virtual_machine
assigned_object_id = VMInterface.objects.get(pk=assigned_object.pk).pk

assigned_object_type_id = ContentType.objects.get_for_model(
assigned_object,
).pk
access_list_host = AccessList.objects.get(pk=access_list.pk).assigned_object

# Check that an interface's parent device/virtual_machine is assigned to the Access List.
if access_list_host != host:
error_acl_not_assigned_to_host = (
"Access List not present on selected host."
)
error_message |= {
"access_list": [error_acl_not_assigned_to_host],
assigned_object_type: [error_acl_not_assigned_to_host],
host_type: [error_acl_not_assigned_to_host],
}
# Check for duplicate entry.
if ACLInterfaceAssignment.objects.filter(
access_list=access_list,
assigned_object_id=assigned_object_id,
assigned_object_type=assigned_object_type_id,
direction=direction,
).exists():
error_duplicate_entry = "An ACL with this name is already associated to this interface & direction."
error_message |= {
"access_list": [error_duplicate_entry],
"direction": [error_duplicate_entry],
assigned_object_type: [error_duplicate_entry],
}
# Check that the interface does not have an existing ACL applied in the direction already.
if ACLInterfaceAssignment.objects.filter(
assigned_object_id=assigned_object_id,
assigned_object_type=assigned_object_type_id,
direction=direction,
).exists():
error_interface_already_assigned = (
"Interfaces can only have 1 Access List assigned in each direction."
)
error_message |= {
"direction": [error_interface_already_assigned],
assigned_object_type: [error_interface_already_assigned],
}
# Get the ContentType id for the assigned object
assigned_object_type_id = ContentType.objects.get_for_model(assigned_object).pk

if not error_message:
# Check if the parent host is assigned to the Access List
error_message |= self._check_if_interface_parent_is_assigned_to_access_list(cleaned_data.get("access_list"), assigned_object_type, host_type, host)

if not error_message:
# Check for duplicate entries in the Access List
error_message |= self._check_for_duplicate_entry(cleaned_data.get("access_list"), assigned_object_id, assigned_object_type_id, cleaned_data.get("direction"),)

if not error_message:
# Check if the interface already has an ACL applied in the specified direction
error_message |= self._check_if_interface_already_has_acl_in_direction(assigned_object_id, assigned_object_type_id, cleaned_data.get("direction"))

if error_message:
raise forms.ValidationError(error_message)
return cleaned_data
else:
return cleaned_data


def _get_interface_types(self):
"""
Get interface type/model assigned to the Access List.
"""
interface = self.cleaned_data.get("interface")
vminterface = self.cleaned_data.get("vminterface")
interface_types = [
("interface", interface),
("vminterface", vminterface),
]
return [x for x in interface_types if x[1]]

def _validate_interface_types(self, interface_types):
"""
Check if number of interface type selected is 1.
"""
# Check if more than 1 hosts selected.
if len(interface_types) > 1:
return "Assignment can only be to one interface at a time (either a interface or vm_interface)."
# Check if no hosts selected.
elif not interface_types:
return "No interface or vm_interface selected."
else:
return {}

def _check_if_interface_parent_is_assigned_to_access_list(self, access_list, assigned_object_type, host_type, host):
"""
Check that an interface's parent device/virtual_machine is assigned to the Access List.
"""

access_list_host = AccessList.objects.get(pk=access_list.pk).assigned_object

if access_list_host != host:
ERROR_ACL_NOT_ASSIGNED_TO_HOST = (
"Access List not present on selected host."
)
return {
"access_list": [ERROR_ACL_NOT_ASSIGNED_TO_HOST],
assigned_object_type: [ERROR_ACL_NOT_ASSIGNED_TO_HOST],
host_type: [ERROR_ACL_NOT_ASSIGNED_TO_HOST],
}
else:
return {}

def _check_for_duplicate_entry(self, access_list, assigned_object_id, assigned_object_type_id, direction):
"""
Check for duplicate entry. (Because of GFK)
"""

if ACLInterfaceAssignment.objects.filter(
access_list=access_list,
assigned_object_id=assigned_object_id,
assigned_object_type=assigned_object_type_id,
direction=direction,
).exists():
return {"access_list": ["Duplicate entry."]}
else:
return {}

def _check_if_interface_already_has_acl_in_direction(self, assigned_object_id, assigned_object_type_id, direction):
"""
Check that the interface does not have an existing ACL applied in the direction already.
"""
if ACLInterfaceAssignment.objects.filter(
assigned_object_id=assigned_object_id,
assigned_object_type=assigned_object_type_id,
direction=direction,
).exists():
error_interface_already_assigned = (
"Interfaces can only have 1 Access List assigned in each direction."
)
return {
"direction": [error_interface_already_assigned],
assigned_object_type: [error_interface_already_assigned],
}
else:
return {}


def save(self, *args, **kwargs):
"""
Expand Down

0 comments on commit ced12c5

Please sign in to comment.