Skip to content

Commit

Permalink
Update to dbplyr interface 2e (#556)
Browse files Browse the repository at this point in the history
Fixes #508
  • Loading branch information
hadley authored Nov 6, 2023
1 parent d901bea commit a229f2c
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 62 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Collate:
'dbi-result.R'
'dplyr.R'
'gs-object.R'
'import-standalone-s3-register.R'
'old-auth.R'
'old-dataset.R'
'old-id.R'
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# bigrquery (development version)

* Now uses 2nd edition of dbplyr interface (#508).

* Compatible with dbplyr 2.4.0 (#550).

* `con |> tbl(sql("..."))` now works robustly once more (#540). (No more
Expand Down
54 changes: 28 additions & 26 deletions R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
#' arrange(desc(n))
#' }
src_bigquery <- function(project, dataset, billing = project, max_pages = 10) {
if (!requireNamespace("dplyr", quietly = TRUE)) {
stop("dplyr is required to use src_bigquery", call. = FALSE)
}
check_installed("dbplyr")

con <- DBI::dbConnect(
bigquery(),
Expand All @@ -44,27 +42,26 @@ src_bigquery <- function(project, dataset, billing = project, max_pages = 10) {
}

# registered onLoad
db_query_fields.BigQueryConnection <- function(con, sql) {
if (dbplyr::is.sql(sql)) {
ds <- bq_dataset(con@project, con@dataset)
fields <- bq_query_fields(sql, con@billing, default_dataset = ds)
} else {
tb <- as_bq_table(con, sql)
fields <- bq_table_fields(tb)
}

vapply(fields, "[[", "name", FUN.VALUE = character(1))
}
dbplyr_edition.BigQueryConnection <- function(con) 2L

# registered onLoad
db_save_query.BigQueryConnection <- function(con, sql, name, temporary = TRUE, ...) {
db_compute.BigQueryConnection <- function(con,
table,
sql,
...,
overwrite = FALSE,
temporary = TRUE,
unique_indexes = list(),
indexes = list(),
analyze = TRUE,
in_transaction = FALSE) {

if (is.null(con@dataset)) {
destination_table <- if (!temporary) as_bq_table(con, name)
destination_table <- if (!temporary) as_bq_table(con, table)
tb <- bq_project_query(con@project, sql, destination_table = destination_table)
} else {
ds <- bq_dataset(con@project, con@dataset)
destination_table <- if (!temporary) as_bq_table(con, name)
destination_table <- if (!temporary) as_bq_table(con, table)

tb <- bq_dataset_query(ds,
query = sql,
Expand All @@ -73,18 +70,22 @@ db_save_query.BigQueryConnection <- function(con, sql, name, temporary = TRUE, .
}

paste0(tb$project, ".", tb$dataset, ".", tb$table)
}

# registered onLoad
db_analyze.BigQueryConnection <- function(con, table, ...) {
TRUE
table
}

# registered onLoad
db_copy_to.BigQueryConnection <- function(con, table, values,
overwrite = FALSE, types = NULL, temporary = TRUE,
unique_indexes = NULL, indexes = NULL,
analyze = TRUE, ...) {
db_copy_to.BigQueryConnection <- function(con,
table,
values,
...,
overwrite = FALSE,
types = NULL,
temporary = TRUE,
unique_indexes = NULL,
indexes = NULL,
analyze = TRUE,
in_transaction = TRUE) {

if (temporary) {
abort("BigQuery does not support temporary tables")
Expand All @@ -105,6 +106,7 @@ collect.tbl_BigQueryConnection <- function(x, ...,
max_connections = 6L,
n = Inf,
warn_incomplete = TRUE) {

assert_that(length(n) == 1, n > 0L)
con <- dbplyr::remote_con(x)

Expand Down Expand Up @@ -211,7 +213,7 @@ sql_join_suffix.BigQueryConnection <- function(con, ...) {
}

# registered onLoad
sql_translate_env.BigQueryConnection <- function(x) {
sql_translation.BigQueryConnection <- function(x) {
dbplyr::sql_variant(
dbplyr::sql_translator(.parent = dbplyr::base_scalar,

Expand Down
187 changes: 187 additions & 0 deletions R/import-standalone-s3-register.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Standalone file: do not edit by hand
# Source: <https://github.com/r-lib/rlang/blob/main/R/standalone-s3-register.R>
# ----------------------------------------------------------------------
#
# ---
# repo: r-lib/rlang
# file: standalone-s3-register.R
# last-updated: 2022-08-29
# license: https://unlicense.org
# ---
#
# nocov start

#' Register a method for a suggested dependency
#'
#' Generally, the recommended way to register an S3 method is to use the
#' `S3Method()` namespace directive (often generated automatically by the
#' `@export` roxygen2 tag). However, this technique requires that the generic
#' be in an imported package, and sometimes you want to suggest a package,
#' and only provide a method when that package is loaded. `s3_register()`
#' can be called from your package's `.onLoad()` to dynamically register
#' a method only if the generic's package is loaded.
#'
#' For R 3.5.0 and later, `s3_register()` is also useful when demonstrating
#' class creation in a vignette, since method lookup no longer always involves
#' the lexical scope. For R 3.6.0 and later, you can achieve a similar effect
#' by using "delayed method registration", i.e. placing the following in your
#' `NAMESPACE` file:
#'
#' ```
#' if (getRversion() >= "3.6.0") {
#' S3method(package::generic, class)
#' }
#' ```
#'
#' @section Usage in other packages:
#' To avoid taking a dependency on vctrs, you copy the source of
#' [`s3_register()`](https://github.com/r-lib/rlang/blob/main/R/standalone-s3-register.R)
#' into your own package. It is licensed under the permissive
#' [unlicense](https://choosealicense.com/licenses/unlicense/) to make it
#' crystal clear that we're happy for you to do this. There's no need to include
#' the license or even credit us when using this function.
#'
#' @param generic Name of the generic in the form `"pkg::generic"`.
#' @param class Name of the class
#' @param method Optionally, the implementation of the method. By default,
#' this will be found by looking for a function called `generic.class`
#' in the package environment.
#' @examples
#' # A typical use case is to dynamically register tibble/pillar methods
#' # for your class. That way you avoid creating a hard dependency on packages
#' # that are not essential, while still providing finer control over
#' # printing when they are used.
#'
#' .onLoad <- function(...) {
#' s3_register("pillar::pillar_shaft", "vctrs_vctr")
#' s3_register("tibble::type_sum", "vctrs_vctr")
#' }
#' @keywords internal
#' @noRd
s3_register <- function(generic, class, method = NULL) {
stopifnot(is.character(generic), length(generic) == 1)
stopifnot(is.character(class), length(class) == 1)

pieces <- strsplit(generic, "::")[[1]]
stopifnot(length(pieces) == 2)
package <- pieces[[1]]
generic <- pieces[[2]]

caller <- parent.frame()

get_method_env <- function() {
top <- topenv(caller)
if (isNamespace(top)) {
asNamespace(environmentName(top))
} else {
caller
}
}
get_method <- function(method) {
if (is.null(method)) {
get(paste0(generic, ".", class), envir = get_method_env())
} else {
method
}
}

register <- function(...) {
envir <- asNamespace(package)

# Refresh the method each time, it might have been updated by
# `devtools::load_all()`
method_fn <- get_method(method)
stopifnot(is.function(method_fn))


# Only register if generic can be accessed
if (exists(generic, envir)) {
registerS3method(generic, class, method_fn, envir = envir)
} else if (identical(Sys.getenv("NOT_CRAN"), "true")) {
warn <- .rlang_s3_register_compat("warn")

warn(c(
sprintf(
"Can't find generic `%s` in package %s to register S3 method.",
generic,
package
),
"i" = "This message is only shown to developers using devtools.",
"i" = sprintf("Do you need to update %s to the latest version?", package)
))
}
}

# Always register hook in case package is later unloaded & reloaded
setHook(packageEvent(package, "onLoad"), function(...) {
register()
})

# For compatibility with R < 4.1.0 where base isn't locked
is_sealed <- function(pkg) {
identical(pkg, "base") || environmentIsLocked(asNamespace(pkg))
}

# Avoid registration failures during loading (pkgload or regular).
# Check that environment is locked because the registering package
# might be a dependency of the package that exports the generic. In
# that case, the exports (and the generic) might not be populated
# yet (#1225).
if (isNamespaceLoaded(package) && is_sealed(package)) {
register()
}

invisible()
}

.rlang_s3_register_compat <- function(fn, try_rlang = TRUE) {
# Compats that behave the same independently of rlang's presence
out <- switch(
fn,
is_installed = return(function(pkg) requireNamespace(pkg, quietly = TRUE))
)

# Only use rlang if it is fully loaded (#1482)
if (try_rlang &&
requireNamespace("rlang", quietly = TRUE) &&
environmentIsLocked(asNamespace("rlang"))) {
switch(
fn,
is_interactive = return(rlang::is_interactive)
)

# Make sure rlang knows about "x" and "i" bullets
if (utils::packageVersion("rlang") >= "0.4.2") {
switch(
fn,
abort = return(rlang::abort),
warn = return((rlang::warn)),
inform = return(rlang::inform)
)
}
}

# Fall back to base compats

is_interactive_compat <- function() {
opt <- getOption("rlang_interactive")
if (!is.null(opt)) {
opt
} else {
interactive()
}
}

format_msg <- function(x) paste(x, collapse = "\n")
switch(
fn,
is_interactive = return(is_interactive_compat),
abort = return(function(msg) stop(format_msg(msg), call. = FALSE)),
warn = return(function(msg) warning(format_msg(msg), call. = FALSE)),
inform = return(function(msg) message(format_msg(msg)))
)

stop(sprintf("Internal error in rlang shims: Unknown function `%s()`.", fn))
}

# nocov end
39 changes: 6 additions & 33 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
)

# S3 methods --------------------------------------------------------------
register_s3_method("dplyr", "collect", "tbl_BigQueryConnection")
register_s3_method("dplyr", "db_analyze", "BigQueryConnection")
register_s3_method("dplyr", "db_query_fields", "BigQueryConnection")
register_s3_method("dplyr", "db_save_query", "BigQueryConnection")
register_s3_method("dplyr", "sql_translate_env", "BigQueryConnection")
register_s3_method("dbplyr", "db_copy_to", "BigQueryConnection")
s3_register("dplyr::collect", "tbl_BigQueryConnection")

if (is_installed("dbplyr") && utils::packageVersion("dbplyr") > "1.99") {
register_s3_method("dbplyr", "sql_join_suffix", "BigQueryConnection")
}
s3_register("dbplyr::dbplyr_edition", "BigQueryConnection")
s3_register("dbplyr::db_compute", "BigQueryConnection")
s3_register("dbplyr::db_copy_to", "BigQueryConnection")
s3_register("dbplyr::sql_join_suffix", "BigQueryConnection")
s3_register("dbplyr::sql_translation", "BigQueryConnection")

# Default options --------------------------------------------------------
op <- options()
Expand All @@ -31,27 +28,3 @@

PACKAGE_NAME <- utils::packageName()
PACKAGE_VERSION <- utils::packageVersion(PACKAGE_NAME)

register_s3_method <- function(pkg, generic, class, fun = NULL) {
stopifnot(is.character(pkg), length(pkg) == 1)
stopifnot(is.character(generic), length(generic) == 1)
stopifnot(is.character(class), length(class) == 1)

if (is.null(fun)) {
fun <- get(paste0(generic, ".", class), envir = parent.frame())
} else {
stopifnot(is.function(fun))
}

if (pkg %in% loadedNamespaces()) {
registerS3method(generic, class, fun, envir = asNamespace(pkg))
}

# Always register hook in case package is later unloaded & reloaded
setHook(
packageEvent(pkg, "onLoad"),
function(...) {
registerS3method(generic, class, fun, envir = asNamespace(pkg))
}
)
}
4 changes: 1 addition & 3 deletions tests/testthat/test-dplyr.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
test_that("historical API continues to work", {
reset_warning_verbosity("BigQueryConnection-edition")
src <- src_bigquery(bq_test_project(), "basedata")

# old dbplyr interface warning
expect_warning(x <- dplyr::tbl(src, "mtcars"))
x <- dplyr::tbl(src, "mtcars")

expect_s3_class(x, "tbl")
expect_true("cyl" %in% dbplyr::op_vars(x))
Expand Down

0 comments on commit a229f2c

Please sign in to comment.