Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/cloudyr/bigQueryR
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkEdmondson1234 committed Jun 8, 2018
2 parents 4012df7 + 52eee84 commit e6e0083
Show file tree
Hide file tree
Showing 15 changed files with 480 additions and 44 deletions.
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Suggests:
shiny (>= 0.12.1),
knitr,
rmarkdown,
testthat
testthat,
data.table,
purrr
RoxygenNote: 6.0.1
VignetteBuilder: knitr
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ export(bq_get_global_project)
export(bq_global_dataset)
export(bq_global_project)
export(bqr_auth)
export(bqr_copy_table)
export(bqr_create_table)
export(bqr_delete_table)
export(bqr_download_extract)
export(bqr_download_query)
export(bqr_extract_data)
export(bqr_get_global_dataset)
export(bqr_get_global_project)
Expand Down
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# bigQueryR 0.3.2.9000

* support `nullMarker`, `maxBadRecords` in upload jobs
* support `nullMarker`, `maxBadRecords`, `fieldDelimiter` in upload jobs
* Support BigQuery type `DATE` for R class `Date` data.frame columns (BigQuery type `TIMESTAMP` still default for `POSIXct`columns) (#48)
* Allow custom user schema for uploads of data.frames (#48)
* Rename misnamed global functions from `bq_` prefix to `bqr_` prefix
* Add `allowJaggedRows` and `allowQuotedNewlines` options to upload via `bqr_upload_data()`
* `bqr_get_job` now accepts a job object as well as the jobId
* Fix bug with `bqr_upload_data` where `autodetect=TRUE` didn't work with `gcs://` loads from Cloud Storage

# bigQueryR 0.3.2

Expand Down
182 changes: 182 additions & 0 deletions R/fastBqDownload.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#' Download data from BigQuery to local folder
#'
#' Requires you to make a bucket at https://console.cloud.google.com/storage/browser
#'
#' @param query The query you want to run.
#' @param target_folder Target folder on your local computer.
#' @param result_file_name Name of your downloaded file.
#' @param refetch Boolean, whether you would like to refetch previously downloaded data.
#' @param useLegacySql Boolean, whether to use Legacy SQL. Default is FALSE.
#' @param clean_intermediate_results Boolean, whether to keep intermediate files on BigQuery and Google Cloud Storage.
#' @param global_project_name BigQuery project name (where you would like to save your file during download).
#' @param global_dataset_name BigQuery dataset name (where you would like to save your file during download).
#' @param global_bucket_name Google Cloud Storage bucket name (where you would like to save your file during download).
#'
#' @examples
#'
#' \dontrun{
#' library(bigQueryR)
#'
#' ## Auth with a project that has at least BigQuery and Google Cloud Storage scope
#' bqr_auth()
#'
#' # Create a bucket at Google Cloud Storage at
#' # https://console.cloud.google.com/storage/browser
#'
#' bqr_download_query(query = "select * from `your_project.your_dataset.your_table`")
#'
#' }
#'
#' @return a data.table.
#'
#' @export
bqr_download_query <- function(query = NULL,
target_folder = "data",
result_file_name = NULL,
refetch = FALSE,
useLegacySql = FALSE,
clean_intermediate_results = TRUE,
global_project_name = bqr_get_global_project(),
global_dataset_name = bqr_get_global_dataset(),
global_bucket_name = googleCloudStorageR::gcs_get_global_bucket()
) {
invisible(sapply(c("data.table", "purrr"), assertRequirement))

if (is.null(result_file_name)) {
result_file_name <- "fast_bq_download_result"
} else {
result_file_name <- gsub("(\\.csv$)|(\\.csv\\.gz$)", "", result_file_name)
}

full_result_path <- paste0(target_folder, "/", result_file_name, ".csv.gz")
if (file.exists(full_result_path) & !refetch) {
return(data.table::fread(paste("gunzip -c", full_result_path)))
}

setFastSqlDownloadOptions(global_project_name, global_dataset_name, global_bucket_name)

gcp_result_name_raw <- paste0(result_file_name, "_", Sys.getenv("LOGNAME"), "_", Sys.time())
gcp_result_name <- gsub("[^[:alnum:]]+", "_", gcp_result_name_raw)

object_names <- saveQueryToStorage(query, gcp_result_name, useLegacySql)

tryCatch(
{
output_dt <- readFromStorage(object_names, target_folder)
unifyLocalChunks(output_dt, object_names, result_file_name, target_folder)
},
error = function(e) {
message("\n\nError while saving from Storage to local. Running cleanup of Storage and BigQuery. See original error message below:\n\n")
message(paste0(e, "\n\n"))
},
finally = {if (clean_intermediate_results == TRUE) {
cleanIntermediateResults(object_names, gcp_result_name, target_folder)
}
}
)

output_dt
}


setFastSqlDownloadOptions <- function(global_project_name, global_dataset_name, global_bucket_name) {
options(googleAuthR.scopes.selected = "https://www.googleapis.com/auth/cloud-platform")

bigQueryR::bqr_global_project(global_project_name)
bigQueryR::bqr_global_dataset(global_dataset_name)
googleCloudStorageR::gcs_global_bucket(global_bucket_name)
}

saveQueryToStorage <- function(query, result_name, useLegacySql){
time <- Sys.time()
message("Querying data and saving to BigQuery table")
query_job <- bigQueryR::bqr_query_asynch(
query = query,
useLegacySql = useLegacySql,
destinationTableId = result_name,
writeDisposition = "WRITE_TRUNCATE"
)

if (suppressMessages(bigQueryR::bqr_wait_for_job(query_job, wait = 2))$status$state == "DONE") {
time_elapsed <- difftime(Sys.time(), time)
message(paste("Querying job is finished, time elapsed:", format(time_elapsed,format = "%H:%M:%S")))

time <- Sys.time()
message("Writing data to storage")
extract_job <- suppressMessages(bigQueryR::bqr_extract_data(
tableId = result_name,
cloudStorageBucket = googleCloudStorageR::gcs_get_global_bucket(),
compression = "GZIP",
filename = paste0(result_name, "_*.csv.gz")
))
}

if (suppressMessages(bigQueryR::bqr_wait_for_job(extract_job, wait = 2))$status$state == "DONE") {
time_elapsed <- difftime(Sys.time(), time)
message(paste("Writing data to storage is finished, time elapsed:", format(time_elapsed,format = "%H:%M:%S")))
object_names <- grep(
result_name,
googleCloudStorageR::gcs_list_objects()$name,
value = TRUE
)
}
object_names
}

readFromStorage <- function(object_names, target_folder) {
createFolder(target_folder)
chunk_dt_list <- purrr::map(object_names, ~ {
object <- .
googleCloudStorageR::gcs_get_object(
object_name = object,
saveToDisk = paste0(target_folder, "/", object),
overwrite = TRUE
)
data.table::fread(paste0("gunzip -c ", target_folder, "/", object))
})
data.table::rbindlist(chunk_dt_list)
}

unifyLocalChunks <- function(output_dt, object_names, result_file_name, target_folder) {
if (length(object_names) > 1) {
data.table::fwrite(output_dt, paste0(target_folder, "/", result_file_name, ".csv"))
gzipDataAtPath(paste0(target_folder, "/", result_file_name, ".csv"))
} else{
file.rename(
paste0(target_folder, "/", object_names[[1]]),
paste0(target_folder, "/", result_file_name, ".csv.gz")
)
}
}

cleanIntermediateResults <- function(object_names, table_id, target_folder) {
purrr::walk(
object_names,
~ googleCloudStorageR::gcs_delete_object(object = .x)
)
bigQueryR::bqr_delete_table(tableId = table_id)
if (length(object_names) > 1) {
purrr::walk(paste0(target_folder, "/", object_names), file.remove)
}
message("The queried table on BigQuery and saved file(s) on GoogleCloudStorage have been cleaned up.
If you want to keep them, use clean_intermediate_results = TRUE.")
}

createFolder <- function(target_folder) {
if (!dir.exists(target_folder)) {
dir.create(target_folder, recursive = TRUE)
message(paste0(target_folder, ' folder does not exist. Creating folder.'))
}
}

gzipDataAtPath <- function(full_result_file_name) {
system(paste0("rm -f ", full_result_file_name, ".gz"))
system(paste0("gzip ", full_result_file_name))
}

assertRequirement <- function(package_name) {
if (!requireNamespace(package_name, quietly = TRUE)) {
stop(paste0(package_name, " needed for this function to work. Please install it via install.packages('", package_name, "')"),
call. = FALSE)
}
}
4 changes: 2 additions & 2 deletions R/globals.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bq_get_global_project <- function(...){
#' @export
#' @import assertthat
bqr_global_dataset <- function(dataset){
.Deprecated("bqr_global_dataset")

assert_that(is.string(dataset))

.bqr_env$dataset <- dataset
Expand Down Expand Up @@ -106,7 +106,7 @@ bq_global_dataset <- function(...){
#' @family dataset functions
#' @export
bqr_get_global_dataset <- function(){
.Deprecated("bqr_get_global_dataset")

if(!exists("dataset", envir = .bqr_env)){
stop("dataset is NULL and couldn't find global dataset ID name.
Set it via bq_global_dataset")
Expand Down
22 changes: 20 additions & 2 deletions R/jobs.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ is.job <- function(x){
inherits(x, "bqr_job")
}

# metadata only jobs
call_job <- function(projectId, config){
l <-
googleAuthR::gar_api_generator("https://www.googleapis.com/bigquery/v2",
"POST",
path_args = list(projects = projectId,
jobs = ""),
data_parse_function = function(x) x
)

o <- l(the_body = config)
as.job(o)
}


#' Wait for a bigQuery job
#'
Expand Down Expand Up @@ -66,7 +80,7 @@ bqr_wait_for_job <- function(job, wait=5){
#' Poll a jobId
#'
#' @param projectId projectId of job
#' @param jobId jobId to poll
#' @param jobId jobId to poll, or a job Object
#'
#' @return A Jobs resource
#'
Expand Down Expand Up @@ -120,8 +134,12 @@ bqr_wait_for_job <- function(job, wait=5){
#'
#' @family BigQuery asynch query functions
#' @export
bqr_get_job <- function(projectId = bqr_get_global_project(), jobId){
bqr_get_job <- function(jobId = .Last.value, projectId = bqr_get_global_project()){
check_bq_auth()

if(is.job(jobId)){
jobId <- jobId$jobReference$jobId
}
stopifnot(inherits(projectId, "character"),
inherits(jobId, "character"))

Expand Down
34 changes: 18 additions & 16 deletions R/partition.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
#' @param projectId The project ID
#' @param datasetId The dataset ID
#'
#' @details
#' @examples
#'
#' WARNING: This can be an expensive operation for large datasets as it does a full column scan.
#' \dontrun{
#'
#' bqr_partition("ga_sessions_", "ga_partition")
#'
#' From \href{partitioned tables background}{https://cloud.google.com/bigquery/docs/partitioned-tables}:
#' }
#'
#' Replicates the functionality of the bq tool
#' \code{bq query --allow_large_results --replace --noflatten_results --destination_table 'mydataset.table1$20160301' 'SELECT field1 + 10, field2 FROM mydataset.table1$20160301'}
#' @details
#'
#' Performs lots of copy table operations via \link{bqr_copy_table}
#'
#' Before partitioned tables became available, BigQuery users would often divide
#' large datasets into separate tables organized by time period; usually daily tables,
#' where each table represented data loaded on that particular date.
Expand All @@ -33,7 +36,7 @@
#' tables increases. There is also a limit of 1,000 tables that can be referenced in a
#' single query. Partitioned tables have none of these disadvantages.
#'
#' @return \code{TRUE} if all partition jobs start running successfully
#' @return A list of copy jobs for the sharded tables that will be copied to one partitioned table
#'
#' @seealso \href{Partitioned Tables Help}{https://cloud.google.com/bigquery/docs/creating-partitioned-tables}
#' @export
Expand Down Expand Up @@ -76,19 +79,18 @@ bqr_partition <- function(sharded,
part_query <- function(sdn){

myMessage("Partitioning ", sdn, level = 3)
## build queries
bqr_query_asynch(projectId = projectId,
datasetId = datasetId,
query = paste0('SELECT * FROM ',sdn),
destinationTableId = paste0(partition,"$",shard_dates[[sdn]]))

bqr_copy_table(source_projectid = projectId,
source_datasetid = datasetId,
source_tableid = sdn,
destination_projectid = projectId,
destination_datasetid = datasetId,
destination_tableid = paste0(partition,"$",shard_dates[[sdn]]),
writeDisposition = "WRITE_EMPTY")
}

result <- lapply(names(shard_dates), part_query)

if(all(vapply(result, function(x) x$status$state, character(1))) == "RUNNING"){
myMessage("All partition jobs running, check project job queue for outcome.")
}

TRUE
setNames(result, names(shard_dates))

}
4 changes: 4 additions & 0 deletions R/print_methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ print.bqr_job <- function(x, ...){
cat("## View job configuration via job$configuration\n")

cat0("## Job had error: \n", x$status$errorResult$message)
if(!is.null(x$status$errors)){
print(x$status$errors$message)
}


}
Loading

0 comments on commit e6e0083

Please sign in to comment.