Skip to content

Commit

Permalink
Cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Alina Buzachis <[email protected]>
  • Loading branch information
alinabuzachis committed Aug 20, 2024
1 parent e7fd3c0 commit b3178ba
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 164 deletions.
32 changes: 23 additions & 9 deletions plugins/module_utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,30 @@

from __future__ import absolute_import, division, print_function

from typing import Any, Optional, Union

__metaclass__ = type


def to_list_of_dict(
result: Optional[Union[dict[str, bool], list[dict[Any, Any]]]]
) -> list[dict]:
if result is None:
return []
if not isinstance(result, list):
return [result]
from typing import Any, Optional

from ansible.module_utils.basic import AnsibleModule

from ..module_utils.controller import Controller
from ..module_utils.errors import EDAError


def lookup_resource_id(
module: AnsibleModule,
controller: Controller,
endpoint: str,
name: str,
params: Optional[dict[str, Any]] = None,
):
result = None

try:
result = controller.resolve_name_to_id(
endpoint, name, **params if params is not None else {}
)
except EDAError as e:
module.fail_json(msg=f"Failed to lookup resource: {e}")
return result
88 changes: 36 additions & 52 deletions plugins/module_utils/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
from __future__ import absolute_import, annotations, division, print_function

from typing import Any, List, NoReturn, Optional, Union
__metaclass__ = type

from ansible.module_utils.basic import AnsibleModule

from .client import Client
from typing import Any, List, NoReturn, Optional

__metaclass__ = type
from ansible.module_utils.basic import AnsibleModule

from .client import Client
from .errors import EDAError


Expand Down Expand Up @@ -76,43 +76,49 @@ def get_item_name(self, item):
raise EDAError(msg)

def fail_wanted_one(self, response, endpoint, query_params) -> NoReturn:
sample = response.json.copy()
if len(sample["results"]) > 1:
sample["results"] = sample["results"][:2] + ["...more results snipped..."]
url = self.client.build_url(endpoint, query_params)
host_length = len(self.client.host)
display_endpoint = url.geturl()[
host_length:
] # truncate to not include the base URL
msg = f"Request to {display_endpoint} returned {response.json['count']} items, expected 1"
msg = (
f"Request to {display_endpoint} returned {len(response)} items, expected 1"
)
raise EDAError(msg)

def get_exactly_one(
self, endpoint, name=None, **kwargs
) -> Optional[dict[str, bool]]:
result = self.get_one_or_many(
endpoint, name=name, allow_none=True, want_one=True, **kwargs
)
# typing: needed as get_one_or_many can also return lists, not expected
# to reach this ever.
if isinstance(result, list): # pragma: no cover
self.fail_wanted_one(result, endpoint, {})
return result
def get_exactly_one(self, endpoint, name=None, **kwargs) -> dict[str, bool]:
new_kwargs = kwargs.copy()

result = self.get_one_or_many(endpoint, name=name, **kwargs)

if len(result) == 0:
return {}

if len(result) > 1:
if name:
# Since we did a name or ID search and got > 1 return
# something if the id matches
for asset in result:
if str(asset["id"]) == name:
return asset
# We got > 1 and either didn't find something by ID (which means
# multiple names)
# Or we weren't running with a or search and just got back too
# many to begin with.
self.fail_wanted_one(result, endpoint, new_kwargs.get("data"))

def resolve_name_to_id(self, endpoint, name):
return self.get_exactly_one(endpoint, name)["id"]
return result[0]

def resolve_name_to_id(self, endpoint, name, **kwargs):
return self.get_exactly_one(endpoint, name, **kwargs)["id"]

def get_one_or_many(
self,
endpoint: str,
name: Optional[str] = None,
allow_none: bool = True,
check_exists: bool = False,
want_one: bool = True,
**kwargs: Any,
) -> Union[None, dict[str, bool], List]:
) -> List[Any]:
new_kwargs = kwargs.copy()
response = None

