From fc237cefddbd797e915e63921a4228510384762d Mon Sep 17 00:00:00 2001 From: Julien Phalip Date: Mon, 30 Oct 2023 23:22:10 -0700 Subject: [PATCH] Add support for Spark SQL --- CHANGES.md | 2 + README.md | 22 ++ cloudbuild/cloudbuild.yaml | 23 +- cloudbuild/presubmit.sh | 24 +- hive-2-bigquery-connector/pom.xml | 22 +- .../sparksql/SparkSQLIntegrationTests.java | 262 ++++++++++++++++++ .../connector/sparksql/SparkUtils.java | 140 ++++++++++ hive-3-bigquery-connector/pom.xml | 38 ++- hive-bigquery-connector-common/pom.xml | 6 - .../connector/BigQueryMetaHookBase.java | 35 ++- .../connector/BigQueryStorageHandlerBase.java | 8 + .../output/BigQueryOutputCommitter.java | 66 +---- .../output/BigQueryOutputFormat.java | 1 + .../output/MapReduceOutputCommitter.java | 63 +++++ .../output/MapReduceOutputFormat.java | 44 +++ .../output/OutputCommitterUtils.java | 76 +++++ .../output/direct/DirectRecordWriter.java | 16 +- .../indirect/IndirectAvroRecordWriter.java | 8 +- .../bigquery/connector/utils/JobUtils.java | 8 +- .../connector/utils/hive/HiveUtils.java | 63 +---- .../integration/ReadIntegrationTestsBase.java | 7 +- .../connector/utils/JobUtilsTest.java | 12 +- hive-bigquery-parent/pom.xml | 106 +++++-- hive-v2 | 1 + pom.xml | 40 ++- .../pom.xml | 5 +- 26 files changed, 872 insertions(+), 226 deletions(-) create mode 100644 hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkSQLIntegrationTests.java create mode 100644 hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkUtils.java create mode 100644 hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputCommitter.java create mode 100644 hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputFormat.java create mode 100644 hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java create mode 160000 hive-v2 rename {shaded-sparksql => shaded-deps-sparksql}/pom.xml (97%) diff --git a/CHANGES.md b/CHANGES.md index 26d4c58b..6ad7877c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Next +* Added support for Hive 2.X. +* Added support for Spark SQL. * Fixed case sensitivity bug with column names. This particularly affected pseudo columns like `_PARTITIONTIME` and `_PARTITIONDATE` in time-ingestion partitioned BigQuery tables. * **Backward-incompatible change:** The type of the `_PARTITION_TIME` pseudo-column in diff --git a/README.md b/README.md index b45a6cde..fc2a815d 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ software versions: * Hive 2.3.6, 2.3.9, 3.1.2, and 3.1.3. * Hadoop 2.10.2, 3.2.3, and 3.3.3. * Tez 0.9.2 on Hadoop 2, and Tez 0.10.1 on Hadoop 3. +* Spark SQL 3.4.1. ## Installation @@ -474,6 +475,27 @@ session creation time (i.e. when the `SELECT` query is initiated). Note that this consistency model currently only applies to the table data, not its metadata. +## Spark SQL integration + +The connector supports versions of Spark SQL that vendor Hive v2.X. Therefore, to use Spark SQL, +you must use the Hive 2 (not Hive 3) version of the connector. See more information in the +[Installation](#installation) section on how to install the correct connector version in your +environment. + +Example (Java): + +```java +SparkConf sparkConf = new SparkConf().setMaster("local"); +SparkSession spark = + SparkSession.builder() + .appName("example") + .config(sparkConf) + .enableHiveSupport() + .getOrCreate(); +Dataset ds = spark.sql("SELECT * FROM mytable"); +Row[] rows = ds.collect(); +``` + ## BigLake integration [BigLake](https://cloud.google.com/biglake) allows you to store your data in open formats diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index d35c5a44..fbd1e8e5 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -1,10 +1,10 @@ steps: -# 1. Create a Docker image containing hadoop-connectors repo +# 0. Create a Docker image containing hadoop-connectors repo - name: 'gcr.io/cloud-builders/docker' id: 'docker-build' args: ['build', '--tag=gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit', '-f', 'cloudbuild/Dockerfile', '.'] -# 2. Build the connector and download dependencies without running tests. +# 1. Build the connector and download dependencies without running tests. - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'check' waitFor: ['docker-build'] @@ -13,7 +13,7 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' -# 3. Build the connector and download dependencies without running tests. +# 2. Build the connector and download dependencies without running tests. - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'build' waitFor: ['check'] @@ -22,7 +22,7 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' -# 4. Run unit tests for Hive 2 +# 3. Run unit tests for Hive 2 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'unit-tests-hive2' waitFor: ['build'] @@ -31,7 +31,7 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' -# 5. Run unit tests for Hive 3 +# 4. Run unit tests for Hive 3 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'unit-tests-hive3' waitFor: ['build'] @@ -40,7 +40,7 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' -# 6. Run integration tests for Hive 2 +# 5. Run integration tests for Hive 2 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'integration-tests-hive2' waitFor: ['unit-tests-hive2'] @@ -49,7 +49,7 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' -# 7. Run integration tests for Hive 3 +# 6. Run integration tests for Hive 3 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' id: 'integration-tests-hive3' waitFor: ['unit-tests-hive3'] @@ -58,6 +58,15 @@ steps: env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' +# 7. Run integration tests for Spark SQL +- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' + id: 'integration-tests-sparksql' + waitFor: ['unit-tests-hive2'] + entrypoint: 'bash' + args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_sparksql'] + env: + - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' + # Tests should take under 90 mins timeout: 5400s diff --git a/cloudbuild/presubmit.sh b/cloudbuild/presubmit.sh index bed0bb10..4ef94435 100644 --- a/cloudbuild/presubmit.sh +++ b/cloudbuild/presubmit.sh @@ -26,6 +26,7 @@ readonly ACTION=$1 readonly HIVE2_PROFILE="hive2-generic" readonly HIVE3_PROFILE="hive3-generic" +readonly HIVE3_SHADED_DEPS="shaded-deps-hive3.1.2-hadoop2.10.2" readonly MVN="./mvnw -B -e -Dmaven.repo.local=/workspace/.repository" export TEST_BUCKET=dataproc-integ-tests @@ -37,16 +38,18 @@ cd /workspace case "$ACTION" in # Java code style check check) - ./mvnw spotless:check -P"${HIVE2_PROFILE}" && ./mvnw spotless:check -P"${HIVE3_PROFILE}" + $MVN spotless:check -P"${HIVE2_PROFILE}" && $MVN spotless:check -P"${HIVE3_PROFILE}" exit ;; - # Download maven and all the dependencies + # Build the Maven packages and dependencies build) - # Install all modules for Hive 2, including parent modules - $MVN install -DskipTests -P"${HIVE2_PROFILE}" - # Install the shaded deps for Hive 3 (all the other shaded & parent modules have already been installed with the previous command) - $MVN install -DskipTests -P"${HIVE3_PROFILE}" -pl shaded-deps-${HIVE3_PROFILE} + # Install shaded dependencies for Spark SQL + $MVN install -DskipTests -P sparksql -pl shaded-deps-sparksql + # Install all modules for Hive 2 + $MVN install -DskipTests -P"${HIVE2_PROFILE},sparksql-integration" + # Install the shaded dependencies for Hive 3 (all the other shaded & parent modules have already been installed with the previous command) + $MVN install -DskipTests -P"${HIVE3_PROFILE}" -pl ${HIVE3_SHADED_DEPS} exit ;; @@ -84,6 +87,15 @@ case "$ACTION" in exit ;; + # Run integration tests for Spark SQL + integrationtest_sparksql) + $MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate \ + -P${HIVE2_PROFILE},sparksql-integration,coverage + # Upload test coverage report to Codecov + bash <(curl -s https://codecov.io/bash) -K -F "${ACTION}" + exit + ;; + *) echo "Unknown action: $ACTION" exit 1 diff --git a/hive-2-bigquery-connector/pom.xml b/hive-2-bigquery-connector/pom.xml index 0e750d2d..355e8b9e 100644 --- a/hive-2-bigquery-connector/pom.xml +++ b/hive-2-bigquery-connector/pom.xml @@ -36,14 +36,6 @@ test - - ${project.groupId} - shaded-acceptance-tests-dependencies - ${project.version} - shaded - test - - io.github.hiverunner hiverunner @@ -53,6 +45,20 @@ + + + + hive2-generic + + + ${project.groupId} + shaded-deps-hive2.3.9-hadoop2.10.2 + ${project.version} + shaded + provided + + + hive2.3.6-hadoop2.7.0 diff --git a/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkSQLIntegrationTests.java b/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkSQLIntegrationTests.java new file mode 100644 index 00000000..3af2f47f --- /dev/null +++ b/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkSQLIntegrationTests.java @@ -0,0 +1,262 @@ +/* + * Copyright 2023 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.hive.bigquery.connector.sparksql; + +import static com.google.cloud.hive.bigquery.connector.sparksql.SparkUtils.DerbyDiskDB; +import static com.google.cloud.hive.bigquery.connector.sparksql.SparkUtils.getSparkSession; +import static org.junit.jupiter.api.Assertions.*; + +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.hive.bigquery.connector.TestUtils; +import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; +import com.google.cloud.hive.bigquery.connector.integration.IntegrationTestsBase; +import com.google.common.collect.Streams; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import scala.collection.immutable.Map; +import scala.collection.mutable.WrappedArray; + +public class SparkSQLIntegrationTests extends IntegrationTestsBase { + + public Row[] runSparkSQLQuery(DerbyDiskDB derby, String queryTemplate) { + SparkSession spark = getSparkSession(derby); + Dataset ds = spark.sql(renderQueryTemplate(queryTemplate)); + Row[] rows = (Row[]) ds.collect(); + spark.stop(); + return rows; + } + + @ParameterizedTest + @MethodSource(IntegrationTestsBase.EXECUTION_ENGINE_READ_FORMAT) + public void testWhereClause(String engine, String readDataFormat) { + DerbyDiskDB derby = new DerbyDiskDB(hive); + initHive(engine, readDataFormat); + createExternalTable( + TestUtils.TEST_TABLE_NAME, + TestUtils.HIVE_TEST_TABLE_DDL, + TestUtils.BIGQUERY_TEST_TABLE_DDL); + // Insert data into BQ using the BQ SDK + runBqQuery( + String.format( + "INSERT `${dataset}.%s` VALUES (123, 'hello'), (999, 'abcd')", + TestUtils.TEST_TABLE_NAME)); + // Read data with Spark SQL + Row[] rows = + runSparkSQLQuery( + derby, + String.format( + "SELECT * FROM default.%s WHERE number = 999", TestUtils.TEST_TABLE_NAME)); + assertArrayEquals( + new Object[] { + new Object[] {999L, "abcd"}, + }, + SparkUtils.simplifySparkRows(rows)); + } + + // --------------------------------------------------------------------------------------------------- + + /** Check that we can read all types of data from BigQuery. */ + @ParameterizedTest + @MethodSource(IntegrationTestsBase.READ_FORMAT) + public void testReadAllTypes(String readDataFormat) throws IOException { + DerbyDiskDB derby = new DerbyDiskDB(hive); + initHive(IntegrationTestsBase.getDefaultExecutionEngine(), readDataFormat); + createExternalTable( + TestUtils.ALL_TYPES_TABLE_NAME, + TestUtils.HIVE_ALL_TYPES_TABLE_DDL, + TestUtils.BIGQUERY_ALL_TYPES_TABLE_DDL); + // Insert data into the BQ table using the BQ SDK + runBqQuery( + String.join( + "\n", + String.format("INSERT `${dataset}.%s` VALUES (", TestUtils.ALL_TYPES_TABLE_NAME), + "11,", + "22,", + "33,", + "44,", + "true,", + "\"fixed char\",", + "\"var char\",", + "\"string\",", + "cast(\"2019-03-18\" as date),", + // Wall clock (no timezone) + "cast(\"2000-01-01T00:23:45.123456\" as datetime),", + "cast(\"bytes\" as bytes),", + "2.0,", + "4.2,", + "struct(", + " cast(\"-99999999999999999999999999999.999999999\" as numeric),", + " cast(\"99999999999999999999999999999.999999999\" as numeric),", + " cast(3.14 as numeric),", + " cast(\"31415926535897932384626433832.795028841\" as numeric)", + "),", + "[1, 2, 3],", + "[(select as struct 111), (select as struct 222), (select as struct 333)],", + "struct(4.2, cast(\"2019-03-18 11:23:45.678901\" as datetime)),", + "[struct('a_key', [struct('a_subkey', 888)]), struct('b_key', [struct('b_subkey'," + + " 999)])]", + ")")); + // Read the data using Spark SQL + Row[] rows = runSparkSQLQuery(derby, "SELECT * FROM default." + TestUtils.ALL_TYPES_TABLE_NAME); + assertEquals(1, rows.length); + Row row = rows[0]; + assertEquals(18, row.size()); // Number of columns + assertEquals((byte) 11, row.get(0)); + assertEquals((short) 22, row.get(1)); + assertEquals((int) 33, row.get(2)); + assertEquals((long) 44, row.get(3)); + assertEquals(true, row.get(4)); + assertEquals("fixed char", row.get(5)); + assertEquals("var char", row.get(6)); + assertEquals("string", row.get(7)); + assertEquals("2019-03-18", row.get(8).toString()); + assertEquals("2000-01-01 00:23:45.123456", row.get(9).toString()); + assertArrayEquals("bytes".getBytes(), (byte[]) row.get(10)); + assertEquals(2.0, row.getFloat(11)); + assertEquals(4.2, row.getDouble(12)); + assertEquals( + "{min=-99999999999999999999999999999.999999999, max=99999999999999999999999999999.999999999, pi=3.140000000, big_pi=31415926535897932384626433832.795028841}", + SparkUtils.convertSparkRowToMap((GenericRowWithSchema) row.get(13)).toString()); + assertArrayEquals( + new Long[] {1l, 2l, 3l}, SparkUtils.convertSparkArray((WrappedArray) row.get(14))); + assertEquals( + "{i=111},{i=222},{i=333}", + Arrays.stream(SparkUtils.convertSparkArray((WrappedArray) row.get(15))) + .map(s -> s.toString()) + .collect(Collectors.joining(","))); + assertEquals( + "{float_field=4.2, ts_field=2019-03-18 11:23:45.678901}", + SparkUtils.convertSparkRowToMap((GenericRowWithSchema) row.get(16)).toString()); + // Map type + Map map = (Map) row.get(17); + assertEquals(2, map.size()); + assertEquals(888, ((Map) map.get("a_key").get()).get("a_subkey").get()); + assertEquals(999, ((Map) map.get("b_key").get()).get("b_subkey").get()); + } + + // --------------------------------------------------------------------------------------------------- + + /** Check that we can write all types of data to BigQuery. */ + @ParameterizedTest + @MethodSource(IntegrationTestsBase.EXECUTION_ENGINE_WRITE_METHOD) + public void testWriteAllTypes(String engine, String writeMethod) { + DerbyDiskDB derby = new DerbyDiskDB(hive); + hive.setHiveConfValue(HiveBigQueryConfig.WRITE_METHOD_KEY, writeMethod); + initHive(engine, HiveBigQueryConfig.AVRO); + // Create the BQ table + createExternalTable( + TestUtils.ALL_TYPES_TABLE_NAME, + TestUtils.HIVE_ALL_TYPES_TABLE_DDL, + TestUtils.BIGQUERY_ALL_TYPES_TABLE_DDL); + // Insert data into the BQ table using Spark SQL + SparkSession spark = SparkUtils.getSparkSession(derby); + spark.sql( + String.join( + "\n", + "INSERT INTO " + TestUtils.ALL_TYPES_TABLE_NAME + " SELECT", + "11,", + "22,", + "33,", + "44,", + "true,", + "\"fixed char\",", + "\"var char\",", + "\"string\",", + "CAST(\"2019-03-18\" AS DATE),", + // Wall clock (no timezone) + "CAST(\"2000-01-01 00:23:45.123456\" as TIMESTAMP),", + "CAST(\"bytes\" AS BINARY),", + "2.0,", + "4.2,", + "NAMED_STRUCT(", + " 'min', CAST(-99999999999999999999999999999.999999999 AS" + " DECIMAL(38,9)),", + " 'max', CAST(99999999999999999999999999999.999999999 AS" + " DECIMAL(38,9)),", + " 'pi', CAST(3.14 AS DECIMAL(38,9)),", + " 'big_pi', CAST(31415926535897932384626433832.795028841 AS" + " DECIMAL(38,9))", + "),", + "ARRAY(CAST (1 AS BIGINT), CAST (2 AS BIGINT), CAST (3 AS" + " BIGINT)),", + "ARRAY(NAMED_STRUCT('i', CAST (1 AS BIGINT))),", + "NAMED_STRUCT('float_field', CAST(4.2 AS FLOAT), 'ts_field', CAST" + + " (\"2019-03-18 01:23:45.678901\" AS TIMESTAMP)),", + "MAP('mykey', MAP('subkey', 999))", + "FROM (select '1') t")); + // Read the data using the BQ SDK + TableResult result = + runBqQuery(String.format("SELECT * FROM `${dataset}.%s`", TestUtils.ALL_TYPES_TABLE_NAME)); + // Verify we get the expected values + assertEquals(1, result.getTotalRows()); + List rows = Streams.stream(result.iterateAll()).collect(Collectors.toList()); + FieldValueList row = rows.get(0); + assertEquals(18, row.size()); // Number of columns + assertEquals(11L, row.get(0).getLongValue()); + assertEquals(22L, row.get(1).getLongValue()); + assertEquals(33L, row.get(2).getLongValue()); + assertEquals(44L, row.get(3).getLongValue()); + assertTrue(row.get(4).getBooleanValue()); + assertEquals("fixed char", row.get(5).getStringValue()); + assertEquals("var char", row.get(6).getStringValue()); + assertEquals("string", row.get(7).getStringValue()); + assertEquals("2019-03-18", row.get(8).getStringValue()); + assertEquals("2000-01-01T00:23:45.123456", row.get(9).getStringValue()); + assertArrayEquals("bytes".getBytes(), row.get(10).getBytesValue()); + assertEquals(2.0, row.get(11).getDoubleValue()); + assertEquals(4.2, row.get(12).getDoubleValue()); + FieldValueList struct = row.get(13).getRecordValue(); + assertEquals( + "-99999999999999999999999999999.999999999", + struct.get("min").getNumericValue().toPlainString()); + assertEquals( + "99999999999999999999999999999.999999999", + struct.get("max").getNumericValue().toPlainString()); + assertEquals("3.14", struct.get("pi").getNumericValue().toPlainString()); + assertEquals( + "31415926535897932384626433832.795028841", + struct.get("big_pi").getNumericValue().toPlainString()); + FieldValueList array = (FieldValueList) row.get(14).getValue(); + assertEquals(3, array.size()); + assertEquals(1, array.get(0).getLongValue()); + assertEquals(2, array.get(1).getLongValue()); + assertEquals(3, array.get(2).getLongValue()); + FieldValueList arrayOfStructs = (FieldValueList) row.get(15).getValue(); + assertEquals(1, arrayOfStructs.size()); + struct = (FieldValueList) arrayOfStructs.get(0).getValue(); + assertEquals(1L, struct.get(0).getLongValue()); + // Mixed struct + struct = row.get(16).getRecordValue(); + assertEquals( + 4.199999809265137, + struct.get("float_field").getDoubleValue()); // TODO: Address discrepancy here + assertEquals("2019-03-18T01:23:45.678901", struct.get("ts_field").getStringValue()); + // Check the Map type + FieldValueList map = (FieldValueList) row.get(17).getRepeatedValue(); + assertEquals(1, map.size()); + FieldValueList entry = map.get(0).getRecordValue(); + assertEquals("mykey", entry.get(0).getStringValue()); + assertEquals(1, entry.get(1).getRepeatedValue().size()); + FieldValueList subEntry = entry.get(1).getRepeatedValue().get(0).getRecordValue(); + assertEquals("subkey", subEntry.get(0).getStringValue()); + assertEquals(999, subEntry.get(1).getLongValue()); + } +} diff --git a/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkUtils.java b/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkUtils.java new file mode 100644 index 00000000..b4c175f2 --- /dev/null +++ b/hive-2-bigquery-connector/src/test/java/com/google/cloud/hive/bigquery/connector/sparksql/SparkUtils.java @@ -0,0 +1,140 @@ +/* + * Copyright 2023 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.hive.bigquery.connector.sparksql; + +import com.klarna.hiverunner.HiveShell; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; +import scala.collection.mutable.WrappedArray; + +public class SparkUtils { + + /** + * Creates the Metastore derby database on disk instead of in-memory so it can be shared between + * Hive and Spark + */ + public static class DerbyDiskDB { + public String url; + public Path dir; + + public DerbyDiskDB(HiveShell hive) { + dir = hive.getBaseDir().resolve("derby_" + UUID.randomUUID()); + url = String.format("jdbc:derby:%s;create=true", dir); + hive.setHiveConfValue("javax.jdo.option.ConnectionURL", url); + } + + /** + * Deletes the locks on the Derby database so Spark can take it over from Hive (or vice versa). + * Otherwise, you'll run into this exception: "Another instance of Derby may have already booted + * the database" + */ + public void releaseLock() { + try { + Files.deleteIfExists(dir.resolve("db.lck")); + Files.deleteIfExists(dir.resolve("dbex.lck")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static SparkSession getSparkSession(DerbyDiskDB derby) { + derby.releaseLock(); + SparkConf sparkConf = + new SparkConf() + .set("spark.sql.defaultUrlStreamHandlerFactory.enabled", "false") + .set("spark.hadoop.javax.jdo.option.ConnectionURL", derby.url) + .setMaster("local"); + SparkSession spark = + SparkSession.builder() + .appName("example") + .config(sparkConf) + .enableHiveSupport() + .getOrCreate(); + return spark; + } + + /** + * Converts the given Spark SQL rows to an array of arrays of primitives for easier comparison in + * tests. + */ + public static Object[] simplifySparkRows(Row[] rows) { + Object[][] result = new Object[rows.length][]; + for (int i = 0; i < rows.length; i++) { + result[i] = new Object[rows[i].size()]; + for (int j = 0; j < rows[i].size(); j++) { + result[i][j] = rows[i].get(j); + } + } + return result; + } + + /** Converts a Spark row to a map of primitives. */ + public static Map convertSparkRowToMap(GenericRowWithSchema row) { + StructType schema = row.schema(); + String[] fieldNames = schema.fieldNames(); + Map map = new HashMap<>(); + for (int i = 0; i < fieldNames.length; i++) { + String fieldName = fieldNames[i]; + DataType fieldType = schema.fields()[i].dataType(); + Object value = row.get(i); + if (value == null) { + map.put(fieldName, null); + } else if (fieldType instanceof StructType) { + map.put(fieldName, convertSparkRowToMap((GenericRowWithSchema) value)); + } else if (fieldType instanceof ArrayType) { + List list = new ArrayList<>(); + for (Object element : (Row[]) value) { + if (element instanceof GenericRowWithSchema) { + list.add(convertSparkRowToMap((GenericRowWithSchema) element)); + } else { + list.add(element); + } + } + map.put(fieldName, list); + } else { + map.put(fieldName, value); + } + } + return map; + } + + /** Converts a Spark array to a simple array of primitives. */ + public static Object[] convertSparkArray(WrappedArray wrappedArray) { + List list = new ArrayList<>(); + scala.collection.Iterator iterator = wrappedArray.iterator(); + while (iterator.hasNext()) { + Object value = iterator.next(); + if (value instanceof GenericRowWithSchema) { + list.add(convertSparkRowToMap((GenericRowWithSchema) value)); + } else if (value instanceof WrappedArray) { + list.add(convertSparkArray((WrappedArray) value)); + } else { + list.add(value); + } + } + return list.toArray(); + } +} diff --git a/hive-3-bigquery-connector/pom.xml b/hive-3-bigquery-connector/pom.xml index 23217a62..9f58502e 100644 --- a/hive-3-bigquery-connector/pom.xml +++ b/hive-3-bigquery-connector/pom.xml @@ -36,14 +36,6 @@ test - - ${project.groupId} - shaded-acceptance-tests-dependencies - ${project.version} - shaded - test - - io.github.hiverunner hiverunner @@ -52,8 +44,21 @@ - + + + + hive3-generic + + + ${project.groupId} + shaded-deps-hive3.1.2-hadoop2.10.2 + ${project.version} + shaded + provided + + + hive3.1.2-hadoop2.10.2 @@ -76,6 +81,13 @@ shaded provided + + ${project.groupId} + shaded-acceptance-tests-dependencies + ${project.version} + shaded + test + @@ -88,11 +100,17 @@ shaded provided + + ${project.groupId} + shaded-acceptance-tests-dependencies + ${project.version} + shaded + test + - diff --git a/hive-bigquery-connector-common/pom.xml b/hive-bigquery-connector-common/pom.xml index 376ef1db..f527a3a7 100644 --- a/hive-bigquery-connector-common/pom.xml +++ b/hive-bigquery-connector-common/pom.xml @@ -184,12 +184,6 @@ test - - com.google.truth - truth - test - - diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHookBase.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHookBase.java index 6baca4dd..81a87db9 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHookBase.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHookBase.java @@ -27,6 +27,7 @@ import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConnectorModule; import com.google.cloud.hive.bigquery.connector.output.BigQueryOutputCommitter; +import com.google.cloud.hive.bigquery.connector.output.OutputCommitterUtils; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; import com.google.cloud.hive.bigquery.connector.utils.JobUtils.CleanMessage; import com.google.cloud.hive.bigquery.connector.utils.avro.AvroUtils; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.*; @@ -66,8 +68,8 @@ public abstract class BigQueryMetaHookBase extends DefaultHiveMetaHook { Configuration conf; - protected List getSupportedTypes() { - List types = new ArrayList<>(); + protected List getSupportedTypes() { + List types = new ArrayList<>(); types.addAll( Arrays.asList( PrimitiveCategory.BYTE, // Tiny Int @@ -389,22 +391,19 @@ public void preInsertTable(Table table, boolean overwrite) throws MetaException @Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException { String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).toLowerCase(); - if (engine.equals("mr")) { - // In Hive v3, `commitInsertTable()` never gets called for MR -- only for Tez. - // But in Hive v2, it does get called. So we exit here since the BigQueryOutputCommitter - // is already called automatically for MR. - return; - } - try { - JobDetails jobDetails = JobDetails.readJobDetailsFile(conf, HiveUtils.getDbTableName(table)); - BigQueryOutputCommitter.commit(conf, jobDetails); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - // deleteOnExit in case of other jobs using the same workdir - JobUtils.cleanNotFail( - () -> JobUtils.deleteQueryWorkDirOnExit(conf), - CleanMessage.DELETE_QUERY_TEMPORARY_DIRECTORY); + if (engine.equals("tez")) { + try { + JobDetails jobDetails = + JobDetails.readJobDetailsFile(conf, HiveUtils.getDbTableName(table)); + OutputCommitterUtils.commitJob(conf, jobDetails); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + // deleteOnExit in case of other jobs using the same workdir + JobUtils.cleanNotFail( + () -> JobUtils.deleteQueryWorkDirOnExit(conf), + CleanMessage.DELETE_QUERY_TEMPORARY_DIRECTORY); + } } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java index 99dc6a81..72e35371 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java @@ -26,6 +26,7 @@ import com.google.cloud.hive.bigquery.connector.input.BigQueryInputFormat; import com.google.cloud.hive.bigquery.connector.output.BigQueryOutputCommitter; import com.google.cloud.hive.bigquery.connector.output.BigQueryOutputFormat; +import com.google.cloud.hive.bigquery.connector.output.MapReduceOutputFormat; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; import com.google.inject.Guice; @@ -166,6 +167,13 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map JobUtils.deleteJobTempOutput(conf, jobDetails), - CleanMessage.DELETE_JOB_TEMPORARY_DIRECTORY); - } - } - @Override public void commitJob(JobContext jobContext) throws IOException { - JobConf jobConf = jobContext.getJobConf(); - Set outputTables = getOutputTables(jobConf); - LOG.info("Committing job {} with output tables {}", jobContext.getJobID(), outputTables); + commitJob(jobContext.getJobConf()); + super.commitJob(jobContext); + } + + public static void commitJob(Configuration conf) throws IOException { + Set outputTables = OutputCommitterUtils.getOutputTables(conf); + LOG.info("Committing job {} with output tables {}", HiveUtils.getQueryId(conf), outputTables); for (String hmsDbTableName : outputTables) { JobDetails jobDetails; try { - jobDetails = JobDetails.readJobDetailsFile(jobConf, hmsDbTableName); + jobDetails = JobDetails.readJobDetailsFile(conf, hmsDbTableName); } catch (Exception e) { // TO-DO: should we abort the job? LOG.warn("JobDetails not found for table {}, skip it", hmsDbTableName); continue; } - commit(jobConf, jobDetails); + OutputCommitterUtils.commitJob(conf, jobDetails); } - super.commitJob(jobContext); } @Override public void abortJob(JobContext jobContext, int status) throws IOException { - JobConf jobConf = jobContext.getJobConf(); - Set outputTables = getOutputTables(jobConf); - LOG.info("aborting job {} with output tables {}", jobContext.getJobID(), outputTables); - for (String hmsDbTableName : outputTables) { - JobDetails jobDetails; - try { - jobDetails = JobDetails.readJobDetailsFile(jobConf, hmsDbTableName); - } catch (Exception e) { - LOG.warn("JobDetails not found for table {}, skip it", hmsDbTableName); - continue; - } - HiveBigQueryConfig opts = HiveBigQueryConfig.from(jobConf, jobDetails.getTableProperties()); - String writeMethod = opts.getWriteMethod(); - if (writeMethod.equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { - DirectOutputCommitter.abortJob(jobConf, jobDetails); - } - JobUtils.deleteJobTempOutput(jobConf, jobDetails); - } + OutputCommitterUtils.abortJob(jobContext.getJobConf()); super.abortJob(jobContext, status); } @@ -120,9 +83,4 @@ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { // Do nothing } - - private Set getOutputTables(JobConf jobConf) { - String outputTables = jobConf.get(HiveBigQueryConfig.OUTPUT_TABLES_KEY); - return Sets.newHashSet(HiveBigQueryConfig.TABLE_NAME_SPLITTER.split(outputTables)); - } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/BigQueryOutputFormat.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/BigQueryOutputFormat.java index 8a70b581..873501fc 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/BigQueryOutputFormat.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/BigQueryOutputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.Progressable; +/** Output format compatible with the old "mapred" Hadoop API. */ public class BigQueryOutputFormat implements OutputFormat, HiveOutputFormat { diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputCommitter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputCommitter.java new file mode 100644 index 00000000..9f7a98cd --- /dev/null +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputCommitter.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.hive.bigquery.connector.output; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** Output committer compatible with the new "mapreduce" Hadoop API. */ +public class MapReduceOutputCommitter extends OutputCommitter { + + @Override + public void commitJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException { + BigQueryOutputCommitter.commitJob(jobContext.getConfiguration()); + super.commitJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + OutputCommitterUtils.abortJob(jobContext.getConfiguration()); + super.abortJob(jobContext, state); + } + + @Override + public void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException { + // Do nothing + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + // Do nothing + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + // Do nothing + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + // Do nothing + } +} diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputFormat.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputFormat.java new file mode 100644 index 00000000..f0068ca0 --- /dev/null +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/MapReduceOutputFormat.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.hive.bigquery.connector.output; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** Output format compatible with the new "mapreduce" Hadoop API. This is used by Spark SQL. */ +public class MapReduceOutputFormat extends FileOutputFormat { + + private MapReduceOutputCommitter committer; + + @Override + public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (committer == null) { + committer = new MapReduceOutputCommitter(); + } + return committer; + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + // Note: Spark and Hive use `BigQueryOutputFormat.getRecordWriter()` instead. + return null; + } +} diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java new file mode 100644 index 00000000..8cb8a880 --- /dev/null +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.hive.bigquery.connector.output; + +import com.google.cloud.hive.bigquery.connector.JobDetails; +import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; +import com.google.cloud.hive.bigquery.connector.output.direct.DirectOutputCommitter; +import com.google.cloud.hive.bigquery.connector.output.indirect.IndirectOutputCommitter; +import com.google.cloud.hive.bigquery.connector.utils.JobUtils; +import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OutputCommitterUtils { + private static final Logger LOG = LoggerFactory.getLogger(OutputCommitterUtils.class); + + public static void commitJob(Configuration conf, JobDetails jobDetails) throws IOException { + HiveBigQueryConfig opts = HiveBigQueryConfig.from(conf, jobDetails.getTableProperties()); + String writeMethod = opts.getWriteMethod(); + // Pick the appropriate OutputCommitter (direct or indirect) based on the + // configured write method + try { + if (writeMethod.equals(HiveBigQueryConfig.WRITE_METHOD_INDIRECT)) { + IndirectOutputCommitter.commitJob(conf, jobDetails); + } else { + DirectOutputCommitter.commitJob(conf, jobDetails); + } + } finally { + JobUtils.cleanNotFail( + () -> JobUtils.deleteJobTempOutput(conf, jobDetails), + JobUtils.CleanMessage.DELETE_JOB_TEMPORARY_DIRECTORY); + } + } + + public static void abortJob(Configuration conf) throws IOException { + Set outputTables = getOutputTables(conf); + LOG.info("aborting job {} with output tables {}", HiveUtils.getQueryId(conf), outputTables); + for (String hmsDbTableName : outputTables) { + JobDetails jobDetails; + try { + jobDetails = JobDetails.readJobDetailsFile(conf, hmsDbTableName); + } catch (Exception e) { + LOG.warn("JobDetails not found for table {}, skip it", hmsDbTableName); + continue; + } + HiveBigQueryConfig opts = HiveBigQueryConfig.from(conf, jobDetails.getTableProperties()); + String writeMethod = opts.getWriteMethod(); + if (writeMethod.equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { + DirectOutputCommitter.abortJob(conf, jobDetails); + } + JobUtils.deleteJobTempOutput(conf, jobDetails); + } + } + + public static Set getOutputTables(Configuration conf) { + String outputTables = conf.get(HiveBigQueryConfig.OUTPUT_TABLES_KEY); + return Sets.newHashSet(HiveBigQueryConfig.TABLE_NAME_SPLITTER.split(outputTables)); + } +} diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectRecordWriter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectRecordWriter.java index 81c0f802..e5a234fd 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectRecordWriter.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectRecordWriter.java @@ -22,11 +22,11 @@ import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; import com.google.cloud.hive.bigquery.connector.output.WriterRegistry; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; -import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.UUID; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -36,7 +36,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TaskAttemptID; /** * Writes records to a given BQ stream. Each task runs its own instance of this writer class, i.e. @@ -49,17 +48,19 @@ public class DirectRecordWriter JobConf jobConf; JobDetails jobDetails; - TaskAttemptID taskAttemptID; BigQueryDirectDataWriterHelper streamWriter; StructObjectInspector rowObjectInspector; Descriptors.Descriptor descriptor; - final String writerId; + final Path streamRefFile; public DirectRecordWriter(JobConf jobConf, JobDetails jobDetails) { this.jobConf = jobConf; - this.taskAttemptID = HiveUtils.taskAttemptIDWrapper(jobConf); - this.writerId = WriterRegistry.getWriterId(); this.jobDetails = jobDetails; + String taskAttemptID = UUID.randomUUID().toString(); + String writerId = WriterRegistry.getWriterId(); + streamRefFile = + JobUtils.getTaskWriterOutputFile( + jobDetails, taskAttemptID, writerId, HiveBigQueryConfig.STREAM_FILE_EXTENSION); this.rowObjectInspector = BigQuerySerDe.getRowObjectInspector(jobDetails.getTableProperties()); try { descriptor = ProtoSchemaConverter.toDescriptor(this.rowObjectInspector); @@ -97,9 +98,6 @@ public void close(boolean abort) throws IOException { // Create a stream reference file that contains the stream name, so we can retrieve // it later at the end of the job to commit all streams. streamWriter.finalizeStream(); - Path streamRefFile = - JobUtils.getTaskWriterOutputFile( - jobDetails, taskAttemptID, writerId, HiveBigQueryConfig.STREAM_FILE_EXTENSION); FSDataOutputStream outputStream = streamRefFile.getFileSystem(jobConf).create(streamRefFile); outputStream.write(streamWriter.getWriteStreamName().getBytes(StandardCharsets.UTF_8)); outputStream.close(); diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectAvroRecordWriter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectAvroRecordWriter.java index 0e6086d9..e8d2a7b6 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectAvroRecordWriter.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectAvroRecordWriter.java @@ -21,8 +21,8 @@ import com.google.cloud.hive.bigquery.connector.output.WriterRegistry; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; import com.google.cloud.hive.bigquery.connector.utils.avro.AvroUtils; -import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; import java.io.IOException; +import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; @@ -34,7 +34,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TaskAttemptID; /** * Writes records to an Avro file in GCS. Each task runs its own instance of this writer class, i.e. @@ -50,15 +49,14 @@ public class IndirectAvroRecordWriter StructObjectInspector rowObjectInspector; Schema avroSchema; final DataFileWriter dataFileWriter; - final String writerId; public IndirectAvroRecordWriter(JobConf jobConf, JobDetails jobDetails) { this.jobConf = jobConf; this.jobDetails = jobDetails; this.rowObjectInspector = BigQuerySerDe.getRowObjectInspector(jobDetails.getTableProperties()); this.avroSchema = jobDetails.getAvroSchema(); - this.writerId = WriterRegistry.getWriterId(); - TaskAttemptID taskAttemptID = HiveUtils.taskAttemptIDWrapper(jobConf); + String taskAttemptID = UUID.randomUUID().toString(); + String writerId = WriterRegistry.getWriterId(); Path filePath = JobUtils.getTaskWriterOutputFile( jobDetails, taskAttemptID, writerId, HiveBigQueryConfig.LOAD_FILE_EXTENSION); diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/JobUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/JobUtils.java index a72e2cd1..06194c87 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/JobUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/JobUtils.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,15 +116,12 @@ public static Path getQueryTempOutputPath(Configuration conf, HiveBigQueryConfig * @return Fully Qualified temporary table path on GCS */ public static Path getTaskWriterOutputFile( - JobDetails jobDetails, TaskAttemptID taskAttemptID, String writerId, String suffix) { + JobDetails jobDetails, String taskAttemptID, String writerId, String suffix) { return new Path( jobDetails.getJobTempOutputPath(), String.format( "%s_%s_%s.%s", - getTableIdPrefix(jobDetails.getTableId()), - taskAttemptID.getTaskID(), - writerId, - suffix)); + getTableIdPrefix(jobDetails.getTableId()), taskAttemptID, writerId, suffix)); } /** Return the name prefix for the temp file. */ diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java index 23377518..842bff3f 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java @@ -16,13 +16,9 @@ package com.google.cloud.hive.bigquery.connector.utils.hive; import java.util.Map; -import java.util.Objects; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapreduce.JobID; /** * Helper class that looks up details about a task ID and Tez Vertex ID. This is useful to create @@ -45,9 +41,20 @@ public static boolean isExternalTable(Table table) { return "TRUE".equalsIgnoreCase(params.get("EXTERNAL")); } - /** Returns the ID of the Hive query as set by Hive in the configuration. */ + public static boolean isSparkJob(Configuration conf) { + return conf.get("spark.app.id", "").length() != 0; + } + public static String getQueryId(Configuration conf) { - return HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + String hiveQueryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID, ""); + if (!hiveQueryId.equals("")) { + return "hive-query-id-" + hiveQueryId; + } + String sparkAppId = conf.get("spark.app.id", ""); + if (!sparkAppId.equals("")) { + return "spark-app-id-" + sparkAppId; + } + return "no-query-id"; } public static String getDbTableName(Table table) { @@ -59,48 +66,4 @@ public static boolean enableCommitterInTez(Configuration conf) { return (tezCommitter != null && tezCommitter.equals("org.apache.tez.mapreduce.committer.MROutputCommitter")); } - - public static TaskAttemptID taskAttemptIDWrapper(JobConf jobConf) { - return new TaskAttemptIDWrapper( - TaskAttemptID.forName(jobConf.get("mapred.task.id")), jobConf.get("hive.tez.vertex.index")); - } - - private static JobID getJobIDWithVertexAppended(JobID jobID, String vertexId) { - if (vertexId != null && !vertexId.isEmpty()) { - return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId()); - } else { - return jobID; - } - } - - private static class TaskAttemptIDWrapper extends TaskAttemptID { - - TaskAttemptIDWrapper(TaskAttemptID taskAttemptID, String vertexId) { - super( - getJobIDWithVertexAppended(taskAttemptID.getJobID(), vertexId).getJtIdentifier(), - taskAttemptID.getJobID().getId(), - taskAttemptID.getTaskType(), - taskAttemptID.getTaskID().getId(), - taskAttemptID.getId()); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof TaskAttemptIDWrapper)) { - return false; - } - TaskAttemptIDWrapper other = (TaskAttemptIDWrapper) obj; - return (getId() == other.getId() - && getTaskID().getId() == other.getTaskID().getId() - && Objects.equals(getJobID(), other.getJobID())); - } - - @Override - public int hashCode() { - return Objects.hash(getId(), getTaskID().getId(), getJobID()); - } - } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ReadIntegrationTestsBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ReadIntegrationTestsBase.java index 5ecbef85..fe4b170d 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ReadIntegrationTestsBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ReadIntegrationTestsBase.java @@ -96,10 +96,9 @@ public void testWhereClause(String engine, String readDataFormat) { // --------------------------------------------------------------------------------------------------- /** Test the `SELECT` statement with explicit columns (i.e. not `SELECT *`) */ - @ParameterizedTest - @MethodSource(EXECUTION_ENGINE_READ_FORMAT) - public void testSelectExplicitColumns(String engine, String readDataFormat) { - initHive(engine, readDataFormat); + @Test + public void testSelectExplicitColumns() { + initHive(); createExternalTable(TEST_TABLE_NAME, HIVE_TEST_TABLE_DDL, BIGQUERY_TEST_TABLE_DDL); // Insert data into BQ using the BQ SDK runBqQuery( diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java index a83d4af1..1c2ce2c2 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java @@ -25,7 +25,6 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskAttemptID; import org.junit.jupiter.api.Test; public class JobUtilsTest { @@ -51,10 +50,10 @@ public void testGetWorkDir() { conf.set("hive.query.id", "query123"); conf.set("hadoop.tmp.dir", "/tmp"); Path path = JobUtils.getQueryWorkDir(conf); - assertEquals("/tmp/bq-hive-query123", path.toString()); + assertEquals("/tmp/bq-hive-hive-query-id-query123", path.toString()); conf.set("bq.work.dir.parent.path", "/my/workdir"); path = JobUtils.getQueryWorkDir(conf); - assertEquals("/my/workdir/bq-hive-query123", path.toString()); + assertEquals("/my/workdir/bq-hive-hive-query-id-query123", path.toString()); } @Test @@ -65,7 +64,8 @@ public void testGetJobDetailsFilePath() { String hmsDbTable = "default.mytable"; Path jobDetailsFilePath = JobUtils.getJobDetailsFilePath(conf, hmsDbTable); assertEquals( - "/tmp/bq-hive-query123/default.mytable/job-details.json", jobDetailsFilePath.toString()); + "/tmp/bq-hive-hive-query-id-query123/default.mytable/job-details.json", + jobDetailsFilePath.toString()); } @Test @@ -75,11 +75,11 @@ public void testGetTaskWriterOutputFile() { jobDetails.setJobTempOutputPath(new Path(tmp)); TableId tableId = TableId.of("myproject", "mydataset", "mytable"); jobDetails.setTableId(tableId); - TaskAttemptID taskAttemptID = new TaskAttemptID(); + String taskAttemptID = "abcd1234"; String writerId = WriterRegistry.getWriterId(); Path path = JobUtils.getTaskWriterOutputFile(jobDetails, taskAttemptID, writerId, "json"); String pattern = - "^" + Pattern.quote(tmp) + "/myproject_mydataset_mytable_task__0000_r_000000_w\\d+\\.json$"; + "^" + Pattern.quote(tmp) + "/myproject_mydataset_mytable_abcd1234_w\\d+\\.json$"; assertThat(path.toString(), matchesPattern(pattern)); } } diff --git a/hive-bigquery-parent/pom.xml b/hive-bigquery-parent/pom.xml index 118300a5..e34cea2f 100644 --- a/hive-bigquery-parent/pom.xml +++ b/hive-bigquery-parent/pom.xml @@ -69,10 +69,14 @@ 6.1.0 2.12.7.1 5.10.0 + 1.10.0 1.9.2 3.21.12 3.4.1 1.1.3 + 2.5.2 + 1.7.36 + 3.0.0 2.14.0 @@ -109,19 +113,11 @@ org.slf4j slf4j-api - 1.7.36 provided org.conscrypt conscrypt-openjdk-uber - 2.5.2 - provided - - - junit - junit - test org.junit.platform @@ -131,13 +127,11 @@ org.junit.jupiter junit-jupiter - ${junit-jupiter.version} test org.junit.platform junit-platform-launcher - ${junit-platform-launcher.version} test @@ -212,11 +206,6 @@ pom import - - com.lmax - disruptor - 3.4.2 - io.grpc grpc-bom @@ -225,24 +214,27 @@ import - junit - junit - 4.13.2 + org.junit.platform + junit-platform-engine + ${junit-platform-engine.version} + test org.junit.platform - junit-platform-engine - 1.10.0 + junit-platform-launcher + ${junit-platform-launcher.version} + test org.junit.jupiter junit-jupiter ${junit-jupiter.version} + test org.glassfish javax.el - 3.0.0 + ${javax.el.version} io.github.hiverunner @@ -294,6 +286,10 @@ jackson-annotations com.fasterxml.jackson.core + + org.junit.vintage + junit-vintage-engine + @@ -306,8 +302,24 @@ com.google.guava guava + + junit + junit + + + org.conscrypt + conscrypt-openjdk-uber + ${conscrypt-openjdk-uber.version} + provided + + + org.slf4j + slf4j-api + ${slf4j-api.version} + provided + @@ -388,6 +400,7 @@ maven-surefire-plugin ${maven-surefire-plugin.version} + 0 **/*IntegrationTests.java **/*AcceptanceTest.java @@ -540,7 +553,56 @@ false false - **/*IntegrationTests.java + **/integration/*Tests.java + + + + + integration-test + + integration-test + + + + verify + + verify + + + + + + + + + sparksql-integration + + + ${project.groupId} + shaded-deps-sparksql + ${project.version} + shaded + test + + + io.netty + * + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + + 1 + false + false + + **/SparkSQLIntegrationTests.java diff --git a/hive-v2 b/hive-v2 new file mode 160000 index 00000000..851217fa --- /dev/null +++ b/hive-v2 @@ -0,0 +1 @@ +Subproject commit 851217faacc43f62f4162d88d3f3c3e2b895ad30 diff --git a/pom.xml b/pom.xml index c1e84b66..695b0a89 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,17 @@ + + + + + + hive2-generic + + shaded-deps-hive2.3.9-hadoop2.10.2 + hive-2-bigquery-connector + + hive2.3.6-hadoop2.7.0 @@ -79,22 +90,20 @@ hive-2-bigquery-connector + + + - hive3.1.2-hadoop2.10.2 + + + hive3-generic shaded-deps-hive3.1.2-hadoop2.10.2 hive-3-bigquery-connector - hive2-generic - - shaded-deps-hive2.3.9-hadoop2.10.2 - hive-2-bigquery-connector - - - - hive3-generic + hive3.1.2-hadoop2.10.2 shaded-deps-hive3.1.2-hadoop2.10.2 hive-3-bigquery-connector @@ -109,13 +118,20 @@ dataproc21 - - true - shaded-deps-dataproc21 hive-3-bigquery-connector + + + + + sparksql + + shaded-deps-sparksql + + + diff --git a/shaded-sparksql/pom.xml b/shaded-deps-sparksql/pom.xml similarity index 97% rename from shaded-sparksql/pom.xml rename to shaded-deps-sparksql/pom.xml index 1bc8dac0..70ae7e06 100644 --- a/shaded-sparksql/pom.xml +++ b/shaded-deps-sparksql/pom.xml @@ -25,7 +25,7 @@ ../hive-bigquery-parent - shaded-sparksql + shaded-deps-sparksql - org.sonatype.plugins nexus-staging-maven-plugin ${nexus-staging-maven-plugin.version} + true