Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pkgdepends for package installation #80

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 75 additions & 143 deletions package_installs.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,151 +2,83 @@
REPO <- 'http://cran.us.r-project.org'
options(repos = c("CRAN" = REPO))

options(install.packages.compile.from.source = "never")

# Number of parallel installs.
# Experimentally optimized. A too high value (128) crashes.
M <- 16

# Make use of all CPUs available.
options(Ncpus = parallel::detectCores())

# Install parallel library.
library(parallel)
unlink("install_log_parallel")

# Install util packages.
utilPackages <- c('Rcpp', 'repr', 'rmutil', 'testthat', 'hrbrthemes')
for (p in utilPackages) {
install.packages(p, verbose=FALSE, quiet=FALSE, repos=REPO)
}

# Install older version of packages.
library(devtools)
install_version("randomForest", version='4.6.14') # [b/219681100]
install_version("terra", version='1.5-34') # [b/240934971]
install_version("ranger", version='0.14.1') # [b/291120269]
install_version("imager", version='0.42.16') # [b/325867887]

# All packages available in the repo.
allPackages <- as.data.frame(available.packages(repos=REPO))

# Already installed packages.
existingPackages <- installed.packages()

# Get list of packages to install from files.
library("rmutil")
p <- read.table(file="packages")
pu <- read.table(file="packages_users")
pmerged <- rbind(p, pu)
pkgs <- pmerged[,1]

M <- min(M, length(pkgs))

do_one <- function(pkg, repos){
h <- function(e) structure(conditionMessage(e), class=c("snow-try-error","try-error"))
# Treat warnings as errors. (An example 'warning' is that the package is not found!)
tryCatch(
install.packages(pkg, verbose=FALSE, quiet=FALSE, repos=repos),
error=h,
warning=h)
}

alreadyInstalled <- function(pkg){
if(pkg %in% rownames(existingPackages)){
if(!is.na(allPackages$Version[pkg]) && existingPackages[pkg,"Version"] == as.character(allPackages$Version[pkg])) {
return(TRUE)
}
}
return(FALSE)
}
vecAlreadyInstalled <- Vectorize(alreadyInstalled)

source('/tmp/utils.R')
print("Generating dependency list...")
dl <- make_dependency_list(pkgs, allPackages)
dl <- dl[!vecAlreadyInstalled(names(dl))]
dl <- lapply(dl, function(x) x[x %in% names(dl)])
lens <- sapply(dl, length)
ready <- names(dl[lens == 0L])
n <- length(ready)
total <- length(dl)

print(paste("Ready packages: ", n))
print(paste("Total packages to install: ", total))

cl <- makeCluster(M, outfile = "install_log_parallel")

submit <- function(node, pkg) {
parallel:::sendCall(cl[[node]], do_one, list(pkg, repos=REPO), tag = pkg)
}

for (i in 1:min(n, M)) {
submit(i, ready[i])
install.packages('pkgdepends')
library(pkgdepends)
library(dplyr)
library(purrr)

p <- trimws(readLines("packages"))
pu <- trimws(readLines('packages_users'))
pkgs <- union(p, pu)

# Remove items listed on packages_users that aren't valid package names
pkgs <- keep(pkgs, is_valid_package_name)

# Check if packages are available
p <- new_pkg_deps(pkgs)
p$resolve()
df <- p$get_resolution()

# List all installable packages
# removes items listed on packages_users that aren't actual packages
pkgs <- filter(df, directpkg, status=="OK") |> pull(package)

# make a install plan
p <- new_pkg_installation_proposal(pkgs)
p$solve()
p$download()
# this will apt-get missing sysreqs if the package defines them
p$install_sysreqs()

failed_pkgs <- c()
errors <- list()
cant_install_pkgs <- c()
start_time <- Sys.time()
repeat {
tryCatch({
p$install()
break
},error=function(e) {
if(!inherits(e, c("package_build_error", "install_input_error"))) {
# if this isnt a package install error, stop
stop(e)
}
# else, error installing e$package
errors <<- c(errors, list(e))
failed_pkgs <<- union(failed_pkgs, e$package)
install_plan <- p$get_install_plan()
fps <- c(e$package)
# we need to remove all dependants of e$package from the install list
while(length(fps)>0) {
fp <- fps[[1]] # take 1st item
fps <- fps[-1] # remove from queue
# find packages that depended on the failed package
p_deps <- pull(install_plan, dependencies,name=package) |> keep(~has_element(.x, fp)) |> names()
# remove failed package from install_plan
install_plan <- filter(install_plan, package != fp)
if(fp %in% pkgs) {
# note if this was a package we requested to install
cant_install_pkgs <<- union(cant_install_pkgs, fp)
}
# repeat this loop for the dependencies of failed package
fps <- union(fps, p_deps)
}
# make a new plan without the failed package + deps
# directpkg are packages we explicitly asked to install
p <<- new_pkg_installation_proposal(filter(install_plan, directpkg)$package)
# the new plan will skip packages already installed
p$solve()
# the downloads are cached and wont be re-done
p$download()
})
}
dl <- dl[!names(dl) %in% ready[1:min(n, M)]]
av <- if(n < M) (n+1L):M else integer() # available workers

success <- character(0)
errors <- character(0)
start <- Sys.time()
while(length(dl) > 0 || length(av) != M) {
if (length(av) == M) {
stop("deadlock")
}

d <- parallel:::recvOneResult(cl)

# Handle errors reported by the worker.
if (inherits(d$value, 'try-error')) {
msg <- paste("ERROR: worker", d$node, "for package ", d$tag, ":", d$value)
print(msg)
warning(msg)
errors <- c(errors, d$tag)
} else {
success <- c(success, d$tag)
}

# Find work to be done.
av <- c(av, d$node)
dl <- lapply(dl, function(x) x[x != d$tag])
lens <- sapply(dl, length)
ready <- names(dl[lens == 0L])
m <- min(length(ready), length(av)) # >= 1

# Report for this iteration.
eta <- start + (Sys.time() - start) / (length(success) + length(errors)) * total
print(paste(
"done:", d$tag, "on", d$node,
", success:", length(success),
", failed:", length(errors),
", remaining:", length(dl),
", ready:", length(ready),
", next:", if (m) paste(ready[1:m], "on", av[1:m]) else "<none>",
", eta:", eta))

# Possibly schedule next work. Typically submits exactly 1 task, though occasionally:
# - 0 (when blocked on ongoing installs to complete dependencies first)
# - or >1 (possibly after being unblocked from the previously described condition)
if (m) {
for (i in 1:m) {
submit(av[i], ready[i])
}
av <- av[-(1:m)]
dl <- dl[!names(dl) %in% ready[1:m]]
}
}

# Make sure the packages from the file `packages` are properly installed
# otherwise reinstalling in a single thread from a different repo, as they sometimes
# fail in the previous technique.
for (p in p[,1]) {
if (!require(p, character.only = TRUE)) {
install.packages(p, verbose=FALSE, quiet=FALSE, repos='http://cran.rstudio.com')
}
}

print(errors)
print("Done!")
print(paste("Successfully installed:", success))
print(paste("Likely failed:", errors))
print(paste("Elapsed:", Sys.time() - start))
print(paste("Successfully installed:",nrow(p$get_install_plan()),'packages'))
print(paste("Failed to install:",paste(failed_pkgs,collapse=', ')))
print(paste("Could not install due to failed dependencies:",paste(setdiff(cant_install_pkgs,failed_pkgs),collapse=', ')))
print(paste("Elapsed:", lubridate::as.duration(Sys.time() - start_time)))