Skip to content

Commit

Permalink
added token creation and OCP route discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
tsebastiani committed Nov 10, 2023
1 parent 9092f07 commit 485db30
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2127,3 +2127,56 @@ def collect_cluster_events(
except Exception as e:
logging.error(str(e))
return None

def create_token_for_sa(
self, namespace: str, service_account: str, token_expiration=43200
) -> Optional[str]:
"""
Creates a token for an existing ServiceAccount in a namespace
that will expire in <token_expiration> seconds (optional)
:param namespace: the namespace where the SA belongs
:param service_account: the name of the SA
:param token_expiration: the duration of the SA in seconds,
default 12h
:return: the token or None if something went wrong.
"""
body = {
"kind": "TokenRequest",
"apiVersion": "authentication.k8s.io/v1",
"spec": {
"expirationSeconds": token_expiration,
},
}

path = (
f"/api/v1/namespaces/{namespace}/"
f"serviceaccounts/{service_account}/token"
)

path_params: Dict[str, str] = {}
query_params: List[str] = []
header_params: Dict[str, str] = {}
auth_settings = ["BearerToken"]
header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
try:
(data) = self.api_client.call_api(
path,
"POST",
path_params,
query_params,
header_params,
body=body,
response_type="str",
auth_settings=auth_settings,
)
json_obj = ast.literal_eval(data[0])
return json_obj["status"]["token"]
except Exception as e:
logging.error(
f"failed to create token for SA: {service_account} "
f"on namespace: {namespace} with error: {e}"
)
return None
63 changes: 63 additions & 0 deletions src/krkn_lib/ocp/krkn_openshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import tarfile
import threading
from collections import namedtuple

from pathlib import Path
from queue import Queue
Expand Down Expand Up @@ -353,3 +354,65 @@ def collect_filter_archive_ocp_logs(
safe_logger.error(f"failed to create logs archive: {str(e)}")

return archive_name

def get_prometheus_api_connection_data(
self, namespace: str = "openshift-monitoring"
) -> Optional[
namedtuple("PrometheusConnectionData", ["token", "endpoint"]) # NOQA
]:
named_tuple = namedtuple(
"PrometheusConnectionData", ["token", "endpoint"]
)
prometheus_namespace = "openshift-monitoring"
prometheus_route = "prometheus-k8s"
token = self.create_token_for_sa(
prometheus_namespace, "prometheus-k8s"
)
if token is None:
return None

try:
path = (
"/apis/route.openshift.io/v1/"
"namespaces/openshift-monitoring/routes"
)
path_params: dict[str, str] = {}
query_params: list[str] = []
header_params: dict[str, str] = {}
auth_settings = ["BearerToken"]
header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)

(data) = self.api_client.call_api(
path,
"GET",
path_params,
query_params,
header_params,
response_type="str",
auth_settings=auth_settings,
)
json_obj = ast.literal_eval(data[0])
endpoint = None
for item in json_obj["items"]:
if item["metadata"]["name"] == "prometheus-k8s":
endpoint = item["spec"]["host"]
break

if endpoint is None:
logging.error(
f"prometheus Route not found in cluster, "
f"namespace{prometheus_namespace}, "
f"Route: {prometheus_route}"
)
return None

return named_tuple(token=token, endpoint=endpoint)
except Exception:
logging.error(
f"failed to fetch prometheus route"
f"namespace{prometheus_namespace}, "
f"Route: {prometheus_route}"
)
return None
9 changes: 9 additions & 0 deletions src/krkn_lib/tests/test_krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,15 @@ def test_collect_cluster_events(self):
obj_list = json.load(file)
self.assertTrue(len(obj_list) > 0)

def test_create_token_for_namespace(self):
token = self.lib_k8s.create_token_for_sa("default", "default")
self.assertIsNotNone(token)

not_token = self.lib_k8s.create_token_for_sa(
"do_not_exists", "do_not_exists"
)
self.assertIsNone(not_token)


if __name__ == "__main__":
unittest.main()

0 comments on commit 485db30

Please sign in to comment.