diff --git a/endpoints/remotehosts/remotehosts.py b/endpoints/remotehosts/remotehosts.py index dc445a88..762c9cdd 100755 --- a/endpoints/remotehosts/remotehosts.py +++ b/endpoints/remotehosts/remotehosts.py @@ -21,6 +21,7 @@ import tempfile import threading import time +import traceback TOOLBOX_HOME = os.environ.get('TOOLBOX_HOME') if TOOLBOX_HOME is None: @@ -101,7 +102,7 @@ def remote_connection(host, user): validate_comment(msg) else: log.info(msg) - break + break except (ssh_exception.AuthenticationException, ssh_exception.NoValidConnectionsError) as e: msg = "Failed to connect to remote '%s' as user '%s' on attempt %d due to '%s'" % (host, user, attempt, str(e)) if args.validate: @@ -1276,7 +1277,8 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "IPWT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting image pull thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -1287,10 +1289,10 @@ def image_pull_worker_thread(thread_id, work_queue, threads_rcs): remote = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") continue job_count += 1 @@ -1426,9 +1428,13 @@ def create_thread_pool(description, acronym, work, worker_threads_count, worker_ if work.empty(): thread_logger("MAIN", "Aborting %s launch because no more work to do" % (acronym)) break - thread_logger("MAIN", "Creating and starting %s with thread ID %d" % (acronym, thread_id)) - worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs)) - worker_threads[thread_id].start() + thread_name = "%s-%d" % (acronym, thread_id) + thread_logger("MAIN", "Creating and starting thread %s" % (thread_name)) + try: + worker_threads[thread_id] = threading.Thread(target = worker_thread_function, args = (thread_id, work, worker_threads_rcs), name = thread_name) + worker_threads[thread_id].start() + except RuntimeError as e: + thread_logger("MAIN", "Failed to create and start thread %s due to exception '%s'" % (thread_name, str(e)), log_level = "error") thread_logger("MAIN", "Waiting for all %s work jobs to be consumed" % (acronym)) work.join() @@ -1436,10 +1442,15 @@ def create_thread_pool(description, acronym, work, worker_threads_count, worker_ thread_logger("MAIN", "Joining %s" % (acronym)) for thread_id in range(0, worker_threads_count): - if worker_threads[thread_id] is None: - break - worker_threads[thread_id].join() - thread_logger("MAIN", "Joined %s with thread ID %d" % (acronym, thread_id)) + thread_name = "%s-%d" % (acronym, thread_id) + if not worker_threads[thread_id] is None: + if not worker_threads[thread_id].native_id is None: + worker_threads[thread_id].join() + thread_logger("MAIN", "Joined thread %s" % (thread_name)) + else: + thread_logger("MAIN", "Skipping join of thread %s because it was not started" % (thread_name), log_level = "warning") + else: + thread_logger("MAIN", "Skipping join of thread %s because it does not exist" % (thread_name), log_level = "warning") thread_logger("MAIN", "Return codes for each %s:\n%s" % (acronym, dump_json(worker_threads_rcs))) @@ -1500,7 +1511,8 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "RWMT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting remote mkdirs thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -1511,10 +1523,10 @@ def remote_mkdirs_worker_thread(thread_id, work_queue, threads_rcs): remote = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") continue job_count += 1 @@ -1952,7 +1964,8 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "LEWT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting launch engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -1963,10 +1976,11 @@ def launch_engines_worker_thread(thread_id, work_queue, threads_rcs): remote_idx = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote_idx is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") + continue job_count += 1 @@ -3295,7 +3309,8 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "SEWT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting shutdown engines thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -3306,10 +3321,11 @@ def shutdown_engines_worker_thread(thread_id, work_queue, threads_rcs): remote_idx = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote_idx is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") + continue job_count += 1 @@ -3409,7 +3425,8 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "IMWT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting image management thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -3420,10 +3437,11 @@ def image_mgmt_worker_thread(thread_id, work_queue, threads_rcs): remote = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") + continue job_count += 1 @@ -3500,7 +3518,8 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs): Returns: None """ - thread_name = "CSWT-%d" % (thread_id) + thread = threading.current_thread() + thread_name = thread.name thread_logger(thread_name, "Starting collect sysinfo thread with thread ID %d and name = '%s'" % (thread_id, thread_name)) rc = 0 job_count = 0 @@ -3511,10 +3530,11 @@ def collect_sysinfo_worker_thread(thread_id, work_queue, threads_rcs): remote = work_queue.get(block = False) except queue.Empty: thread_logger(thread_name, "Received a work queue empty exception") - continue + break if remote is None: - thread_logger(thread_name, "Received a null job") + thread_logger(thread_name, "Received a null job", log_level = "warning") + continue job_count += 1 @@ -3702,6 +3722,28 @@ def setup_logger(): return logging.getLogger(__file__) +def thread_exception_hook(args): + """ + Generic thread exception handler + + Args: + args (namespace): information about the exception being handled + + Globals: + log: a logger instance + + Returns: + None + """ + thread_name = "UNKNOWN" + if not args.thread is None: + thread_name = args.thread.name + + msg = "[Thread %s] Thread failed with exception:\ntype: %s\nvalue: %s\ntraceback:\n%s" % (thread_name, args.exc_type, args.exc_value, "".join(traceback.format_list(traceback.extract_tb(args.exc_traceback)))) + log.error(msg, stacklevel = 3) + + return + def main(): """ Main control block @@ -3719,6 +3761,8 @@ def main(): global log global settings + threading.excepthook = thread_exception_hook + if args.validate: return(validate())