-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: table supplier in writer operator for optional table refresh #8555
Conversation
b95e86d
to
41a27f4
Compare
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
Show resolved
Hide resolved
I am very concerned about this simplified solution. We should consider:
If the main goal here is to refresh the credentials, then maybe a proxy solution above the accessor could be a better solution, like the https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java where we get a new Kerberos token when the previous one has expired. What do you think? |
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.flink.TableLoader; | ||
import org.awaitility.Awaitility; | ||
import org.junit.Test; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a JUnit5 test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Flink tests are still using org.junit.Test
so I kept it consistent with those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for new test classes being added we actually want to use JUnit5, as otherwise this will just cause more work later on in the migration process.
You might need to add the below snippet to the build file (if it isn't already defined for the flink project)
test {
useJUnitPlatform()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK thanks, I made this change.
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestReloadingTableSupplier.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
To address your points:
cc @danielcweeks @rdblue in case you have additional perspective or feedback... |
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
thanks. I think we are very close now. left a few minor comments |
Thanks for your patience. I made the changes except refreshing in the supplier |
@stevenzwu We'd also need this to be set when using Flink SQL. I was considering removing |
I made the change to move the refresh interval config to a write property, and removed the config method in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bryanck moving to write conf makes sense to me. Thanks!
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. seems that there is no objection on exposing the config as a duration. current implementation of refreshing is fine with me. although I have slight favor toward self-refreshing, it seems more people favor the explicit refresh methods. this is internal implementation detail not a blocker for this PR.
Thanks @stevenzwu and @pvary for taking the time on this, and for the feedback and guidance. |
(also thanks to @nastra and @amogh-jahagirdar for the reviews) |
@bryanck: Thanks for reaching out, and discussing the possible ways forward |
Currently, when a Flink sink job is started, write operators are serialized with an initial table instance and that instance is used for the lifetime of the job. There are cases where a table needs to be reloaded at runtime by the subtasks. For example, if a REST catalog returns expiring credentials in the table load config response, those credentials cannot later be refreshed and will eventually expire. Also, if a task restarts, the original credentials created with the job will be used and they may have long expired.
Longer term, the ability to reload a table could be used to support schema evolution, though that is not addressed in this PR. The table schema and partition spec from the initial table instance are still used for now here.
This PR updates the initialization of the Flink sink so that a table supplier is provided. This supplier can implement reload/refresh logic as needed. The initial supplier implementation created here simply reloads the table at a given interval from the Iceberg catalog. Also, for the time being, the supplier is mainly used to get refreshed FileIO instances from the reloaded table and the original schema is still always used in the writers.
This initial supplier implementation puts additional load on the Iceberg catalog, so by default the reload option is turned off. Different table suppliers could be implemented in the future that use a centralized mechanism for obtaining refreshed table instances to cut down on the extra catalog load. Different options were explored, such as using a broadcast or the token delegation manager, but each had limitations or complexities, so it was felt that a better first step would be to introduce the abstraction to allow for table reload with a simple initial implementation of that.