Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: table supplier in writer operator for optional table refresh #8555

Merged
merged 45 commits into from
Sep 21, 2023

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Sep 12, 2023

Currently, when a Flink sink job is started, write operators are serialized with an initial table instance and that instance is used for the lifetime of the job. There are cases where a table needs to be reloaded at runtime by the subtasks. For example, if a REST catalog returns expiring credentials in the table load config response, those credentials cannot later be refreshed and will eventually expire. Also, if a task restarts, the original credentials created with the job will be used and they may have long expired.

Longer term, the ability to reload a table could be used to support schema evolution, though that is not addressed in this PR. The table schema and partition spec from the initial table instance are still used for now here.

This PR updates the initialization of the Flink sink so that a table supplier is provided. This supplier can implement reload/refresh logic as needed. The initial supplier implementation created here simply reloads the table at a given interval from the Iceberg catalog. Also, for the time being, the supplier is mainly used to get refreshed FileIO instances from the reloaded table and the original schema is still always used in the writers.

This initial supplier implementation puts additional load on the Iceberg catalog, so by default the reload option is turned off. Different table suppliers could be implemented in the future that use a centralized mechanism for obtaining refreshed table instances to cut down on the extra catalog load. Different options were explored, such as using a broadcast or the token delegation manager, but each had limitations or complexities, so it was felt that a better first step would be to introduce the abstraction to allow for table reload with a simple initial implementation of that.

@bryanck bryanck force-pushed the flink-table-supplier branch from b95e86d to 41a27f4 Compare September 13, 2023 01:20
@pvary
Copy link
Contributor

pvary commented Sep 13, 2023

I am very concerned about this simplified solution. We should consider:

  • Concurrency issues, when the table metadata has been changed between reloads. Maybe we should only refresh the table metadata during checpoints, so we can localize, and handle changes in one place?
  • Load issues, as every TaskManager with a writer task connects to the catalog, and does so periodically. This will become costly very soon. I would prioritize a centralized solution.
  • The current default implementation does not solve the schema evolution problem, since that would mean on-demand table reload instead of periodic reload

If the main goal here is to refresh the credentials, then maybe a proxy solution above the accessor could be a better solution, like the https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java where we get a new Kerberos token when the previous one has expired.

What do you think?

import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.awaitility.Awaitility;
import org.junit.Test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a JUnit5 test

Copy link
Contributor Author

@bryanck bryanck Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Flink tests are still using org.junit.Test so I kept it consistent with those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for new test classes being added we actually want to use JUnit5, as otherwise this will just cause more work later on in the migration process.

You might need to add the below snippet to the build file (if it isn't already defined for the flink project)

test {
    useJUnitPlatform()
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks, I made this change.

@bryanck
Copy link
Contributor Author

bryanck commented Sep 13, 2023

I am very concerned about this simplified solution. We should consider:

  • Concurrency issues, when the table metadata has been changed between reloads. Maybe we should only refresh the table metadata during checpoints, so we can localize, and handle changes in one place?
  • Load issues, as every TaskManager with a writer task connects to the catalog, and does so periodically. This will become costly very soon. I would prioritize a centralized solution.
  • The current default implementation does not solve the schema evolution problem, since that would mean on-demand table reload instead of periodic reload

If the main goal here is to refresh the credentials, then maybe a proxy solution above the accessor could be a better solution, like the https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java where we get a new Kerberos token when the previous one has expired.

What do you think?

To address your points:

  • This PR was updated so the committer will now only reload table at checkpoints. The committer was already updating the table metadata as part of the Iceberg commit and upon restart. Writers will always use the original table metadata and only use the reloaded table to get an updated FileIO, though the hope is that the writers will used the reloaded metadata in the future.
  • This PR was designed so that a different and more optimal table supplier solution could be plugged in later. There were limitations with the options we explored for centralized table reload, such as using the new token delegation framework or using a broadcast (I'd be happy to discuss details if you're interested). We went with this initial solution as a starting point. This feature is disabled by default and marked as experimental, so should only be used in cases where it is known that a job will not overburden the catalog with load requests.
  • This PR was not meant to solve schema evolution problems but rather make a change that will take a step towards that long term goal.
  • The short term goal for our use case was to refresh credentials, so we initially explored adding a FileIO-specific refresh. However, the long term goal is to support table reload in a running Flink job to support schema evolution. We felt that adding in the table refresh capability would be a better step towards reaching that goal, and it could also be used to solve the particular issue we are having.

cc @danielcweeks @rdblue in case you have additional perspective or feedback...

@github-actions github-actions bot added the build label Sep 13, 2023
@stevenzwu
Copy link
Contributor

@stevenzwu I reverted things back to use a Supplier<Table> instead of a TableLoader. The only open comment is whether to remove isOpen() from the TableLoader interface, currently it is still being used in the new table supplier.

thanks. I think we are very close now. left a few minor comments

@bryanck
Copy link
Contributor Author

bryanck commented Sep 20, 2023

thanks. I think we are very close now. left a few minor comments

Thanks for your patience. I made the changes except refreshing in the supplier get(). I tend to agree w/ Peter that it should be refreshed only during the checkpoint, but I'm happy to change it back.

@bryanck
Copy link
Contributor Author

bryanck commented Sep 20, 2023

@stevenzwu We'd also need this to be set when using Flink SQL. I was considering removing FlinkSink.tableRefreshInterval() and using a configuration. Can you advise on what that best approach would be for this?

@bryanck
Copy link
Contributor Author

bryanck commented Sep 20, 2023

I made the change to move the refresh interval config to a write property, and removed the config method in FlinkSink. This allows the option to be used in Flink SQL also. Apologies for the last minute change.

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck moving to write conf makes sense to me. Thanks!

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. seems that there is no objection on exposing the config as a duration. current implementation of refreshing is fine with me. although I have slight favor toward self-refreshing, it seems more people favor the explicit refresh methods. this is internal implementation detail not a blocker for this PR.

@danielcweeks danielcweeks merged commit d213b16 into apache:master Sep 21, 2023
47 checks passed
@bryanck bryanck deleted the flink-table-supplier branch September 21, 2023 18:51
bryanck added a commit to bryanck/iceberg that referenced this pull request Sep 21, 2023
@bryanck
Copy link
Contributor Author

bryanck commented Sep 21, 2023

Thanks @stevenzwu and @pvary for taking the time on this, and for the feedback and guidance.

@bryanck
Copy link
Contributor Author

bryanck commented Sep 21, 2023

(also thanks to @nastra and @amogh-jahagirdar for the reviews)

@pvary
Copy link
Contributor

pvary commented Sep 22, 2023

@bryanck: Thanks for reaching out, and discussing the possible ways forward

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants