Skip to content

Commit

Permalink
Add support for partitioned BigQuery tables (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
pbonito authored and medb committed May 15, 2019
1 parent 1194136 commit e6f742b
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 3 deletions.
4 changes: 4 additions & 0 deletions bigquery/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

2. Support nested record type in field schema in BigQuery connector.

3. Add a property to specify BigQuery tables partitioning definition:

mapred.bq.output.table.partitioning


0.13.16 - 2019-02-25

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class BigQueryConfiguration {
*/
public static final String OUTPUT_TABLE_SCHEMA_KEY = "mapred.bq.output.table.schema";

/**
* Configuration key for the output table partitioning used by the output format. This key is
* stored as a {@link String}.
*/
public static final String OUTPUT_TABLE_PARTITIONING_KEY = "mapred.bq.output.table.partitioning";

/**
* Configuration key for the Cloud KMS encryption key that will be used to protect output BigQuery
* table. This key is stored as a {@link String}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.hadoop.io.bigquery;

import static com.google.common.flogger.LazyArgs.lazy;

import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.model.Dataset;
Expand All @@ -26,6 +28,7 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -141,19 +144,62 @@ public void importFromGcs(
List<String> gcsPaths,
boolean awaitCompletion)
throws IOException, InterruptedException {
importFromGcs(
projectId,
tableRef,
schema,
/* timePartitioning= */ null,
kmsKeyName,
sourceFormat,
writeDisposition,
gcsPaths,
awaitCompletion);
}

/**
* Imports data from GCS into BigQuery via a load job. Optionally polls for completion before
* returning.
*
* @param projectId the project on whose behalf to perform the load.
* @param tableRef the reference to the destination table.
* @param schema the schema of the source data to populate the destination table by.
* @param timePartitioning time partitioning to populate the destination table.
* @param kmsKeyName the Cloud KMS encryption key used to protect the output table.
* @param sourceFormat the file format of the source data.
* @param writeDisposition the write disposition of the output table.
* @param gcsPaths the location of the source data in GCS.
* @param awaitCompletion if true, block and poll until job completes, otherwise return as soon as
* the job has been successfully dispatched.
* @throws IOException
* @throws InterruptedException if interrupted while waiting for job completion.
*/
public void importFromGcs(
String projectId,
TableReference tableRef,
@Nullable TableSchema schema,
@Nullable TimePartitioning timePartitioning,
@Nullable String kmsKeyName,
BigQueryFileFormat sourceFormat,
String writeDisposition,
List<String> gcsPaths,
boolean awaitCompletion)
throws IOException, InterruptedException {
logger.atInfo().log(
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s",
BigQueryStrings.toString(tableRef),
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s;"
+ " timePartitioning: %s",
lazy(() -> BigQueryStrings.toString(tableRef)),
gcsPaths.size(),
gcsPaths.isEmpty() ? "(empty)" : gcsPaths.get(0),
awaitCompletion);
awaitCompletion,
timePartitioning);

// Create load conf with minimal requirements.
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setSchema(schema);
loadConfig.setSourceFormat(sourceFormat.getFormatIdentifier());
loadConfig.setSourceUris(gcsPaths);
loadConfig.setDestinationTable(tableRef);
loadConfig.setTimePartitioning(timePartitioning);
loadConfig.setWriteDisposition(writeDisposition);
if (!Strings.isNullOrEmpty(kmsKeyName)) {
loadConfig.setDestinationEncryptionConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
Expand Down Expand Up @@ -315,6 +316,31 @@ static Optional<BigQueryTableSchema> getTableSchema(Configuration conf) throws I
return Optional.empty();
}

/**
* Gets the output table time partitioning based on the given configuration.
*
* @param conf the configuration to reference the keys from.
* @return the derived table time partitioning, absent value if no table time partitioning exists
* in the configuration.
* @throws IOException if a table time partitioning was set in the configuration but couldn't be
* parsed.
*/
static Optional<BigQueryTimePartitioning> getTablePartitioning(Configuration conf)
throws IOException {
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY);
if (!Strings.isNullOrEmpty(fieldsJson)) {
try {
TimePartitioning tablePartitioning = BigQueryTimePartitioning.getFromJson(fieldsJson);
return Optional.of(BigQueryTimePartitioning.wrap(tablePartitioning));
} catch (IOException e) {
throw new IOException(
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY + "'.",
e);
}
}
return Optional.empty();
}

/**
* Gets the output table KMS key name based on the given configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.hadoop.io.bigquery.output;

import com.google.api.client.json.JsonParser;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.IOException;

/**
* Wrapper for BigQuery {@link TimePartitioning}.
*
* <p>This class is used to avoid client code to depend on BigQuery API classes, so that there is no
* potential conflict between different versions of BigQuery API libraries in the client.
*
* @see TimePartitioning.
*/
public class BigQueryTimePartitioning {
private final TimePartitioning timePartitioning;

public BigQueryTimePartitioning() {
this.timePartitioning = new TimePartitioning();
}

public BigQueryTimePartitioning(TimePartitioning timePartitioning) {
this.timePartitioning = timePartitioning;
}

public String getType() {
return timePartitioning.getType();
}

public void setType(String type) {
timePartitioning.setType(type);
}

public String getField() {
return timePartitioning.getField();
}

public void setField(String field) {
timePartitioning.setField(field);
}

public long getExpirationMs() {
return timePartitioning.getExpirationMs();
}

public void setExpirationMs(long expirationMs) {
timePartitioning.setExpirationMs(expirationMs);
}

public Boolean getRequirePartitionFilter() {
return timePartitioning.getRequirePartitionFilter();
}

public void setRequirePartitionFilter(Boolean requirePartitionFilter) {
timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
}

@Override
public int hashCode() {
return timePartitioning.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof BigQueryTimePartitioning)) {
return false;
}
BigQueryTimePartitioning other = (BigQueryTimePartitioning) obj;
return timePartitioning.equals(other.timePartitioning);
}

