Skip to content

Commit

Permalink
Add missing table features when adding timestamp_ntz in Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 11, 2024
1 parent e168874 commit a7e72d4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1252,12 +1252,12 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
}
}
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties());
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry().minReaderVersion(), tableHandle.getProtocolEntry().minWriterVersion(), containsTimestampType, tableMetadata.getProperties());
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location);
}
else {
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
protocolEntry = protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties());
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
}

appendTableEntries(
Expand Down Expand Up @@ -1435,13 +1435,13 @@ public DeltaLakeOutputTableHandle beginCreateTable(
ProtocolEntry protocolEntry;

if (replaceExistingTable) {
protocolEntry = protocolEntryForTable(handle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties());
protocolEntry = protocolEntryForTable(handle.getProtocolEntry().minReaderVersion(), handle.getProtocolEntry().minWriterVersion(), containsTimestampType, tableMetadata.getProperties());
readVersion = OptionalLong.of(handle.getReadVersion());
}
else {
checkPathContainsNoFiles(session, finalLocation);
setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation));
protocolEntry = protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties());
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
}

return new DeltaLakeOutputTableHandle(
Expand Down Expand Up @@ -1810,9 +1810,11 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
ProtocolEntry protocolEntry = handle.getProtocolEntry();
checkSupportedWriterVersion(handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), protocolEntry);
if (changeDataFeedEnabled(handle.getMetadataEntry(), protocolEntry).orElse(false) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
Optional<Boolean> changeDataFeedEnabled = changeDataFeedEnabled(handle.getMetadataEntry(), protocolEntry);
if (changeDataFeedEnabled.orElse(false) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
}
boolean deletionVectorEnabled = isDeletionVectorEnabled(handle.getMetadataEntry(), protocolEntry);
checkUnsupportedWriterFeatures(protocolEntry);

if (!newColumnMetadata.isNullable()) {
Expand Down Expand Up @@ -1861,7 +1863,12 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
transactionLogWriter,
ADD_COLUMN_OPERATION,
session,
buildProtocolEntryForNewColumn(protocolEntry, newColumnMetadata.getType()),
protocolEntry(
ProtocolEntry.builder(protocolEntry),
containsTimestampType(newColumnMetadata.getType()),
changeDataFeedEnabled,
columnMappingMode,
deletionVectorEnabled),
MetadataEntry.builder(handle.getMetadataEntry())
.setSchemaString(schemaString)
.setConfiguration(configuration));
Expand All @@ -1879,17 +1886,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

private ProtocolEntry buildProtocolEntryForNewColumn(ProtocolEntry protocolEntry, Type type)
{
ProtocolEntry.Builder builder = ProtocolEntry.builder(protocolEntry);

if (containsTimestampType(type)) {
builder.enableTimestampNtz();
}

return builder.build();
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
Expand Down Expand Up @@ -2916,32 +2912,33 @@ private TableSnapshot getSnapshot(ConnectorSession session, DeltaLakeTableHandle
return getSnapshot(session, table.getSchemaTableName(), table.getLocation(), Optional.of(table.getReadVersion()));
}

private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Map<String, Object> properties)
private ProtocolEntry protocolEntryForTable(int readerVersion, int writerVersion, boolean containsTimestampType, Map<String, Object> properties)
{
return protocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, properties);
return protocolEntry(
ProtocolEntry.builder(readerVersion, writerVersion),
containsTimestampType,
getChangeDataFeedEnabled(properties),
getColumnMappingMode(properties),
getDeletionVectorsEnabled(properties));
}

private ProtocolEntry protocolEntryForTable(ProtocolEntry existingProtocolEntry, boolean containsTimestampType, Map<String, Object> properties)
private ProtocolEntry protocolEntry(
ProtocolEntry.Builder protocolEntry,
boolean containsTimestampType,
Optional<Boolean> changeDataFeedEnabled,
ColumnMappingMode columnMappingMode,
boolean deletionVectorsEnabled)
{
return protocolEntry(existingProtocolEntry.minReaderVersion(), existingProtocolEntry.minWriterVersion(), containsTimestampType, properties);
}

private ProtocolEntry protocolEntry(int readerVersion, int writerVersion, boolean containsTimestampType, Map<String, Object> properties)
{
ProtocolEntry.Builder protocolEntry = ProtocolEntry.builder(readerVersion, writerVersion);

Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(properties);
if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) {
protocolEntry.enableChangeDataFeed();
}
ColumnMappingMode columnMappingMode = getColumnMappingMode(properties);
if (columnMappingMode == ID || columnMappingMode == NAME) {
protocolEntry.enableColumnMapping();
}
if (containsTimestampType) {
protocolEntry.enableTimestampNtz();
}
if (getDeletionVectorsEnabled(properties)) {
if (deletionVectorsEnabled) {
protocolEntry.enableDeletionVector();
}
return protocolEntry.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.union;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
Expand Down Expand Up @@ -4201,6 +4202,39 @@ public void testQueriesWithoutCheckpointFiltering()
assertUpdate("DROP TABLE " + tableName);
}

@Test
void testAddTimestampNtzColumnToCdfEnabledTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_timestamp_ntz", "(x int) WITH (change_data_feed_enabled = true)")) {
assertThat(getTableProperties(table.getName()))
.containsExactlyInAnyOrderEntriesOf(ImmutableMap.<String, String>builder()
.put("delta.enableChangeDataFeed", "true")
.put("delta.enableDeletionVectors", "false")
.put("delta.minReaderVersion", "1")
.put("delta.minWriterVersion", "4")
.buildOrThrow());

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN ts TIMESTAMP");

// CDF is enabled in this table. 'delta.feature.changeDataFeed' must be added when updating the table to versions supporting table features
assertThat(getTableProperties(table.getName()))
.containsExactlyInAnyOrderEntriesOf(ImmutableMap.<String, String>builder()
.put("delta.enableChangeDataFeed", "true")
.put("delta.enableDeletionVectors", "false")
.put("delta.feature.changeDataFeed", "supported")
.put("delta.feature.timestampNtz", "supported")
.put("delta.minReaderVersion", "3")
.put("delta.minWriterVersion", "7")
.buildOrThrow());
}
}

private Map<String, String> getTableProperties(String tableName)
{
return computeActual("SELECT key, value FROM \"" + tableName + "$properties\"").getMaterializedRows().stream()
.collect(toImmutableMap(row -> (String) row.getField(0), row -> (String) row.getField(1)));
}

@Test
public void testTypeCoercionOnCreateTable()
{
Expand Down

0 comments on commit a7e72d4

Please sign in to comment.