diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 5d6c978c..4e95c695 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -2127,3 +2127,56 @@ def collect_cluster_events( except Exception as e: logging.error(str(e)) return None + + def create_token_for_sa( + self, namespace: str, service_account: str, token_expiration=43200 + ) -> Optional[str]: + """ + Creates a token for an existing ServiceAccount in a namespace + that will expire in seconds (optional) + + :param namespace: the namespace where the SA belongs + :param service_account: the name of the SA + :param token_expiration: the duration of the SA in seconds, + default 12h + :return: the token or None if something went wrong. + """ + body = { + "kind": "TokenRequest", + "apiVersion": "authentication.k8s.io/v1", + "spec": { + "expirationSeconds": token_expiration, + }, + } + + path = ( + f"/api/v1/namespaces/{namespace}/" + f"serviceaccounts/{service_account}/token" + ) + + path_params: Dict[str, str] = {} + query_params: List[str] = [] + header_params: Dict[str, str] = {} + auth_settings = ["BearerToken"] + header_params["Accept"] = self.api_client.select_header_accept( + ["application/json"] + ) + try: + (data) = self.api_client.call_api( + path, + "POST", + path_params, + query_params, + header_params, + body=body, + response_type="str", + auth_settings=auth_settings, + ) + json_obj = ast.literal_eval(data[0]) + return json_obj["status"]["token"] + except Exception as e: + logging.error( + f"failed to create token for SA: {service_account} " + f"on namespace: {namespace} with error: {e}" + ) + return None diff --git a/src/krkn_lib/ocp/krkn_openshift.py b/src/krkn_lib/ocp/krkn_openshift.py index 6b6c1b2b..353c4fe9 100644 --- a/src/krkn_lib/ocp/krkn_openshift.py +++ b/src/krkn_lib/ocp/krkn_openshift.py @@ -5,6 +5,7 @@ import subprocess import tarfile import threading +from collections import namedtuple from pathlib import Path from queue import Queue @@ -353,3 +354,65 @@ def collect_filter_archive_ocp_logs( safe_logger.error(f"failed to create logs archive: {str(e)}") return archive_name + + def get_prometheus_api_connection_data( + self, namespace: str = "openshift-monitoring" + ) -> Optional[ + namedtuple("PrometheusConnectionData", ["token", "endpoint"]) # NOQA + ]: + named_tuple = namedtuple( + "PrometheusConnectionData", ["token", "endpoint"] + ) + prometheus_namespace = "openshift-monitoring" + prometheus_route = "prometheus-k8s" + token = self.create_token_for_sa( + prometheus_namespace, "prometheus-k8s" + ) + if token is None: + return None + + try: + path = ( + "/apis/route.openshift.io/v1/" + "namespaces/openshift-monitoring/routes" + ) + path_params: dict[str, str] = {} + query_params: list[str] = [] + header_params: dict[str, str] = {} + auth_settings = ["BearerToken"] + header_params["Accept"] = self.api_client.select_header_accept( + ["application/json"] + ) + + (data) = self.api_client.call_api( + path, + "GET", + path_params, + query_params, + header_params, + response_type="str", + auth_settings=auth_settings, + ) + json_obj = ast.literal_eval(data[0]) + endpoint = None + for item in json_obj["items"]: + if item["metadata"]["name"] == "prometheus-k8s": + endpoint = item["spec"]["host"] + break + + if endpoint is None: + logging.error( + f"prometheus Route not found in cluster, " + f"namespace{prometheus_namespace}, " + f"Route: {prometheus_route}" + ) + return None + + return named_tuple(token=token, endpoint=endpoint) + except Exception: + logging.error( + f"failed to fetch prometheus route" + f"namespace{prometheus_namespace}, " + f"Route: {prometheus_route}" + ) + return None diff --git a/src/krkn_lib/tests/test_krkn_kubernetes.py b/src/krkn_lib/tests/test_krkn_kubernetes.py index f6ab1b62..067970e7 100644 --- a/src/krkn_lib/tests/test_krkn_kubernetes.py +++ b/src/krkn_lib/tests/test_krkn_kubernetes.py @@ -746,6 +746,15 @@ def test_collect_cluster_events(self): obj_list = json.load(file) self.assertTrue(len(obj_list) > 0) + def test_create_token_for_namespace(self): + token = self.lib_k8s.create_token_for_sa("default", "default") + self.assertIsNotNone(token) + + not_token = self.lib_k8s.create_token_for_sa( + "do_not_exists", "do_not_exists" + ) + self.assertIsNone(not_token) + if __name__ == "__main__": unittest.main()