Skip to content

Commit

Permalink
Make asynchronous process test run asynchronously. Fix #656.
Browse files Browse the repository at this point in the history
  • Loading branch information
huard committed May 11, 2022
1 parent 7839f52 commit dca06fb
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 45 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* @jonas-eberle Jonas Eberle
* @cehbrecht Carsten Ehbrecht
* @idanmiara Idan Miara
* @huard David Huard

# Contributor to older versions of PyWPS (< 4.x)

Expand Down
35 changes: 35 additions & 0 deletions tests/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,38 @@ def bbox(request, response):
area = request.inputs['area'][0].data
response.outputs['extent'].data = area
return response


class Sleep(Process):
"""A long running process, just sleeping."""
def __init__(self):
inputs = [
LiteralInput('seconds', title='Seconds', data_type='float')
]
outputs = [
LiteralOutput('finished', title='Finished', data_type='boolean')
]

super(Sleep, self).__init__(
self._handler,
identifier='sleep',
title='Sleep',
abstract='Wait for specified number of seconds.',
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True
)

@staticmethod
def _handler(request, response):
import time

seconds = request.inputs['seconds'][0].data
step = seconds / 3
for i in range(3):
response.update_status('Sleep in progress...', i / 3 * 100)
time.sleep(step)

response.outputs['finished'].data = "True"
return response
74 changes: 29 additions & 45 deletions tests/test_assync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,50 @@

import unittest
import time
from pywps import Service, Process, LiteralInput, LiteralOutput
from pywps import Service, configuration
from pywps import get_ElementMakerForVersion
from pywps.tests import client_for, assert_response_accepted
from .processes import Sleep
from owslib.wps import WPSExecution
from pathlib import Path

VERSION = "1.0.0"

WPS, OWS = get_ElementMakerForVersion(VERSION)


def create_sleep():
class ExecuteTest(unittest.TestCase):
def setUp(self) -> None:
self.mode = configuration.CONFIG.get('processing', 'mode')
configuration.CONFIG.set('processing', 'mode', 'distributed')

def sleep(request, response):
seconds = request.inputs['seconds'][0].data
assert isinstance(seconds, float)
def tearDown(self) -> None:
configuration.CONFIG.set('processing', 'mode', self.mode)

step = seconds / 3
for i in range(3):
# How is status working in version 4 ?
#self.status.set("Waiting...", i * 10)
time.sleep(step)
def test_assync(self):
client = client_for(Service(processes=[Sleep()]))
wps = WPSExecution()

response.outputs['finished'].data = "True"
return response
# Build an asynchronous request (requires specifying outputs and setting the mode).
doc = wps.buildRequest('sleep',
inputs=[('seconds', '.01')],
output=[('finished', None, None)],
mode='async')

return Process(handler=sleep,
identifier='sleep',
title='Sleep',
inputs=[
LiteralInput('seconds', title='Seconds', data_type='float')
],
outputs=[
LiteralOutput('finished', title='Finished', data_type='boolean')
]
)
resp = client.post_xml(doc=doc)
wps.parseResponse(resp.xml)
assert_response_accepted(resp)

# Wait for process to complete. The test will fail otherwise, which confirms the process is asynchronous.
time.sleep(.5)

class ExecuteTest(unittest.TestCase):

def test_assync(self):
client = client_for(Service(processes=[create_sleep()]))
request_doc = WPS.Execute(
OWS.Identifier('sleep'),
WPS.DataInputs(
WPS.Input(
OWS.Identifier('seconds'),
WPS.Data(
WPS.LiteralData(
"0.3"
)
)
)
),
version="1.0.0"
)
resp = client.post_xml(doc=request_doc)
assert_response_accepted(resp)
# Parse response to extract the status file path
url = resp.xml.xpath("//@statusLocation")[0]

# TODO:
# . extract the status URL from the response
# . send a status request
# OWSlib only reads from URLs, not local files. So we need to read the response manually.
p = Path(url[6:])
wps.checkStatus(response=p.read_bytes(), sleepSecs=0)
assert wps.status == 'ProcessSucceeded'


def load_tests(loader=None, tests=None, pattern=None):
Expand Down

0 comments on commit dca06fb

Please sign in to comment.