From 44e7875790bd2c7b9fd2a8c86454c581e215a39c Mon Sep 17 00:00:00 2001 From: manuzhang Date: Fri, 13 Sep 2024 17:49:32 +0800 Subject: [PATCH] Core: Fix caching table with metadata table names --- .../org/apache/iceberg/CachingCatalog.java | 37 ++++---- .../TestCachingTableWithMetaTableName.java | 89 +++++++++++++++++++ 2 files changed, 110 insertions(+), 16 deletions(-) create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCachingTableWithMetaTableName.java diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 1043e3e7205c..cc859686a074 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -32,6 +32,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.slf4j.Logger; @@ -145,22 +146,26 @@ public Table loadTable(TableIdentifier ident) { } if (MetadataTableUtils.hasMetadataTableName(canonicalized)) { - TableIdentifier originTableIdentifier = - TableIdentifier.of(canonicalized.namespace().levels()); - Table originTable = tableCache.get(originTableIdentifier, catalog::loadTable); - - // share TableOperations instance of origin table for all metadata tables, so that metadata - // table instances are - // also refreshed as well when origin table instance is refreshed. - if (originTable instanceof HasTableOperations) { - TableOperations ops = ((HasTableOperations) originTable).operations(); - MetadataTableType type = MetadataTableType.from(canonicalized.name()); - - Table metadataTable = - MetadataTableUtils.createMetadataTableInstance( - ops, catalog.name(), originTableIdentifier, canonicalized, type); - tableCache.put(canonicalized, metadataTable); - return metadataTable; + try { + TableIdentifier originTableIdentifier = + TableIdentifier.of(canonicalized.namespace().levels()); + Table originTable = tableCache.get(originTableIdentifier, catalog::loadTable); + + // share TableOperations instance of origin table for all metadata tables, so that metadata + // table instances are + // also refreshed as well when origin table instance is refreshed. + if (originTable instanceof HasTableOperations) { + TableOperations ops = ((HasTableOperations) originTable).operations(); + MetadataTableType type = MetadataTableType.from(canonicalized.name()); + + Table metadataTable = + MetadataTableUtils.createMetadataTableInstance( + ops, catalog.name(), originTableIdentifier, canonicalized, type); + tableCache.put(canonicalized, metadataTable); + return metadataTable; + } + } catch (NoSuchTableException e) { + // it's not a metadata table so ignore exception and load as a normal table } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCachingTableWithMetaTableName.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCachingTableWithMetaTableName.java new file mode 100644 index 000000000000..5023db5b18c3 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCachingTableWithMetaTableName.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.util.List; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestCachingTableWithMetaTableName extends CatalogTestBase { + private static final String TABLE_NAME = "history"; + + @Parameter(index = 3) + private int cacheExpirationInterval; + + @Parameters( + name = "catalogName = {0}, implementation = {1}, config = {2}, cacheExpirationInterval = {3}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "cache-enabled", "true"), + 1 + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "cache-enabled", "true"), + -1 + } + }; + } + + @BeforeEach + public void createTables() { + spark + .conf() + .set( + "spark.sql.catalog.spark_catalog.cache.expiration-interval-ms", + cacheExpirationInterval); + sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg", TABLE_NAME); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + } + + @TestTemplate + public void testLoadTableAfterCacheExpires() throws InterruptedException { + Thread.sleep(10); // wait for cache to expire + sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", TABLE_NAME); + List expected = + ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", TABLE_NAME)); + } +}