Skip to content

Commit

Permalink
BUG: Fixed bug where occasionally manager would recieve message from
Browse files Browse the repository at this point in the history
dead workers
  • Loading branch information
Fraser-Birks committed Oct 10, 2023
1 parent 8680f8f commit 7072308
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions scripts/fracture_mechanics/parallel_NCFlex.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ def walk(x0,x1,direction,pipe_output,sc_dict):

#time.sleep(10)
#go idle
data_queue.put([os.getpid(),x0,direction])
status_queue.put([os.getpid(),'idle'],block=False)
delete_calc(sc)

def get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids):
def get_opt_K_alpha(walk_positions,trail_positions,currently_walking_pids):
print('GETTING NEW OPT K ALPHA')
#get a random value of alpha that's not yet been
#searched, as well as corresponding estimate for K from
Expand All @@ -183,7 +182,7 @@ def get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids):

alpha_covered = 0
for pid in walk_positions:
if pid not in already_killed_pids:
if pid in currently_walking_pids:
[[k_lead,alpha_lead],direction] = walk_positions[pid]
[k_trail,alpha_trail] = trail_positions[pid]
alpha_covered += (alpha_lead-alpha_trail)*direction
Expand Down Expand Up @@ -376,12 +375,12 @@ def main(K_range,alpha_range):

it_num = 0
killed_num = 0
already_killed_pids = []
currently_walking_pids = []
curve_explored = False
percentage_covered = 0
time.sleep(5)
while not curve_explored:
print(f'already killed pids, {already_killed_pids}')
print(f'currently walking pids, {currently_walking_pids}')
it_num += 1
#first, check the status queue to update any worker statuses.
#get PID list
Expand Down Expand Up @@ -424,7 +423,7 @@ def main(K_range,alpha_range):
if num_new_searches>0:
for i in range(num_new_searches):
print('LAUNCHING A NEW SEARCH')
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids)
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,currently_walking_pids)
print('INITIAL K, ALPHA OF NEW SEARCH:',new_K,new_alpha)
worker_pool.apply_async(search, args=(new_K,new_alpha,sc_dict))
time.sleep(1)
Expand Down Expand Up @@ -452,7 +451,7 @@ def main(K_range,alpha_range):
if len(x0) == 2:
#this indicates that the search failed. Immediately restart it
print('search failed, restarting')
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids)
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,currently_walking_pids)
worker_pool.apply_async(search, args=(new_K,new_alpha,sc_dict))
continue

Expand All @@ -464,7 +463,7 @@ def main(K_range,alpha_range):

#if found alpha is out of range, start a new search
if (found_alpha<alpha_range[0] or found_alpha>alpha_range[1]):
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids)
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,currently_walking_pids)
worker_pool.apply_async(search, args=(new_K,new_alpha,sc_dict))
continue

Expand All @@ -480,7 +479,7 @@ def main(K_range,alpha_range):
#is at least 0.1 big.
if (alpha_lead>comp_alpha_range[0]) and (alpha_lead<comp_alpha_range[1]):
print('alpha detected in already searched range, starting new search')
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,already_killed_pids)
new_K, new_alpha = get_opt_K_alpha(walk_positions,trail_positions,currently_walking_pids)
worker_pool.apply_async(search, args=(new_K,new_alpha,sc_dict))
search_restarted=True
break
Expand Down Expand Up @@ -510,6 +509,7 @@ def main(K_range,alpha_range):
pipe_dict[pid] = pipes[pipe_id][1]
#add initial positions as trails to dict
trail_positions[pid] = [x0[-1],x0[-2]] #[K,alpha]
currently_walking_pids.append(pid)
except Empty:
queue_empty = True
elif (explore_direction == 1) or (explore_direction ==-1):
Expand All @@ -531,6 +531,7 @@ def main(K_range,alpha_range):
pipe_dict[pid] = pipes[pipe_id][1]
#add initial positions as trails to dict
trail_positions[pid] = [x0[-1],x0[-2]] #[K,alpha]
currently_walking_pids.append(pid)
except Empty:
queue_empty = True

Expand Down Expand Up @@ -560,7 +561,7 @@ def main(K_range,alpha_range):
kill_pids = []
for item in items:
[pid, x, direction] = item
if pid in already_killed_pids:
if pid not in currently_walking_pids:
#if the pid that sent the message
#has already had a command sent to kill
#but has not yet been killed
Expand Down Expand Up @@ -622,7 +623,7 @@ def main(K_range,alpha_range):
if len(kill_pids)>0:
for pid in kill_pids:
pipe_dict[pid].send(1) #send kill command
already_killed_pids.append(pid)
currently_walking_pids.remove(pid)
killed_num += 1
trail_positions[f'killed{killed_num}'] = trail_positions[pid]
walk_positions[f'killed{killed_num}'] = walk_positions[pid]
Expand All @@ -637,7 +638,6 @@ def main(K_range,alpha_range):
while not queue_empty:
try:
pid = kill_confirm_queue.get(block=False)
already_killed_pids.remove(pid)
del trail_positions[pid]
del walk_positions[pid]
except Empty:
Expand All @@ -646,7 +646,7 @@ def main(K_range,alpha_range):
#check if the search is finished and calculate %
total_alpha_covered = 0
for pid in walk_positions:
if pid in already_killed_pids:
if pid not in currently_walking_pids:
#need to avoid double counting when a process terminates
continue
[[k_lead,alpha_lead],direction] = walk_positions[pid]
Expand Down

0 comments on commit 7072308

Please sign in to comment.