Skip to content

Commit

Permalink
Merge pull request #492 from perftool-incubator/dev-kmr
Browse files Browse the repository at this point in the history
update remotehosts.py to formally name the threads it launches
  • Loading branch information
k-rister authored May 6, 2024
2 parents 74c8371 + efdd6e7 commit f76281d
Showing 1 changed file with 70 additions and 26 deletions.
96 changes: 70 additions & 26 deletions endpoints/remotehosts/remotehosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tempfile
import threading
import time
import traceback

TOOLBOX_HOME = os.environ.get('TOOLBOX_HOME')
if TOOLBOX_HOME is None:
Expand Down Expand Up @@ -101,7 +102,7 @@ def remote_connection(host, user):
validate_comment(msg)
else:
log.info(msg)
break
break
except (ssh_exception.AuthenticationException, ssh_exception.NoValidConnectionsError) as e:
msg = "Failed to connect to remote '%s' as user '%s' on attempt %d due to '%s'" % (host, user, attempt, str(e))
if args.validate:
Expand Down Expand Up @@ -1276,7 +1277,8 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "IPWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting image pull thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1287,10 +1289,10 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1
Expand Down Expand Up @@ -1426,20 +1428,29 @@ def create_thread_pool(description, acronym, work, worker_threads_count, worker_
if work.empty():
thread_logger("MAIN", "Aborting %s launch because no more work to do" % (acronym))
break
thread_logger("MAIN", "Creating and starting %s with thread ID %d" % (acronym, thread_id))
worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs))
worker_threads[thread_id].start()
thread_name = "%s-%d" % (acronym, thread_id)
thread_logger("MAIN", "Creating and starting thread %s" % (thread_name))
try:
worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs), name = thread_name)
worker_threads[thread_id].start()
except RuntimeError as e:
thread_logger("MAIN", "Failed to create and start thread %s due to exception '%s'" % (thread_name, str(e)), log_level = "error")

thread_logger("MAIN", "Waiting for all %s work jobs to be consumed" % (acronym))
work.join()
thread_logger("MAIN", "All %s work jobs have been consumed" % (acronym))

thread_logger("MAIN", "Joining %s" % (acronym))
for thread_id in range(0, worker_threads_count):
if worker_threads[thread_id] is None:
break
worker_threads[thread_id].join()
thread_logger("MAIN", "Joined %s with thread ID %d" % (acronym, thread_id))
thread_name = "%s-%d" % (acronym, thread_id)
if not worker_threads[thread_id] is None:
if not worker_threads[thread_id].native_id is None:
worker_threads[thread_id].join()
thread_logger("MAIN", "Joined thread %s" % (thread_name))
else:
thread_logger("MAIN", "Skipping join of thread %s because it was not started" % (thread_name), log_level = "warning")
else:
thread_logger("MAIN", "Skipping join of thread %s because it does not exist" % (thread_name), log_level = "warning")

thread_logger("MAIN", "Return codes for each %s:\n%s" % (acronym, dump_json(worker_threads_rcs)))

Expand Down Expand Up @@ -1500,7 +1511,8 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "RWMT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting remote mkdirs thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1511,10 +1523,10 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1
Expand Down Expand Up @@ -1952,7 +1964,8 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "LEWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting launch engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -1963,10 +1976,11 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs):
remote_idx = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote_idx is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3295,7 +3309,8 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "SEWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting shutdown engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3306,10 +3321,11 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs):
remote_idx = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote_idx is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3409,7 +3425,8 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "IMWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting image management thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3420,10 +3437,11 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3500,7 +3518,8 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs):
Returns:
None
"""
thread_name = "CSWT-%d" % (thread_id)
thread = threading.current_thread()
thread_name = thread.name
thread_logger(thread_name, "Starting collect sysinfo thread with thread ID %d and name = '%s'" % (thread_id, thread_name))
rc = 0
job_count = 0
Expand All @@ -3511,10 +3530,11 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs):
remote = work_queue.get(block = False)
except queue.Empty:
thread_logger(thread_name, "Received a work queue empty exception")
continue
break

if remote is None:
thread_logger(thread_name, "Received a null job")
thread_logger(thread_name, "Received a null job", log_level = "warning")
continue

job_count += 1

Expand Down Expand Up @@ -3702,6 +3722,28 @@ def setup_logger():

return logging.getLogger(__file__)

def thread_exception_hook(args):
"""
Generic thread exception handler
Args:
args (namespace): information about the exception being handled
Globals:
log: a logger instance
Returns:
None
"""
thread_name = "UNKNOWN"
if not args.thread is None:
thread_name = args.thread.name

msg = "[Thread %s] Thread failed with exception:\ntype: %s\nvalue: %s\ntraceback:\n%s" % (thread_name, args.exc_type, args.exc_value, "".join(traceback.format_list(traceback.extract_tb(args.exc_traceback))))
log.error(msg, stacklevel = 3)

return

def main():
"""
Main control block
Expand All @@ -3719,6 +3761,8 @@ def main():
global log
global settings

threading.excepthook = thread_exception_hook

if args.validate:
return(validate())

Expand Down

0 comments on commit f76281d

Please sign in to comment.