Skip to content

Commit

Permalink
Close iterators in reduce-based collectors
Browse files Browse the repository at this point in the history
And fix bug that caused cleaning up twice in generators
  • Loading branch information
lionel- committed Nov 3, 2024
1 parent ca2b314 commit cd47d65
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 16 deletions.
44 changes: 29 additions & 15 deletions R/generator.R
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ generator0 <- function(fn, type = "generator") {
base::local(envir = `_private`, {
generator_env <- environment()$generator_env
caller_env <- environment()$caller_env

# Prevent lints about unknown bindings
exits <- NULL
exited <- NULL
cleanup <- NULL
close_active_iterators <- NULL

info <- machine_info(type, env = caller_env)

Expand All @@ -149,8 +154,10 @@ generator0 <- function(fn, type = "generator") {
env <- new_generator_env(env, info)
user_env <- env$user_env

# The compiler caches function bodies, so inline a weak
# reference to avoid leaks (#36)
# The compiler caches function bodies, so inline a weak reference to avoid
# leaks (#36). This weak reference is injected inside the body of the
# generator instance to work around a scoping issue. See where we install
# the user's exit handlers.
weak_env <- new_weakref(env)

# Forward arguments inside the user space of the state machine
Expand All @@ -171,6 +178,16 @@ generator0 <- function(fn, type = "generator") {
}
}

env$close_active_iterators <- close_active_iterators

env$cleanup <- function() {
env$close_active_iterators()

# Prevent user exit handlers from running again
env$exits <- NULL
}


# Create the generator instance. This is a function that resumes
# a state machine.
instance <- inject(function(arg, close = FALSE) {
Expand All @@ -181,12 +198,12 @@ generator0 <- function(fn, type = "generator") {
if (!undebugged && (debugged || is_true(peek_option("coro_debug")))) {
env_browse(user_env)

on.exit({
defer({
# `f` was pressed, disable debugging for this generator
if (!env_is_browsed(user_env)) {
undebugged <<- TRUE
}
}, add = TRUE)
})
}

if (is_true(env$exhausted)) {
Expand Down Expand Up @@ -236,20 +253,15 @@ generator0 <- function(fn, type = "generator") {
# expressions. Then evaluate state machine in its private
# environment.
env$jumped <- TRUE
out <- evalq(envir = user_env,
env$exited <- TRUE

out <- evalq(envir = user_env, {
base::evalq(envir = rlang::wref_key(!!weak_env), {
env_poke_exits(
user_env,
c(
# Thunk scoped in this environment
!!list(call2(function() close_active_iterators())),
# User expressions scoped in the user environment
exits
)
)
defer(if (exited) cleanup())
env_poke_exits(user_env, exits)
!!state_machine
})
)
})
env$jumped <- FALSE

out
Expand Down Expand Up @@ -281,6 +293,7 @@ new_generator_env <- function(parent, info) {
env$iterators <- list()
env$handlers <- list()
env$exits <- NULL
env$exited <- TRUE
env$.last_value <- NULL

with(env, {
Expand All @@ -292,6 +305,7 @@ new_generator_env <- function(parent, info) {
}

suspend <- function() {
exited <<- FALSE
exits <<- env_poke_exits(user_env, NULL)
}
})
Expand Down
3 changes: 3 additions & 0 deletions R/step-reduce.R
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ iter_reduce_impl <- function(.x, .f, ..., .init, .left = TRUE) {
abort("Can't right-reduce with an iterator.")
}

# TODO: How do we close transducers?
defer(iter_close(.x))

.f <- as_function(.f)

out <- NULL
Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test-async.R
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ test_that("async function returns invisibly (#46)", {

test_that("async functions do not cause CMD check notes (#40)", {
skip_on_cran()
invisible(compiler::cmpfun(
async(function() NULL),
options = list(suppressAll = FALSE)
))
expect_silent(
invisible(compiler::cmpfun(
async(function() NULL),
Expand Down
4 changes: 3 additions & 1 deletion tests/testthat/test-generator.R
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,12 @@ test_that("disabled generators only clean up once", {
called <- NULL
g <- coro::generator(function() {
on.exit(called <<- c(called, TRUE))
stop("foo")
yield(1)
stop("foo")
})()

expect_equal(g(), 1)

expect_error(g(), "foo")
expect_equal(called, TRUE)

Expand Down

0 comments on commit cd47d65

Please sign in to comment.