From f0581e44cb1acb9a65a57e721e1a66918c64934b Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 28 Nov 2024 10:40:56 +0900 Subject: [PATCH] Add $transactions system table to Delta Lake --- .../plugin/deltalake/DeltaLakeMetadata.java | 6 ++ .../plugin/deltalake/DeltaLakeTableType.java | 1 + .../deltalake/DeltaLakeTransactionsTable.java | 75 +++++++++++++++++++ .../deltalake/TestDeltaLakeSystemTables.java | 27 +++++++ 4 files changed, 109 insertions(+) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ca70235b60d8..6edb4db1d57c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -4100,6 +4100,12 @@ private Optional getRawSystemTable(ConnectorSession session, Schema fileSystemFactory, transactionLogAccess, typeManager)); + case TRANSACTIONS -> Optional.of(new DeltaLakeTransactionsTable( + systemTableName, + tableLocation, + fileSystemFactory, + transactionLogAccess, + typeManager)); case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess)); case PARTITIONS -> Optional.of(new DeltaLakePartitionsTable(session, systemTableName, tableLocation, transactionLogAccess, typeManager)); }; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java index d1fac99083fa..0b9d3b62d375 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java @@ -17,6 +17,7 @@ public enum DeltaLakeTableType { DATA, HISTORY, + TRANSACTIONS, PROPERTIES, PARTITIONS, } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java new file mode 100644 index 000000000000..b35ce6074618 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableList; +import io.airlift.json.JsonCodec; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; +import io.trino.plugin.deltalake.transactionlog.Transaction; +import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.util.PageListBuilder; +import io.trino.spi.Page; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; + +import java.util.List; + +import static io.airlift.json.JsonCodec.listJsonCodec; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.StandardTypes.JSON; +import static java.util.Objects.requireNonNull; + +public class DeltaLakeTransactionsTable + extends BaseTransactionsTable +{ + private static final JsonCodec> TRANSACTION_LOG_ENTRIES_CODEC = listJsonCodec(DeltaLakeTransactionLogEntry.class); + + public DeltaLakeTransactionsTable( + SchemaTableName tableName, + String tableLocation, + TrinoFileSystemFactory fileSystemFactory, + TransactionLogAccess transactionLogAccess, + TypeManager typeManager) + { + super( + tableName, + tableLocation, + fileSystemFactory, + transactionLogAccess, + typeManager, + new ConnectorTableMetadata( + requireNonNull(tableName, "tableName is null"), + ImmutableList.builder() + .add(new ColumnMetadata("version", BIGINT)) + .add(new ColumnMetadata("transaction", typeManager.getType(new TypeSignature(JSON)))) + .build())); + } + + @Override + protected List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions) + { + for (Transaction transaction : transactions) { + pagesBuilder.beginRow(); + pagesBuilder.appendBigint(transaction.transactionId()); + pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries())); + pagesBuilder.endRow(); + } + return pagesBuilder.build(); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index dfa9def3cbef..7a3fa97a5816 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -17,6 +17,7 @@ import com.google.common.io.Resources; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; @@ -25,6 +26,8 @@ import java.nio.file.Path; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; +import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN; +import static io.trino.testing.TestingAccessControlManager.privilege; import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; @@ -100,6 +103,30 @@ public void testHistoryTable() } } + @Test + void testTransactionsTable() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_transactions", "(col int)")) { + assertThat((String) computeScalar("SELECT transaction FROM \"" + table.getName() + "$transactions\"")) + .contains("commitInfo", "protocol", "metaData"); + } + } + + @Test + void testTransactionsTableAccessControl() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_transactions", "(col int)")) { + // TODO Disallow access to transactions table when the user can't access the base table + assertAccessAllowed( + "SELECT * FROM \"" + table.getName() + "$transactions\"", + privilege(table.getName(), SELECT_COLUMN)); + assertAccessDenied( + "SELECT * FROM \"" + table.getName() + "$transactions\"", + "Cannot select from columns .*", + privilege(table.getName() + "$transactions", SELECT_COLUMN)); + } + } + @Test public void testPropertiesTable() {