Skip to content

Commit

Permalink
fix: catch broad exceptions to make sure kubewatcher always reconnects (
Browse files Browse the repository at this point in the history
#15)

* fix: catch broad exceptions to make sure kubewatcher always reconnects

* fix: use internal logger for KubeWatcher
  • Loading branch information
bodom0015 authored Jul 12, 2023
1 parent 1ac267a commit a4400d2
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions pkg/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ def __init__(self):
self.thread = threading.Thread(target=self.run, name='kube-event-watcher', daemon=True)

self.stream = None
logger.info('Starting KubeWatcher')
self.logger.info('Starting KubeWatcher')
self.thread.start()
logger.info('Started KubeWatcher')
self.logger.info('Started KubeWatcher')

def run(self):
w = watch.Watch()
Expand All @@ -207,7 +207,7 @@ def run(self):
# Ignore kube-system namespace
# TODO: Parameterize this?
ignored_namespaces = ['kube-system']
logger.info('KubeWatcher watching all namespaces except for: ' + str(ignored_namespaces))
self.logger.info('KubeWatcher watching all namespaces except for: ' + str(ignored_namespaces))

# Include workbench app labels
# Example: 'labels': {'manager': 'workbench',
Expand All @@ -218,14 +218,14 @@ def run(self):
required_labels = {
'manager': 'workbench'
}
logger.info('KubeWatcher looking for required labels: ' + str(required_labels))
self.logger.info('KubeWatcher looking for required labels: ' + str(required_labels))

resource_version = None
k8s_event_stream = None

while True:
time.sleep(1)
logger.info('KubeWatcher is connecting...')
self.logger.info('KubeWatcher is connecting...')
try:
# Resource version is used to keep track of stream progress (in case of resume)
k8s_event_stream = w.stream(func=v1.list_pod_for_all_namespaces,
Expand All @@ -238,7 +238,7 @@ def run(self):

# Skip Pods in ignored namespaces
if event['object'].metadata.namespace in ignored_namespaces:
logger.debug('Skipping event in excluded namespace')
self.logger.debug('Skipping event in excluded namespace')
continue

# Examine labels, ignore if not workbench app
Expand All @@ -265,7 +265,7 @@ def run(self):
# sid-stackkey-svckey-deploymentsuffix-podsuffix => we want 3rd to last
if config.KUBE_WORKBENCH_SINGLEPOD:
# TODO: Status events for singlepod mode
logger.warning('Workbench cannot yet update stack status automatically when singlepod=True')
self.logger.warning('Workbench cannot yet update stack status automatically when singlepod=True')

userapp_key = segments[2]
continue
Expand All @@ -288,24 +288,25 @@ def run(self):
write_status_and_endpoints(userapp_id, username, service_key, service_status, pod_ip,
service_endpoints)

logger.info(
self.logger.info(
'UserappId=%s ServiceKey=%s type=%s phase=%s -> status=%s endpoints=%s' % (
userapp_id, service_key, type, phase, service_status, str(service_endpoints)))
except urllib3.exceptions.ProtocolError as e:
logger.error('KubeWatcher reconnecting to Kube API: %s' % str(e))
except (ApiException, HTTPError) as e:
self.logger.error('KubeWatcher reconnecting to Kube API: %s' % str(e))
if k8s_event_stream:
k8s_event_stream.close()
k8s_event_stream = None
if e.status == 410:
# Resource too old
resource_version = None
self.logger.warning("Resource too old (410) - reconnecting: " + str(e))
time.sleep(2)
continue
except (ApiException, HTTPError) as e:
except Exception as e:
self.logger.error('KubeWatcher reconnecting to Kube API: %s' % str(e))
if k8s_event_stream:
k8s_event_stream.close()
k8s_event_stream = None
logger.error("Connection to kube API failed: " + str(e))
if e.status == 410:
# Resource too old
resource_version = None
time.sleep(2)
continue

Expand Down

0 comments on commit a4400d2

Please sign in to comment.