-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.5: Fix Migrate procedure renaming issue for custom catalog #8931
Conversation
catalogName); | ||
catalogPlugin = catalogManager.currentCatalog(); | ||
} | ||
this.destCatalog = checkDestinationCatalog(catalogPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: earlier we use to use sourceCatalog as destCatalog too, was this a problem ? can you please add more comments as to why sourceCatalog was picked as spark_catalog
rather than the glue_catalog
?? since we were calling the migrate procedure from glue_catalog ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @singhpk234.
Yes that was a problem because in the migrate
, the source catalog can only be SparkSessionCatalog
. Let me elaborate this in addition to what I investigated in #7317 (comment) (if I'm wrong, please correct me).
When running migrate
, checkSourceCatalog
in BaseTableCreationSparkAction
is called along with MigrateTableSparkAction
initialization.
The checkSourceCatalog
comes from the MigrateTableSparkAction
implementation, and the method checks if the sourceCatalog is SparkSessionCatalog or not.
@Override
protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
// currently the import code relies on being able to look up the table in the session catalog
Preconditions.checkArgument(
catalog instanceof SparkSessionCatalog,
"Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
catalog.name(),
catalog.getClass().getName());
return (TableCatalog) catalog;
}
So, the sourceCatalog is necessary to be specified the SparkSessionCatalog
. And if SparkSessionCatalog
is set and GlueCatalogImpl
is set as its implementation, the renameTable
operation in the migrate
is called via SparkSessionCatalog
, GlueCatalogImpl
is not used in the renameTable
operation, and then the migrate
query fails.
For the custom catalog, specifically Glue Data Catalog, as you know, it originally has the renameTable
implemenation in Iceberg because the Glue Data Catalog client doesn't support the table rename. I think if it were possible to specify the destCatalog to use the destCatalog impl, it could resolve the restriction of source catalog side impl like Glue Data Catalog.
For reference, I tested the following patterns, but all patterns failed:
Pattern 1 - Set SparkCatalog (glue_catalog
) to the sourceTable
SparkSession config:
val spark: SparkSession = SparkSession.builder
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
Migrate query:
spark.sql(
s"""
CALL glue_catalog.system.migrate (
table => 'glue_catalog.db.table'
)
""")
Error (partial stacktrace):
Exception in thread "main" java.lang.IllegalArgumentException: Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found glue_catalog of class org.apache.iceberg.spark.SparkCatalog as the source catalog.
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:445)
at org.apache.iceberg.spark.actions.MigrateTableSparkAction.checkSourceCatalog(MigrateTableSparkAction.java:200)
at org.apache.iceberg.spark.actions.BaseTableCreationSparkAction.<init>(BaseTableCreationSparkAction.java:74)
at org.apache.iceberg.spark.actions.MigrateTableSparkAction.<init>(MigrateTableSparkAction.java:65)
at org.apache.iceberg.spark.actions.SparkActions.migrateTable(SparkActions.java:67)
at org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:98)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...
Pattern 1' - Set SparkCatalog (glue_catalog
) to the sourceTable and specify SparkCatalog for Procedure call
The same result was obtained as "Pattern 1" because the catalog name for procedure call doesn't affect the source/destination catalog.
SparkSession config: omitted. The same configuration was used as "Pattern 1".
Migrate query:
spark.sql(
s"""
CALL spark_catalog.system.migrate (
table => 'glue_catalog.db.table'
)
Error: omitted. The same error was obtained as "Pattern 1".
Pattern 2 - Set SparkSessionCatalog
with GlueCatalogImpl to the sourceTable
SparkSession config:
val spark: SparkSession = SparkSession.builder
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
Migrate query:
spark.sql(
s"""
CALL glue_catalog.system.migrate (
table => 'db.table'
)
""")
/* Or,
CALL glue_catalog.system.migrate (
table => 'spark_catalog.db.table'
)
*/
Error (partial stacktrace):
Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.UnsupportedOperationException: Table rename is not supported
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:133)
at org.apache.spark.sql.hive.HiveExternalCatalog.renameTable(HiveExternalCatalog.scala:566)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.renameTable(ExternalCatalogWithListener.scala:110)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.renameTable(SessionCatalog.scala:803)
at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.renameTable(V2SessionCatalog.scala:216)
at org.apache.iceberg.spark.SparkSessionCatalog.renameTable(SparkSessionCatalog.java:301)
at org.apache.iceberg.spark.actions.MigrateTableSparkAction.renameAndBackupSourceTable(MigrateTableSparkAction.java:212)
at org.apache.iceberg.spark.actions.MigrateTableSparkAction.doExecute(MigrateTableSparkAction.java:123)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:132)
at org.apache.iceberg.spark.actions.MigrateTableSparkAction.execute(MigrateTableSparkAction.java:115)
at org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:108)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...
Caused by: java.lang.UnsupportedOperationException: Table rename is not supported
at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.alterTable(GlueMetastoreClientDelegate.java:539)
at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.alter_table_with_environmentContext(AWSCatalogMetastoreClient.java:417)
at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:648)
at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:630)
...
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.sql.hive.client.Shim_v2_1.alterTable(HiveShim.scala:1624)
...
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$renameTable$1(HiveExternalCatalog.scala:588)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:104)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomtongue do we know why would the icebergCatalog.tableExists(from)
fail ? so that we are not using the GlueCatalog implementation (of iceberg) which supports rename and are falling back to the V2SessionCatalog.renameTable which is using GlueMetaStoreClient and doesn't support rename ?
iceberg/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
Lines 293 to 305 in 445664f
public void renameTable(Identifier from, Identifier to) | |
throws NoSuchTableException, TableAlreadyExistsException { | |
// rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session | |
// catalog tables, | |
// check table existence first to ensure that the table belongs to the Iceberg catalog. | |
if (icebergCatalog.tableExists(from)) { | |
icebergCatalog.renameTable(from, to); | |
} else { | |
getSessionCatalog().renameTable(from, to); | |
} | |
} | |
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 thanks for checking my comments.
In this part, the renameTable
checks if the source table is Iceberg or not. However, for the migrate
procedure, the source table must be spark_catalog
(for example, if the glue_catalog
is specified, it fails as described above).
So in the migrate
query, getSessionCatalog().renameTable(from, to);
is called, and the answer for your question below is NOT using the GlueCatalogImpl in Iceberg.
so that we are not using the GlueCatalog implementation (of iceberg) which supports rename and are falling back to the V2SessionCatalog.renameTable which is using GlueMetaStoreClient and doesn't support rename ?
As you're mentioning, the V2SessionCatalog.rename
falls back to GlueMetastoreClient that doesn't support table rename. And the query always fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this part, the renameTable checks if the source table is Iceberg or not. However, for the migrate procedure, the source table must be spark_catalog (for example, if the glue_catalog is specified, it fails as described above).
I see the source table being non-iceberg table is making the check of rename fail and it defers to V2SessionCatalog.rename, and this step can't be skipped as we want to rename the source to backup table to halt the writes. wondering then adding the support for rename in GlueMetaStore client would be best, but it comes with it's own challenges frankly, how we have implemented rename in GlueCatalog of iceberg is a bit tricky we read the source table, create a destination table and then delete the source table (As glue doesn't have rename API). I am not sure if something goes south will we be able to correctly restore the source table state from the backup.
for ex, i am assuming non-iceberg table would track partition info also in glue which i don't think is correctly being propagated in iceberg GlueCatalog impl, thoughts :
public void renameTable(TableIdentifier from, TableIdentifier to) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @singhpk234. Yes, as you're saying, the Iceberg GlueCatalogImpl replicates the "partial" metadata in the rename. So if the source Spark/Hive table is partitioned, the restore process will fail as follows:
23/11/06 09:54:03 INFO MigrateTableSparkAction: Generating Iceberg metadata for db.tbl in s3://bucket/path/tbl/metadata
23/11/06 09:54:03 WARN BaseCatalogToHiveConverter: Hive Exception type not found for AccessDeniedException
23/11/06 09:54:05 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 230.388332 ms
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 17.169875 ms
23/11/06 09:54:06 INFO CodeGenerator: Code generated in 18.598328 ms
23/11/06 09:54:07 ERROR MigrateTableSparkAction: Failed to perform the migration, aborting table creation and restoring the original table
23/11/06 09:54:07 INFO MigrateTableSparkAction: Restoring db.tbl from db.tbl_backup
23/11/06 09:54:08 INFO GlueCatalog: created rename destination table db.tbl
23/11/06 09:54:08 INFO GlueCatalog: Successfully dropped table db.tbl_backup from Glue
23/11/06 09:54:08 INFO GlueCatalog: Dropped table: db.tbl_backup
23/11/06 09:54:08 INFO GlueCatalog: Successfully renamed table from db.tbl_backup to garbagedb.iceberg_migrate_w_year_partition
Exception in thread "main" org.apache.iceberg.exceptions.ValidationException: Unable to get partition spec for table: `db`.`tbl_backup`
at org.apache.iceberg.spark.SparkExceptionUtil.toUncheckedException(SparkExceptionUtil.java:55)
at org.apache.iceberg.spark.SparkTableUtil.importSparkTable(SparkTableUtil.java:415)
at org.apache.iceberg.spark.SparkTableUtil.importSparkTable(SparkTableUtil.java:460)
...
Caused by: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Table tbl_backup is not a partitioned table
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:133)
at org.apache.spark.sql.hive.HiveExternalCatalog.doListPartitions(HiveExternalCatalog.scala:1308)
at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1302)
...
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Table tbl_backup is not a partitioned table
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2676)
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2709)
...
This error was caused by the partition lost in the renamed table.
So as you know, the way to resolve the migrate restriction, supporting the rename for GlueHiveMetastoreClient should be the best.
At least there are people who have tried to migrate from their table into Iceberg on custom catalog like Glue Catalog. But the migrate query cannot be used because of the rename restriction. So let me consider the better way to resolve this issue. If there's no way to resolve this issue, I think we need to ask the GlueHiveMetastoreClient to support the rename operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, at this point GlueHiveMetastoreClient supporting rename ops seems a reasonable solution to me.
"CALL %s.system.migrate(table => '%s', drop_backup => false, dest_catalog_name => '%s')", | ||
catalogName, tableName, destCatalogName); | ||
Assertions.assertThat(result).isEqualTo(1L); | ||
Assertions.assertThat(spark.catalog().tableExists(tableName + "_BACKUP_")).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check the warning log in this case, was wondering if giving incorrect destCatalogName should be thrown as Invalid input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that it's better to throw an exception for specifying non-existing catalog name as you're suggesting. So change the dest catalog check handling, and also change the test to follow the dest catalog change. Thanks!
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
For the fix of #7317. The issue is related to the non-supporting migrate query for Glue Data Catalog. To support the migrate query for custom catalog impl, this commit makes it possible to specify the destination catalog name.
I also think adding support renaming GlueCatalogHiveClient tables is another way to support this. If there's a better way to resolve the issue, please let me know.
The following partial script is an example to run the migrate query with Glue Data Catalog: