Skip to content

Commit

Permalink
Extend BaseSystemTable in Iceberg refs table
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 9, 2024
1 parent cdecbbf commit c9d984e
Showing 1 changed file with 26 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,79 +15,52 @@

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.REFS;

public class RefsTable
implements SystemTable
extends BaseSystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
private static final List<ColumnMetadata> COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("name", VARCHAR))
.add(new ColumnMetadata("type", VARCHAR))
.add(new ColumnMetadata("snapshot_id", BIGINT))
.add(new ColumnMetadata("max_reference_age_in_ms", BIGINT))
.add(new ColumnMetadata("min_snapshots_to_keep", INTEGER))
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build();

public RefsTable(SchemaTableName tableName, Table icebergTable)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("name", VARCHAR))
.add(new ColumnMetadata("type", VARCHAR))
.add(new ColumnMetadata("snapshot_id", BIGINT))
.add(new ColumnMetadata("max_reference_age_in_ms", BIGINT))
.add(new ColumnMetadata("min_snapshots_to_keep", INTEGER))
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build());
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
super(
requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS),
REFS);
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

icebergTable.refs().forEach((refName, ref) -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(refName);
pagesBuilder.appendVarchar(ref.isBranch() ? "BRANCH" : "TAG");
pagesBuilder.appendBigint(ref.snapshotId());
pagesBuilder.appendBigint(ref.maxRefAgeMs());
pagesBuilder.appendInteger(ref.minSnapshotsToKeep());
pagesBuilder.appendBigint(ref.maxSnapshotAgeMs());
pagesBuilder.endRow();
});

return pagesBuilder.build();
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("name"), String.class));
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get("type"), String.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("snapshot_id"), Long.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_reference_age_in_ms"), Long.class));
pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get("min_snapshots_to_keep"), Integer.class));
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get("max_snapshot_age_in_ms"), Long.class));
pagesBuilder.endRow();
}
}

0 comments on commit c9d984e

Please sign in to comment.