From c9d984e812c8ae93f6a2d9f87e1efa9d1a4ca603 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 25 Jun 2024 13:34:51 +0900 Subject: [PATCH] Extend BaseSystemTable in Iceberg refs table --- .../io/trino/plugin/iceberg/RefsTable.java | 79 ++++++------------- 1 file changed, 26 insertions(+), 53 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java index b08b6d3b7f7c..ae938a82a0a8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java @@ -15,79 +15,52 @@ import com.google.common.collect.ImmutableList; import io.trino.plugin.iceberg.util.PageListBuilder; -import io.trino.spi.Page; import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorPageSource; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.ConnectorTransactionHandle; -import io.trino.spi.connector.FixedPageSource; import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.SystemTable; -import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.TimeZoneKey; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import java.util.List; +import java.util.Map; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableType.REFS; public class RefsTable - implements SystemTable + extends BaseSystemTable { - private final ConnectorTableMetadata tableMetadata; - private final Table icebergTable; + private static final List COLUMNS = ImmutableList.builder() + .add(new ColumnMetadata("name", VARCHAR)) + .add(new ColumnMetadata("type", VARCHAR)) + .add(new ColumnMetadata("snapshot_id", BIGINT)) + .add(new ColumnMetadata("max_reference_age_in_ms", BIGINT)) + .add(new ColumnMetadata("min_snapshots_to_keep", INTEGER)) + .add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT)) + .build(); public RefsTable(SchemaTableName tableName, Table icebergTable) { - this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); - - this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), - ImmutableList.builder() - .add(new ColumnMetadata("name", VARCHAR)) - .add(new ColumnMetadata("type", VARCHAR)) - .add(new ColumnMetadata("snapshot_id", BIGINT)) - .add(new ColumnMetadata("max_reference_age_in_ms", BIGINT)) - .add(new ColumnMetadata("min_snapshots_to_keep", INTEGER)) - .add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT)) - .build()); - } - - @Override - public Distribution getDistribution() - { - return Distribution.SINGLE_COORDINATOR; - } - - @Override - public ConnectorTableMetadata getTableMetadata() - { - return tableMetadata; + super( + requireNonNull(icebergTable, "icebergTable is null"), + new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS), + REFS); } @Override - public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) { - return new FixedPageSource(buildPages(tableMetadata, icebergTable)); - } - - private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable) - { - PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); - - icebergTable.refs().forEach((refName, ref) -> { - pagesBuilder.beginRow(); - pagesBuilder.appendVarchar(refName); - pagesBuilder.appendVarchar(ref.isBranch() ? "BRANCH" : "TAG"); - pagesBuilder.appendBigint(ref.snapshotId()); - pagesBuilder.appendBigint(ref.maxRefAgeMs()); - pagesBuilder.appendInteger(ref.minSnapshotsToKeep()); - pagesBuilder.appendBigint(ref.maxSnapshotAgeMs()); - pagesBuilder.endRow(); - }); - - return pagesBuilder.build(); + pagesBuilder.beginRow(); + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("name"), String.class)); + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("type"), String.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("snapshot_id"), Long.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_reference_age_in_ms"), Long.class)); + pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get("min_snapshots_to_keep"), Integer.class)); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_snapshot_age_in_ms"), Long.class)); + pagesBuilder.endRow(); } }