Skip to content

Commit

Permalink
Adding run-local code
Browse files Browse the repository at this point in the history
  • Loading branch information
wvangeit committed Apr 9, 2024
1 parent 274f7ba commit 704122d
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 35 deletions.
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

SHELL = /bin/sh
.DEFAULT_GOAL := help
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-map
export DOCKER_IMAGE_TAG ?= 0.0.5
Expand Down Expand Up @@ -34,11 +35,20 @@ clean:
build: clean compose-spec ## build docker image
docker-compose build

.PHONY: run-local
run-local: ## runs image with local configuration
validation-clean:
sudo rm -rf validation-tmp
cp -r validation validation-tmp
chmod -R 770 validation-tmp

validation_client_run: validation-clean
pip install osparc-filecomms
VALIDATION_CLIENT_INPUT_PATH=validation-tmp/outputs/output_1 VALIDATION_CLIENT_OUTPUT_PATH=validation-tmp/inputs/input_2 python validation-client/client.py

docker_compose: validation-clean
docker-compose --file docker-compose-local.yml up

.PHONY: run-local
run-local: validation_client_run docker_compose ## runs image with local configuration

.PHONY: publish-local
publish-local: ## push to local throw away registry to test integration
Expand Down
4 changes: 3 additions & 1 deletion docker_scripts/map.bash
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash

pip install -r /docker/requirements.txt
python3 /docker/map.py
echo "Starting map python code"
python3 /docker/map.py
echo "Closing map python code"
86 changes: 56 additions & 30 deletions docker_scripts/map.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from osparc_filecomms import handshakers
import osparc_client.models.file
import osparc_client
import osparc
import pathos
import contextlib
import http.server
import json
import logging
import os
import pathlib as pl
import socketserver
import tempfile
import threading
import time
import uuid
import contextlib
import tempfile
import zipfile
import http.server
import socketserver

import pathlib as pl
import logging
import threading
import osparc
import osparc_client
import osparc_client.models.file
import pathos
from osparc_filecomms import handshakers

logging.basicConfig(level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s")
logging.basicConfig(
level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s"
)
logger = logging.getLogger(__name__)

HTTP_PORT = 8888
Expand All @@ -40,17 +42,21 @@ class HTTPHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, directory=http_dir_path)

pyrunner = MapRunner(input_path, output_path, polling_interval=POLLING_INTERVAL)
maprunner = MapRunner(
input_path, output_path, polling_interval=POLLING_INTERVAL
)

try:
logger.info(
f"Starting http server at port {HTTP_PORT} and serving path {http_dir_path}"
)
httpd = socketserver.TCPServer(("", HTTP_PORT), HTTPHandler)
threading.Thread(target=httpd.serve_forever).start()
pyrunner.setup()
pyrunner.start()
pyrunner.teardown()
with socketserver.TCPServer(("", HTTP_PORT), HTTPHandler) as httpd:
httpd_thread = threading.Thread(target=httpd.serve_forever)
httpd_thread.start()
maprunner.setup()
maprunner.start()
maprunner.teardown()
httpd.shutdown()
except Exception as err: # pylint: disable=broad-except
logger.error(f"{err} . Stopping %s", exc_info=True)

Expand Down Expand Up @@ -79,7 +85,9 @@ def __init__(self, input_path, output_path, polling_interval=1):
self.input_tasks_path = self.input_tasks_dir_path / "input_tasks.json"

self.output_tasks_dir_path = self.output_path / "output_1"
self.output_tasks_path = self.output_tasks_dir_path / "output_tasks.json"
self.output_tasks_path = (
self.output_tasks_dir_path / "output_tasks.json"
)

if self.output_tasks_path.exists():
self.output_tasks_path.unlink()
Expand Down Expand Up @@ -136,7 +144,8 @@ def start(self):
):
if waiter % 10 == 0:
logger.info(
f"Waiting for {INPUT_PARAMETERS_KEY} " "to exist in key_values..."
f"Waiting for {INPUT_PARAMETERS_KEY} "
"to exist in key_values..."
)
key_values = json.loads(self.key_values_path.read_text())
time.sleep(self.polling_interval)
Expand All @@ -153,7 +162,9 @@ def start(self):
waiter = 0
while not self.input_tasks_path.exists():
if waiter % 10 == 0:
logger.info(f"Waiting for input file at {self.input_tasks_path}...")
logger.info(
f"Waiting for input file at {self.input_tasks_path}..."
)
time.sleep(self.polling_interval)
waiter += 1

