diff --git a/src/krkn_lib/k8s/krkn_kubernetes.py b/src/krkn_lib/k8s/krkn_kubernetes.py index 4e95c695..920d481f 100644 --- a/src/krkn_lib/k8s/krkn_kubernetes.py +++ b/src/krkn_lib/k8s/krkn_kubernetes.py @@ -41,6 +41,7 @@ class KrknKubernetes: """ """ + request_chunk_size: int = 250 api_client: client.ApiClient = None cli: client.CoreV1Api = None batch_cli: client.BatchV1Api = None @@ -56,6 +57,7 @@ def __init__( kubeconfig_path: str = None, *, kubeconfig_string: str = None, + request_chunk_size: int = 250, ): """ KrknKubernetes Constructor. Can be invoked with kubeconfig_path @@ -65,6 +67,7 @@ def __init__( :param kubeconfig_path: kubeconfig path :param kubeconfig_string: (keyword argument) kubeconfig in string format + :param: request_chunk_size: int of chunk size to limit requests to Initialization with kubeconfig path: @@ -82,10 +85,12 @@ def __init__( "or a valid kubeconfig string" ) + self.request_chunk_size = request_chunk_size if kubeconfig_string is not None: - self.__initialize_clients_from_kconfig_string(kubeconfig_string) self.__kubeconfig_string = kubeconfig_string - else: + self.__initialize_clients_from_kconfig_string(kubeconfig_string) + + if kubeconfig_path is not None: self.__initialize_clients(kubeconfig_path) self.__kubeconfig_path = kubeconfig_path @@ -139,16 +144,14 @@ def __initialize_clients(self, kubeconfig_path: str = None): ) self.dyn_client = DynamicClient(self.k8s_client) self.watch_resource = watch.Watch() + except OSError: raise Exception( "Invalid kube-config file: {0}. " "No configuration found.".format(kubeconfig_path) ) - def __initialize_clients_from_kconfig_string( - self, - kubeconfig_str: str, - ): + def __initialize_clients_from_kconfig_string(self, kubeconfig_str: str): """ Initialize all clients from kubeconfig yaml string @@ -207,6 +210,67 @@ def get_host(self) -> str: return self.cli.api_client.configuration.get_default_copy().host + def list_continue_helper(self, func, *args, **keyword_args): + """ + List continue helper, be able to get all objects past the request limit + + :param func: function to call of the kubernetes cli + :param args: any set arguments for the function + :param keyword_args: key value pair arguments to pass to the function + :return: list of all resources after segmentation + """ + ret_overall = [] + try: + ret = func(*args, **keyword_args) + ret_overall.append(ret) + continue_string = ret.metadata._continue + + while continue_string: + ret = func(*args, **keyword_args, _continue=continue_string) + ret_overall.append(ret) + + continue_string = ret.metadata._continue + + except ApiException as e: + logging.error( + "Exception when calling CoreV1Api->%s: %s\n" % (str(func), e) + ) + + return ret_overall + + # Return of all data of namespaces + def list_all_namespaces(self, label_selector: str = None) -> list[str]: + """ + List all namespaces with info + + :param label_selector: filter by label + selector (optional default `None`) + :return: list of namespaces json data + """ + + try: + if label_selector: + ret = self.list_continue_helper( + self.cli.list_namespace, + pretty=True, + label_selector=label_selector, + limit=self.request_chunk_size, + ) + else: + ret = self.list_continue_helper( + self.cli.list_namespace, + pretty=True, + limit=self.request_chunk_size, + ) + except ApiException as e: + logging.error( + "Exception when calling CoreV1Api->list_namespaced_pod: %s\n", + str(e), + ) + raise e + + return ret + # def list_namespaces(self, label_selector: str = None) -> list[str]: """ @@ -219,20 +283,16 @@ def list_namespaces(self, label_selector: str = None) -> list[str]: namespaces = [] try: - if label_selector: - ret = self.cli.list_namespace( - pretty=True, label_selector=label_selector - ) - else: - ret = self.cli.list_namespace(pretty=True) + ret = self.list_all_namespaces(label_selector) except ApiException as e: logging.error( - "Exception when calling CoreV1Api->list_namespaced_pod: %s\n", + "Exception when calling list_namespaces: %s\n", str(e), ) raise e - for namespace in ret.items: - namespaces.append(namespace.metadata.name) + for ret_list in ret: + for namespace in ret_list.items: + namespaces.append(namespace.metadata.name) return namespaces def get_namespace_status(self, namespace_name: str) -> str: @@ -325,18 +385,26 @@ def list_nodes(self, label_selector: str = None) -> list[str]: nodes = [] try: if label_selector: - ret = self.cli.list_node( - pretty=True, label_selector=label_selector + ret = self.list_continue_helper( + self.cli.list_node, + pretty=True, + label_selector=label_selector, + limit=self.request_chunk_size, ) else: - ret = self.cli.list_node(pretty=True) + ret = self.list_continue_helper( + self.cli.list_node, + pretty=True, + limit=self.request_chunk_size, + ) except ApiException as e: logging.error( "Exception when calling CoreV1Api->list_node: %s\n", str(e) ) raise ApiRequestException(str(e)) - for node in ret.items: - nodes.append(node.metadata.name) + for ret_list in ret: + for node in ret_list.items: + nodes.append(node.metadata.name) return nodes # TODO: refactoring to work both in k8s and OpenShift @@ -423,20 +491,16 @@ def list_pods( """ pods = [] try: - if label_selector: - ret = self.cli.list_namespaced_pod( - namespace, pretty=True, label_selector=label_selector - ) - else: - ret = self.cli.list_namespaced_pod(namespace, pretty=True) + ret = self.get_all_pod_info(namespace, label_selector) except ApiException as e: logging.error( - "Exception when calling CoreV1Api->list_namespaced_pod: %s\n", + "Exception when calling list_pods: %s\n", str(e), ) raise e - for pod in ret.items: - pods.append(pod.metadata.name) + for ret_list in ret: + for pod in ret_list.items: + pods.append(pod.metadata.name) return pods def get_daemonset(self, namespace: str) -> list[str]: @@ -609,13 +673,21 @@ def get_all_pods(self, label_selector: str = None) -> list[[str, str]]: """ pods = [] if label_selector: - ret = self.cli.list_pod_for_all_namespaces( - pretty=True, label_selector=label_selector + ret = self.list_continue_helper( + self.cli.list_pod_for_all_namespaces, + pretty=True, + label_selector=label_selector, + limit=self.request_chunk_size, ) else: - ret = self.cli.list_pod_for_all_namespaces(pretty=True) - for pod in ret.items: - pods.append([pod.metadata.name, pod.metadata.namespace]) + ret = self.list_continue_helper( + self.cli.list_pod_for_all_namespaces, + pretty=True, + limit=self.request_chunk_size, + ) + for ret_list in ret: + for pod in ret_list.items: + pods.append([pod.metadata.name, pod.metadata.namespace]) return pods def get_all_statefulset(self, namespace) -> list[str]: @@ -685,6 +757,39 @@ def get_all_services(self, namespace: str) -> list[str]: services.append(serv.metadata.name) return services + # Outputs a json blob with informataion about all pods in a given namespace + def get_all_pod_info( + self, namespace: str = "default", label_selector: str = None + ) -> list[str]: + """ + Get details of all pods in a namespace + + :param namespace: namespace (optional default `default`) + :return list of pod details + """ + try: + if label_selector: + ret = self.list_continue_helper( + self.cli.list_namespaced_pod, + namespace, + pretty=True, + label_selector=label_selector, + limit=self.request_chunk_size, + ) + else: + ret = self.list_continue_helper( + self.cli.list_namespaced_pod, + namespace, + limit=self.request_chunk_size, + ) + except ApiException as e: + logging.error( + "Exception when calling CoreV1Api->list_namespaced_pod: %s\n" + % e + ) + + return ret + # to be tested, return value not sure def exec_cmd_in_pod( self, @@ -1698,41 +1803,49 @@ def get_nodes_infos(self) -> list[NodeInfo]: :return: the list of NodeInfo objects """ instance_type_label = "node.k8s.io/instance-type" - node_type_master_label = "node-role.k8s.io/master" - node_type_worker_label = "node-role.k8s.io/worker" - node_type_infra_label = "node-role.k8s.io/infra" - node_type_workload_label = "node-role.k8s.io/workload" - node_type_application_label = "node-role.k8s.io/app" + node_type_master_label = "node-role.kubernetes.io/master" + node_type_worker_label = "node-role.kubernetes.io/worker" + node_type_infra_label = "node-role.kubernetes.io/infra" + node_type_workload_label = "node-role.kubernetes.io/workload" + node_type_application_label = "node-role.kubernetes.io/app" result = list[NodeInfo]() - resp = self.cli.list_node() - for node in resp.items: - node_info = NodeInfo() - if instance_type_label in node.metadata.labels.keys(): - node_info.instance_type = node.metadata.labels[ - instance_type_label - ] - else: - node_info.instance_type = "unknown" - - if node_type_infra_label in node.metadata.labels.keys(): - node_info.node_type = "infra" - elif node_type_worker_label in node.metadata.labels.keys(): - node_info.node_type = "worker" - elif node_type_master_label in node.metadata.labels.keys(): - node_info.node_type = "master" - elif node_type_workload_label in node.metadata.labels.keys(): - node_info.node_type = "workload" - elif node_type_application_label in node.metadata.labels.keys(): - node_info.node_type = "application" - else: - node_info.node_type = "unknown" - - node_info.architecture = node.status.node_info.architecture - node_info.kernel_version = node.status.node_info.kernel_version - node_info.kubelet_version = node.status.node_info.kubelet_version - node_info.os_version = node.status.node_info.os_image - result.append(node_info) - + resp = self.list_continue_helper( + self.cli.list_node, limit=self.request_chunk_size + ) + for node_resp in resp: + for node in node_resp.items: + node_info = NodeInfo(taint=node.spec.taints) + if instance_type_label in node.metadata.labels.keys(): + node_info.instance_type = node.metadata.labels[ + instance_type_label + ] + else: + node_info.instance_type = "unknown" + + if node_type_infra_label in node.metadata.labels.keys(): + node_info.node_type = "infra" + elif node_type_worker_label in node.metadata.labels.keys(): + node_info.node_type = "worker" + elif node_type_master_label in node.metadata.labels.keys(): + node_info.node_type = "master" + elif node_type_workload_label in node.metadata.labels.keys(): + node_info.node_type = "workload" + elif ( + node_type_application_label in node.metadata.labels.keys() + ): + node_info.node_type = "application" + else: + node_info.node_type = "unknown" + + node_info.name = node.metadata.name + node_info.architecture = node.status.node_info.architecture + node_info.architecture = node.status.node_info.architecture + node_info.kernel_version = node.status.node_info.kernel_version + node_info.kubelet_version = ( + node.status.node_info.kubelet_version + ) + node_info.os_version = node.status.node_info.os_image + result.append(node_info) return result def delete_file_from_pod( diff --git a/src/krkn_lib/models/telemetry/models.py b/src/krkn_lib/models/telemetry/models.py index 34194b56..64413a99 100644 --- a/src/krkn_lib/models/telemetry/models.py +++ b/src/krkn_lib/models/telemetry/models.py @@ -76,6 +76,14 @@ class NodeInfo: """ Cluster node telemetry informations """ + taint: dict + """ + Taint on the node""" + + name: str = "" + """ + Name of the node + """ architecture: str = "" """ diff --git a/src/krkn_lib/tests/test_krkn_kubernetes.py b/src/krkn_lib/tests/test_krkn_kubernetes.py index 067970e7..e9cf5cdd 100644 --- a/src/krkn_lib/tests/test_krkn_kubernetes.py +++ b/src/krkn_lib/tests/test_krkn_kubernetes.py @@ -72,6 +72,34 @@ def test_get_kubeconfig_path(self): test_kubeconfig = test.read() self.assertEqual(test_kubeconfig, kubeconfig_str) + + def test_list_all_namespaces(self): + # test list all namespaces + result = self.lib_k8s.list_all_namespaces() + result_count = 0 + for r in result: + for item in r.items: + result_count+=1 + print('result type' + str((result_count))) + self.assertTrue(result_count > 1) + # test filter by label + result = self.lib_k8s.list_all_namespaces( + "kubernetes.io/metadata.name=default" + ) + + self.assertTrue(len(result) == 1) + namespace_names = [] + for r in result: + for item in r.items: + namespace_names.append(item.metadata.name) + self.assertIn("default", namespace_names) + + # test unexisting filter + result = self.lib_k8s.list_namespaces( + "k8s.io/metadata.name=donotexist" + ) + self.assertTrue(len(result) == 0) + def test_list_namespaces(self): # test all namespaces result = self.lib_k8s.list_namespaces()