Skip to content

Commit

Permalink
Merge pull request #98 from backtick-se/stability_fixes
Browse files Browse the repository at this point in the history
Await orphaned tasks, task failure tests & agent error storage fix
  • Loading branch information
johanhenriksson authored Jun 2, 2020
2 parents 2e9597e + 97e5de3 commit 0616c00
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 6 deletions.
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[flake8]
max-line-length = 100
exclude =
__pycache__,
.git,
build,
cloud,
dist
33 changes: 33 additions & 0 deletions cowait/engine/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,36 @@ def test_docker_child_task():

children = dp.find_child_containers(task.id)
assert len(children) == 0


def test_docker_task_error():
dp = DockerProvider()

task = dp.spawn(TaskDefinition(
name=TEST_TASK,
image=TEST_IMAGE,
inputs={'error': True},
))

container = dp.docker.containers.get(task.container.id)
assert task.container == container

result = container.wait()
assert result['StatusCode'] != 0


def test_docker_child_error():
dp = DockerProvider()

task = dp.spawn(TaskDefinition(
name=TEST_TASK,
image=TEST_IMAGE,
inputs={'child_error': True},
))

container = dp.docker.containers.get(task.container.id)
assert task.container == container

# child error should cause the parent to fail
result = container.wait()
assert result['StatusCode'] != 0
2 changes: 1 addition & 1 deletion cowait/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def _connect(self, url: str, token: str) -> None:

# send buffered messages
for msg in self.buffer:
self.send(msg)
await self.send(msg)
self.buffer = []

# client loop
Expand Down
4 changes: 4 additions & 0 deletions cowait/tasks/remote_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def __init__(self, taskdef: TaskDefinition, cluster):
def __await__(self):
return self.awaitable.__await__()

@property
def done(self):
return self.future.done()

def destroy(self):
self.cluster.destroy(self.id)

Expand Down
17 changes: 14 additions & 3 deletions cowait/test/tasks/utility_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,28 @@ def __init__(self, taskdef, cluster, node):
# store taskdef for later comparison
self.taskdef = taskdef

async def run(self, child: bool = False, **inputs):
async def run(
self,
child: bool = False,
error: bool = False,
child_error: bool = False,
**inputs
):
# dump task information to stdout
print(json.dumps({
'taskdef': self.taskdef.serialize(),
'env': dict(os.environ),
}))

# create a child
if child:
if child or child_error:
print('spawn child')
await self.spawn('cowait.test.tasks.utility_task', inputs={'child': False})
await self.spawn(
'cowait.test.tasks.utility_task',
inputs={'error': child_error})

if error:
raise RuntimeError('Caused test error')

# run forever
if inputs.get('forever', False):
Expand Down
2 changes: 1 addition & 1 deletion cowait/utils/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def __init__(self):

def on(self, type: str, callback: callable) -> None:
try:
self.callbacks[type].append(callback)
self.callbacks[type].insert(0, callback)
except KeyError:
self.callbacks[type] = [callback]

Expand Down
9 changes: 8 additions & 1 deletion cowait/worker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None:
# execute task
result = await task.run(**inputs)

# wait for dangling tasks
orphans = filter(lambda child: not child.done, task.subtasks.values())
for orphan in orphans:
print('~~ waiting for orphaned task', orphan.id)
await orphan

# after hook
await task.after(inputs)

Expand All @@ -86,8 +92,9 @@ async def execute(cluster: ClusterProvider, taskdef: TaskDefinition) -> None:
except TaskError as e:
# pass subtask errors upstream
await node.parent.send_fail(
f'Caught exception in {taskdef.id}:\n'
f'Caught exception in subtask {taskdef.id}:\n'
f'{e.error}')
raise e

except Exception as e:
# capture local errors
Expand Down

0 comments on commit 0616c00

Please sign in to comment.