Skip to content

Commit

Permalink
reset rotate cursor as appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
JinZhou5042 committed Aug 27, 2024
1 parent 0ddff27 commit 0252df5
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
35 changes: 25 additions & 10 deletions dttools/src/priority_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,23 @@ void swap_elements(struct priority_queue *pq, int i, int j)
pq->elements[j] = temp;
}

void swim(struct priority_queue *pq, int k)
int swim(struct priority_queue *pq, int k)
{
if (!pq)
return;
return 1;

while (k > 1 && pq->elements[k / 2]->priority < pq->elements[k]->priority) {
swap_elements(pq, k, k / 2);
k /= 2;
}

return k;
}

void sink(struct priority_queue *pq, int k)
int sink(struct priority_queue *pq, int k)
{
if (!pq)
return;
return -1;

while (2 * k <= pq->size) {
int j = 2 * k;
Expand All @@ -108,6 +110,8 @@ void sink(struct priority_queue *pq, int k)
swap_elements(pq, k, j);
k = j;
}

return k;
}

int priority_queue_double_capacity(struct priority_queue *pq)
Expand Down Expand Up @@ -148,9 +152,15 @@ int priority_queue_push(struct priority_queue *pq, void *data, double priority)
e->priority = priority;

pq->elements[++pq->size] = e;
swim(pq, pq->size);

return 1;
int new_idx = swim(pq, pq->size);

if (new_idx <= pq->rotate_cursor) {
// reset the rotate cursor if the new element is inserted before/equal to it
priority_queue_rotate_reset(pq);
}

return new_idx;
}

void *priority_queue_pop(struct priority_queue *pq)
Expand Down Expand Up @@ -205,12 +215,10 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double
pq->elements[idx]->priority = new_priority;

if (new_priority > old_priority) {
swim(pq, idx);
return swim(pq, idx);
} else if (new_priority < old_priority) {
sink(pq, idx);
return sink(pq, idx);
}

return 1;
}

int priority_queue_find_idx(struct priority_queue *pq, void *data)
Expand Down Expand Up @@ -295,6 +303,7 @@ int priority_queue_remove(struct priority_queue *pq, int idx)
struct element *e = pq->elements[idx];
pq->elements[idx] = pq->elements[pq->size];
pq->elements[pq->size--] = NULL;

sink(pq, idx);

if (pq->static_cursor == idx) {
Expand All @@ -307,6 +316,12 @@ int priority_queue_remove(struct priority_queue *pq, int idx)
pq->rotate_cursor--;
}
free(e);

if (idx <= pq->rotate_cursor) {
// reset the rotate cursor if the removed element is before/equal to it
priority_queue_rotate_reset(pq);
}

return 1;
}

Expand Down
4 changes: 2 additions & 2 deletions dttools/src/priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int priority_queue_size(struct priority_queue *pq);
@param pq A pointer to a priority queue.
@param data A pointer to store in the queue.
@param priority The specified priority with the given object.
@return One if the push succeeded, failure otherwise.
@return The idex of data if the push succeeded, -1 on failure.
*/
int priority_queue_push(struct priority_queue *pq, void *data, double priority);

Expand Down Expand Up @@ -133,7 +133,7 @@ void *priority_queue_get_element(struct priority_queue *pq, int index);
@param pq A pointer to a priority queue.
@param data The pointer to the element to update.
@param new_priority The new priority of the element.
@return One if the update succeeded, 0 on failure.
@return The new index if the update succeeded, -1 on failure.
*/
int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ def set_keepalive_timeout(self, timeout):
# - "max-retrievals" Sets the max number of tasks to retrieve per manager wait(). If less than 1, the manager prefers to retrieve all completed tasks before dispatching new tasks to workers. (default=1)
# - "min-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a worker. (default=10)
# - "monitor-interval" Parameter to change how frequently the resource monitor records resource consumption of a task in a times series, if this feature is enabled. See @ref enable_monitoring.
# - "prefer-dispatch" If 1, try to dispatch tasks even if there are retrieved tasks ready to be reportedas done. (default=0)
# - "prefer-dispatch" If 1, try to dispatch tasks even if there are retrieved tasks ready to be reported as done. (default=0)
# - "proportional-resources" If set to 0, do not assign resources proportionally to tasks. The default is to use proportions.
# - "proportional-whole-tasks" Round up resource proportions such that only an integer number of tasks could be fit in the worker. The default is to use proportions.
# - "ramp-down-heuristic" If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. (default=0)
Expand Down
14 changes: 9 additions & 5 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -4951,6 +4951,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,

// only check for fixed location if any are present (high overhead)
if (q->fixed_location_in_queue) {

BEGIN_ACCUM_TIME(q, time_internal);
result = enforce_waiting_fixed_locations(q);
END_ACCUM_TIME(q, time_internal);
Expand All @@ -4960,8 +4961,12 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
}
}

if (retrieved_this_cycle && !q->prefer_dispatch) {
continue;
if (retrieved_this_cycle) {
// reset the rotate cursor on task retrieval
priority_queue_rotate_reset(q->ready_tasks);
if (!q->prefer_dispatch) {
continue;
}
}

sent_in_previous_cycle = 0;
Expand Down Expand Up @@ -5000,6 +5005,8 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
END_ACCUM_TIME(q, time_status_msgs);
if (result) {
// accepted at least one worker
// reset the rotate cursor on worker connection
priority_queue_rotate_reset(q->ready_tasks);
events++;
continue;
}
Expand All @@ -5015,7 +5022,6 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
}

if (q->process_pending_check) {

BEGIN_ACCUM_TIME(q, time_internal);
int pending = process_pending();
END_ACCUM_TIME(q, time_internal);
Expand All @@ -5029,8 +5035,6 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
// return if manager is empty and something interesting already happened
// in this wait.
if (events > 0) {
priority_queue_rotate_reset(q->ready_tasks);

BEGIN_ACCUM_TIME(q, time_internal);
int done = !priority_queue_size(q->ready_tasks) && !list_size(q->waiting_retrieval_list) && !itable_size(q->running_table);
END_ACCUM_TIME(q, time_internal);
Expand Down

0 comments on commit 0252df5

Please sign in to comment.