Skip to content

Commit

Permalink
Support reading json type in BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 17, 2023
1 parent 6f79755 commit beed389
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 7 deletions.
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ to the following table:
- In [Well-known text
(WKT)](https://wikipedia.org/wiki/Well-known_text_representation_of_geometry)
format
* - `JSON`
- `JSON`
-
* - `ARRAY`
- `ARRAY`
-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@
public class BigQueryArrowToPageConverter
implements AutoCloseable
{
private final BigQueryTypeManager typeManager;
private final VectorSchemaRoot root;
private final VectorLoader loader;
private final BufferAllocator allocator;
private final List<Type> columnTypes;
private final List<String> columnNames;

public BigQueryArrowToPageConverter(BufferAllocator allocator, Schema schema, List<BigQueryColumnHandle> columns)
public BigQueryArrowToPageConverter(BigQueryTypeManager typeManager, BufferAllocator allocator, Schema schema, List<BigQueryColumnHandle> columns)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.allocator = requireNonNull(allocator, "allocator is null");
this.columnTypes = requireNonNull(columns, "columns is null").stream()
.map(BigQueryColumnHandle::getTrinoType)
Expand Down Expand Up @@ -192,7 +194,7 @@ private void writeVectorValues(BlockBuilder output, FieldVector vector, Consumer

private void writeSlice(BlockBuilder output, Type type, FieldVector vector, int index)
{
if (type instanceof VarcharType) {
if (type instanceof VarcharType || typeManager.isJsonType(type)) {
byte[] slice = ((VarCharVector) vector).get(index);
type.writeSlice(output, wrappedBuffer(slice));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ public class BigQueryPageSourceProvider

private final BigQueryClientFactory bigQueryClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final BigQueryTypeManager typeManager;
private final int maxReadRowsRetries;
private final boolean arrowSerializationEnabled;

@Inject
public BigQueryPageSourceProvider(BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config)
public BigQueryPageSourceProvider(
BigQueryClientFactory bigQueryClientFactory,
BigQueryReadClientFactory bigQueryReadClientFactory,
BigQueryTypeManager typeManager,
BigQueryConfig config)
{
this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null");
this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxReadRowsRetries = config.getMaxReadRowsRetries();
this.arrowSerializationEnabled = config.isArrowSerializationEnabled();
}
Expand Down Expand Up @@ -94,13 +100,15 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi
{
if (arrowSerializationEnabled) {
return new BigQueryStorageArrowPageSource(
typeManager,
bigQueryReadClientFactory.create(session),
maxReadRowsRetries,
split,
columnHandles);
}
return new BigQueryStorageAvroPageSource(
bigQueryReadClientFactory.create(session),
typeManager,
maxReadRowsRetries,
split,
columnHandles);
Expand All @@ -110,6 +118,7 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ
{
return new BigQueryQueryPageSource(
session,
typeManager,
bigQueryClientFactory.create(session),
table,
columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class BigQueryQueryPageSource
.optionalEnd()
.toFormatter();

private final BigQueryTypeManager typeManager;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final PageBuilder pageBuilder;
Expand All @@ -83,6 +84,7 @@ public class BigQueryQueryPageSource

public BigQueryQueryPageSource(
ConnectorSession session,
BigQueryTypeManager typeManager,
BigQueryClient client,
BigQueryTableHandle table,
List<String> columnNames,
Expand All @@ -94,6 +96,7 @@ public BigQueryQueryPageSource(
requireNonNull(columnNames, "columnNames is null");
requireNonNull(columnTypes, "columnTypes is null");
requireNonNull(filter, "filter is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match");
this.columnNames = ImmutableList.copyOf(columnNames);
this.columnTypes = ImmutableList.copyOf(columnTypes);
Expand Down Expand Up @@ -235,9 +238,9 @@ else if (type instanceof RowType rowType) {
}
}

private static void writeSlice(BlockBuilder output, Type type, FieldValue value)
private void writeSlice(BlockBuilder output, Type type, FieldValue value)
{
if (type instanceof VarcharType) {
if (type instanceof VarcharType || typeManager.isJsonType(type)) {
type.writeSlice(output, utf8Slice(value.getStringValue()));
}
else if (type instanceof VarbinaryType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class BigQueryStorageArrowPageSource
private final PageBuilder pageBuilder;

public BigQueryStorageArrowPageSource(
BigQueryTypeManager typeManager,
BigQueryReadClient bigQueryReadClient,
int maxReadRowsRetries,
BigQuerySplit split,
Expand All @@ -70,7 +71,7 @@ public BigQueryStorageArrowPageSource(
log.debug("Starting to read from %s", split.getStreamName());
responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows();
this.streamBufferAllocator = allocator.newChildAllocator(split.getStreamName(), 1024, Long.MAX_VALUE);
this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(streamBufferAllocator, schema, columns);
this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(typeManager, streamBufferAllocator, schema, columns);
this.pageBuilder = new PageBuilder(columns.stream()
.map(BigQueryColumnHandle::getTrinoType)
.collect(toImmutableList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class BigQueryStorageAvroPageSource
private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter();

private final BigQueryReadClient bigQueryReadClient;
private final BigQueryTypeManager typeManager;
private final BigQuerySplit split;
private final List<String> columnNames;
private final List<Type> columnTypes;
Expand All @@ -92,11 +93,13 @@ public class BigQueryStorageAvroPageSource

public BigQueryStorageAvroPageSource(
BigQueryReadClient bigQueryReadClient,
BigQueryTypeManager typeManager,
int maxReadRowsRetries,
BigQuerySplit split,
List<BigQueryColumnHandle> columns)
{
this.bigQueryReadClient = requireNonNull(bigQueryReadClient, "bigQueryReadClient is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.split = requireNonNull(split, "split is null");
this.readBytes = new AtomicLong();
requireNonNull(columns, "columns is null");
Expand Down Expand Up @@ -220,7 +223,7 @@ else if (type instanceof RowType rowType) {
}
}

private static void writeSlice(BlockBuilder output, Type type, Object value)
private void writeSlice(BlockBuilder output, Type type, Object value)
{
if (type instanceof VarcharType) {
type.writeSlice(output, utf8Slice(((Utf8) value).toString()));
Expand All @@ -233,6 +236,9 @@ else if (type instanceof VarbinaryType) {
output.appendNull();
}
}
else if (typeManager.isJsonType(type)) {
type.writeSlice(output, utf8Slice(((Utf8) value).toString()));
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.type.ArrayType;
Expand All @@ -38,6 +39,8 @@
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import jakarta.annotation.Nullable;
Expand All @@ -62,6 +65,7 @@
import static io.trino.plugin.bigquery.BigQueryMetadata.DEFAULT_NUMERIC_TYPE_SCALE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.DecimalType.createDecimalType;
import static io.trino.spi.type.StandardTypes.JSON;
import static io.trino.spi.type.TimeWithTimeZoneType.DEFAULT_PRECISION;
import static io.trino.spi.type.TimeWithTimeZoneType.createTimeWithTimeZoneType;
import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey;
Expand All @@ -78,6 +82,7 @@
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public final class BigQueryTypeManager
Expand All @@ -97,6 +102,14 @@ public final class BigQueryTypeManager
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("''HH:mm:ss.SSSSSS''");
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss.SSSSSS").withZone(UTC);

private final Type jsonType;

@Inject
public BigQueryTypeManager(TypeManager typeManager)
{
jsonType = requireNonNull(typeManager, "typeManager is null").getType(new TypeSignature(JSON));
}

private RowType.Field toRawTypeField(String name, Field field)
{
Type trinoType = convertToTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported column " + field)).type();
Expand Down Expand Up @@ -353,6 +366,8 @@ private Optional<ColumnMapping> convertToTrinoType(Field field)
return Optional.of(new ColumnMapping(TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS, true));
case GEOGRAPHY:
return Optional.of(new ColumnMapping(VarcharType.VARCHAR, false));
case JSON:
return Optional.of(new ColumnMapping(jsonType, false));
case STRUCT:
// create the row
FieldList subTypes = field.getSubFields();
Expand Down Expand Up @@ -402,6 +417,11 @@ public boolean isSupportedType(Field field)
return toTrinoType(field).isPresent();
}

public boolean isJsonType(Type type)
{
return type.equals(jsonType);
}

private static Field.Mode getMode(Field field)
{
return firstNonNull(field.getMode(), Field.Mode.NULLABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void testPredicatePushdown()
testPredicatePushdown("DATETIME '2018-04-01 02:13:55.123'", "TIMESTAMP '2018-04-01 02:13:55.123'", true);

testPredicatePushdown("ST_GeogPoint(0, 0)", "'POINT(0 0)'", false);
testPredicatePushdown("JSON '{\"age\": 30}'", "JSON '{\"age\": 30}'", false);
testPredicatePushdown("[true]", "ARRAY[true]", false);
testPredicatePushdown("STRUCT('nested' AS x)", "ROW('nested')", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.type.JsonType.JSON;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -678,6 +679,16 @@ public void testGeography()
.execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.geography"));
}

@Test
public void testJson()
{
SqlDataTypeTest.create()
.addRoundTrip("JSON", "JSON '{\"name\": \"Alice\", \"age\": 30}'", JSON, "JSON '{\"name\": \"Alice\", \"age\": 30}'")
.addRoundTrip("JSON", "NULL", JSON, "CAST(NULL AS JSON)")
.execute(getQueryRunner(), bigqueryCreateAndInsert("test.json"))
.execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.json"));
}

@Test
public void testArray()
{
Expand Down

0 comments on commit beed389

Please sign in to comment.