-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix caching table with metadata table names #11123
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
import org.apache.iceberg.catalog.Namespace; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.exceptions.AlreadyExistsException; | ||
import org.apache.iceberg.exceptions.NoSuchTableException; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
import org.slf4j.Logger; | ||
|
@@ -145,22 +146,26 @@ public Table loadTable(TableIdentifier ident) { | |
} | ||
|
||
if (MetadataTableUtils.hasMetadataTableName(canonicalized)) { | ||
TableIdentifier originTableIdentifier = | ||
TableIdentifier.of(canonicalized.namespace().levels()); | ||
Table originTable = tableCache.get(originTableIdentifier, catalog::loadTable); | ||
|
||
// share TableOperations instance of origin table for all metadata tables, so that metadata | ||
// table instances are | ||
// also refreshed as well when origin table instance is refreshed. | ||
if (originTable instanceof HasTableOperations) { | ||
TableOperations ops = ((HasTableOperations) originTable).operations(); | ||
MetadataTableType type = MetadataTableType.from(canonicalized.name()); | ||
|
||
Table metadataTable = | ||
MetadataTableUtils.createMetadataTableInstance( | ||
ops, catalog.name(), originTableIdentifier, canonicalized, type); | ||
tableCache.put(canonicalized, metadataTable); | ||
return metadataTable; | ||
try { | ||
TableIdentifier originTableIdentifier = | ||
TableIdentifier.of(canonicalized.namespace().levels()); | ||
Table originTable = tableCache.get(originTableIdentifier, catalog::loadTable); | ||
|
||
// share TableOperations instance of origin table for all metadata tables, so that metadata | ||
// table instances are | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I know there was only an indentation change here, but there is an odd linebreak in this line |
||
// also refreshed as well when origin table instance is refreshed. | ||
if (originTable instanceof HasTableOperations) { | ||
TableOperations ops = ((HasTableOperations) originTable).operations(); | ||
MetadataTableType type = MetadataTableType.from(canonicalized.name()); | ||
|
||
Table metadataTable = | ||
MetadataTableUtils.createMetadataTableInstance( | ||
ops, catalog.name(), originTableIdentifier, canonicalized, type); | ||
tableCache.put(canonicalized, metadataTable); | ||
return metadataTable; | ||
} | ||
} catch (NoSuchTableException e) { | ||
// it's not a metadata table so ignore exception and load as a normal table | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.spark.sql; | ||
|
||
import java.util.List; | ||
import org.apache.iceberg.Parameter; | ||
import org.apache.iceberg.Parameters; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.spark.CatalogTestBase; | ||
import org.apache.iceberg.spark.SparkCatalogConfig; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.TestTemplate; | ||
|
||
public class TestCachingTableWithMetaTableName extends CatalogTestBase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found that there is an existing test suite, TestCachingCatalog. Wouldn't it be cleaner to add test coverage there instead of to the Spark test suite? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That test is to test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get that, but I think this patch did change the CachingCatalog internals itself, I don't see what is the additional value in putting this into a Spark test. TestCachingCatalog should definitely have coverage for behaviours like this. If you feel that an additional Spark test is also beneficial, I don't mind. I might miss some background, though. |
||
private static final String TABLE_NAME = "history"; | ||
|
||
@Parameter(index = 3) | ||
private int cacheExpirationInterval; | ||
|
||
@Parameters( | ||
name = "catalogName = {0}, implementation = {1}, config = {2}, cacheExpirationInterval = {3}") | ||
protected static Object[][] parameters() { | ||
return new Object[][] { | ||
{ | ||
SparkCatalogConfig.SPARK.catalogName(), | ||
SparkCatalogConfig.SPARK.implementation(), | ||
ImmutableMap.of( | ||
"type", "hive", | ||
"default-namespace", "default", | ||
"parquet-enabled", "true", | ||
"cache-enabled", "true"), | ||
1 | ||
}, | ||
{ | ||
SparkCatalogConfig.SPARK.catalogName(), | ||
SparkCatalogConfig.SPARK.implementation(), | ||
ImmutableMap.of( | ||
"type", "hive", | ||
"default-namespace", "default", | ||
"parquet-enabled", "true", | ||
"cache-enabled", "true"), | ||
-1 | ||
} | ||
}; | ||
} | ||
|
||
@BeforeEach | ||
public void createTables() { | ||
spark | ||
.conf() | ||
.set( | ||
"spark.sql.catalog.spark_catalog.cache.expiration-interval-ms", | ||
cacheExpirationInterval); | ||
sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg", TABLE_NAME); | ||
} | ||
|
||
@AfterEach | ||
public void removeTables() { | ||
sql("DROP TABLE IF EXISTS %s", TABLE_NAME); | ||
} | ||
|
||
@TestTemplate | ||
public void testLoadTableAfterCacheExpires() throws InterruptedException { | ||
Thread.sleep(10); // wait for cache to expire | ||
sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", TABLE_NAME); | ||
List<Object[]> expected = | ||
ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); | ||
|
||
assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", TABLE_NAME)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, as I see the RESTSessionCatalog does this the other way around: First tries to load the table as a regular table and then if that fails it tries to load it as a metadata table. https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L461
I find that approach cleaner, but let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you suggest we call
catalog::loadTable
first? If so, we might not be able to cache the original table, since the underlying catalog can load the metadata table directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I had in mind was something different:
try {
return tableCache.get(canonicalized, catalog::loadTable);
} catch (NoSuchTableException) {
// Verify it's a metadata table
// Do whatever we want in case of a metadata table, even cache the original table.
}
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is no
NoSuchTableException
will ever be thrown on cache miss sincecatalog::loadTable
can successfully load the metadata table. For example, inBaseMetastoreCatalog
iceberg/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Lines 45 to 71 in fa00482
In this case, we don't cache the original table and change the current behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally I had some time to take a deeper look on this. You are right, with my above proposal we won't run the specific code for metadata tables like caching the underlying table and sharing the table operations.
I did find a way simpler version of this fix, however. I also wrote some tests in TestCachingCatalog. gaborkaszab@c1eca8b
What do you think? Would you mind if I opened an alternative fix PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please go ahead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @manuzhang !
I created this PR for the alternative fix: #11738 Added you as a co-author.
@RussellSpitzer I saw your thumbs-up on one of my comments. Would you mind taking a look at the PR?