Expand Down Expand Up @@ -186,12 +197,16 @@ def start(self):
waiter += 1
else:
input_tasks = input_dict["tasks"]
output_tasks = self.run_tasks(tasks_uuid, input_tasks, n_of_workers)
output_tasks = self.run_tasks(
tasks_uuid, input_tasks, n_of_workers
)
output_tasks_content = json.dumps(
{"uuid": tasks_uuid, "tasks": output_tasks}
)
self.output_tasks_path.write_text(output_tasks_content)
logger.info(f"Finished a set of tasks: {output_tasks_content}")
logger.info(
f"Finished a set of tasks: {output_tasks_content}"
)
last_tasks_uuid = tasks_uuid
waiter = 0
else:
Expand Down Expand Up @@ -220,9 +235,9 @@ def map_func(task):
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))

input_data_file = osparc.FilesApi(self.api_client).upload_file(
file=tmp_input_file_path
)
input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
job_inputs["values"][param_name] = input_data_file
elif param_type == "file":
file_info = json.loads(param_value)
Expand Down Expand Up @@ -250,7 +265,10 @@ def map_func(task):
study_id=self.template_id, job_id=job.id
)

while job_status.state != "SUCCESS" and job_status.state != "FAILED":
while (
job_status.state != "SUCCESS"
and job_status.state != "FAILED"
):
job_status = self.studies_api.inspect_study_job(
study_id=self.template_id, job_id=job.id
)
Expand All @@ -267,7 +285,9 @@ def map_func(task):

for probe_name, probe_output in results.items():
if probe_name not in output:
raise ValueError(f"Unknown probe in output: {probe_name}")
raise ValueError(
f"Unknown probe in output: {probe_name}"
)
probe_type = output[probe_name]["type"]

if probe_type == "FileJSON":
Expand All @@ -280,7 +300,9 @@ def map_func(task):
file_results_path = zipfile.Path(
zip_file, at=output[probe_name]["filename"]
)
file_results = json.loads(file_results_path.read_text())
file_results = json.loads(
file_results_path.read_text()
)

output[probe_name]["value"] = file_results
elif probe_type == "file":
Expand All @@ -305,6 +327,10 @@ def map_func(task):

return task

if self.template_id == "TEST_UUID":
logger.info("Map in test mode, just returning input")
return input_tasks

logger.info(f"Starting tasks on {n_of_workers} workers")
with pathos.pools.ThreadPool(nodes=n_of_workers) as pool:
output_tasks = list(pool.map(map_func, input_tasks))
Expand Down
68 changes: 68 additions & 0 deletions validation-client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Test client for validation"""

import json
import logging
import os
import pathlib as pl
import time
import uuid

from osparc_filecomms import handshakers

logging.basicConfig(
level=logging.INFO,
format="Validation client: [%(filename)s:%(lineno)d] %(message)s",
)
logger = logging.getLogger(__name__)


def main():
logger.info("Started validation client")

client_uuid = str(uuid.uuid4())
task_uuid = str(uuid.uuid4())
client_input_path = pl.Path(os.environ["VALIDATION_CLIENT_INPUT_PATH"])
client_output_path = pl.Path(os.environ["VALIDATION_CLIENT_OUTPUT_PATH"])
input_tasks_path = client_output_path / "input_tasks.json"
output_tasks_path = client_input_path / "output_tasks.json"

this_dir = pl.Path(__file__).parent

input_tasks_template_path = this_dir / "input_tasks_template.json"
input_tasks = json.loads(input_tasks_template_path.read_text())
input_tasks["uuid"] = task_uuid
input_tasks["caller_uuid"] = client_uuid

handshaker = handshakers.FileHandshaker(
client_uuid,
client_input_path,
client_output_path,
is_initiator=False,
verbose_level=logging.DEBUG,
polling_interval=0.1,
print_polling_interval=100,
)
map_uuid = handshaker.shake()
input_tasks["map_uuid"] = map_uuid

input_tasks_path.write_text(json.dumps(input_tasks))

while not os.path.exists(output_tasks_path):
time.sleep(0.1)

output_tasks = json.loads(output_tasks_path.read_text())

assert output_tasks["uuid"] == task_uuid

logger.info(output_tasks)

stop_command = {
"caller_uuid": client_uuid,
"map_uuid": map_uuid,
"command": "stop",
}
input_tasks_path.write_text(json.dumps(stop_command))


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"uuid": "23423432",
"uuid": "dummy",
"caller_uuid": "dummy",
"command": "run",
"tasks": [
{
Expand Down
2 changes: 1 addition & 1 deletion validation/inputs/key_values.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"input_0": {
"key": "input_0",
"value": "a13d566e-c05b-11ee-95bf-02420a000008"
"value": "TEST_UUID"
},
"input_1": {"key": "input_1", "value": 2},
"input_2": {"key": "input_2", "value": "/tmp/inputs/input_2/"}
Expand Down

0 comments on commit 704122d

Please sign in to comment.