Skip to content

Commit

Permalink
Add worker start time. (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
plietar authored Oct 15, 2024
1 parent adedb91 commit 317271b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 36 deletions.
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ importFrom(dplyr,desc)
importFrom(dplyr,ends_with)
importFrom(dplyr,everything)
importFrom(dplyr,mutate)
importFrom(dplyr,rowwise)
importFrom(dplyr,select)
importFrom(dplyr,ungroup)
90 changes: 56 additions & 34 deletions R/main.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# wpia-hn.hpc.dide.ic.ac.uk

asISO8601 <- function(t) strftime(t, "%Y-%m-%dT%H:%M:%SZ", tz = "UTC")

get_controller <- function(con, controller_id) {
key <- sprintf("%s:configuration", controller_id)
if (!as.logical(con$EXISTS(key))) {
get_controller_keys <- function(con, controller_id) {
keys <- rrq:::rrq_keys(controller_id)
if (!as.logical(con$EXISTS(keys$configuration))) {
porcelain::porcelain_stop(
sprintf("rrq controller %s does not exist", controller_id),
status_code = 404L)
}
rrq::rrq_controller(queue_id = controller_id, con = con)
keys
}

redis_scan <- function(con, pattern) {
Expand All @@ -33,24 +31,36 @@ target_controller_list <- function(con) {
}
}

pipeline_check <- function(result) {
for (v in result) {
if (inherits(v, "redis_error")) {
stop(v)
}
}
invisible(result)
}

#' @importFrom dplyr mutate across ends_with arrange desc everything
target_task_list <- function(con) {
function(controller_id) {
controller <- get_controller(con, controller_id)
keys <- get_controller_keys(con, controller_id)

ids <- rrq::rrq_task_list(controller = controller)
ids <- unlist(con$HKEYS(keys$task_expr))
if (length(ids) == 0) {
return(list(tasks=list()))
}

redis <- redux::redis
data <- tibble::as_tibble(con$pipeline(
status = redis$HMGET(controller$keys$task_status, ids),
queue = redis$HMGET(controller$keys$task_queue, ids),
local = redis$HMGET(controller$keys$task_local, ids),
pid = redis$HMGET(controller$keys$task_pid, ids),
worker_id = redis$HMGET(controller$keys$task_worker, ids),
submit_time = redis$HMGET(controller$keys$task_time_submit, ids),
start_time = redis$HMGET(controller$keys$task_time_start, ids),
complete_time = redis$HMGET(controller$keys$task_time_complete, ids),
moved_time = redis$HMGET(controller$keys$task_time_moved, ids)))
data <- tibble::as_tibble(pipeline_check(con$pipeline(
status = redis$HMGET(keys$task_status, ids),
queue = redis$HMGET(keys$task_queue, ids),
local = redis$HMGET(keys$task_local, ids),
pid = redis$HMGET(keys$task_pid, ids),
worker_id = redis$HMGET(keys$task_worker, ids),
submit_time = redis$HMGET(keys$task_time_submit, ids),
start_time = redis$HMGET(keys$task_time_start, ids),
complete_time = redis$HMGET(keys$task_time_complete, ids),
moved_time = redis$HMGET(keys$task_time_moved, ids))))

data <- data |>
tibble::add_column(id = ids) |>
Expand All @@ -61,27 +71,39 @@ target_task_list <- function(con) {
arrange(desc(submit_time)) |>
mutate(across(ends_with("_time"), asISO8601))

return (list(tasks=data))
list(tasks=data)
}
}

#' @importFrom dplyr mutate select ungroup rowwise
target_worker_list <- function(con) {
function(controller_id) {
controller <- get_controller(con, controller_id)

# rrq_workers_list only returns active workers. There is
# rrq_workers_list_exited too. The docs for rrq_worker_status says it only
# returns active workers, but empirically it actually returns all of them.
status <- rrq::rrq_worker_status(controller = controller)
info <- unname(rrq::rrq_worker_info(names(status), controller = controller))

list(workers = data.frame(
id = names(status),
status = unname(status),
hostname = vapply(info, function(i) i$hostname, character(1)),
username = vapply(info, function(i) i$username, character(1)),
platform = vapply(info, function(i) i$platform, character(1))
))
keys <- get_controller_keys(con, controller_id)

ids <- unlist(con$SMEMBERS(keys$worker_id))
if (length(ids) == 0) {
return(list(workers=list()))
}

redis <- redux::redis
data <- tibble::as_tibble(pipeline_check(con$pipeline(
status = redis$HMGET(keys$worker_status, ids),
info = redis$HMGET(keys$worker_info, ids)
))) |> tidyr::unchop(status)

data <- data |>
tibble::add_column(id = ids) |>
rowwise() |>
mutate(as.data.frame(redux::bin_to_object(info))) |>
mutate(start_time = {
k <- rrq:::rrq_key_worker_log(keys$queue_id, id)
log <- rrq:::worker_log_parse(con$LINDEX(k, 0), id)
asISO8601(log$time)
}) |>
ungroup() |>
select(c(id, status, hostname, username, platform, start_time))

list(workers = data)
}
}

Expand Down
12 changes: 12 additions & 0 deletions inst/schema/worker_list.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
"type": "array",
"items": {
"type": "object",
"required": [
"id",
"status",
"hostname",
"username",
"platform",
"start_time"
],
"properties": {
"id": {
"type": "string",
Expand All @@ -29,6 +37,10 @@
},
"platform": {
"type": "string",
},
"start_time": {
"type": "string",
"format": "date-time"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions web/src/ApiTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export default function ApiTable(props) {
columns: props.columns,
initialState: {
showColumnFilters: true,
...props.initialState
},
state: { isLoading },
enableTopToolbar: false,
Expand Down
2 changes: 2 additions & 0 deletions web/src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ function App() {
endpoint={queueId ? `/controller/${queueId}/tasks` : null}
accessorKey="tasks"
columns={taskColumns}
initialState={{ sorting: [ { id: "start_time", desc: true }] }}
/>
</Tabs.Panel>
<Tabs.Panel value="workers">
<ApiTable
endpoint={queueId ? `/controller/${queueId}/workers` : null}
accessorKey="workers"
columns={workerColumns}
initialState={{ sorting: [ { id: "start_time", desc: true }] }}
/>
</Tabs.Panel>
</Tabs>
Expand Down
12 changes: 10 additions & 2 deletions web/src/columns.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ export const taskColumns = [
filterFn: filterDateTime,
}),
columnHelper.accessor(dateAccessorFn("start_time"), {
id: "start_time",
header: "Started",
Cell: renderDateCell,
filterVariant: 'date',
sortFn: "datetime",
filterFn: filterDateTime,
}),
columnHelper.accessor(dateAccessorFn("complete_time"), {
id: "complete_time",
header: "Completed",
Cell: renderDateCell,
filterVariant: 'date',
Expand All @@ -92,6 +94,14 @@ export const workerColumns = [
enableClickToCopy: true,
enableSorting: false,
}),
columnHelper.accessor(dateAccessorFn("start_time"), {
id: "start_time",
header: "Started",
Cell: renderDateCell,
filterVariant: 'date',
sortFn: "datetime",
filterFn: filterDateTime,
}),
columnHelper.accessor("status", {
header: "Status",
filterVariant: 'multi-select',
Expand All @@ -111,5 +121,3 @@ export const workerColumns = [
enableClickToCopy: true,
}),
];


0 comments on commit 317271b

Please sign in to comment.