Skip to content

Commit

Permalink
kubeconfig_string initialization restored
Browse files Browse the repository at this point in the history
  • Loading branch information
tsebastiani committed Nov 30, 2023
1 parent 3e2d602 commit 0684c54
Showing 1 changed file with 76 additions and 40 deletions.
116 changes: 76 additions & 40 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
kubeconfig_path: str = None,
*,
kubeconfig_string: str = None,
request_chunk_size: int = 250
request_chunk_size: int = 250,
):
"""
KrknKubernetes Constructor. Can be invoked with kubeconfig_path
Expand All @@ -68,7 +68,7 @@ def __init__(
:param kubeconfig_string: (keyword argument)
kubeconfig in string format
:param: request_chunk_size: int of chunk size to limit requests to
Initialization with kubeconfig path:
>>> KrknKubernetes(log_writer, "/home/test/.kube/config", )
Expand All @@ -88,7 +88,9 @@ def __init__(
self.__kubeconfig_path = kubeconfig_path
self.request_chunk_size = request_chunk_size
if kubeconfig_string is not None:
self.__kubeconfig_string = kubeconfig_string
self.__initialize_clients_from_kconfig_string(kubeconfig_string)

else:
self.__initialize_clients(kubeconfig_path)

Expand Down Expand Up @@ -142,17 +144,14 @@ def __initialize_clients(self, kubeconfig_path: str = None):
)
self.dyn_client = DynamicClient(self.k8s_client)
self.watch_resource = watch.Watch()

except OSError:
raise Exception(
"Invalid kube-config file: {0}. "
"No configuration found.".format(kubeconfig_path)
)

def __initialize_clients_from_kconfig_string(
self,
kubeconfig_str: str
):
def __initialize_clients_from_kconfig_string(self, kubeconfig_str: str):
"""
Initialize all clients from kubeconfig yaml string
Expand Down Expand Up @@ -210,8 +209,8 @@ def get_host(self) -> str:
"""

return self.cli.api_client.configuration.get_default_copy().host
def list_continue_helper(self,func, *args, **keyword_args):

def list_continue_helper(self, func, *args, **keyword_args):
"""
List continue helper, be able to get all objects past the request limit
Expand All @@ -229,15 +228,16 @@ def list_continue_helper(self,func, *args, **keyword_args):
while continue_string:
ret = func(*args, **keyword_args, _continue=continue_string)
ret_overall.append(ret)

continue_string = ret.metadata._continue

except ApiException as e:
logging.error("Exception when calling CoreV1Api->%s: %s\n" % (str(func), e))
logging.error(
"Exception when calling CoreV1Api->%s: %s\n" % (str(func), e)
)

return ret_overall


