Skip to content

Commit

Permalink
TRON-2208: Add toggle in tron config to disable retries on LOST k8s j…
Browse files Browse the repository at this point in the history
…obs (#988)

Given that "LOST" means Tron has lost track of a pod it already thought it had started for a job, attempting to retry/start a replacement can be dangerous for non-idempotent jobs. In the current state, these will consume retries, but with some of our EKS migration methods, LOST tasks are more likely. Therefore, we should have a way to temporarily pause retries on these.

## Related Issues
- TRON-2208: Add toggle in Tron config to disable retries on LOST k8s jobs
  • Loading branch information
cuza authored Aug 28, 2024
1 parent d51984e commit 2d17df3
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tron.egg-info
docs/_build/
.idea
.vscode
.fleet
tron.iml
docs/images/
*.dot
Expand Down
1 change: 1 addition & 0 deletions requirements-dev-minimal.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
asynctest
debugpy
flake8
mock
mypy
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
astroid==2.13.3
asynctest==0.12.0
cfgv==2.0.1
debugpy==1.8.1
dill==0.3.6
distlib==0.3.6
filelock==3.4.1
Expand Down
2 changes: 1 addition & 1 deletion tests/config/config_parse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def make_mesos_options():


def make_k8s_options():
return schema.ConfigKubernetes(enabled=False, default_volumes=())
return schema.ConfigKubernetes(enabled=False, non_retryable_exit_codes=(), default_volumes=())


def make_action(**kwargs):
Expand Down
2 changes: 1 addition & 1 deletion tests/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def test_handle_event_lost(mock_kubernetes_task):
)
)

assert mock_kubernetes_task.is_unknown
assert mock_kubernetes_task.exit_status == exitcode.EXIT_KUBERNETES_TASK_LOST


def test_create_task_disabled():
Expand Down
2 changes: 2 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,14 @@ class ValidateKubernetes(Validator):
defaults = {
"kubeconfig_path": None,
"enabled": False,
"non_retryable_exit_codes": (),
"default_volumes": (),
}

validators = {
"kubeconfig_path": valid_string,
"enabled": valid_bool,
"non_retryable_exit_codes": build_list_of_type_validator(valid_int, allow_empty=True),
"default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
"watcher_kubeconfig_paths": build_list_of_type_validator(valid_string, allow_empty=True),
}
Expand Down
1 change: 1 addition & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def config_object_factory(name, required=None, optional=None):
optional=[
"kubeconfig_path",
"enabled",
"non_retryable_exit_codes",
"default_volumes",
"watcher_kubeconfig_paths",
],
Expand Down
33 changes: 26 additions & 7 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ def fail(self, exit_status=None):

return self._done("fail", exit_status)

def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> Optional[Union[bool, ActionCommand]]:
def _exit_unsuccessful(
self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[]
) -> Optional[Union[bool, ActionCommand]]:
if self.is_done:
log.info(
f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying',
Expand All @@ -599,13 +601,17 @@ def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> O
if self.last_attempt is not None:
self.last_attempt.exit(exit_status)
if self.retries_remaining is not None:
if self.retries_remaining > 0:
self.retries_remaining -= 1
return self.restart(original_command=retry_original_command)
if exit_status in non_retryable_exit_codes:
self.retries_remaining = 0
log.info(f"{self} skipping auto-retries, received non-retryable exit code ({exit_status}).")
else:
log.info(
f"Reached maximum number of retries: {len(self.attempts)}",
)
if self.retries_remaining > 0:
self.retries_remaining -= 1
return self.restart(original_command=retry_original_command)
else:
log.info(
f"Reached maximum number of retries: {len(self.attempts)}",
)
if exit_status is None:
return self._done("fail_unknown", exit_status)
else:
Expand Down Expand Up @@ -1314,6 +1320,19 @@ def kill(self, final: bool = True) -> Optional[str]:

return "\n".join(msgs)

def _exit_unsuccessful(
self, exit_status=None, retry_original_command=True, non_retryable_exit_codes=[]
) -> Optional[Union[bool, ActionCommand]]:

k8s_cluster = KubernetesClusterRepository.get_cluster()
non_retryable_exit_codes = [] if not k8s_cluster else k8s_cluster.non_retryable_exit_codes

return super()._exit_unsuccessful(
exit_status=exit_status,
retry_original_command=retry_original_command,
non_retryable_exit_codes=non_retryable_exit_codes,
)

def handle_action_command_state_change(
self, action_command: ActionCommand, event: str, event_data=None
) -> Optional[Union[bool, ActionCommand]]:
Expand Down
6 changes: 5 additions & 1 deletion tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def handle_event(self, event: Event) -> None:
self.log.warning(f" tronctl skip {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:")
self.log.warning(f" tronctl fail {self.id}")
self.exited(None)
self.exited(exitcode.EXIT_KUBERNETES_TASK_LOST)
else:
self.log.info(
f"Did not handle unknown kubernetes event type: {event}",
Expand Down Expand Up @@ -281,10 +281,12 @@ def __init__(
default_volumes: Optional[List[ConfigVolume]] = None,
pod_launch_timeout: Optional[int] = None,
watcher_kubeconfig_paths: Optional[List[str]] = None,
non_retryable_exit_codes: Optional[List[int]] = [],
):
# general k8s config
self.kubeconfig_path = kubeconfig_path
self.enabled = enabled
self.non_retryable_exit_codes = non_retryable_exit_codes
self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or []
self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S
self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or []
Expand Down Expand Up @@ -621,6 +623,7 @@ def recover(self, task: KubernetesTask) -> None:
class KubernetesClusterRepository:
# Kubernetes config
kubernetes_enabled: bool = False
kubernetes_non_retryable_exit_codes: Optional[List[int]] = []
kubeconfig_path: Optional[str] = None
pod_launch_timeout: Optional[int] = None
default_volumes: Optional[List[ConfigVolume]] = None
Expand Down Expand Up @@ -665,6 +668,7 @@ def shutdown(cls) -> None:
def configure(cls, kubernetes_options: ConfigKubernetes) -> None:
cls.kubeconfig_path = kubernetes_options.kubeconfig_path
cls.kubernetes_enabled = kubernetes_options.enabled
cls.kubernetes_non_retryable_exit_codes = kubernetes_options.non_retryable_exit_codes
cls.default_volumes = kubernetes_options.default_volumes
cls.watcher_kubeconfig_paths = kubernetes_options.watcher_kubeconfig_paths

Expand Down
2 changes: 2 additions & 0 deletions tron/utils/exitcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
EXIT_KUBERNETES_ABNORMAL = -9
EXIT_KUBERNETES_SPOT_INTERRUPTION = -10
EXIT_KUBERNETES_NODE_SCALEDOWN = -11
EXIT_KUBERNETES_TASK_LOST = -12

EXIT_REASONS = {
EXIT_INVALID_COMMAND: "Invalid command",
Expand All @@ -23,4 +24,5 @@
EXIT_KUBERNETES_ABNORMAL: "Kubernetes task failed in an unexpected manner",
EXIT_KUBERNETES_SPOT_INTERRUPTION: "Kubernetes task failed due to spot interruption",
EXIT_KUBERNETES_NODE_SCALEDOWN: "Kubernetes task failed due to the autoscaler scaling down a node",
EXIT_KUBERNETES_TASK_LOST: "Tron lost track of a pod it already thought it had started for a job.",
}

0 comments on commit 2d17df3

Please sign in to comment.