diff --git a/Makefile b/Makefile index 62fcd6a..0f50002 100755 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ SHELL = /bin/sh .DEFAULT_GOAL := help +MAKEFLAGS += -j2 export DOCKER_IMAGE_NAME ?= osparc-map export DOCKER_IMAGE_TAG ?= 0.0.5 @@ -34,11 +35,20 @@ clean: build: clean compose-spec ## build docker image docker-compose build -.PHONY: run-local -run-local: ## runs image with local configuration +validation-clean: sudo rm -rf validation-tmp cp -r validation validation-tmp + chmod -R 770 validation-tmp + +validation_client_run: validation-clean + pip install osparc-filecomms + VALIDATION_CLIENT_INPUT_PATH=validation-tmp/outputs/output_1 VALIDATION_CLIENT_OUTPUT_PATH=validation-tmp/inputs/input_2 python validation-client/client.py + +docker_compose: validation-clean docker-compose --file docker-compose-local.yml up + +.PHONY: run-local +run-local: validation_client_run docker_compose ## runs image with local configuration .PHONY: publish-local publish-local: ## push to local throw away registry to test integration diff --git a/docker_scripts/map.bash b/docker_scripts/map.bash index 81858ea..f47a654 100755 --- a/docker_scripts/map.bash +++ b/docker_scripts/map.bash @@ -1,4 +1,6 @@ #!/bin/bash pip install -r /docker/requirements.txt -python3 /docker/map.py +echo "Starting map python code" +python3 /docker/map.py +echo "Closing map python code" diff --git a/docker_scripts/map.py b/docker_scripts/map.py index 5bcdbd9..eb23572 100755 --- a/docker_scripts/map.py +++ b/docker_scripts/map.py @@ -1,23 +1,25 @@ -from osparc_filecomms import handshakers -import osparc_client.models.file -import osparc_client -import osparc -import pathos +import contextlib +import http.server import json +import logging import os +import pathlib as pl +import socketserver +import tempfile +import threading import time import uuid -import contextlib -import tempfile import zipfile -import http.server -import socketserver -import pathlib as pl -import logging -import threading +import osparc +import osparc_client +import osparc_client.models.file +import pathos +from osparc_filecomms import handshakers -logging.basicConfig(level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s") +logging.basicConfig( + level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s" +) logger = logging.getLogger(__name__) HTTP_PORT = 8888 @@ -40,17 +42,21 @@ class HTTPHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs, directory=http_dir_path) - pyrunner = MapRunner(input_path, output_path, polling_interval=POLLING_INTERVAL) + maprunner = MapRunner( + input_path, output_path, polling_interval=POLLING_INTERVAL + ) try: logger.info( f"Starting http server at port {HTTP_PORT} and serving path {http_dir_path}" ) - httpd = socketserver.TCPServer(("", HTTP_PORT), HTTPHandler) - threading.Thread(target=httpd.serve_forever).start() - pyrunner.setup() - pyrunner.start() - pyrunner.teardown() + with socketserver.TCPServer(("", HTTP_PORT), HTTPHandler) as httpd: + httpd_thread = threading.Thread(target=httpd.serve_forever) + httpd_thread.start() + maprunner.setup() + maprunner.start() + maprunner.teardown() + httpd.shutdown() except Exception as err: # pylint: disable=broad-except logger.error(f"{err} . Stopping %s", exc_info=True) @@ -79,7 +85,9 @@ def __init__(self, input_path, output_path, polling_interval=1): self.input_tasks_path = self.input_tasks_dir_path / "input_tasks.json" self.output_tasks_dir_path = self.output_path / "output_1" - self.output_tasks_path = self.output_tasks_dir_path / "output_tasks.json" + self.output_tasks_path = ( + self.output_tasks_dir_path / "output_tasks.json" + ) if self.output_tasks_path.exists(): self.output_tasks_path.unlink() @@ -136,7 +144,8 @@ def start(self): ): if waiter % 10 == 0: logger.info( - f"Waiting for {INPUT_PARAMETERS_KEY} " "to exist in key_values..." + f"Waiting for {INPUT_PARAMETERS_KEY} " + "to exist in key_values..." ) key_values = json.loads(self.key_values_path.read_text()) time.sleep(self.polling_interval) @@ -153,7 +162,9 @@ def start(self): waiter = 0 while not self.input_tasks_path.exists(): if waiter % 10 == 0: - logger.info(f"Waiting for input file at {self.input_tasks_path}...") + logger.info( + f"Waiting for input file at {self.input_tasks_path}..." + ) time.sleep(self.polling_interval) waiter += 1 @@ -186,12 +197,16 @@ def start(self): waiter += 1 else: input_tasks = input_dict["tasks"] - output_tasks = self.run_tasks(tasks_uuid, input_tasks, n_of_workers) + output_tasks = self.run_tasks( + tasks_uuid, input_tasks, n_of_workers + ) output_tasks_content = json.dumps( {"uuid": tasks_uuid, "tasks": output_tasks} ) self.output_tasks_path.write_text(output_tasks_content) - logger.info(f"Finished a set of tasks: {output_tasks_content}") + logger.info( + f"Finished a set of tasks: {output_tasks_content}" + ) last_tasks_uuid = tasks_uuid waiter = 0 else: @@ -220,9 +235,9 @@ def map_func(task): tmp_input_file_path = tmp_dir_path / param_filename tmp_input_file_path.write_text(json.dumps(param_value)) - input_data_file = osparc.FilesApi(self.api_client).upload_file( - file=tmp_input_file_path - ) + input_data_file = osparc.FilesApi( + self.api_client + ).upload_file(file=tmp_input_file_path) job_inputs["values"][param_name] = input_data_file elif param_type == "file": file_info = json.loads(param_value) @@ -250,7 +265,10 @@ def map_func(task): study_id=self.template_id, job_id=job.id ) - while job_status.state != "SUCCESS" and job_status.state != "FAILED": + while ( + job_status.state != "SUCCESS" + and job_status.state != "FAILED" + ): job_status = self.studies_api.inspect_study_job( study_id=self.template_id, job_id=job.id ) @@ -267,7 +285,9 @@ def map_func(task): for probe_name, probe_output in results.items(): if probe_name not in output: - raise ValueError(f"Unknown probe in output: {probe_name}") + raise ValueError( + f"Unknown probe in output: {probe_name}" + ) probe_type = output[probe_name]["type"] if probe_type == "FileJSON": @@ -280,7 +300,9 @@ def map_func(task): file_results_path = zipfile.Path( zip_file, at=output[probe_name]["filename"] ) - file_results = json.loads(file_results_path.read_text()) + file_results = json.loads( + file_results_path.read_text() + ) output[probe_name]["value"] = file_results elif probe_type == "file": @@ -305,6 +327,10 @@ def map_func(task): return task + if self.template_id == "TEST_UUID": + logger.info("Map in test mode, just returning input") + return input_tasks + logger.info(f"Starting tasks on {n_of_workers} workers") with pathos.pools.ThreadPool(nodes=n_of_workers) as pool: output_tasks = list(pool.map(map_func, input_tasks)) diff --git a/validation-client/client.py b/validation-client/client.py new file mode 100644 index 0000000..3813fe7 --- /dev/null +++ b/validation-client/client.py @@ -0,0 +1,68 @@ +"""Test client for validation""" + +import json +import logging +import os +import pathlib as pl +import time +import uuid + +from osparc_filecomms import handshakers + +logging.basicConfig( + level=logging.INFO, + format="Validation client: [%(filename)s:%(lineno)d] %(message)s", +) +logger = logging.getLogger(__name__) + + +def main(): + logger.info("Started validation client") + + client_uuid = str(uuid.uuid4()) + task_uuid = str(uuid.uuid4()) + client_input_path = pl.Path(os.environ["VALIDATION_CLIENT_INPUT_PATH"]) + client_output_path = pl.Path(os.environ["VALIDATION_CLIENT_OUTPUT_PATH"]) + input_tasks_path = client_output_path / "input_tasks.json" + output_tasks_path = client_input_path / "output_tasks.json" + + this_dir = pl.Path(__file__).parent + + input_tasks_template_path = this_dir / "input_tasks_template.json" + input_tasks = json.loads(input_tasks_template_path.read_text()) + input_tasks["uuid"] = task_uuid + input_tasks["caller_uuid"] = client_uuid + + handshaker = handshakers.FileHandshaker( + client_uuid, + client_input_path, + client_output_path, + is_initiator=False, + verbose_level=logging.DEBUG, + polling_interval=0.1, + print_polling_interval=100, + ) + map_uuid = handshaker.shake() + input_tasks["map_uuid"] = map_uuid + + input_tasks_path.write_text(json.dumps(input_tasks)) + + while not os.path.exists(output_tasks_path): + time.sleep(0.1) + + output_tasks = json.loads(output_tasks_path.read_text()) + + assert output_tasks["uuid"] == task_uuid + + logger.info(output_tasks) + + stop_command = { + "caller_uuid": client_uuid, + "map_uuid": map_uuid, + "command": "stop", + } + input_tasks_path.write_text(json.dumps(stop_command)) + + +if __name__ == "__main__": + main() diff --git a/validation/inputs/input_2/input_tasks.json b/validation-client/input_tasks_template.json similarity index 98% rename from validation/inputs/input_2/input_tasks.json rename to validation-client/input_tasks_template.json index 2b4ed75..07e0137 100755 --- a/validation/inputs/input_2/input_tasks.json +++ b/validation-client/input_tasks_template.json @@ -1,5 +1,6 @@ { - "uuid": "23423432", + "uuid": "dummy", + "caller_uuid": "dummy", "command": "run", "tasks": [ { diff --git a/validation/inputs/key_values.json b/validation/inputs/key_values.json index 3e2fdaa..e2859a9 100755 --- a/validation/inputs/key_values.json +++ b/validation/inputs/key_values.json @@ -1,7 +1,7 @@ { "input_0": { "key": "input_0", - "value": "a13d566e-c05b-11ee-95bf-02420a000008" + "value": "TEST_UUID" }, "input_1": {"key": "input_1", "value": 2}, "input_2": {"key": "input_2", "value": "/tmp/inputs/input_2/"}