if name:
name_field = self.get_name_field_from_endpoint(endpoint)
Expand All @@ -133,31 +139,9 @@ def get_one_or_many(
raise EDAError("The endpoint did not provide count, results")

if response.json["count"] == 0:
if allow_none:
return None
self.fail_wanted_one(response, endpoint, new_kwargs.get("data"))
if response.json["count"] == 1:
return response.json["results"][0]
if response.json["count"] > 1:
if want_one:
if name:
# Since we did a name or ID search and got > 1 return
# something if the id matches
for asset in response.json["results"]:
if str(asset["id"]) == name:
return asset
# We got > 1 and either didn't find something by ID (which means
# multiple names)
# Or we weren't running with a or search and just got back too
# many to begin with.
self.fail_wanted_one(response, endpoint, new_kwargs.get("data"))
else:
return response.json["results"]
return []

if check_exists:
self.result["id"] = response.json["results"][0]["id"]
return self.result
return None
return response.json["results"]

def create_if_needed(
self,
Expand Down Expand Up @@ -304,7 +288,7 @@ def update_if_needed(
# to you to process the response and exit from the module.
# Note: common error codes from the EDA API can cause the module to fail
response = None
if existing_item is None:
if not existing_item:
raise RuntimeError(
"update_if_needed called incorrectly without existing_item"
)
Expand Down Expand Up @@ -387,7 +371,7 @@ def create_or_update_if_needed(
)

def delete_if_needed(self, existing_item, endpoint, on_delete=None):
if existing_item is None:
if not existing_item:
return self.result

# If we have an item, we can try to delete it
Expand Down
47 changes: 22 additions & 25 deletions plugins/modules/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,50 +151,43 @@

from ..module_utils.arguments import AUTH_ARGSPEC
from ..module_utils.client import Client
from ..module_utils.common import lookup_resource_id
from ..module_utils.controller import Controller
from ..module_utils.errors import EDAError


def lookup(module: AnsibleModule, controller: Controller, endpoint, name):
result = None
try:
result = controller.resolve_name_to_id(endpoint, name)
except EDAError as e:
module.fail_json(msg=f"Failed to lookup resource: {e}")
return result


def create_params(module: AnsibleModule, controller: Controller) -> dict[str, Any]:
activation_params: dict[str, Any] = {}

# Get the project id
project_id = None
if module.params.get("project_name"):
project_id = lookup(
project_id = lookup_resource_id(
module, controller, "projects", module.params["project_name"]
)
if project_id is not None:
activation_params["project_id"] = project_id

# Get the rulebook id
rulebook = None
rulebook_id = None
params = {}
if project_id is not None:
params = {"data": {"project_id": project_id}}
if module.params.get("rulebook_name"):
try:
rulebook = controller.get_exactly_one(
"rulebooks", name=module.params["rulebook_name"], **params
)
except EDAError as e:
module.fail_json(msg=f"Failed to lookup rulebook: {e}")
if rulebook is not None:
activation_params["rulebook_id"] = rulebook["id"]
rulebook_id = lookup_resource_id(
module,
controller,
"rulebooks",
module.params["rulebook_name"],
params,
)
if rulebook_id is not None:
activation_params["rulebook_id"] = rulebook_id

# Get the decision environment id
decision_environment_id = None
if module.params.get("decision_environment_name"):
decision_environment_id = lookup(
decision_environment_id = lookup_resource_id(
module,
controller,
"decision-environments",
Expand All @@ -206,7 +199,7 @@ def create_params(module: AnsibleModule, controller: Controller) -> dict[str, An
# Get the organization id
organization_id = None
if module.params.get("organization_name"):
organization_id = lookup(
organization_id = lookup_resource_id(
module, controller, "organizations", module.params["organization_name"]
)
if organization_id is not None:
Expand All @@ -221,7 +214,7 @@ def create_params(module: AnsibleModule, controller: Controller) -> dict[str, An
# Get the AWX token id
awx_token_id = None
if module.params.get("awx_token_name"):
awx_token_id = lookup(
awx_token_id = lookup_resource_id(
module, controller, "/users/me/awx-tokens/", module.params["awx_token_name"]
)
if awx_token_id is not None:
Expand All @@ -241,7 +234,7 @@ def create_params(module: AnsibleModule, controller: Controller) -> dict[str, An
if module.params.get("eda_credentials"):
eda_credential_ids = []
for item in module.params["eda_credentials"]:
cred_id = lookup(module, controller, "eda-credentials", item)
cred_id = lookup_resource_id(module, controller, "eda-credentials", item)
if cred_id is not None:
eda_credential_ids.append(cred_id)

Expand All @@ -256,7 +249,7 @@ def create_params(module: AnsibleModule, controller: Controller) -> dict[str, An
if module.params.get("webhooks"):
webhooks_ids = []
for item in module.params["webhooks"]:
webhook_id = lookup(module, controller, "webhooks", item)
webhook_id = lookup_resource_id(module, controller, "webhooks", item)
if webhook_id is not None:
webhooks_ids.append(webhook_id)
if webhooks_ids is not None:
Expand Down Expand Up @@ -303,7 +296,11 @@ def main() -> None:
argument_spec.update(AUTH_ARGSPEC)

required_if = [
("state", "present", ("name", "rulebook_name", "decision_environment_name", "organization_name"))
(
"state",
"present",
("name", "rulebook_name", "decision_environment_name", "organization_name"),
)
]

module = AnsibleModule(
Expand Down
5 changes: 2 additions & 3 deletions plugins/modules/activation_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

from ..module_utils.arguments import AUTH_ARGSPEC
from ..module_utils.client import Client
from ..module_utils.common import to_list_of_dict
from ..module_utils.controller import Controller
from ..module_utils.errors import EDAError

Expand All @@ -109,11 +108,11 @@ def main() -> None:

# Attempt to look up credential based on the provided name
try:
result = controller.get_one_or_many("activations", name=name, want_one=False)
result = controller.get_one_or_many("activations", name=name)
except EDAError as e:
module.fail_json(msg=f"Failed to get rulebook activations: {e}")

module.exit_json(activations=to_list_of_dict(result))
module.exit_json(activations=result)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion plugins/modules/controller_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def main() -> None:
ret = {}

try:
token_type = controller.get_one_or_many(token_endpoint, name=token_name)
token_type = controller.get_exactly_one(token_endpoint, name=token_name)
except EDAError as eda_err:
module.fail_json(msg=str(eda_err))

Expand Down
Loading

0 comments on commit b3178ba

Please sign in to comment.