Skip to content

Commit

Permalink
[FLINK-34516] Use new CheckpointingMode in flink-core in python
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Mar 14, 2024
1 parent 315e2f4 commit 699f00a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/checkpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_checkpointing_mode(self) -> CheckpointingMode:
:return: The :class:`CheckpointingMode`.
"""
return CheckpointingMode._from_j_checkpointing_mode(
self._j_checkpoint_config.getCheckpointingMode())
self._j_checkpoint_config.getConsistencyMode())

def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'CheckpointConfig':
"""
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/checkpointing_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def _from_j_checkpointing_mode(j_checkpointing_mode) -> 'CheckpointingMode':
def _to_j_checkpointing_mode(self):
gateway = get_gateway()
JCheckpointingMode = \
gateway.jvm.org.apache.flink.streaming.api.CheckpointingMode
gateway.jvm.org.apache.flink.core.execution.CheckpointingMode
return getattr(JCheckpointingMode, self.name)
20 changes: 19 additions & 1 deletion flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,27 @@ def get_checkpointing_mode(self) -> CheckpointingMode:
Shorthand for get_checkpoint_config().get_checkpointing_mode().
:return: The :class:`~pyflink.datastream.CheckpointingMode`.
.. note:: Deprecated since version 1.20: This method is deprecated and will be removed in
future FLINK major version. Use
`stream_execution_environment.get_checkpointing_consistency_mode` method instead.
"""
warnings.warn("Deprecated since version 1.20: This method is deprecated and will be removed"
" in future FLINK major version. Use"
" `stream_execution_environment.get_checkpointing_consistency_mode`"
" method instead.", DeprecationWarning)
j_checkpointing_mode = self._j_stream_execution_environment.getConsistencyMode()
return CheckpointingMode._from_j_checkpointing_mode(j_checkpointing_mode)

def get_checkpointing_consistency_mode(self) -> CheckpointingMode:
"""
Returns the checkpointing mode (exactly-once vs. at-least-once).
Shorthand for get_checkpoint_config().get_checkpointing_mode().
:return: The :class:`~pyflink.datastream.CheckpointingMode`.
"""
j_checkpointing_mode = self._j_stream_execution_environment.getCheckpointingMode()
j_checkpointing_mode = self._j_stream_execution_environment.getConsistencyMode()
return CheckpointingMode._from_j_checkpointing_mode(j_checkpointing_mode)

def get_state_backend(self) -> StateBackend:
Expand Down

0 comments on commit 699f00a

Please sign in to comment.