From beed389c13674a0cd495e64e66d95416b07d7fd5 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 16 Nov 2023 22:22:47 +0900 Subject: [PATCH] Support reading json type in BigQuery --- docs/src/main/sphinx/connector/bigquery.md | 3 +++ .../BigQueryArrowToPageConverter.java | 6 ++++-- .../bigquery/BigQueryPageSourceProvider.java | 11 +++++++++- .../bigquery/BigQueryQueryPageSource.java | 7 +++++-- .../BigQueryStorageArrowPageSource.java | 3 ++- .../BigQueryStorageAvroPageSource.java | 8 +++++++- .../plugin/bigquery/BigQueryTypeManager.java | 20 +++++++++++++++++++ .../bigquery/BaseBigQueryConnectorTest.java | 1 + .../bigquery/BaseBigQueryTypeMapping.java | 11 ++++++++++ 9 files changed, 63 insertions(+), 7 deletions(-) diff --git a/docs/src/main/sphinx/connector/bigquery.md b/docs/src/main/sphinx/connector/bigquery.md index 96db6e68c40a..2bae6d8a7a99 100644 --- a/docs/src/main/sphinx/connector/bigquery.md +++ b/docs/src/main/sphinx/connector/bigquery.md @@ -212,6 +212,9 @@ to the following table: - In [Well-known text (WKT)](https://wikipedia.org/wiki/Well-known_text_representation_of_geometry) format +* - `JSON` + - `JSON` + - * - `ARRAY` - `ARRAY` - diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java index fbfd8ff664d3..6b3b9450afd8 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java @@ -82,14 +82,16 @@ public class BigQueryArrowToPageConverter implements AutoCloseable { + private final BigQueryTypeManager typeManager; private final VectorSchemaRoot root; private final VectorLoader loader; private final BufferAllocator allocator; private final List columnTypes; private final List columnNames; - public BigQueryArrowToPageConverter(BufferAllocator allocator, Schema schema, List columns) + public BigQueryArrowToPageConverter(BigQueryTypeManager typeManager, BufferAllocator allocator, Schema schema, List columns) { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.allocator = requireNonNull(allocator, "allocator is null"); this.columnTypes = requireNonNull(columns, "columns is null").stream() .map(BigQueryColumnHandle::getTrinoType) @@ -192,7 +194,7 @@ private void writeVectorValues(BlockBuilder output, FieldVector vector, Consumer private void writeSlice(BlockBuilder output, Type type, FieldVector vector, int index) { - if (type instanceof VarcharType) { + if (type instanceof VarcharType || typeManager.isJsonType(type)) { byte[] slice = ((VarCharVector) vector).get(index); type.writeSlice(output, wrappedBuffer(slice)); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index b9a37f951a62..51b5acfe84e0 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -38,14 +38,20 @@ public class BigQueryPageSourceProvider private final BigQueryClientFactory bigQueryClientFactory; private final BigQueryReadClientFactory bigQueryReadClientFactory; + private final BigQueryTypeManager typeManager; private final int maxReadRowsRetries; private final boolean arrowSerializationEnabled; @Inject - public BigQueryPageSourceProvider(BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config) + public BigQueryPageSourceProvider( + BigQueryClientFactory bigQueryClientFactory, + BigQueryReadClientFactory bigQueryReadClientFactory, + BigQueryTypeManager typeManager, + BigQueryConfig config) { this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null"); this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.maxReadRowsRetries = config.getMaxReadRowsRetries(); this.arrowSerializationEnabled = config.isArrowSerializationEnabled(); } @@ -94,6 +100,7 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi { if (arrowSerializationEnabled) { return new BigQueryStorageArrowPageSource( + typeManager, bigQueryReadClientFactory.create(session), maxReadRowsRetries, split, @@ -101,6 +108,7 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi } return new BigQueryStorageAvroPageSource( bigQueryReadClientFactory.create(session), + typeManager, maxReadRowsRetries, split, columnHandles); @@ -110,6 +118,7 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ { return new BigQueryQueryPageSource( session, + typeManager, bigQueryClientFactory.create(session), table, columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 9504a370bbe2..315bab7086ec 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -74,6 +74,7 @@ public class BigQueryQueryPageSource .optionalEnd() .toFormatter(); + private final BigQueryTypeManager typeManager; private final List columnNames; private final List columnTypes; private final PageBuilder pageBuilder; @@ -83,6 +84,7 @@ public class BigQueryQueryPageSource public BigQueryQueryPageSource( ConnectorSession session, + BigQueryTypeManager typeManager, BigQueryClient client, BigQueryTableHandle table, List columnNames, @@ -94,6 +96,7 @@ public BigQueryQueryPageSource( requireNonNull(columnNames, "columnNames is null"); requireNonNull(columnTypes, "columnTypes is null"); requireNonNull(filter, "filter is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match"); this.columnNames = ImmutableList.copyOf(columnNames); this.columnTypes = ImmutableList.copyOf(columnTypes); @@ -235,9 +238,9 @@ else if (type instanceof RowType rowType) { } } - private static void writeSlice(BlockBuilder output, Type type, FieldValue value) + private void writeSlice(BlockBuilder output, Type type, FieldValue value) { - if (type instanceof VarcharType) { + if (type instanceof VarcharType || typeManager.isJsonType(type)) { type.writeSlice(output, utf8Slice(value.getStringValue())); } else if (type instanceof VarbinaryType) { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java index 919cfcf94f51..b230a3146870 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java @@ -58,6 +58,7 @@ public class BigQueryStorageArrowPageSource private final PageBuilder pageBuilder; public BigQueryStorageArrowPageSource( + BigQueryTypeManager typeManager, BigQueryReadClient bigQueryReadClient, int maxReadRowsRetries, BigQuerySplit split, @@ -70,7 +71,7 @@ public BigQueryStorageArrowPageSource( log.debug("Starting to read from %s", split.getStreamName()); responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows(); this.streamBufferAllocator = allocator.newChildAllocator(split.getStreamName(), 1024, Long.MAX_VALUE); - this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(streamBufferAllocator, schema, columns); + this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(typeManager, streamBufferAllocator, schema, columns); this.pageBuilder = new PageBuilder(columns.stream() .map(BigQueryColumnHandle::getTrinoType) .collect(toImmutableList())); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java index 41572ec1a2a7..8f17854adb10 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java @@ -83,6 +83,7 @@ public class BigQueryStorageAvroPageSource private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter(); private final BigQueryReadClient bigQueryReadClient; + private final BigQueryTypeManager typeManager; private final BigQuerySplit split; private final List columnNames; private final List columnTypes; @@ -92,11 +93,13 @@ public class BigQueryStorageAvroPageSource public BigQueryStorageAvroPageSource( BigQueryReadClient bigQueryReadClient, + BigQueryTypeManager typeManager, int maxReadRowsRetries, BigQuerySplit split, List columns) { this.bigQueryReadClient = requireNonNull(bigQueryReadClient, "bigQueryReadClient is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.split = requireNonNull(split, "split is null"); this.readBytes = new AtomicLong(); requireNonNull(columns, "columns is null"); @@ -220,7 +223,7 @@ else if (type instanceof RowType rowType) { } } - private static void writeSlice(BlockBuilder output, Type type, Object value) + private void writeSlice(BlockBuilder output, Type type, Object value) { if (type instanceof VarcharType) { type.writeSlice(output, utf8Slice(((Utf8) value).toString())); @@ -233,6 +236,9 @@ else if (type instanceof VarbinaryType) { output.appendNull(); } } + else if (typeManager.isJsonType(type)) { + type.writeSlice(output, utf8Slice(((Utf8) value).toString())); + } else { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java index e90ff84c091c..bfd4ab41e185 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; import io.airlift.slice.Slice; import io.trino.spi.TrinoException; import io.trino.spi.type.ArrayType; @@ -38,6 +39,8 @@ import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import jakarta.annotation.Nullable; @@ -62,6 +65,7 @@ import static io.trino.plugin.bigquery.BigQueryMetadata.DEFAULT_NUMERIC_TYPE_SCALE; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.StandardTypes.JSON; import static io.trino.spi.type.TimeWithTimeZoneType.DEFAULT_PRECISION; import static io.trino.spi.type.TimeWithTimeZoneType.createTimeWithTimeZoneType; import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; @@ -78,6 +82,7 @@ import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.time.ZoneOffset.UTC; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; public final class BigQueryTypeManager @@ -97,6 +102,14 @@ public final class BigQueryTypeManager private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("''HH:mm:ss.SSSSSS''"); private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss.SSSSSS").withZone(UTC); + private final Type jsonType; + + @Inject + public BigQueryTypeManager(TypeManager typeManager) + { + jsonType = requireNonNull(typeManager, "typeManager is null").getType(new TypeSignature(JSON)); + } + private RowType.Field toRawTypeField(String name, Field field) { Type trinoType = convertToTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported column " + field)).type(); @@ -353,6 +366,8 @@ private Optional convertToTrinoType(Field field) return Optional.of(new ColumnMapping(TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS, true)); case GEOGRAPHY: return Optional.of(new ColumnMapping(VarcharType.VARCHAR, false)); + case JSON: + return Optional.of(new ColumnMapping(jsonType, false)); case STRUCT: // create the row FieldList subTypes = field.getSubFields(); @@ -402,6 +417,11 @@ public boolean isSupportedType(Field field) return toTrinoType(field).isPresent(); } + public boolean isJsonType(Type type) + { + return type.equals(jsonType); + } + private static Field.Mode getMode(Field field) { return firstNonNull(field.getMode(), Field.Mode.NULLABLE); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index 493ec15d9b66..cb92f5e55d5a 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -135,6 +135,7 @@ public void testPredicatePushdown() testPredicatePushdown("DATETIME '2018-04-01 02:13:55.123'", "TIMESTAMP '2018-04-01 02:13:55.123'", true); testPredicatePushdown("ST_GeogPoint(0, 0)", "'POINT(0 0)'", false); + testPredicatePushdown("JSON '{\"age\": 30}'", "JSON '{\"age\": 30}'", false); testPredicatePushdown("[true]", "ARRAY[true]", false); testPredicatePushdown("STRUCT('nested' AS x)", "ROW('nested')", false); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java index d2130132351d..c3760d95ed7a 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java @@ -45,6 +45,7 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.type.JsonType.JSON; import static java.lang.String.format; import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -678,6 +679,16 @@ public void testGeography() .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.geography")); } + @Test + public void testJson() + { + SqlDataTypeTest.create() + .addRoundTrip("JSON", "JSON '{\"name\": \"Alice\", \"age\": 30}'", JSON, "JSON '{\"name\": \"Alice\", \"age\": 30}'") + .addRoundTrip("JSON", "NULL", JSON, "CAST(NULL AS JSON)") + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.json")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.json")); + } + @Test public void testArray() {