Skip to content

Commit

Permalink
adding use of new common functions
Browse files Browse the repository at this point in the history
  • Loading branch information
paigerube14 committed Nov 30, 2023
1 parent 80d5c12 commit 3e2d602
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def __init__(
:param kubeconfig_path: kubeconfig path
:param kubeconfig_string: (keyword argument)
kubeconfig in string format
:param: request_chunk_size: int of chunk size to limit requests to
Initialization with kubeconfig path:
>>> KrknKubernetes(log_writer, "/home/test/.kube/config")
>>> KrknKubernetes(log_writer, "/home/test/.kube/config", )
Initialization with kubeconfig string:
>>> kubeconfig_string="apiVersion: v1 ....."
>>> KrknKubernetes(log_writer, kubeconfig_string=kubeconfig_string)
>>> KrknKubernetes(log_writer, kubeconfig_string=kubeconfig_string, r)
"""

if kubeconfig_string is not None and kubeconfig_path is not None:
Expand All @@ -85,17 +86,18 @@ def __init__(
)

self.__kubeconfig_path = kubeconfig_path
self.request_chunk_size = request_chunk_size
if kubeconfig_string is not None:
self.__initialize_clients_from_kconfig_string(kubeconfig_string, request_chunk_size)
self.__initialize_clients_from_kconfig_string(kubeconfig_string)
else:
self.__initialize_clients(kubeconfig_path,request_chunk_size)
self.__initialize_clients(kubeconfig_path)

def __del__(self):
self.api_client.rest_client.pool_manager.clear()
self.api_client.close()

# Load kubeconfig and initialize k8s python client
def __initialize_clients(self, kubeconfig_path: str = None, request_chunk_size: int = 250):
def __initialize_clients(self, kubeconfig_path: str = None):
"""
Initialize all clients from kubeconfig path
Expand Down Expand Up @@ -140,7 +142,7 @@ def __initialize_clients(self, kubeconfig_path: str = None, request_chunk_size:
)
self.dyn_client = DynamicClient(self.k8s_client)
self.watch_resource = watch.Watch()
self.request_chunk_size = request_chunk_size

except OSError:
raise Exception(
"Invalid kube-config file: {0}. "
Expand All @@ -149,8 +151,7 @@ def __initialize_clients(self, kubeconfig_path: str = None, request_chunk_size:

def __initialize_clients_from_kconfig_string(
self,
kubeconfig_str: str,
request_chunk_size: int
kubeconfig_str: str
):
"""
Initialize all clients from kubeconfig yaml string
Expand All @@ -174,7 +175,6 @@ def __initialize_clients_from_kconfig_string(
self.api_client
)
self.dyn_client = DynamicClient(self.api_client)
self.request_chunk_size = request_chunk_size
except ApiException as e:
logging.error("Failed to initialize k8s client: %s\n", str(e))
raise e
Expand Down Expand Up @@ -277,15 +277,11 @@ def list_namespaces(self, label_selector: str = None) -> list[str]:

namespaces = []
try:
if label_selector:
ret = self.list_continue_helper(self.cli.list_namespace,
pretty=True, label_selector=label_selector,limit=self.request_chunk_size
)
else:
ret = self.list_continue_helper(self.cli.list_namespace,pretty=True, limit=self.request_chunk_size)

ret = self.list_all_namespaces(label_selector)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->list_namespaced_pod: %s\n",
"Exception when calling list_namespaces: %s\n",
str(e),
)
raise e
Expand Down Expand Up @@ -483,15 +479,10 @@ def list_pods(
"""
pods = []
try:
if label_selector:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,
namespace, pretty=True, label_selector=label_selector, limit=self.request_chunk_size
)
else:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,namespace, pretty=True, limit=self.request_chunk_size)
ret = self.get_all_pod_info(namespace, label_selector)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->list_namespaced_pod: %s\n",
"Exception when calling list_pods: %s\n",
str(e),
)
raise e
Expand Down Expand Up @@ -749,15 +740,20 @@ def get_all_services(self, namespace: str) -> list[str]:


# Outputs a json blob with informataion about all pods in a given namespace
def get_all_pod_info(self, namespace: str = "default") -> list[str]:
def get_all_pod_info(self, namespace: str = "default", label_selector: str = None) -> list[str]:
"""
Get details of all pods in a namespace
:param namespace: namespace (optional default `default`)
:return list of pod details
"""
try:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,namespace, limit=self.request_chunk_size)
if label_selector:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,
namespace, pretty=True, label_selector=label_selector, limit=self.request_chunk_size
)
else:
ret = self.list_continue_helper(self.cli.list_namespaced_pod,namespace, limit=self.request_chunk_size)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e)

Expand Down

0 comments on commit 3e2d602

Please sign in to comment.