Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Fix Migrate procedure renaming issue for custom catalog #8931

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ default MigrateTable backupTableName(String tableName) {
throw new UnsupportedOperationException("Backup table name cannot be specified");
}

/**
* Sets a destination catalog name to use the catalog for renaming a backup table
*
* @param catalogName - the destination catalog name for backup table rename
* @return this for method chaining
*/
default MigrateTable destCatalogName(String catalogName) {
throw new UnsupportedOperationException("Destination catalog name cannot be specified");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,49 @@ public void testMigrateWithBackupTableName() throws IOException {
Assertions.assertThat(spark.catalog().tableExists(dbName + "." + backupTableName)).isTrue();
}

@Test
public void testMigrateWithDestCatalogName() throws IOException {
Assume.assumeTrue(catalogName.equals("spark_catalog"));

spark
.conf()
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog");

String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);

Object result =
scalarSql(
"CALL %s.system.migrate(table => '%s', drop_backup => false, dest_catalog_name => '%s')",
catalogName, tableName, catalogName);
Assertions.assertThat(result).isEqualTo(1L);
Assertions.assertThat(spark.catalog().tableExists(tableName + "_BACKUP_")).isTrue();
}

@Test
public void testMigrateWithDestCatalogNameWithNonExistingCatalog() throws IOException {
Assume.assumeTrue(catalogName.equals("spark_catalog"));

String destCatalogName = "non_existing_catalog";

String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);

Assertions.assertThatThrownBy(
() -> {
sql(
"CALL %s.system.migrate(table => '%s', dest_catalog_name => '%s')",
catalogName, tableName, destCatalogName);
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("The destination catalog %s doesn't exist in SparkSession.", destCatalogName);
}

@Test
public void testMigrateWithInvalidMetricsConfig() throws IOException {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
Expand All @@ -54,9 +55,9 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
private static final Logger LOG = LoggerFactory.getLogger(MigrateTableSparkAction.class);
private static final String BACKUP_SUFFIX = "_BACKUP_";

private final StagingTableCatalog destCatalog;
private final Identifier destTableIdent;

private StagingTableCatalog destCatalog;
private Identifier backupIdent;
private boolean dropBackup = false;

Expand Down Expand Up @@ -108,6 +109,21 @@ public MigrateTableSparkAction backupTableName(String tableName) {
return this;
}

@Override
public MigrateTableSparkAction destCatalogName(String catalogName) {
CatalogManager catalogManager = spark().sessionState().catalogManager();

CatalogPlugin catalogPlugin;
if (catalogManager.isCatalogRegistered(catalogName)) {
catalogPlugin = catalogManager.catalog(catalogName);
} else {
throw new IllegalArgumentException(
String.format("The destination catalog %s doesn't exist in SparkSession.", catalogName));
}
this.destCatalog = checkDestinationCatalog(catalogPlugin);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: earlier we use to use sourceCatalog as destCatalog too, was this a problem ? can you please add more comments as to why sourceCatalog was picked as spark_catalog rather than the glue_catalog ?? since we were calling the migrate procedure from glue_catalog ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @singhpk234.

Yes that was a problem because in the migrate, the source catalog can only be SparkSessionCatalog. Let me elaborate this in addition to what I investigated in #7317 (comment) (if I'm wrong, please correct me).

When running migrate, checkSourceCatalog in BaseTableCreationSparkAction is called along with MigrateTableSparkAction initialization.

The checkSourceCatalog comes from the MigrateTableSparkAction implementation, and the method checks if the sourceCatalog is SparkSessionCatalog or not.

  @Override
  protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
    // currently the import code relies on being able to look up the table in the session catalog
    Preconditions.checkArgument(
        catalog instanceof SparkSessionCatalog,
        "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
        catalog.name(),
        catalog.getClass().getName());

    return (TableCatalog) catalog;
  }

So, the sourceCatalog is necessary to be specified the SparkSessionCatalog. And if SparkSessionCatalog is set and GlueCatalogImpl is set as its implementation, the renameTable operation in the migrate is called via SparkSessionCatalog, GlueCatalogImpl is not used in the renameTable operation, and then the migrate query fails.

For the custom catalog, specifically Glue Data Catalog, as you know, it originally has the renameTable implemenation in Iceberg because the Glue Data Catalog client doesn't support the table rename. I think if it were possible to specify the destCatalog to use the destCatalog impl, it could resolve the restriction of source catalog side impl like Glue Data Catalog.

For reference, I tested the following patterns, but all patterns failed:

Pattern 1 - Set SparkCatalog (glue_catalog) to the sourceTable

SparkSession config:

        val spark: SparkSession = SparkSession.builder
          .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
          .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
          .config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
          .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
          .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
          .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .getOrCreate()

Migrate query:

spark.sql(
    s"""
        CALL glue_catalog.system.migrate (
            table => 'glue_catalog.db.table'
       )
""")

Error (partial stacktrace):

Exception in thread "main" java.lang.IllegalArgumentException: Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found glue_catalog of class org.apache.iceberg.spark.SparkCatalog as the source catalog.
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:445)
	at org.apache.iceberg.spark.actions.MigrateTableSparkAction.checkSourceCatalog(MigrateTableSparkAction.java:200)
	at org.apache.iceberg.spark.actions.BaseTableCreationSparkAction.<init>(BaseTableCreationSparkAction.java:74)
	at org.apache.iceberg.spark.actions.MigrateTableSparkAction.<init>(MigrateTableSparkAction.java:65)
	at org.apache.iceberg.spark.actions.SparkActions.migrateTable(SparkActions.java:67)
	at org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:98)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...

