Skip to content

Commit

Permalink
[FLINK-36823][Checkpointing] Drain state requests after the user func…
Browse files Browse the repository at this point in the history
…tion perform snapshotState
  • Loading branch information
Zakelly committed Dec 2, 2024
1 parent ea74da0 commit 97f4ac4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ RecordContext getCurrentProcessingContext() {
return currentProcessingContext;
}

public void drainStateRequests() {
asyncExecutionController.drainInflightRecords(0);
}

@Override
public void finish() throws Exception {
super.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
// Drain state requests in case the user function modifies the state.
drainStateRequests();
}

@Override
Expand Down

0 comments on commit 97f4ac4

Please sign in to comment.