Skip to content

Commit

Permalink
... faster robust ate when no censoring at the beginning ...
Browse files Browse the repository at this point in the history
  • Loading branch information
bozenne committed Oct 21, 2024
1 parent ac51af6 commit 83e0aea
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 46 deletions.
65 changes: 36 additions & 29 deletions R/ate-iid.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
## Author: Brice Ozenne
## Created: apr 5 2018 (17:01)
## Version:
## Last-Updated: Oct 21 2024 (11:56)
## Last-Updated: Oct 21 2024 (15:20)
## By: Brice Ozenne
## Update #: 1361
## Update #: 1396
##----------------------------------------------------------------------
##
### Commentary:
Expand Down Expand Up @@ -40,6 +40,8 @@ iidATE <- function(estimator,
G.jump,
dM.jump,
index.obsSINDEXjumpC,
index.obsSINDEXjumpC.int,
index.obsIntegral,
eventVar.time,
time.jumpC,
beforeEvent.jumpC,
Expand Down Expand Up @@ -67,11 +69,12 @@ iidATE <- function(estimator,
iSG <- matrix(0, nrow = NROW(SG), ncol = NCOL(SG))
iSGG <- matrix(0, nrow = NROW(SG), ncol = NCOL(SG))
iSSG <- matrix(0, nrow = NROW(SG), ncol = NCOL(SG))
index.beforeEvent.jumpC <- which(beforeEvent.jumpC)
if(length(index.beforeEvent.jumpC)>0){
iSG[index.beforeEvent.jumpC] <- 1/SG[index.beforeEvent.jumpC]
iSGG[index.beforeEvent.jumpC] <- 1/SGG[index.beforeEvent.jumpC]
iSSG[index.beforeEvent.jumpC] <- 1/SSG[index.beforeEvent.jumpC]

beforeEventIntegral.jumpC <- beforeEvent.jumpC[index.obsIntegral,,drop=FALSE]
if(any(beforeEventIntegral.jumpC>0)){
iSG[beforeEventIntegral.jumpC] <- 1/SG[beforeEventIntegral.jumpC]
iSGG[beforeEventIntegral.jumpC] <- 1/SGG[beforeEventIntegral.jumpC]
iSSG[beforeEventIntegral.jumpC] <- 1/SSG[beforeEventIntegral.jumpC]
}

dM_SG <- dM.jump * iSG
Expand Down Expand Up @@ -166,6 +169,9 @@ iidATE <- function(estimator,
## *** augmentation censoring term
if(attr(estimator,"integral")){

mydataIntegral <- mydata[index.obsIntegral,,drop=FALSE]
n.obsIntegral <- length(index.obsIntegral)

## **** outcome term
## at tau

Expand All @@ -175,34 +181,35 @@ iidATE <- function(estimator,
## extract integral over the right time spand
factor <- TRUE
attr(factor,"factor") <- lapply(1:n.contrasts, function(iC){
matrix(NA, nrow = n.obs, ncol = n.times)
matrix(NA, nrow = n.obsIntegral, ncol = n.times)
})

for(iTau in 1:n.times){ ## iTau <- 1
index.col <- prodlim::sindex(jump.times = c(0,time.jumpC), eval.times = pmin(mydata[[eventVar.time]],times[iTau]))
index.col <- prodlim::sindex(jump.times = c(0,time.jumpC), eval.times = pmin(mydataIntegral[[eventVar.time]],times[iTau]))
for(iC in 1:n.contrasts){
attr(factor,"factor")[[iC]][,iTau] <- cbind(iW.IPTW[,iC] * int.IFF1_tau[(1:n.obs) + (index.col-1) * n.obs])
attr(factor,"factor")[[iC]][,iTau] <- cbind(iW.IPTW[index.obsIntegral,iC] * int.IFF1_tau[(1:n.obsIntegral) + (index.col-1) * n.obsIntegral])
}
}
attr(factor,"factor")[[1]]
## setdiff(1:n.obs,index.obsIntegral) ## 26 30 372
## attr(factor,"factor")[[iC]][c(26,30,372),]

term.intF1_tau <- attr(predictRisk(object.event, newdata = mydata, times = times, cause = cause,
term.intF1_tau <- attr(predictRisk(object.event, newdata = mydataIntegral, times = times, cause = cause,
average.iid = factor, product.limit = product.limit, store = store),"average.iid")

for(iC in 1:n.contrasts){ ## iC <- 1
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + term.intF1_tau[[iC]]
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + term.intF1_tau[[iC]]*n.obsIntegral/n.obs
}

## ## at t
factor <- TRUE
attr(factor, "factor") <- lapply(1:n.contrasts, function(iC){
-colMultiply_cpp(dM_SG*beforeEvent.jumpC, scale = iW.IPTW[,iC])
attr(factor, "factor") <- lapply(1:n.contrasts, function(iC){ ## iC <- 2
-colMultiply_cpp(dM_SG*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC])
})

integrand.F1t <- attr(predictRisk(object.event, newdata = mydata, times = time.jumpC, cause = cause,
integrand.F1t <- attr(predictRisk(object.event, newdata = mydataIntegral, times = time.jumpC, cause = cause,
average.iid = factor, product.limit = product.limit, store = store), "average.iid")

for(iC in 1:n.contrasts){ ## iC <- 1
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + subsetIndex(rowCumSum(integrand.F1t[[iC]]),
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + subsetIndex(rowCumSum(integrand.F1t[[iC]])*n.obsIntegral/n.obs,
index = beforeTau.nJumpC,
default = 0, col = TRUE)
}
Expand All @@ -212,15 +219,15 @@ iidATE <- function(estimator,
## **** treatment term
for(iC in 1:n.contrasts){ ## iC <- 1
factor <- TRUE
attr(factor,"factor") <- list(AIPTW = colMultiply_cpp(augTerm, scale = -iW.IPTW2[,iC]))
attr(factor,"factor") <- list(AIPTW = colMultiply_cpp(augTerm[index.obsIntegral,,drop=FALSE], scale = -iW.IPTW2[index.obsIntegral,iC]))

term.treatment <- attr(predictRisk(object.treatment,
newdata = mydata,
newdata = mydataIntegral,
average.iid = factor,
level = contrasts[iC]), "average.iid")

## print(colSums(term.treatment[["AIPTW"]]^2))
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + term.treatment[["AIPTW"]]
iid.AIPTW[[iC]] <- iid.AIPTW[[iC]] + term.treatment[["AIPTW"]]*n.obsIntegral/n.obs
}
## cat("Augmentation treatment (method=1) \n")
## print(sapply(lapply(iid.AIPTW,abs),colSums))
Expand All @@ -230,16 +237,16 @@ iidATE <- function(estimator,
attr(factor,"factor") <- lapply(1:n.grid, function(iGrid){ ## iGrid <- 1
iTau <- grid[iGrid,"tau"]
iC <- grid[iGrid,"contrast"]
return(-colMultiply_cpp(ls.F1tau_F1t_dM_SSG[[iTau]]*beforeEvent.jumpC, scale = iW.IPTW[,iC]))
return(-colMultiply_cpp(ls.F1tau_F1t_dM_SSG[[iTau]]*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC]))
})
integrand.St <- attr(predictRisk(object.event, type = "survival", newdata = mydata, times = time.jumpC-tol, cause = cause,
integrand.St <- attr(predictRisk(object.event, type = "survival", newdata = mydataIntegral, times = time.jumpC-tol, cause = cause,
average.iid = factor, product.limit = product.limit, store = store), "average.iid")
for(iGrid in 1:n.grid){ ## iGrid <- 1
iTau <- grid[iGrid,"tau"]
iC <- grid[iGrid,"contrast"]

if(beforeTau.nJumpC[iTau]>0){
iid.AIPTW[[iC]][,iTau] <- iid.AIPTW[[iC]][,iTau] + rowSums(integrand.St[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE])
iid.AIPTW[[iC]][,iTau] <- iid.AIPTW[[iC]][,iTau] + rowSums(integrand.St[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE])*n.obsIntegral/n.obs
}
}
## cat("Augmentation survival (method=1) \n")
Expand All @@ -251,27 +258,27 @@ iidATE <- function(estimator,
attr(factor,"factor") <- lapply(1:n.grid, function(iGrid){ ## iGrid <- 1
iTau <- grid[iGrid,"tau"]
iC <- grid[iGrid,"contrast"]
return(-colMultiply_cpp(ls.F1tau_F1t_dM_SGG[[iTau]]*beforeEvent.jumpC, scale = iW.IPTW[,iC]))
return(-colMultiply_cpp(ls.F1tau_F1t_dM_SGG[[iTau]]*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC]))
})

integrand.G1 <- predictCox(object.censor, newdata = mydata, times = time.jumpC - tol,
integrand.G1 <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC - tol,
average.iid = factor, store = store)$survival.average.iid

## ## integral censoring martingale
factor <- TRUE
attr(factor,"factor") <- lapply(1:n.grid, function(iGrid){ ## iGrid <- 1
iTau <- grid[iGrid,"tau"]
iC <- grid[iGrid,"contrast"]
return(-colMultiply_cpp(ls.F1tau_F1t_SG[[iTau]]*beforeEvent.jumpC, scale = iW.IPTW[,iC]))
return(-colMultiply_cpp(ls.F1tau_F1t_SG[[iTau]]*beforeEventIntegral.jumpC, scale = iW.IPTW[index.obsIntegral,iC]))
})
integrand.G2 <- predictCox(object.censor, newdata = mydata, times = time.jumpC, type = "hazard",
integrand.G2 <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC, type = "hazard",
average.iid = factor, store = store)$hazard.average.iid

for(iGrid in 1:n.grid){ ## iGrid <- 1
iTau <- grid[iGrid,"tau"]
iC <- grid[iGrid,"contrast"]
if(beforeTau.nJumpC[iTau]>0){
iid.AIPTW[[iC]][,iTau] <- iid.AIPTW[[iC]][,iTau] + rowSums(integrand.G1[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE] + integrand.G2[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE])
iid.AIPTW[[iC]][,iTau] <- iid.AIPTW[[iC]][,iTau] + rowSums(integrand.G1[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE] + integrand.G2[[iGrid]][,1:beforeTau.nJumpC[iTau],drop=FALSE])*n.obsIntegral/n.obs
}
}
} ## end attr(estimator,"integral")
Expand Down
48 changes: 31 additions & 17 deletions R/ate-pointEstimate.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
## Author: Brice Ozenne
## Created: jun 27 2019 (10:43)
## Version:
## Last-Updated: Oct 21 2024 (13:25)
## Last-Updated: Oct 21 2024 (15:25)
## By: Brice Ozenne
## Update #: 1048
## Update #: 1098
##----------------------------------------------------------------------
##
### Commentary:
Expand Down Expand Up @@ -224,9 +224,19 @@ ATE_TI <- function(object.event,

augTerm <- matrix(0, nrow = n.obs, ncol = n.times)

## exclude individuals with event before the first censoring as they do not contribute to the integral term
if(method.iid==2){ ## slow but simple i.e. no subset
index.obsIntegral <- 1:n.obs
}else{
## index.obsIntegral <- 1:n.obs
index.obsIntegral <- which(index.obsSINDEXjumpC.int[,n.times]>0) ## WARNING do not use index.obsSINDEXjumpC as it is evaluated just before the jump (i.e. time-tol)
}
mydataIntegral <- mydata[index.obsIntegral,,drop=FALSE]
n.obsIntegral <- NROW(mydataIntegral)

## *** evaluate survival functions (event, censoring)
## absolute risk at event times and jump times of the censoring process
predTempo <- predictRisk(object.event, newdata = mydata, times = c(times, time.jumpC), cause = cause, product.limit = product.limit,
predTempo <- predictRisk(object.event, newdata = mydataIntegral, times = c(times, time.jumpC), cause = cause, product.limit = product.limit,
iid = (method.iid==2)*return.iid.nuisance, store = store)
F1.tau <- predTempo[,1:n.times,drop=FALSE]
F1.jump <- predTempo[,n.times + (1:index.lastjumpC),drop=FALSE]
Expand All @@ -235,7 +245,7 @@ ATE_TI <- function(object.event,
}

## survival at all jump of the censoring process
S.jump <- predictRisk(object.event, type = "survival", newdata = mydata, times = time.jumpC-tol, product.limit = product.limit,
S.jump <- predictRisk(object.event, type = "survival", newdata = mydataIntegral, times = time.jumpC-tol, product.limit = product.limit,
iid = (method.iid==2)*return.iid.nuisance, store = store)
if((method.iid==2)*return.iid.nuisance){
out$store$iid.nuisance.survival <- attr(S.jump,"iid")
Expand All @@ -244,14 +254,14 @@ ATE_TI <- function(object.event,

## martingale for the censoring process
## at all times of jump of the censoring process
G.jump <- 1-predictRisk(object.censor, newdata = mydata, times = if(index.lastjumpC>1){c(0,time.jumpC[1:(index.lastjumpC-1)])}else{0},
G.jump <- 1-predictRisk(object.censor, newdata = mydataIntegral, times = if(index.lastjumpC>1){c(0,time.jumpC[1:(index.lastjumpC-1)])}else{0},
product.limit = product.limit, iid = (method.iid==2)*return.iid.nuisance, store = store)

if(return.iid.nuisance && (method.iid==2)){
out$store$iid.nuisance.censoring <- -attr(G.jump,"iid")
attr(G.jump,"iid") <- NULL
}
dLambda.jump <- predictCox(object.censor, newdata = mydata, times = time.jumpC, type = "hazard", iid = (method.iid==2)*return.iid.nuisance, store = store)
dLambda.jump <- predictCox(object.censor, newdata = mydataIntegral, times = time.jumpC, type = "hazard", iid = (method.iid==2)*return.iid.nuisance, store = store)
if((method.iid==2)*return.iid.nuisance){
out$store$iid.nuisance.martingale <- dLambda.jump$hazard.iid
}
Expand All @@ -263,32 +273,35 @@ ATE_TI <- function(object.event,

## version 2: fast
dM.jump <- - dLambda.jump$hazard
indexTime.jumpCindiv <- match(mydata[[eventVar.time]], time.jumpC) ## index of the individual event times matching the jumps
index.jumpCindiv <- intersect(which(!is.na(indexTime.jumpCindiv)), which(mydata[[eventVar.status]] == level.censoring)) ## index of the individual jumping at the right time and having a censoring event
indexTime.jumpCindiv <- match(mydataIntegral[[eventVar.time]], time.jumpC) ## index of the individual event times matching the jumps
index.jumpCindiv <- intersect(which(!is.na(indexTime.jumpCindiv)), which(mydataIntegral[[eventVar.status]] == level.censoring)) ## index of the individual jumping at the right time and having a censoring event
## range(which(dN.jump!=0) - sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral))
dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obs)] <- 1 + dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obs)]
dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)] <- 1 + dM.jump[sort(index.jumpCindiv + (indexTime.jumpCindiv[index.jumpCindiv]-1)*n.obsIntegral)]

## *** evaluate integral
## integral = \int_0^min(T_i,\tau) (F1(\tau|A_i,W_i)-F1(t|A_i,W_i)) / S(t|A_i,W_i) * dM_i^C(t)/Gc(t|A_i,W_i)
## = F1(\tau|A_i,W_i) \int_0^min(T_i,\tauf) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i)) - \int_0^min(T_i,\tauf) F1(t|A_i,W_i) dM_i^C(t) / (S(t|A_i,W_i) * Gc(t|A_i,W_i))

## ## version 1 (matrix product and sums): does not scale well with sample size
## integrand <- matrix(0, nrow = n.obs, ncol = index.lastjumpC)
## integrand2 <- matrix(0, nrow = n.obs, ncol = index.lastjumpC)
## index.beforeEvent.jumpC <- which(beforeEvent.jumpC) ## identify which increment to put in the integral, i.e. when the individual was still at risk of being censored
## integrand <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC)
## integrand2 <- matrix(0, nrow = n.obsIntegral, ncol = index.lastjumpC)
## index.beforeEvent.jumpC <- which(beforeEvent.jumpC[index.obsIntegral,,drop=FALSE]) ## identify which increment to put in the integral, i.e. when the individual was still at risk of being censored
## integrand[index.beforeEvent.jumpC] <- dM.jump[index.beforeEvent.jumpC] / (G.jump[index.beforeEvent.jumpC] * S.jump[index.beforeEvent.jumpC])
## integrand2[index.beforeEvent.jumpC] <- F1.jump[index.beforeEvent.jumpC] * integrand[index.beforeEvent.jumpC]
## integral <- rowCumSum(integrand)
## integral2 <- rowCumSum(integrand2)
## augTerm[,beforeTau.nJumpC!=0] <- F1.tau[,beforeTau.nJumpC!=0,drop=FALSE] * integral[,beforeTau.nJumpC.n0,drop=FALSE] - integral2[,beforeTau.nJumpC.n0,drop=FALSE]
## augTerm[index.obsIntegral,beforeTau.nJumpC!=0] <- F1.tau[,beforeTau.nJumpC!=0,drop=FALSE] * integral[,beforeTau.nJumpC.n0,drop=FALSE] - integral2[,beforeTau.nJumpC.n0,drop=FALSE]

## version 1 (loop): scale ok with sample size
for(iObs in which(index.obsSINDEXjumpC.int[,n.times]>0)){ ## iObs <- 1
iMax.jumpC <- index.obsSINDEXjumpC.int[iObs,n.times]
## version 2 (loop): scale ok with sample size
for(iObs in 1:n.obsIntegral){ ## iObs <- which(index.obsIntegral[iObs]==428)
iMax.jumpC <- index.obsSINDEXjumpC.int[index.obsIntegral[iObs],n.times]
if(iMax.jumpC==0){next} ## case where method.iid = 2
iIndex.tau <- pmin(beforeTau.nJumpC.n0, iMax.jumpC)
iIntegrand <- dM.jump[iObs,1:iMax.jumpC] / (G.jump[iObs,1:iMax.jumpC] * S.jump[iObs, 1:iMax.jumpC])
augTerm[iObs,beforeTau.nJumpC!=0] <- F1.tau[iObs,beforeTau.nJumpC!=0,drop=FALSE] * cumsum(iIntegrand)[iIndex.tau] - cumsum(F1.jump[iObs,1:iMax.jumpC] * iIntegrand)[iIndex.tau]
augTerm[index.obsIntegral[iObs],beforeTau.nJumpC!=0] <- F1.tau[iObs,beforeTau.nJumpC!=0,drop=FALSE] * cumsum(iIntegrand)[iIndex.tau] - cumsum(F1.jump[iObs,1:iMax.jumpC] * iIntegrand)[iIndex.tau]
}
## augTerm.GS <<- augTerm
augTerm.test <<- augTerm
}

## ** Compute individual contribution to the ATE + influence function for the G-formula
Expand Down Expand Up @@ -374,6 +387,7 @@ ATE_TI <- function(object.event,
out$store$beforeEvent.jumpC <- beforeEvent.jumpC
out$store$beforeTau.nJumpC <- beforeTau.nJumpC
out$store$index.obsSINDEXjumpC.int <- index.obsSINDEXjumpC.int
out$store$index.obsIntegral <- index.obsIntegral
}
}

Expand Down

0 comments on commit 83e0aea

Please sign in to comment.