Skip to content

Commit

Permalink
Allow PARQUET format for uploading data. (#609)
Browse files Browse the repository at this point in the history
  • Loading branch information
apalacio9502 authored Jun 4, 2024
1 parent 7a624ea commit 3642c14
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 75 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ jobs:
- {os: macos-latest, r: 'release'}

- {os: windows-latest, r: 'release'}
# Use 3.6 to trigger usage of RTools35
- {os: windows-latest, r: '3.6'}
# use 4.1 to check with rtools40's older compiler
- {os: windows-latest, r: '4.1'}

Expand Down
7 changes: 4 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ License: MIT + file LICENSE
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
BugReports: https://github.com/r-dbi/bigrquery/issues
Depends:
R (>= 3.6)
R (>= 4.0)
Imports:
bit64,
brio,
Expand All @@ -28,8 +28,9 @@ Imports:
methods,
prettyunits,
rlang (>= 1.1.0),
tibble
Suggests:
tibble,
nanoparquet
Suggests:
blob,
covr,
dbplyr (>= 2.4.0),
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ importFrom(httr,DELETE)
importFrom(httr,GET)
importFrom(httr,PATCH)
importFrom(httr,POST)
importFrom(httr,PUT)
importFrom(httr,add_headers)
importFrom(httr,config)
importFrom(httr,content)
importFrom(httr,headers)
importFrom(httr,http_status)
importFrom(httr,parse_media)
importFrom(httr,status_code)
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# bigrquery (development version)

* The `bq_perform_upload()` function now allows users to choose the transmission format (JSON or PARQUET) for data sent to BigQuery (@apalacio9502, #608).
* bigrquery now requires R 4.0, in line with our version support principles.

# bigrquery 1.5.1

* Forward compatibility with upcoming dbplyr release (#601).
Expand Down
50 changes: 39 additions & 11 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ bq_perform_extract <- function(x,
#' @export
#' @name api-perform
#' @param values Data frame of values to insert.
#' @param source_format The format of the data files:
#' * For newline-delimited JSON, specify "NEWLINE_DELIMITED_JSON".
#' * For parquet, specify "PARQUET".
#' @param create_disposition Specifies whether the job is allowed to create
#' new tables.
#'
Expand All @@ -110,6 +113,7 @@ bq_perform_extract <- function(x,
#' 'duplicate' error is returned in the job result.
bq_perform_upload <- function(x, values,
fields = NULL,
source_format = c("NEWLINE_DELIMITED_JSON", "PARQUET"),
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...,
Expand All @@ -121,12 +125,13 @@ bq_perform_upload <- function(x, values,
cli::cli_abort("{.arg values} must be a data frame.")
}
fields <- as_bq_fields(fields)
arg_match(source_format)
check_string(create_disposition)
check_string(write_disposition)
check_string(billing)

load <- list(
sourceFormat = unbox("NEWLINE_DELIMITED_JSON"),
sourceFormat = unbox(source_format),
destinationTable = tableReference(x),
createDisposition = unbox(create_disposition),
writeDisposition = unbox(write_disposition)
Expand All @@ -139,22 +144,30 @@ bq_perform_upload <- function(x, values,
load$autodetect <- unbox(TRUE)
}

config <- list(configuration = list(load = load))
config <- bq_body(config, ...)
config_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
jsonlite::toJSON(config, pretty = TRUE)
metadata <- list(configuration = list(load = load))
metadata <- bq_body(metadata, ...)
metadata <- list(
"type" = "application/json; charset=UTF-8",
"content" = jsonlite::toJSON(metadata, pretty = TRUE)
)

data_part <- part(
c("Content-type" = "application/json; charset=UTF-8"),
export_json(values)
)
if (source_format == "NEWLINE_DELIMITED_JSON") {
media <- list(
"type" = "application/json; charset=UTF-8",
"content" = export_json(values)
)
} else {
media <- list(
"type" = "application/vnd.apache.parquet",
"content" = export_parquet(values)
)
}

url <- bq_path(billing, jobs = "")
res <- bq_upload(
url,
parts = c(config_part, data_part),
metadata,
media,
query = list(fields = "jobReference")
)
as_bq_job(res$jobReference)
Expand Down Expand Up @@ -186,6 +199,21 @@ export_json <- function(values) {
rawToChar(rawConnectionValue(con))
}

# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet?hl=es-419
export_parquet <- function(values) {

tmpfile <- tempfile(fileext = ".parquet")

defer(unlink(tmpfile))

# write to disk
nanoparquet::write_parquet(values, tmpfile)

# read back results
readBin(tmpfile, what = "raw", n = file.info(tmpfile)$size)

}

#' @export
#' @name api-perform
#' @param source_uris The fully-qualified URIs that point to your data in
Expand Down
77 changes: 28 additions & 49 deletions R/bq-request.R
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,36 @@ bq_patch <- function(url, body, ..., query = NULL, token = bq_token()) {
process_request(req)
}

#' @importFrom httr POST add_headers config
bq_upload <- function(url, parts, ..., query = list(), token = bq_token()) {
url <- paste0(upload_url, url)
req <- POST_multipart_related(
url,
parts = parts,
token,
#' @importFrom httr POST PUT add_headers headers config status_code
# https://cloud.google.com/bigquery/docs/reference/api-uploads
bq_upload <- function(url, metadata, media, query = list(), token = bq_token()) {

query <- utils::modifyList(list(fields = "jobReference",uploadType = "resumable"), query)
config <- add_headers("Content-Type" = metadata[["type"]])

req <- POST(
paste0(upload_url, url),
body = metadata[["content"]],
httr::user_agent(bq_ua()),
...,
query = prepare_bq_query(query)
token,
config,
query = query
)

if (status_code(req) == 200) {

config <- add_headers("Content-Type" = media[["type"]])

req <- PUT(
headers(req)$location,
body = media[["content"]],
httr::user_agent(bq_ua()),
token,
config
)

}

process_request(req)
}

Expand Down Expand Up @@ -242,43 +261,3 @@ gargle_abort <- function(reason, message, status, call = caller_env()) {
cli::cli_abort(message, class = class, call = call)
}

# Multipart/related ------------------------------------------------------------


# http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
POST_multipart_related <- function(url, config = NULL, parts = NULL,
query = list(), ...,
boundary = random_boundary(),
handle = NULL) {
if (is.null(config)) config <- config()

sep <- paste0("\n--", boundary, "\n")
end <- paste0("\n--", boundary, "--\n")

body <- paste0(sep, paste0(parts, collapse = sep), end)

type <- paste0("multipart/related; boundary=", boundary)
config <- c(config, add_headers("Content-Type" = type))

query <- utils::modifyList(list(uploadType = "multipart"), query)

POST(url, config = config, body = body, query = query, ..., handle = handle)
}

part <- function(headers, body) {
if (length(headers) == 0) {
header <- "\n"
} else {
header <- paste0(names(headers), ": ", headers, "\n", collapse = "")
}
body <- paste0(body, collapse = "\n")

paste0(header, "\n", body)
}

random_boundary <- function() {
valid <- c(LETTERS, letters, 0:9) # , "'", "(", ")", "+", ",", "-", ".", "/",
# ":", "?")
paste0(sample(valid, 50, replace = TRUE), collapse = "")
}

21 changes: 11 additions & 10 deletions man/api-perform.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3642c14

Please sign in to comment.