# Return of all data of namespaces
def list_all_namespaces(self, label_selector: str = None) -> list[str]:
"""
Expand All @@ -250,20 +250,26 @@ def list_all_namespaces(self, label_selector: str = None) -> list[str]:

try:
if label_selector:
ret = self.list_continue_helper(self.cli.list_namespace,
pretty=True, label_selector=label_selector,limit=self.request_chunk_size
ret = self.list_continue_helper(
self.cli.list_namespace,
pretty=True,
label_selector=label_selector,
limit=self.request_chunk_size,
)
else:
ret = self.list_continue_helper(self.cli.list_namespace,pretty=True, limit=self.request_chunk_size)
ret = self.list_continue_helper(
self.cli.list_namespace,
pretty=True,
limit=self.request_chunk_size,
)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->list_namespaced_pod: %s\n",
str(e),
)
raise e

return ret

return ret

#
def list_namespaces(self, label_selector: str = None) -> list[str]:
Expand All @@ -277,15 +283,14 @@ def list_namespaces(self, label_selector: str = None) -> list[str]:

namespaces = []
try:

ret = self.list_all_namespaces(label_selector)
except ApiException as e:
logging.error(
"Exception when calling list_namespaces: %s\n",
str(e),
)
raise e
for ret_list in ret:
for ret_list in ret:
for namespace in ret_list.items:
namespaces.append(namespace.metadata.name)
return namespaces
Expand Down Expand Up @@ -380,17 +385,24 @@ def list_nodes(self, label_selector: str = None) -> list[str]:
nodes = []
try:
if label_selector:
ret = self.list_continue_helper(self.cli.list_node,
pretty=True, label_selector=label_selector, limit=self.request_chunk_size
ret = self.list_continue_helper(
self.cli.list_node,
pretty=True,
label_selector=label_selector,
limit=self.request_chunk_size,
)
else:
ret = self.list_continue_helper(self.cli.list_node,pretty=True, limit=self.request_chunk_size)
ret = self.list_continue_helper(
self.cli.list_node,
pretty=True,
limit=self.request_chunk_size,
)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->list_node: %s\n", str(e)
)
raise ApiRequestException(str(e))
for ret_list in ret:
for ret_list in ret:
for node in ret_list.items:
nodes.append(node.metadata.name)
return nodes
Expand Down Expand Up @@ -486,7 +498,7 @@ def list_pods(
str(e),
)
raise e
for ret_list in ret:
for ret_list in ret:
for pod in ret_list.items:
pods.append(pod.metadata.name)
return pods
Expand Down Expand Up @@ -661,12 +673,19 @@ def get_all_pods(self, label_selector: str = None) -> list[[str, str]]:
"""
pods = []
if label_selector:
ret = self.list_continue_helper(self.cli.list_pod_for_all_namespaces,
pretty=True, label_selector=label_selector, limit=self.request_chunk_size
ret = self.list_continue_helper(
self.cli.list_pod_for_all_namespaces,
pretty=True,
label_selector=label_selector,
limit=self.request_chunk_size,
)
else:
ret = self.list_continue_helper(self.cli.list_pod_for_all_namespaces,pretty=True, limit=self.request_chunk_size)
for ret_list in ret:
ret = self.list_continue_helper(
self.cli.list_pod_for_all_namespaces,
pretty=True,
limit=self.request_chunk_size,
)
for ret_list in ret:
for pod in ret_list.items:
pods.append([pod.metadata.name, pod.metadata.namespace])
return pods
Expand Down Expand Up @@ -737,10 +756,11 @@ def get_all_services(self, namespace: str) -> list[str]:
for serv in ret.items:
services.append(serv.metadata.name)
return services


# Outputs a json blob with informataion about all pods in a given namespace
def get_all_pod_info(self, namespace: str = "default", label_selector: str = None) -> list[str]:
def get_all_pod_info(
self, namespace: str = "default", label_selector: str = None
) -> list[str]:
"""
Get details of all pods in a namespace
Expand All @@ -749,13 +769,24 @@ def get_all_pod_info(self, namespace: str = "default", label_selector: str = No
"""
try:
if label_selector:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,
namespace, pretty=True, label_selector=label_selector, limit=self.request_chunk_size
ret = self.list_continue_helper(
self.cli.list_namespaced_pod,
namespace,
pretty=True,
label_selector=label_selector,
limit=self.request_chunk_size,
)
else:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,namespace, limit=self.request_chunk_size)
ret = self.list_continue_helper(
self.cli.list_namespaced_pod,
namespace,
limit=self.request_chunk_size,
)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e)
logging.error(
"Exception when calling CoreV1Api->list_namespaced_pod: %s\n"
% e
)

return ret

Expand Down Expand Up @@ -1778,8 +1809,10 @@ def get_nodes_infos(self) -> list[NodeInfo]:
node_type_workload_label = "node-role.kubernetes.io/workload"
node_type_application_label = "node-role.kubernetes.io/app"
result = list[NodeInfo]()
resp = self.list_continue_helper(self.cli.list_node, limit=self.request_chunk_size)
for node_resp in resp:
resp = self.list_continue_helper(
self.cli.list_node, limit=self.request_chunk_size
)
for node_resp in resp:
for node in node_resp.items:
node_info = NodeInfo(taint=node.spec.taints)
if instance_type_label in node.metadata.labels.keys():
Expand All @@ -1797,17 +1830,20 @@ def get_nodes_infos(self) -> list[NodeInfo]:
node_info.node_type = "master"
elif node_type_workload_label in node.metadata.labels.keys():
node_info.node_type = "workload"
elif node_type_application_label in node.metadata.labels.keys():
elif (
node_type_application_label in node.metadata.labels.keys()
):
node_info.node_type = "application"
else:
node_info.node_type = "unknown"


node_info.name = node.metadata.name
node_info.architecture = node.status.node_info.architecture
node_info.architecture = node.status.node_info.architecture
node_info.kernel_version = node.status.node_info.kernel_version
node_info.kubelet_version = node.status.node_info.kubelet_version
node_info.kubelet_version = (
node.status.node_info.kubelet_version
)
node_info.os_version = node.status.node_info.os_image
result.append(node_info)
return result
Expand Down Expand Up @@ -2256,4 +2292,4 @@ def create_token_for_sa(
f"failed to create token for SA: {service_account} "
f"on namespace: {namespace} with error: {e}"
)
return None
return None

0 comments on commit 0684c54

Please sign in to comment.