TimePartitioning get() {
return timePartitioning;
}

static TimePartitioning getFromJson(String json) throws IOException {
JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(json);
return parser.parseAndClose(TimePartitioning.class);
}

public String getAsJson() throws IOException {
return JacksonFactory.getDefaultInstance().toString(timePartitioning);
}

static BigQueryTimePartitioning wrap(TimePartitioning tableTimePartitioning) {
return new BigQueryTimePartitioning(tableTimePartitioning);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void commitJob(JobContext context) throws IOException {
String destProjectId = BigQueryOutputConfiguration.getProjectId(conf);
String writeDisposition = BigQueryOutputConfiguration.getWriteDisposition(conf);
Optional<BigQueryTableSchema> destSchema = BigQueryOutputConfiguration.getTableSchema(conf);
Optional<BigQueryTimePartitioning> timePartitioning =
BigQueryOutputConfiguration.getTablePartitioning(conf);
String kmsKeyName = BigQueryOutputConfiguration.getKmsKeyName(conf);
BigQueryFileFormat outputFileFormat = BigQueryOutputConfiguration.getFileFormat(conf);
List<String> sourceUris = getOutputFileURIs();
Expand All @@ -72,6 +74,7 @@ public void commitJob(JobContext context) throws IOException {
destProjectId,
destTable,
destSchema.isPresent() ? destSchema.get().get() : null,
timePartitioning.isPresent() ? timePartitioning.get().get() : null,
kmsKeyName,
outputFileFormat,
writeDisposition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public Job answer(InvocationOnMock invocationOnMock) throws Throwable {
jobProjectId,
tableRef,
fakeTableSchema,
/* timePartitioning= */ null,
kmsKeyName,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.hadoop.io.bigquery.output;

import static com.google.common.truth.Truth.assertThat;

import java.io.IOException;
import org.junit.Test;

public class BigQueryTimePartitioningTest {

public static final String TIME_PARTITIONING_JSON =
"{\"expirationMs\":\"1000\",\"field\":\"ingestDate\",\"requirePartitionFilter\":true,"
+ "\"type\":\"DAY\"}";

@Test
public void testConvertToJson() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
bigQueryTimePartitioning.setExpirationMs(1000L);
bigQueryTimePartitioning.setField("ingestDate");
bigQueryTimePartitioning.setRequirePartitionFilter(true);

assertThat(bigQueryTimePartitioning.getAsJson()).isEqualTo(TIME_PARTITIONING_JSON);
}

@Test
public void testConvertFromJson() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
bigQueryTimePartitioning.setExpirationMs(1000L);
bigQueryTimePartitioning.setField("ingestDate");
bigQueryTimePartitioning.setRequirePartitionFilter(true);

assertThat(BigQueryTimePartitioning.getFromJson(TIME_PARTITIONING_JSON))
.isEqualTo(bigQueryTimePartitioning.get());
}

@Test
public void testConversion_OnlyTypeIsPresent() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
String json = bigQueryTimePartitioning.getAsJson();

assertThat(json).isEqualTo("{\"type\":\"DAY\"}");
assertThat(BigQueryTimePartitioning.getFromJson(json).getType()).isEqualTo("DAY");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.fs.gcs.InMemoryGoogleHadoopFileSystem;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class IndirectBigQueryOutputCommitterTest {
private static final String QUALIFIED_TEST_TABLE_ID =
String.format("%s:%s.%s", TEST_PROJECT_ID, TEST_DATASET_ID, TEST_TABLE_ID);

/** Sample table time partitioning used for output. */
private static final BigQueryTimePartitioning TEST_TIME_PARTITIONING =
BigQueryTimePartitioning.wrap(new TimePartitioning().setType("DAY"));

/** Sample output file format for the committer. */
private static final BigQueryFileFormat TEST_FILE_FORMAT =
BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
Expand Down Expand Up @@ -151,6 +156,8 @@ public void setUp() throws IOException {
TEST_FILE_FORMAT,
TEST_OUTPUT_CLASS);
BigQueryOutputConfiguration.setKmsKeyName(conf, TEST_KMS_KEY_NAME);
conf.set(
BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY, TEST_TIME_PARTITIONING.getAsJson());

// Setup sample data.
outputTableRef = BigQueryOutputConfiguration.getTableReference(conf);
Expand Down Expand Up @@ -204,6 +211,7 @@ public void testCommitJob() throws IOException, InterruptedException {
eq(TEST_PROJECT_ID),
eq(outputTableRef),
eq(TEST_TABLE_SCHEMA.get()),
eq(TEST_TIME_PARTITIONING.get()),
eq(TEST_KMS_KEY_NAME),
eq(TEST_FILE_FORMAT),
eq(TEST_WRITE_DISPOSITION),
Expand Down Expand Up @@ -234,6 +242,7 @@ public void testCommitJobInterrupt() throws IOException, InterruptedException {
any(String.class),
any(TableReference.class),
any(TableSchema.class),
any(TimePartitioning.class),
anyString(),
any(BigQueryFileFormat.class),
any(String.class),
Expand All @@ -249,6 +258,7 @@ public void testCommitJobInterrupt() throws IOException, InterruptedException {
eq(TEST_PROJECT_ID),
eq(outputTableRef),
eq(TEST_TABLE_SCHEMA.get()),
eq(TEST_TIME_PARTITIONING.get()),
eq(TEST_KMS_KEY_NAME),
eq(TEST_FILE_FORMAT),
eq(TEST_WRITE_DISPOSITION),
Expand Down

0 comments on commit e6f742b

Please sign in to comment.