Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update remotehosts.py to formally name the threads it launches #492

Merged
merged 5 commits into from
May 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 70 additions & 26 deletions endpoints/remotehosts/remotehosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tempfile
import threading
import time
import traceback

TOOLBOX_HOME = os.environ.get('TOOLBOX_HOME')
if TOOLBOX_HOME is None:
Expand Down Expand Up @@ -101,7 +102,7 @@ def remote_connection(host, user):
validate_comment(msg)
else:
log.info(msg)
break
k-rister marked this conversation as resolved.
Show resolved Hide resolved
break
except (ssh_exception.AuthenticationException, ssh_exception.NoValidConnectionsError) as e:
msg = "Failed to connect to remote '%s' as user '%s' on attempt %d due to '%s'" % (host, user, attempt, str(e))
if args.validate:
Expand Down Expand Up @@ -1276,7 +1277,8 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "IPWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting image pull thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1287,10 +1289,10 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1
Expand Down Expand Up @@ -1426,20 +1428,29 @@ def create_thread_pool(description, acronym, work, worker_threads_count, worker_
if work.empty():
thread_logger("MAIN", "Aborting %s launch because no more work to do" % (acronym))
break
thread_logger("MAIN", "Creating and starting %s with thread ID %d" % (acronym, thread_id))
worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs))
worker_threads[thread_id].start()
thread_name = "%s-%d" % (acronym, thread_id)
thread_logger("MAIN", "Creating and starting thread %s" % (thread_name))
try:
worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs), name = thread_name)
worker_threads[thread_id].start()
except RuntimeError as e:
thread_logger("MAIN", "Failed to create and start thread %s due to exception '%s'" % (thread_name, str(e)), log_level = "error")

thread_logger("MAIN", "Waiting for all %s work jobs to be consumed" % (acronym))
work.join()
thread_logger("MAIN", "All %s work jobs have been consumed" % (acronym))

thread_logger("MAIN", "Joining %s" % (acronym))
for thread_id in range(0, worker_threads_count):
if worker_threads[thread_id] is None:
break
worker_threads[thread_id].join()
thread_logger("MAIN", "Joined %s with thread ID %d" % (acronym, thread_id))
thread_name = "%s-%d" % (acronym, thread_id)
if not worker_threads[thread_id] is None:
if not worker_threads[thread_id].native_id is None:
worker_threads[thread_id].join()
thread_logger("MAIN", "Joined thread %s" % (thread_name))
else:
thread_logger("MAIN", "Skipping join of thread %s because it was not started" % (thread_name), log_level = "warning")
else:
thread_logger("MAIN", "Skipping join of thread %s because it does not exist" % (thread_name), log_level = "warning")

thread_logger("MAIN", "Return codes for each %s:\n%s" % (acronym, dump_json(worker_threads_rcs)))

Expand Down Expand Up @@ -1500,7 +1511,8 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "RWMT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting remote mkdirs thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1511,10 +1523,10 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1
Expand Down Expand Up @@ -1952,7 +1964,8 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "LEWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting launch engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1963,10 +1976,11 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs):
remote_idx = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote_idx is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3295,7 +3309,8 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "SEWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting shutdown engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3306,10 +3321,11 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs):
remote_idx = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote_idx is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3409,7 +3425,8 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "IMWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting image management thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3420,10 +3437,11 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3500,7 +3518,8 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "CSWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting collect sysinfo thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3511,10 +3530,11 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3702,6 +3722,28 @@ def setup_logger():

return logging.getLogger(__file__)

def thread_exception_hook(args):
"""
Generic thread exception handler

Args:
args (namespace): information about the exception being handled

Globals:
log: a logger instance

Returns:
None
"""
thread_name = "UNKNOWN"
if not args.thread is None:
thread_name = args.thread.name

msg = "[Thread %s] Thread failed with exception:\ntype: %s\nvalue: %s\ntraceback:\n%s" % (thread_name, args.exc_type, args.exc_value, "".join(traceback.format_list(traceback.extract_tb(args.exc_traceback))))
log.error(msg, stacklevel = 3)

return

def main():
"""
Main control block
Expand All @@ -3719,6 +3761,8 @@ def main():
global log
global settings

threading.excepthook = thread_exception_hook

if args.validate:
return(validate())

Expand Down
Loading