Pattern 1' - Set SparkCatalog (glue_catalog) to the sourceTable and specify SparkCatalog for Procedure call

The same result was obtained as "Pattern 1" because the catalog name for procedure call doesn't affect the source/destination catalog.

SparkSession config: omitted. The same configuration was used as "Pattern 1".

Migrate query:

spark.sql(
    s"""
        CALL spark_catalog.system.migrate (
            table => 'glue_catalog.db.table'
       )

Error: omitted. The same error was obtained as "Pattern 1".

Pattern 2 - Set SparkSessionCatalog with GlueCatalogImpl to the sourceTable

SparkSession config:

        val spark: SparkSession = SparkSession.builder
          .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
          .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
          .config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
          .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
          .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
          .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .getOrCreate()

Migrate query:

spark.sql(
    s"""
        CALL glue_catalog.system.migrate (
            table => 'db.table'
       )
""")

/* Or,
        CALL glue_catalog.system.migrate (
            table => 'spark_catalog.db.table'
       )
*/

Error (partial stacktrace):

Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.UnsupportedOperationException: Table rename is not supported
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:133)
	at org.apache.spark.sql.hive.HiveExternalCatalog.renameTable(HiveExternalCatalog.scala:566)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.renameTable(ExternalCatalogWithListener.scala:110)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.renameTable(SessionCatalog.scala:803)
	at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.renameTable(V2SessionCatalog.scala:216)
	at org.apache.iceberg.spark.SparkSessionCatalog.renameTable(SparkSessionCatalog.java:301)
	at org.apache.iceberg.spark.actions.MigrateTableSparkAction.renameAndBackupSourceTable(MigrateTableSparkAction.java:212)
	at org.apache.iceberg.spark.actions.MigrateTableSparkAction.doExecute(MigrateTableSparkAction.java:123)
	at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
	at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
	at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:132)
	at org.apache.iceberg.spark.actions.MigrateTableSparkAction.execute(MigrateTableSparkAction.java:115)
	at org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:108)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...
Caused by: java.lang.UnsupportedOperationException: Table rename is not supported
	at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.alterTable(GlueMetastoreClientDelegate.java:539)
	at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.alter_table_with_environmentContext(AWSCatalogMetastoreClient.java:417)
	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:648)
	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:630)
...
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.sql.hive.client.Shim_v2_1.alterTable(HiveShim.scala:1624)
...
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$renameTable$1(HiveExternalCatalog.scala:588)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:104)

Copy link
Contributor

@singhpk234 singhpk234 Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomtongue do we know why would the icebergCatalog.tableExists(from) fail ? so that we are not using the GlueCatalog implementation (of iceberg) which supports rename and are falling back to the V2SessionCatalog.renameTable which is using GlueMetaStoreClient and doesn't support rename ?

public void renameTable(Identifier from, Identifier to)
throws NoSuchTableException, TableAlreadyExistsException {
// rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session
// catalog tables,
// check table existence first to ensure that the table belongs to the Iceberg catalog.
if (icebergCatalog.tableExists(from)) {
icebergCatalog.renameTable(from, to);
} else {
getSessionCatalog().renameTable(from, to);
}
}
@Override

Copy link
Contributor Author

@tomtongue tomtongue Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 thanks for checking my comments.

In this part, the renameTable checks if the source table is Iceberg or not. However, for the migrate procedure, the source table must be spark_catalog (for example, if the glue_catalog is specified, it fails as described above).
So in the migrate query, getSessionCatalog().renameTable(from, to); is called, and the answer for your question below is NOT using the GlueCatalogImpl in Iceberg.

so that we are not using the GlueCatalog implementation (of iceberg) which supports rename and are falling back to the V2SessionCatalog.renameTable which is using GlueMetaStoreClient and doesn't support rename ?

As you're mentioning, the V2SessionCatalog.rename falls back to GlueMetastoreClient that doesn't support table rename. And the query always fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this part, the renameTable checks if the source table is Iceberg or not. However, for the migrate procedure, the source table must be spark_catalog (for example, if the glue_catalog is specified, it fails as described above).

I see the source table being non-iceberg table is making the check of rename fail and it defers to V2SessionCatalog.rename, and this step can't be skipped as we want to rename the source to backup table to halt the writes. wondering then adding the support for rename in GlueMetaStore client would be best, but it comes with it's own challenges frankly, how we have implemented rename in GlueCatalog of iceberg is a bit tricky we read the source table, create a destination table and then delete the source table (As glue doesn't have rename API). I am not sure if something goes south will we be able to correctly restore the source table state from the backup.

for ex, i am assuming non-iceberg table would track partition info also in glue which i don't think is correctly being propagated in iceberg GlueCatalog impl, thoughts :

public void renameTable(TableIdentifier from, TableIdentifier to) {

Copy link
Contributor Author

@tomtongue tomtongue Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @singhpk234. Yes, as you're saying, the Iceberg GlueCatalogImpl replicates the "partial" metadata in the rename. So if the source Spark/Hive table is partitioned, the restore process will fail as follows:

23/11/06 09:54:03 INFO MigrateTableSparkAction: Generating Iceberg metadata for db.tbl in s3://bucket/path/tbl/metadata
23/11/06 09:54:03 WARN BaseCatalogToHiveConverter: Hive Exception type not found for AccessDeniedException
23/11/06 09:54:05 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 230.388332 ms
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 17.169875 ms
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 18.598328 ms
23/11/06 09:54:07 ERROR MigrateTableSparkAction: Failed to perform the migration, aborting table creation and restoring the original table
23/11/06 09:54:07 INFO MigrateTableSparkAction: Restoring db.tbl from db.tbl_backup
23/11/06 09:54:08 INFO GlueCatalog: created rename destination table db.tbl
23/11/06 09:54:08 INFO GlueCatalog: Successfully dropped table db.tbl_backup from Glue
23/11/06 09:54:08 INFO GlueCatalog: Dropped table: db.tbl_backup
23/11/06 09:54:08 INFO GlueCatalog: Successfully renamed table from db.tbl_backup to garbagedb.iceberg_migrate_w_year_partition
Exception in thread "main" org.apache.iceberg.exceptions.ValidationException: Unable to get partition spec for table: `db`.`tbl_backup`
	at org.apache.iceberg.spark.SparkExceptionUtil.toUncheckedException(SparkExceptionUtil.java:55)
	at org.apache.iceberg.spark.SparkTableUtil.importSparkTable(SparkTableUtil.java:415)
	at org.apache.iceberg.spark.SparkTableUtil.importSparkTable(SparkTableUtil.java:460)
...
Caused by: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Table tbl_backup is not a partitioned table
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:133)
	at org.apache.spark.sql.hive.HiveExternalCatalog.doListPartitions(HiveExternalCatalog.scala:1308)
	at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1302)
...
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Table tbl_backup is not a partitioned table
	at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2676)
	at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2709)
...

This error was caused by the partition lost in the renamed table.

So as you know, the way to resolve the migrate restriction, supporting the rename for GlueHiveMetastoreClient should be the best.

At least there are people who have tried to migrate from their table into Iceberg on custom catalog like Glue Catalog. But the migrate query cannot be used because of the rename restriction. So let me consider the better way to resolve this issue. If there's no way to resolve this issue, I think we need to ask the GlueHiveMetastoreClient to support the rename operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, at this point GlueHiveMetastoreClient supporting rename ops seems a reasonable solution to me.

return this;
}

@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("properties", STRING_MAP),
ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
ProcedureParameter.optional("backup_table_name", DataTypes.StringType)
ProcedureParameter.optional("backup_table_name", DataTypes.StringType),
ProcedureParameter.optional("dest_catalog_name", DataTypes.StringType),
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -93,6 +94,7 @@ public InternalRow[] call(InternalRow args) {

boolean dropBackup = args.isNullAt(2) ? false : args.getBoolean(2);
String backupTableName = args.isNullAt(3) ? null : args.getString(3);
String destCatalogName = args.isNullAt(4) ? null : args.getString(4);

MigrateTableSparkAction migrateTableSparkAction =
SparkActions.get().migrateTable(tableName).tableProperties(properties);
Expand All @@ -105,6 +107,10 @@ public InternalRow[] call(InternalRow args) {
migrateTableSparkAction = migrateTableSparkAction.backupTableName(backupTableName);
}

if (destCatalogName != null) {
migrateTableSparkAction = migrateTableSparkAction.destCatalogName(destCatalogName);
}

MigrateTable.Result result = migrateTableSparkAction.execute();
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
Expand Down