From d4822dccbf6a0546fbf97ae4bc65d64023c5468f Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 2 Jun 2020 12:44:45 +0200 Subject: [PATCH 1/5] network client: properly await when sending buffered messages --- cowait/network/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cowait/network/client.py b/cowait/network/client.py index e2e3ac23..e86d7ca3 100644 --- a/cowait/network/client.py +++ b/cowait/network/client.py @@ -45,7 +45,7 @@ async def _connect(self, url: str, token: str) -> None: # send buffered messages for msg in self.buffer: - self.send(msg) + await self.send(msg) self.buffer = [] # client loop From 40f9107ec84c5b739cdbb8cc9e27a6a0fd27e1c8 Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 2 Jun 2020 12:46:27 +0200 Subject: [PATCH 2/5] event emitter: emit events in reverse order --- cowait/utils/emitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cowait/utils/emitter.py b/cowait/utils/emitter.py index c9ad9256..90e404cf 100644 --- a/cowait/utils/emitter.py +++ b/cowait/utils/emitter.py @@ -5,7 +5,7 @@ def __init__(self): def on(self, type: str, callback: callable) -> None: try: - self.callbacks[type].append(callback) + self.callbacks[type].insert(0, callback) except KeyError: self.callbacks[type] = [callback] From 0a8732d1dfc6dfbe0f0fc70dd5ae5ffa35ac401d Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 2 Jun 2020 12:47:47 +0200 Subject: [PATCH 3/5] add tests for failing tasks & subtasks --- cowait/engine/test_docker.py | 33 +++++++++++++++++++++++++++++++ cowait/test/tasks/utility_task.py | 17 +++++++++++++--- cowait/worker/executor.py | 1 + 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/cowait/engine/test_docker.py b/cowait/engine/test_docker.py index e7e32253..33d22e0b 100644 --- a/cowait/engine/test_docker.py +++ b/cowait/engine/test_docker.py @@ -116,3 +116,36 @@ def test_docker_child_task(): children = dp.find_child_containers(task.id) assert len(children) == 0 + + +def test_docker_task_error(): + dp = DockerProvider() + + task = dp.spawn(TaskDefinition( + name=TEST_TASK, + image=TEST_IMAGE, + inputs={'error': True}, + )) + + container = dp.docker.containers.get(task.container.id) + assert task.container == container + + result = container.wait() + assert result['StatusCode'] != 0 + + +def test_docker_child_error(): + dp = DockerProvider() + + task = dp.spawn(TaskDefinition( + name=TEST_TASK, + image=TEST_IMAGE, + inputs={'child_error': True}, + )) + + container = dp.docker.containers.get(task.container.id) + assert task.container == container + + # child error should cause the parent to fail + result = container.wait() + assert result['StatusCode'] != 0 diff --git a/cowait/test/tasks/utility_task.py b/cowait/test/tasks/utility_task.py index 73fbfe15..1589ee8e 100644 --- a/cowait/test/tasks/utility_task.py +++ b/cowait/test/tasks/utility_task.py @@ -13,7 +13,13 @@ def __init__(self, taskdef, cluster, node): # store taskdef for later comparison self.taskdef = taskdef - async def run(self, child: bool = False, **inputs): + async def run( + self, + child: bool = False, + error: bool = False, + child_error: bool = False, + **inputs + ): # dump task information to stdout print(json.dumps({ 'taskdef': self.taskdef.serialize(), @@ -21,9 +27,14 @@ async def run(self, child: bool = False, **inputs): })) # create a child - if child: + if child or child_error: print('spawn child') - await self.spawn('cowait.test.tasks.utility_task', inputs={'child': False}) + await self.spawn( + 'cowait.test.tasks.utility_task', + inputs={'error': child_error}) + + if error: + raise RuntimeError('Caused test error') # run forever if inputs.get('forever', False): diff --git a/cowait/worker/executor.py b/cowait/worker/executor.py index d60378c0..03763133 100644 --- a/cowait/worker/executor.py +++ b/cowait/worker/executor.py @@ -88,6 +88,7 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None: await node.parent.send_fail( f'Caught exception in {taskdef.id}:\n' f'{e.error}') + raise e except Exception as e: # capture local errors From d850c9ce0eb7dd359352cccb9198172326f3131e Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 2 Jun 2020 13:16:46 +0200 Subject: [PATCH 4/5] linter: set max line length to 100 --- .flake8 | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .flake8 diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..09a7856d --- /dev/null +++ b/.flake8 @@ -0,0 +1,8 @@ +[flake8] +max-line-length = 100 +exclude = + __pycache__, + .git, + build, + cloud, + dist From 97e5de3f1f9d6ae3385ade47f7337589159a2a0a Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 2 Jun 2020 13:32:10 +0200 Subject: [PATCH 5/5] await orphaned tasks --- cowait/tasks/remote_task.py | 4 ++++ cowait/worker/executor.py | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cowait/tasks/remote_task.py b/cowait/tasks/remote_task.py index c3d804ed..a7aad96b 100644 --- a/cowait/tasks/remote_task.py +++ b/cowait/tasks/remote_task.py @@ -22,6 +22,10 @@ def __init__(self, taskdef: TaskDefinition, cluster): def __await__(self): return self.awaitable.__await__() + @property + def done(self): + return self.future.done() + def destroy(self): self.cluster.destroy(self.id) diff --git a/cowait/worker/executor.py b/cowait/worker/executor.py index 03763133..3fe92f36 100644 --- a/cowait/worker/executor.py +++ b/cowait/worker/executor.py @@ -70,6 +70,12 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None: # execute task result = await task.run(**inputs) + # wait for dangling tasks + orphans = filter(lambda child: not child.done, task.subtasks.values()) + for orphan in orphans: + print('~~ waiting for orphaned task', orphan.id) + await orphan + # after hook await task.after(inputs) @@ -86,7 +92,7 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None: except TaskError as e: # pass subtask errors upstream await node.parent.send_fail( - f'Caught exception in {taskdef.id}:\n' + f'Caught exception in subtask {taskdef.id}:\n' f'{e.error}') raise e