diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index a7a19c359..47926e204 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -10,6 +10,7 @@ * @jonas-eberle Jonas Eberle * @cehbrecht Carsten Ehbrecht * @idanmiara Idan Miara +* @huard David Huard # Contributor to older versions of PyWPS (< 4.x) diff --git a/tests/processes/__init__.py b/tests/processes/__init__.py index 0b847ace4..5882a9ef1 100644 --- a/tests/processes/__init__.py +++ b/tests/processes/__init__.py @@ -99,3 +99,38 @@ def bbox(request, response): area = request.inputs['area'][0].data response.outputs['extent'].data = area return response + + +class Sleep(Process): + """A long running process, just sleeping.""" + def __init__(self): + inputs = [ + LiteralInput('seconds', title='Seconds', data_type='float') + ] + outputs = [ + LiteralOutput('finished', title='Finished', data_type='boolean') + ] + + super(Sleep, self).__init__( + self._handler, + identifier='sleep', + title='Sleep', + abstract='Wait for specified number of seconds.', + inputs=inputs, + outputs=outputs, + store_supported=True, + status_supported=True + ) + + @staticmethod + def _handler(request, response): + import time + + seconds = request.inputs['seconds'][0].data + step = seconds / 3 + for i in range(3): + response.update_status('Sleep in progress...', i / 3 * 100) + time.sleep(step) + + response.outputs['finished'].data = "True" + return response diff --git a/tests/test_assync.py b/tests/test_assync.py index 2db2eb074..6f6af6d02 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -5,66 +5,50 @@ import unittest import time -from pywps import Service, Process, LiteralInput, LiteralOutput +from pywps import Service, configuration from pywps import get_ElementMakerForVersion from pywps.tests import client_for, assert_response_accepted +from .processes import Sleep +from owslib.wps import WPSExecution +from pathlib import Path VERSION = "1.0.0" WPS, OWS = get_ElementMakerForVersion(VERSION) -def create_sleep(): +class ExecuteTest(unittest.TestCase): + def setUp(self) -> None: + self.mode = configuration.CONFIG.get('processing', 'mode') + configuration.CONFIG.set('processing', 'mode', 'distributed') - def sleep(request, response): - seconds = request.inputs['seconds'][0].data - assert isinstance(seconds, float) + def tearDown(self) -> None: + configuration.CONFIG.set('processing', 'mode', self.mode) - step = seconds / 3 - for i in range(3): - # How is status working in version 4 ? - #self.status.set("Waiting...", i * 10) - time.sleep(step) + def test_assync(self): + client = client_for(Service(processes=[Sleep()])) + wps = WPSExecution() - response.outputs['finished'].data = "True" - return response + # Build an asynchronous request (requires specifying outputs and setting the mode). + doc = wps.buildRequest('sleep', + inputs=[('seconds', '.01')], + output=[('finished', None, None)], + mode='async') - return Process(handler=sleep, - identifier='sleep', - title='Sleep', - inputs=[ - LiteralInput('seconds', title='Seconds', data_type='float') - ], - outputs=[ - LiteralOutput('finished', title='Finished', data_type='boolean') - ] - ) + resp = client.post_xml(doc=doc) + wps.parseResponse(resp.xml) + assert_response_accepted(resp) + # Wait for process to complete. The test will fail otherwise, which confirms the process is asynchronous. + time.sleep(.5) -class ExecuteTest(unittest.TestCase): - - def test_assync(self): - client = client_for(Service(processes=[create_sleep()])) - request_doc = WPS.Execute( - OWS.Identifier('sleep'), - WPS.DataInputs( - WPS.Input( - OWS.Identifier('seconds'), - WPS.Data( - WPS.LiteralData( - "0.3" - ) - ) - ) - ), - version="1.0.0" - ) - resp = client.post_xml(doc=request_doc) - assert_response_accepted(resp) + # Parse response to extract the status file path + url = resp.xml.xpath("//@statusLocation")[0] - # TODO: - # . extract the status URL from the response - # . send a status request + # OWSlib only reads from URLs, not local files. So we need to read the response manually. + p = Path(url[6:]) + wps.checkStatus(response=p.read_bytes(), sleepSecs=0) + assert wps.status == 'ProcessSucceeded' def load_tests(loader=None, tests=None, pattern=None):