diff --git a/NAMESPACE b/NAMESPACE index 4372a73..11acd24 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -7,3 +7,6 @@ importFrom(dplyr,desc) importFrom(dplyr,ends_with) importFrom(dplyr,everything) importFrom(dplyr,mutate) +importFrom(dplyr,rowwise) +importFrom(dplyr,select) +importFrom(dplyr,ungroup) diff --git a/R/main.R b/R/main.R index b6124f6..1697bfb 100644 --- a/R/main.R +++ b/R/main.R @@ -1,15 +1,13 @@ -# wpia-hn.hpc.dide.ic.ac.uk - asISO8601 <- function(t) strftime(t, "%Y-%m-%dT%H:%M:%SZ", tz = "UTC") -get_controller <- function(con, controller_id) { - key <- sprintf("%s:configuration", controller_id) - if (!as.logical(con$EXISTS(key))) { +get_controller_keys <- function(con, controller_id) { + keys <- rrq:::rrq_keys(controller_id) + if (!as.logical(con$EXISTS(keys$configuration))) { porcelain::porcelain_stop( sprintf("rrq controller %s does not exist", controller_id), status_code = 404L) } - rrq::rrq_controller(queue_id = controller_id, con = con) + keys } redis_scan <- function(con, pattern) { @@ -33,24 +31,36 @@ target_controller_list <- function(con) { } } +pipeline_check <- function(result) { + for (v in result) { + if (inherits(v, "redis_error")) { + stop(v) + } + } + invisible(result) +} + #' @importFrom dplyr mutate across ends_with arrange desc everything target_task_list <- function(con) { function(controller_id) { - controller <- get_controller(con, controller_id) + keys <- get_controller_keys(con, controller_id) - ids <- rrq::rrq_task_list(controller = controller) + ids <- unlist(con$HKEYS(keys$task_expr)) + if (length(ids) == 0) { + return(list(tasks=list())) + } redis <- redux::redis - data <- tibble::as_tibble(con$pipeline( - status = redis$HMGET(controller$keys$task_status, ids), - queue = redis$HMGET(controller$keys$task_queue, ids), - local = redis$HMGET(controller$keys$task_local, ids), - pid = redis$HMGET(controller$keys$task_pid, ids), - worker_id = redis$HMGET(controller$keys$task_worker, ids), - submit_time = redis$HMGET(controller$keys$task_time_submit, ids), - start_time = redis$HMGET(controller$keys$task_time_start, ids), - complete_time = redis$HMGET(controller$keys$task_time_complete, ids), - moved_time = redis$HMGET(controller$keys$task_time_moved, ids))) + data <- tibble::as_tibble(pipeline_check(con$pipeline( + status = redis$HMGET(keys$task_status, ids), + queue = redis$HMGET(keys$task_queue, ids), + local = redis$HMGET(keys$task_local, ids), + pid = redis$HMGET(keys$task_pid, ids), + worker_id = redis$HMGET(keys$task_worker, ids), + submit_time = redis$HMGET(keys$task_time_submit, ids), + start_time = redis$HMGET(keys$task_time_start, ids), + complete_time = redis$HMGET(keys$task_time_complete, ids), + moved_time = redis$HMGET(keys$task_time_moved, ids)))) data <- data |> tibble::add_column(id = ids) |> @@ -61,27 +71,39 @@ target_task_list <- function(con) { arrange(desc(submit_time)) |> mutate(across(ends_with("_time"), asISO8601)) - return (list(tasks=data)) + list(tasks=data) } } +#' @importFrom dplyr mutate select ungroup rowwise target_worker_list <- function(con) { function(controller_id) { - controller <- get_controller(con, controller_id) - - # rrq_workers_list only returns active workers. There is - # rrq_workers_list_exited too. The docs for rrq_worker_status says it only - # returns active workers, but empirically it actually returns all of them. - status <- rrq::rrq_worker_status(controller = controller) - info <- unname(rrq::rrq_worker_info(names(status), controller = controller)) - - list(workers = data.frame( - id = names(status), - status = unname(status), - hostname = vapply(info, function(i) i$hostname, character(1)), - username = vapply(info, function(i) i$username, character(1)), - platform = vapply(info, function(i) i$platform, character(1)) - )) + keys <- get_controller_keys(con, controller_id) + + ids <- unlist(con$SMEMBERS(keys$worker_id)) + if (length(ids) == 0) { + return(list(workers=list())) + } + + redis <- redux::redis + data <- tibble::as_tibble(pipeline_check(con$pipeline( + status = redis$HMGET(keys$worker_status, ids), + info = redis$HMGET(keys$worker_info, ids) + ))) |> tidyr::unchop(status) + + data <- data |> + tibble::add_column(id = ids) |> + rowwise() |> + mutate(as.data.frame(redux::bin_to_object(info))) |> + mutate(start_time = { + k <- rrq:::rrq_key_worker_log(keys$queue_id, id) + log <- rrq:::worker_log_parse(con$LINDEX(k, 0), id) + asISO8601(log$time) + }) |> + ungroup() |> + select(c(id, status, hostname, username, platform, start_time)) + + list(workers = data) } } diff --git a/inst/schema/worker_list.json b/inst/schema/worker_list.json index 137f300..14f8e6e 100644 --- a/inst/schema/worker_list.json +++ b/inst/schema/worker_list.json @@ -7,6 +7,14 @@ "type": "array", "items": { "type": "object", + "required": [ + "id", + "status", + "hostname", + "username", + "platform", + "start_time" + ], "properties": { "id": { "type": "string", @@ -29,6 +37,10 @@ }, "platform": { "type": "string", + }, + "start_time": { + "type": "string", + "format": "date-time" } } } diff --git a/web/src/ApiTable.js b/web/src/ApiTable.js index e088164..6577f6d 100644 --- a/web/src/ApiTable.js +++ b/web/src/ApiTable.js @@ -32,6 +32,7 @@ export default function ApiTable(props) { columns: props.columns, initialState: { showColumnFilters: true, + ...props.initialState }, state: { isLoading }, enableTopToolbar: false, diff --git a/web/src/App.js b/web/src/App.js index ff4c1ca..8f5788c 100644 --- a/web/src/App.js +++ b/web/src/App.js @@ -82,6 +82,7 @@ function App() { endpoint={queueId ? `/controller/${queueId}/tasks` : null} accessorKey="tasks" columns={taskColumns} + initialState={{ sorting: [ { id: "start_time", desc: true }] }} /> @@ -89,6 +90,7 @@ function App() { endpoint={queueId ? `/controller/${queueId}/workers` : null} accessorKey="workers" columns={workerColumns} + initialState={{ sorting: [ { id: "start_time", desc: true }] }} /> diff --git a/web/src/columns.js b/web/src/columns.js index cc06ed0..2301c27 100644 --- a/web/src/columns.js +++ b/web/src/columns.js @@ -70,6 +70,7 @@ export const taskColumns = [ filterFn: filterDateTime, }), columnHelper.accessor(dateAccessorFn("start_time"), { + id: "start_time", header: "Started", Cell: renderDateCell, filterVariant: 'date', @@ -77,6 +78,7 @@ export const taskColumns = [ filterFn: filterDateTime, }), columnHelper.accessor(dateAccessorFn("complete_time"), { + id: "complete_time", header: "Completed", Cell: renderDateCell, filterVariant: 'date', @@ -92,6 +94,14 @@ export const workerColumns = [ enableClickToCopy: true, enableSorting: false, }), + columnHelper.accessor(dateAccessorFn("start_time"), { + id: "start_time", + header: "Started", + Cell: renderDateCell, + filterVariant: 'date', + sortFn: "datetime", + filterFn: filterDateTime, + }), columnHelper.accessor("status", { header: "Status", filterVariant: 'multi-select', @@ -111,5 +121,3 @@ export const workerColumns = [ enableClickToCopy: true, }), ]; - -