From a1db207366a34f2e344c0f473cea8bac9c6b4bff Mon Sep 17 00:00:00 2001 From: bozenne Date: Mon, 21 Oct 2024 17:43:58 +0200 Subject: [PATCH] (2024.21.10) polishing, split.size in ate(store=...) for memory --- R/anova.ate.R | 7 +- R/ate-bootstrap.R | 10 +-- R/ate-iid.R | 16 ++-- R/ate-pointEstimate.R | 174 ++++++++++++++++++++++++------------------ R/ate.R | 79 ++++++++++++++----- R/autoplot.ate.R | 7 +- man/ate.Rd | 36 ++++++--- 7 files changed, 204 insertions(+), 125 deletions(-) diff --git a/R/anova.ate.R b/R/anova.ate.R index e203a09b..c1816401 100644 --- a/R/anova.ate.R +++ b/R/anova.ate.R @@ -3,9 +3,9 @@ ## Author: Brice Ozenne ## Created: aug 19 2020 (09:18) ## Version: -## Last-Updated: Oct 16 2024 (11:52) +## Last-Updated: Oct 21 2024 (17:43) ## By: Brice Ozenne -## Update #: 72 +## Update #: 73 ##---------------------------------------------------------------------- ## ### Commentary: @@ -38,6 +38,7 @@ ##' @examples ##' library(survival) ##' library(data.table) +##' library(ggplot2) ##' ##' \dontrun{ ##' ## simulate data @@ -48,7 +49,7 @@ ##' dtS <- dtS[dtS$X12!="D"] ##' ##' ## model fit -##' fit <- cph(formula = Surv(time,event)~ X1+X6,data=dtS,y=TRUE,x=TRUE) +##' fit <- coxph(formula = Surv(time,event)~ X1+X6,data=dtS,y=TRUE,x=TRUE) ##' seqTime <- 1:10 ##' ateFit <- ate(fit, data = dtS, treatment = "X1", contrasts = NULL, ##' times = seqTime, B = 0, iid = TRUE, se = TRUE, verbose = TRUE, band = TRUE) diff --git a/R/ate-bootstrap.R b/R/ate-bootstrap.R index 13547511..8c519d86 100644 --- a/R/ate-bootstrap.R +++ b/R/ate-bootstrap.R @@ -3,9 +3,9 @@ ## Author: Brice Ozenne ## Created: apr 11 2018 (17:05) ## Version: -## Last-Updated: Mar 7 2022 (08:36) -## By: Thomas Alexander Gerds -## Update #: 343 +## Last-Updated: Oct 21 2024 (16:44) +## By: Brice Ozenne +## Update #: 344 ##---------------------------------------------------------------------- ## ### Commentary: @@ -18,12 +18,12 @@ ## * calcBootATE ## generate a boot object for the ate function that will be used to compute CI and p.values calcBootATE <- function(args, n.obs, fct.pointEstimate, name.estimate, - handler, B, seed, mc.cores, cl, - verbose){ + handler, B, seed, mc.cores, cl){ # {{{ prepare arguments ## hard copy of the dataset before bootstrap + verbose <- args$verbose ls.data <- list(object.event = NULL, object.treatment = NULL, object.censor = NULL) diff --git a/R/ate-iid.R b/R/ate-iid.R index 33fc09f5..b9356e4f 100644 --- a/R/ate-iid.R +++ b/R/ate-iid.R @@ -3,9 +3,9 @@ ## Author: Brice Ozenne ## Created: apr 5 2018 (17:01) ## Version: -## Last-Updated: Oct 21 2024 (15:20) +## Last-Updated: Oct 21 2024 (16:40) ## By: Brice Ozenne -## Update #: 1396 +## Update #: 1397 ##---------------------------------------------------------------------- ## ### Commentary: @@ -149,7 +149,7 @@ iidATE <- function(estimator, attr(factor, "factor") <- lapply(1:n.contrasts, function(iC){cbind(-iW.IPTW[,iC]*iW.IPCW2[,iTime]*Y.tau[,iTime])}) term.censoring <- attr(predictRisk(object.censor, newdata = mydata, times = c(0,time.jumpC)[index.obsSINDEXjumpC[,iTime]+1], - diag = TRUE, product.limit = product.limit, average.iid = factor, store = store),"average.iid") + diag = TRUE, product.limit = product.limit, average.iid = factor, store = store[c("data","iid")]),"average.iid") for(iC in 1:n.contrasts){ ## iGrid <- 1 ## - because predictRisk outputs the risk instead of the survival @@ -195,7 +195,7 @@ iidATE <- function(estimator, ## attr(factor,"factor")[[iC]][c(26,30,372),] term.intF1_tau <- attr(predictRisk(object.event, newdata = mydataIntegral, times = times, cause = cause, - average.iid = factor, product.limit = product.limit, store = store),"average.iid") + average.iid = factor, product.limit = product.limit, store = store[c("data","iid")]),"average.iid") for(iC in 1:n.contrasts){ ## iC <- 1 iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + term.intF1_tau[[iC]]*n.obsIntegral/n.obs } @@ -206,7 +206,7 @@ iidATE <- function(estimator, }) integrand.F1t <- attr(predictRisk(object.event, newdata = mydataIntegral, times = time.jumpC, cause = cause, - average.iid = factor, product.limit = product.limit, store = store), "average.iid") + average.iid = factor, product.limit = product.limit, store = store[c("data","iid")]), "average.iid") for(iC in 1:n.contrasts){ ## iC <- 1 iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + subsetIndex(rowCumSum(integrand.F1t[[iC]])*n.obsIntegral/n.obs, @@ -240,7 +240,7 @@ iidATE <- function(estimator, return(-colMultiply_cpp(ls.F1tau_F1t_dM_SSG[[iTau]]*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC])) }) integrand.St <- attr(predictRisk(object.event, type = "survival", newdata = mydataIntegral, times = time.jumpC-tol, cause = cause, - average.iid = factor, product.limit = product.limit, store = store), "average.iid") + average.iid = factor, product.limit = product.limit, store = store[c("data","iid")]), "average.iid") for(iGrid in 1:n.grid){ ## iGrid <- 1 iTau <- grid[iGrid,"tau"] iC <- grid[iGrid,"contrast"] @@ -262,7 +262,7 @@ iidATE <- function(estimator, }) integrand.G1 <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC - tol, - average.iid = factor, store = store)$survival.average.iid + average.iid = factor, store = store[c("data","iid")])$survival.average.iid ## ## integral censoring martingale factor <- TRUE @@ -272,7 +272,7 @@ iidATE <- function(estimator, return(-colMultiply_cpp(ls.F1tau_F1t_SG[[iTau]]*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC])) }) integrand.G2 <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC, type = "hazard", - average.iid = factor, store = store)$hazard.average.iid + average.iid = factor, store = store[c("data","iid")])$hazard.average.iid for(iGrid in 1:n.grid){ ## iGrid <- 1 iTau <- grid[iGrid,"tau"] diff --git a/R/ate-pointEstimate.R b/R/ate-pointEstimate.R index 96ccadee..12f56b93 100644 --- a/R/ate-pointEstimate.R +++ b/R/ate-pointEstimate.R @@ -3,9 +3,9 @@ ## Author: Brice Ozenne ## Created: jun 27 2019 (10:43) ## Version: -## Last-Updated: Oct 21 2024 (15:25) +## Last-Updated: Oct 21 2024 (16:56) ## By: Brice Ozenne -## Update #: 1098 +## Update #: 1131 ##---------------------------------------------------------------------- ## ### Commentary: @@ -41,6 +41,7 @@ ATE_TI <- function(object.event, method.iid, product.limit, store, + verbose, ...){ tol <- 1e-12 ## difference in jump time must be above tol @@ -143,7 +144,7 @@ ATE_TI <- function(object.event, if(attr(estimator,"IPCW")){ iPred <- lapply(1:n.times, function(iT){ ## iT <- 1 1-predictRisk(object.censor, newdata = mydata, times = c(0,time.jumpC)[index.obsSINDEXjumpC[,iT]+1], - diag = TRUE, product.limit = product.limit, iid = (method.iid==2)*return.iid.nuisance, store = store) + diag = TRUE, product.limit = product.limit, iid = (method.iid==2)*return.iid.nuisance, store = store[c("data","iid")]) }) G.T_tau <- do.call(cbind,iPred) @@ -200,7 +201,7 @@ ATE_TI <- function(object.event, } outRisk <- predictRisk(object.event, newdata = data.i, times = times, average.iid = factor, cause = cause, - product.limit = product.limit, store = store) + product.limit = product.limit, store = store[c("data","iid")]) F1.ctf.tau[[iC]][ls.index.strata[[iC]],] <- outRisk if(return.iid.nuisance){ @@ -224,84 +225,109 @@ ATE_TI <- function(object.event, augTerm <- matrix(0, nrow = n.obs, ncol = n.times) - ## exclude individuals with event before the first censoring as they do not contribute to the integral term + ## *** exclude individuals with event before the first censoring as they do not contribute to the integral term if(method.iid==2){ ## slow but simple i.e. no subset - index.obsIntegral <- 1:n.obs - }else{ - ## index.obsIntegral <- 1:n.obs - index.obsIntegral <- which(index.obsSINDEXjumpC.int[,n.times]>0) ## WARNING do not use index.obsSINDEXjumpC as it is evaluated just before the jump (i.e. time-tol) + indexAll.obsIntegral <- 1:n.obs + }else{ + indexAll.obsIntegral <- which(index.obsSINDEXjumpC.int[,n.times]>0) ## WARNING do not use index.obsSINDEXjumpC as it is evaluated just before the jump (i.e. time-tol) } - mydataIntegral <- mydata[index.obsIntegral,,drop=FALSE] - n.obsIntegral <- NROW(mydataIntegral) - - ## *** evaluate survival functions (event, censoring) - ## absolute risk at event times and jump times of the censoring process - predTempo <- predictRisk(object.event, newdata = mydataIntegral, times = c(times, time.jumpC), cause = cause, product.limit = product.limit, - iid = (method.iid==2)*return.iid.nuisance, store = store) - F1.tau <- predTempo[,1:n.times,drop=FALSE] - F1.jump <- predTempo[,n.times + (1:index.lastjumpC),drop=FALSE] - if((method.iid==2)*return.iid.nuisance){ - out$store$iid.nuisance.outcome <- attr(predTempo,"iid") + + ## *** split dataset to avoid large matrices + if(method.iid==2 || return.iid.nuisance || is.null(store$size.split)){ + n.split <- 1 + }else{ + size.split <- store$size.split + nAll.obsIntegal <- NROW(indexAll.obsIntegral) + label.split <- cut(1:nAll.obsIntegal, breaks = ceiling(nAll.obsIntegal/size.split)) + n.split <- length(levels(label.split)) + if(verbose>1){ + cat(" ") + pb <- txtProgressBar(max = n.split, style = 1) + } } + + for(iSplit in 1:n.split){ ## iSplit <- 1 + if(n.split == 1){ + index.obsIntegral <- indexAll.obsIntegral + }else{ + if(verbose>1){ + setTxtProgressBar(pb = pb, value = iSplit) + } + index.obsIntegral <- indexAll.obsIntegral[label.split==levels(label.split)[iSplit]] + } + mydataIntegral <- mydata[index.obsIntegral,,drop=FALSE] + n.obsIntegral <- NROW(mydataIntegral) + + ## *** evaluate survival functions (event, censoring) + ## absolute risk at event times and jump times of the censoring process + predTempo <- predictRisk(object.event, newdata = mydataIntegral, times = c(times, time.jumpC), cause = cause, product.limit = product.limit, + iid = (method.iid==2)*return.iid.nuisance, store = store[c("data","iid")]) + F1.tau <- predTempo[,1:n.times,drop=FALSE] + F1.jump <- predTempo[,n.times + (1:index.lastjumpC),drop=FALSE] + if((method.iid==2)*return.iid.nuisance){ + out$store$iid.nuisance.outcome <- attr(predTempo,"iid") + } - ## survival at all jump of the censoring process - S.jump <- predictRisk(object.event, type = "survival", newdata = mydataIntegral, times = time.jumpC-tol, product.limit = product.limit, - iid = (method.iid==2)*return.iid.nuisance, store = store) - if((method.iid==2)*return.iid.nuisance){ - out$store$iid.nuisance.survival <- attr(S.jump,"iid") - attr(S.jump,"iid") <- NULL - } + ## survival at all jump of the censoring process + S.jump <- predictRisk(object.event, type = "survival", newdata = mydataIntegral, times = time.jumpC-tol, product.limit = product.limit, + iid = (method.iid==2)*return.iid.nuisance, store = store[c("data","iid")]) + if((method.iid==2)*return.iid.nuisance){ + out$store$iid.nuisance.survival <- attr(S.jump,"iid") + attr(S.jump,"iid") <- NULL + } - ## martingale for the censoring process - ## at all times of jump of the censoring process - G.jump <- 1-predictRisk(object.censor, newdata = mydataIntegral, times = if(index.lastjumpC>1){c(0,time.jumpC[1:(index.lastjumpC-1)])}else{0}, - product.limit = product.limit, iid = (method.iid==2)*return.iid.nuisance, store = store) + ## martingale for the censoring process + ## at all times of jump of the censoring process + G.jump <- 1-predictRisk(object.censor, newdata = mydataIntegral, times = if(index.lastjumpC>1){c(0,time.jumpC[1:(index.lastjumpC-1)])}else{0}, + product.limit = product.limit, iid = (method.iid==2)*return.iid.nuisance, store = store[c("data","iid")]) - if(return.iid.nuisance && (method.iid==2)){ - out$store$iid.nuisance.censoring <- -attr(G.jump,"iid") - attr(G.jump,"iid") <- NULL - } - dLambda.jump <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC, type = "hazard", iid = (method.iid==2)*return.iid.nuisance, store = store) - if((method.iid==2)*return.iid.nuisance){ - out$store$iid.nuisance.martingale <- dLambda.jump$hazard.iid - } + if(return.iid.nuisance && (method.iid==2)){ + out$store$iid.nuisance.censoring <- -attr(G.jump,"iid") + attr(G.jump,"iid") <- NULL + } + dLambda.jump <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC, type = "hazard", iid = (method.iid==2)*return.iid.nuisance, store = store[c("data","iid")]) + if((method.iid==2)*return.iid.nuisance){ + out$store$iid.nuisance.martingale <- dLambda.jump$hazard.iid + } - ## *** evaluate martingal w.r.t. the censoring mechanism - ## ## version 1: slow - ## dN.jump <- do.call(cbind,lapply(time.jumpC, function(iJump){(mydataIntegral[[eventVar.time]] == iJump)*(mydataIntegral[[eventVar.status]] == level.censoring)})) - ## dM.jump <- dN.jump - dLambda.jump$hazard - - ## version 2: fast - dM.jump <- - dLambda.jump$hazard - indexTime.jumpCindiv <- match(mydataIntegral[[eventVar.time]], time.jumpC) ## index of the individual event times matching the jumps - index.jumpCindiv <- intersect(which(!is.na(indexTime.jumpCindiv)), which(mydataIntegral[[eventVar.status]] == level.censoring)) ## index of the individual jumping at the right time and having a censoring event - ## range(which(dN.jump!=0) - sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)) - dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)] <- 1 + dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)] - - ## *** evaluate integral - ## integral = \int_0^min(T_i,\tau) (F1(\tau|A_i,W_i)-F1(t|A_i,W_i)) / S(t|A_i,W_i) * dM_i^C(t)/Gc(t|A_i,W_i) - ## = F1(\tau|A_i,W_i) \int_0^min(T_i,\tauf) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i)) - \int_0^min(T_i,\tauf) F1(t|A_i,W_i) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i)) - - ## ## version 1 (matrix product and sums): does not scale well with sample size - ## integrand <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC) - ## integrand2 <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC) - ## index.beforeEvent.jumpC <- which(beforeEvent.jumpC[index.obsIntegral,,drop=FALSE]) ## identify which increment to put in the integral, i.e. when the individual was still at risk of being censored - ## integrand[index.beforeEvent.jumpC] <- dM.jump[index.beforeEvent.jumpC] / (G.jump[index.beforeEvent.jumpC] * S.jump[index.beforeEvent.jumpC]) - ## integrand2[index.beforeEvent.jumpC] <- F1.jump[index.beforeEvent.jumpC] * integrand[index.beforeEvent.jumpC] - ## integral <- rowCumSum(integrand) - ## integral2 <- rowCumSum(integrand2) - ## augTerm[index.obsIntegral,beforeTau.nJumpC!=0] <- F1.tau[,beforeTau.nJumpC!=0,drop=FALSE] * integral[,beforeTau.nJumpC.n0,drop=FALSE] - integral2[,beforeTau.nJumpC.n0,drop=FALSE] - - ## version 2 (loop): scale ok with sample size - for(iObs in 1:n.obsIntegral){ ## iObs <- which(index.obsIntegral[iObs]==428) - iMax.jumpC <- index.obsSINDEXjumpC.int[index.obsIntegral[iObs],n.times] - if(iMax.jumpC==0){next} ## case where method.iid = 2 - iIndex.tau <- pmin(beforeTau.nJumpC.n0, iMax.jumpC) - iIntegrand <- dM.jump[iObs,1:iMax.jumpC] / (G.jump[iObs,1:iMax.jumpC] * S.jump[iObs, 1:iMax.jumpC]) - augTerm[index.obsIntegral[iObs],beforeTau.nJumpC!=0] <- F1.tau[iObs,beforeTau.nJumpC!=0,drop=FALSE] * cumsum(iIntegrand)[iIndex.tau] - cumsum(F1.jump[iObs,1:iMax.jumpC] * iIntegrand)[iIndex.tau] + ## *** evaluate martingal w.r.t. the censoring mechanism + ## ## version 1: slow + ## dN.jump <- do.call(cbind,lapply(time.jumpC, function(iJump){(mydataIntegral[[eventVar.time]] == iJump)*(mydataIntegral[[eventVar.status]] == level.censoring)})) + ## dM.jump <- dN.jump - dLambda.jump$hazard + + ## version 2: fast + dM.jump <- - dLambda.jump$hazard + indexTime.jumpCindiv <- match(mydataIntegral[[eventVar.time]], time.jumpC) ## index of the individual event times matching the jumps + index.jumpCindiv <- intersect(which(!is.na(indexTime.jumpCindiv)), which(mydataIntegral[[eventVar.status]] == level.censoring)) ## index of the individual jumping at the right time and having a censoring event + ## range(which(dN.jump!=0) - sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)) + dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)] <- 1 + dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)] + + ## *** evaluate integral + ## integral = \int_0^min(T_i,\tau) (F1(\tau|A_i,W_i)-F1(t|A_i,W_i)) / S(t|A_i,W_i) * dM_i^C(t)/Gc(t|A_i,W_i) + ## = F1(\tau|A_i,W_i) \int_0^min(T_i,\tauf) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i)) - \int_0^min(T_i,\tauf) F1(t|A_i,W_i) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i)) + + ## ## version 1 (matrix product and sums): does not scale well with sample size + ## integrand <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC) + ## integrand2 <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC) + ## index.beforeEvent.jumpC <- which(beforeEvent.jumpC[index.obsIntegral,,drop=FALSE]) ## identify which increment to put in the integral, i.e. when the individual was still at risk of being censored + ## integrand[index.beforeEvent.jumpC] <- dM.jump[index.beforeEvent.jumpC] / (G.jump[index.beforeEvent.jumpC] * S.jump[index.beforeEvent.jumpC]) + ## integrand2[index.beforeEvent.jumpC] <- F1.jump[index.beforeEvent.jumpC] * integrand[index.beforeEvent.jumpC] + ## integral <- rowCumSum(integrand) + ## integral2 <- rowCumSum(integrand2) + ## augTerm[index.obsIntegral,beforeTau.nJumpC!=0] <- F1.tau[,beforeTau.nJumpC!=0,drop=FALSE] * integral[,beforeTau.nJumpC.n0,drop=FALSE] - integral2[,beforeTau.nJumpC.n0,drop=FALSE] + + ## version 2 (loop): scale ok with sample size + for(iObs in 1:n.obsIntegral){ ## iObs <- which(index.obsIntegral[iObs]==428) + iMax.jumpC <- index.obsSINDEXjumpC.int[index.obsIntegral[iObs],n.times] + if(iMax.jumpC==0){next} ## case where method.iid = 2 + iIndex.tau <- pmin(beforeTau.nJumpC.n0, iMax.jumpC) + iIntegrand <- dM.jump[iObs,1:iMax.jumpC] / (G.jump[iObs,1:iMax.jumpC] * S.jump[iObs, 1:iMax.jumpC]) + augTerm[index.obsIntegral[iObs],beforeTau.nJumpC!=0] <- F1.tau[iObs,beforeTau.nJumpC!=0,drop=FALSE] * cumsum(iIntegrand)[iIndex.tau] - cumsum(F1.jump[iObs,1:iMax.jumpC] * iIntegrand)[iIndex.tau] + } } - ## augTerm.GS <<- augTerm - augTerm.test <<- augTerm + + if(n.split>1 && verbose>1){close(pb)} + } ## ** Compute individual contribution to the ATE + influence function for the G-formula diff --git a/R/ate.R b/R/ate.R index 11fa4d19..879524a2 100644 --- a/R/ate.R +++ b/R/ate.R @@ -3,9 +3,9 @@ ## author: Thomas Alexander Gerds ## created: Oct 23 2016 (08:53) ## Version: -## last-updated: Oct 21 2024 (11:04) +## last-updated: Oct 21 2024 (17:18) ## By: Brice Ozenne -## Update #: 2543 +## Update #: 2566 #---------------------------------------------------------------------- ## ### Commentary: @@ -92,8 +92,20 @@ #' \item G-formula estimator: NULL #' \item IPTW or AIPTW estimator: NULL if no censoring and otherwise a survival model (e.g. Cox \code{"survival::coxph"}, Kaplan Meier \code{"prodlim::prodlim"}) #' } +#' #' \bold{estimator}: when using a IPCW logistic model (\code{"riskRegression::wglm"}), the integral term w.r.t. to the martingale of the censoring process is not computed for augmented estimator, -#' i.e. AIPTW,IPCW estimator instead AIPTW,AIPCW with the notation of Ozenne et al. 2020. +#' i.e. AIPTW,IPCW estimator instead AIPTW,AIPCW with the notation of Ozenne et al. 2020. \cr +#' +#' In presence of censoring, the computation time and memory usage for the evaluation of the AIPTW estimator and its uncertainty do not scale well with the number of observations (n) or the number of unique timepoints (T). +#' Point estimation involves n by T matrices and influence function n by T by n arrays. \itemize{ +#' \item for large datasets (e.g. n>5000), bootstrap is recommended as the memory need for the influence function may be prohibitive. +#' \item it is possible to decrease the memory usage for the point estimation by setting the (hidden) argument \code{store=c(size.split=1000)}. The integral term of the AIPTW estimator is then evaluated for 1000 observations at a time, i.e. involing matrices of size 1000 by T instead of n by T. This may lead to increased computation time. +#' \item reducing the number of unique timepoints (e.g. by rounding them) will lead to a less efficient but faster and less memory demanding estimation procedure. +#' } +#' +#' +#' +#' #' #' @references #' Brice Maxime Hugues Ozenne, Thomas Harder Scheike, Laila Staerk, and Thomas @@ -275,7 +287,7 @@ #' treatment = m.treatment, #' censor = m.censor, #' data = dt, times = 5:10, -#' cause = 1, band = TRUE) +#' cause = 1) #' summary(ateRobust) #' #' ## compare various estimators @@ -290,20 +302,28 @@ #' as.data.table(ateRobust2, type = "meanRisk") #' as.data.table(ateRobust2, type = "diffRisk") #' +#' ## reduce memory load by computing the integral term +#' ## on only 100 observations at a time (only relevant when iid = FALSE) +#' ateRobust3 <- ate(event = m.event, +#' treatment = m.treatment, +#' censor = m.censor, +#' data = dt, times = 5:10, +#' cause = 1, se = FALSE, +#' store = c("size.split" = 100), verbose = 2) +#' coef(ateRobust3) - coef(ateRobust) ## same +#' #' ## approximation to speed up calculations #' dt$time.round <- round(dt$time/0.5)*0.5 ## round to the nearest half #' dt$time.round[dt$time.round==0] <- 0.1 ## ensure strictly positive event times #' mRound.event <- CSC(Hist(time.round,event)~ X1+X2+X3+X5+X8,data=dt) #' mRound.censor <- coxph(Surv(time.round,event==0)~ X1+X2+X3+X5+X8,data=dt, x = TRUE, y = TRUE) -#' system.time( ## about 0.6s -#' ateRobust3 <- ate(event = mRound.event, -#' treatment = m.treatment, -#' censor = mRound.censor, -#' estimator = c("GFORMULA","IPTW","AIPTW"), -#' data = dt, times = c(5:10), -#' cause = 1, se = TRUE) +#' system.time( ## about 0.4s +#' ateRobustFast <- ate(event = mRound.event, treatment = m.treatment, +#' censor = mRound.censor, +#' estimator = c("GFORMULA","IPTW","AIPTW"), product.limit = FALSE, +#' data = dt, times = c(5:10), cause = 1, se = TRUE) #' ) -#' ateRobust2$meanRisk$estimate - ateRobust3$meanRisk$estimate ## inaccuracy +#' coef(ateRobustFast) - coef(ateRobust) ## inaccuracy #' } #' #' ################################### @@ -626,7 +646,8 @@ ate <- function(event, return.iid.nuisance = return.iid.nuisance, data.index = data.index, method.iid = method.iid, - store = store), + store = store, + verbose = verbose), dots[names(dots) %in% "store" == FALSE]) if (attr(estimator,"TD")){ args.pointEstimate <- c(args.pointEstimate,list(formula=formula)) @@ -679,8 +700,7 @@ ate <- function(event, B = B, seed = seed, mc.cores = mc.cores, - cl = cl, - verbose = verbose) + cl = cl) ## store out$boot <- list(t0 = estimate, @@ -893,7 +913,16 @@ ate_initArgs <- function(object.event, ## Not reliable due to re-ordering and reverse arguments ## unique(mydata[[censorVar.status]][model.censor$model.response[,"status"]==1]) }else{ - level.censoring <- unique(mydata[[censorVar.status]][model.censor$y[,2]==1]) + mydata.censor <- try(eval(model.censor$call$data), silent = TRUE) + if(!inherits(mydata.censor, "try-error") && (inherits(mydata.censor,"list") || inherits(mydata.censor,"data.frame"))){ + level.censoring <- unique(mydata.censor[[censorVar.status]][model.censor$y[,2]==1]) + }else if(is.numeric(mydata[[censorVar.status]])){ + level.censoring <- 0 + }else if(is.character(mydata[[censorVar.status]])){ + level.censoring <- sort(unique(mydata[[censorVar.status]]))[1] + }else if(is.factor(mydata[[censorVar.status]])){ + level.censoring <- levels(mydata[[censorVar.status]])[1] + } } }else{ ## G-formula or IPTW (no censoring) censorVar.status <- NA @@ -1170,31 +1199,39 @@ ate_initArgs <- function(object.event, ## ** store store.data <- NULL store.iid <- "full" + store.size.split <- NULL if(!is.null(store)){ if(length(store) > 2){ stop("Argument \'store\' should contain at most two elements. \n", "For instance store = c(data = \"full\", iid = \"full\") or store = c(data = \"minimal\", iid = \"minimal\").\n") } - if(is.null(names(store)) || any(names(store) %in% c("data","iid") == FALSE)){ - stop("Incorrect names for argument \'store\': should \"data\" and \"iid\". \n", + if(is.null(names(store)) || any(names(store) %in% c("data","iid", "size.split") == FALSE)){ + stop("Incorrect names for argument \'store\': should be \"data\", \"iid\", \"size.split\". \n", "For instance store = c(data = \"full\", iid = \"full\") or store = c(data = \"minimal\", iid = \"minimal\").\n") } if("data" %in% names(store) && !is.null(store[["data"]])){ if(store[["data"]] %in% c("minimal","full") == FALSE){ - stop("Element in argument \'store\' should take value \'minimal\' or \'full\'.\n", + stop("Element \"data\" in argument \'store\' should take value \'minimal\' or \'full\'.\n", "For instance store = c(data = \"full\") or store = c(data = \"minimal\").\n") } store.data <- store[["data"]] } if("iid" %in% names(store) && !is.null(store[["iid"]])){ if(store[["iid"]] %in% c("minimal","full") == FALSE){ - stop("Element in argument \'store\' should take value \'minimal\' or \'full\'.\n", + stop("Element \"iid\" in argument \'store\' should take value \'minimal\' or \'full\'.\n", "For instance store = c(iid = \"full\") or store = c(iid = \"minimal\").\n") } store.iid <- store[["iid"]] } + if("size.split" %in% names(store)){ + if(!is.numeric(store[["size.split"]]) || store[["size.split"]] < 0 || (store[["size.split"]] %% 1>0)){ + stop("Element \"size.split\" in argument \'store\' should be a positive integer.\n") + } + store.size.split <- store[["size.split"]] + } + } - store <- list(data = store.data, iid = store.iid) + store <- list(data = store.data, iid = store.iid, size.split = store.size.split) ## ** output return(list(object.event = model.event, diff --git a/R/autoplot.ate.R b/R/autoplot.ate.R index 305c3c01..8fc897b1 100644 --- a/R/autoplot.ate.R +++ b/R/autoplot.ate.R @@ -3,9 +3,9 @@ ## author: Brice Ozenne ## created: apr 28 2017 (14:19) ## Version: -## last-updated: okt 7 2021 (20:54) +## last-updated: Oct 21 2024 (17:37) ## By: Brice Ozenne -## Update #: 191 +## Update #: 192 #---------------------------------------------------------------------- ## ### Commentary: @@ -52,7 +52,6 @@ ## * autoplot.ate (examples) #' @examples #' library(survival) -#' library(rms) #' library(ggplot2) #' #' #### simulate data #### @@ -62,7 +61,7 @@ #' seqTimes <- c(0,sort(dtS$time[dtS$event==1]),max(dtS$time)) #' #' #### Cox model #### -#' fit <- cph(formula = Surv(time,event)~ X1+X2,data=dtS,y=TRUE,x=TRUE) +#' fit <- coxph(formula = Surv(time,event)~ X1+X2,data=dtS,y=TRUE,x=TRUE) #' #' #### plot.type = 1: for few timepoints #### #' ateFit <- ate(fit, data = dtS, treatment = "X1", diff --git a/man/ate.Rd b/man/ate.Rd index 28b43425..b0149f0b 100644 --- a/man/ate.Rd +++ b/man/ate.Rd @@ -127,8 +127,16 @@ as the influence function of the estimator will typically not be available. \item G-formula estimator: NULL \item IPTW or AIPTW estimator: NULL if no censoring and otherwise a survival model (e.g. Cox \code{"survival::coxph"}, Kaplan Meier \code{"prodlim::prodlim"}) } + \bold{estimator}: when using a IPCW logistic model (\code{"riskRegression::wglm"}), the integral term w.r.t. to the martingale of the censoring process is not computed for augmented estimator, -i.e. AIPTW,IPCW estimator instead AIPTW,AIPCW with the notation of Ozenne et al. 2020. +i.e. AIPTW,IPCW estimator instead AIPTW,AIPCW with the notation of Ozenne et al. 2020. \cr + +In presence of censoring, the computation time and memory usage for the evaluation of the AIPTW estimator and its uncertainty do not scale well with the number of observations (n) or the number of unique timepoints (T). +Point estimation involves n by T matrices and influence function n by T by n arrays. \itemize{ +\item for large datasets (e.g. n>5000), bootstrap is recommended as the memory need for the influence function may be prohibitive. +\item it is possible to decrease the memory usage for the point estimation by setting the (hidden) argument \code{store=c(size.split=1000)}. The integral term of the AIPTW estimator is then evaluated for 1000 observations at a time, i.e. involing matrices of size 1000 by T instead of n by T. This may lead to increased computation time. +\item reducing the number of unique timepoints (e.g. by rounding them) will lead to a less efficient but faster and less memory demanding estimation procedure. +} } \examples{ library(survival) @@ -296,7 +304,7 @@ ateRobust <- ate(event = m.event, treatment = m.treatment, censor = m.censor, data = dt, times = 5:10, - cause = 1, band = TRUE) + cause = 1) summary(ateRobust) ## compare various estimators @@ -311,20 +319,28 @@ ateRobust2 <- ate(event = m.event, as.data.table(ateRobust2, type = "meanRisk") as.data.table(ateRobust2, type = "diffRisk") +## reduce memory load by computing the integral term +## on only 100 observations at a time (only relevant when iid = FALSE) +ateRobust3 <- ate(event = m.event, + treatment = m.treatment, + censor = m.censor, + data = dt, times = 5:10, + cause = 1, se = FALSE, + store = c("size.split" = 100), verbose = 2) +coef(ateRobust3) - coef(ateRobust) ## same + ## approximation to speed up calculations dt$time.round <- round(dt$time/0.5)*0.5 ## round to the nearest half dt$time.round[dt$time.round==0] <- 0.1 ## ensure strictly positive event times mRound.event <- CSC(Hist(time.round,event)~ X1+X2+X3+X5+X8,data=dt) mRound.censor <- coxph(Surv(time.round,event==0)~ X1+X2+X3+X5+X8,data=dt, x = TRUE, y = TRUE) -system.time( ## about 0.6s -ateRobust3 <- ate(event = mRound.event, - treatment = m.treatment, - censor = mRound.censor, - estimator = c("GFORMULA","IPTW","AIPTW"), - data = dt, times = c(5:10), - cause = 1, se = TRUE) +system.time( ## about 0.4s +ateRobustFast <- ate(event = mRound.event, treatment = m.treatment, + censor = mRound.censor, + estimator = c("GFORMULA","IPTW","AIPTW"), product.limit = FALSE, + data = dt, times = c(5:10), cause = 1, se = TRUE) ) -ateRobust2$meanRisk$estimate - ateRobust3$meanRisk$estimate ## inaccuracy +coef(ateRobustFast) - coef(ateRobust) ## inaccuracy } ###################################