diff --git a/alab_management/device_view/device.py b/alab_management/device_view/device.py index 7d1862b6..3f7a537e 100644 --- a/alab_management/device_view/device.py +++ b/alab_management/device_view/device.py @@ -176,6 +176,7 @@ def __init__(self, address: str, port: int = 502, *args, **kwargs): raise TypeError("description must be a string") from alab_management.device_view import DeviceView + self.__connected = False self._device_view = DeviceView() self._signalemitter = DeviceSignalEmitter( @@ -613,7 +614,7 @@ def retrieve_signal(self, signal_name, within: datetime.timedelta | None = None) def add_device(device: BaseDevice): """Register a device instance. It is stored in a global dictionary.""" if device.name in _device_registry: - raise KeyError(f"Duplicated device name {device.name}") + raise KeyError(f"Duplicated device name {device.name}.") _device_registry[device.name] = device diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index 2b1a6059..9b3578c5 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -50,7 +50,9 @@ def run(self): }, ) start = time.time() - while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): + while not self.termination_event.is_set() and ( + self.live_time is None or time.time() - start < self.live_time + ): self._loop() time.sleep(1) @@ -127,6 +129,7 @@ def _handle_pending_experiment(self, experiment: dict[str, Any]): parameters=task["parameters"], samples=samples, task_id=task.get("task_id", None), + commit_hash_or_version=experiment["commit_hash_or_version"], ) ) diff --git a/alab_management/experiment_view/experiment_view.py b/alab_management/experiment_view/experiment_view.py index e7c08671..429f8dfa 100644 --- a/alab_management/experiment_view/experiment_view.py +++ b/alab_management/experiment_view/experiment_view.py @@ -1,7 +1,11 @@ """A wrapper over the ``experiment`` class.""" +import os +import shutil +from copy import copy from datetime import datetime from enum import Enum, auto +from pathlib import Path from typing import Any, cast from bson import ObjectId # type: ignore @@ -71,10 +75,38 @@ def create_experiment(self, experiment: InputExperiment) -> ObjectId: f"was not submitted." ) + # create a new folder in the version control folder inside working directory if it does not exist + from alab_management.config import AlabOSConfig + from alab_management.utils.versioning import get_version + + config = AlabOSConfig() + working_dir = config["general"]["working_dir"] + + dir_to_import_from = copy(working_dir) + dir_to_import_from = ( + Path(dir_to_import_from) + if os.path.isabs(dir_to_import_from) + else config.path.parent / dir_to_import_from + ) + versions_dir = os.listdir(dir_to_import_from / "versions") + current_version = get_version() + if current_version not in versions_dir: + os.mkdir(dir_to_import_from / "versions" / current_version) + # copy all the folders other than versions folder to the new folder using shutil + folders = os.listdir(dir_to_import_from) + for folder in folders: + if folder != "versions": + # copy the folder to the new folder + shutil.copytree( + dir_to_import_from / folder, + dir_to_import_from / "versions" / current_version / folder, + ) + # all good, lets submit the experiment into ALabOS! result = self._experiment_collection.insert_one( { **experiment.dict(), + "commit_hash_or_version": current_version, "submitted_at": datetime.now(), "status": ExperimentStatus.PENDING.name, } diff --git a/alab_management/resource_manager/resource_manager.py b/alab_management/resource_manager/resource_manager.py index dca0b891..cc8c344b 100644 --- a/alab_management/resource_manager/resource_manager.py +++ b/alab_management/resource_manager/resource_manager.py @@ -25,6 +25,7 @@ from alab_management.task_view.task_enums import CancelingProgress, TaskStatus from alab_management.utils.data_objects import DocumentNotUpdatedError, get_collection from alab_management.utils.module_ops import load_definition +from alab_management.utils.versioning import get_version class ResourceManager(RequestMixin): @@ -36,7 +37,7 @@ class ResourceManager(RequestMixin): """ def __init__(self, live_time: float | None = None, termination_event=None): - load_definition() + load_definition(get_version()) self.task_view = TaskView() self.sample_view = SampleView() self.device_view = DeviceView() @@ -51,7 +52,9 @@ def __init__(self, live_time: float | None = None, termination_event=None): def run(self): """Start the loop.""" start = time.time() - while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): + while not self.termination_event.is_set() and ( + self.live_time is None or time.time() - start < self.live_time + ): self._loop() time.sleep(0.5) diff --git a/alab_management/scripts/cleanup_lab.py b/alab_management/scripts/cleanup_lab.py index 891a7e31..783c4b6f 100644 --- a/alab_management/scripts/cleanup_lab.py +++ b/alab_management/scripts/cleanup_lab.py @@ -5,6 +5,11 @@ be deleted. """ +import os +import shutil +from copy import copy +from pathlib import Path + def cleanup_lab( all_collections: bool = False, @@ -12,15 +17,37 @@ def cleanup_lab( user_confirmation: str = None, sim_mode: bool = True, database_name: str = None, + remove_versions: bool = True, ): - """Drop device, sample_position collection from MongoDB.""" + """ + Drop device, sample_position collection from MongoDB. + Can also drop the whole database if 'all_collections' is true. + Can also remove the versions folder if 'remove_versions' is true. + """ from alab_management.config import AlabOSConfig # type: ignore from alab_management.device_view.device_view import DeviceView from alab_management.sample_view.sample_view import SampleView from alab_management.utils.data_objects import _GetMongoCollection - _GetMongoCollection.init() config = AlabOSConfig() + if remove_versions: + print("Removing versions folder") + working_dir = config["general"]["working_dir"] + dir_to_import_from = copy(working_dir) + dir_to_import_from = ( + Path(dir_to_import_from) + if os.path.isabs(dir_to_import_from) + else config.path.parent / dir_to_import_from + ) + versions_folders = [ + entry + for entry in os.listdir(dir_to_import_from / "versions") + if os.path.isdir(os.path.join(dir_to_import_from / "versions", entry)) + ] + for version_folder in versions_folders: + shutil.rmtree(dir_to_import_from / "versions" / version_folder) + + _GetMongoCollection.init() task_count_new = ( _GetMongoCollection.client.get_database(config["general"]["name"]) .get_collection("tasks") diff --git a/alab_management/scripts/setup_lab.py b/alab_management/scripts/setup_lab.py index 62d7791b..628647d2 100644 --- a/alab_management/scripts/setup_lab.py +++ b/alab_management/scripts/setup_lab.py @@ -3,15 +3,60 @@ and write them to MongoDB, which will make it easier to query. """ +import os +import shutil +from copy import copy +from pathlib import Path + def setup_lab(): """Cleanup the db and then import all the definitions and set up the db.""" + # create a new folder in the version control folder inside working directory if it does not exist + from alab_management.config import AlabOSConfig from alab_management.device_view import DeviceView, get_all_devices from alab_management.sample_view import SampleView from alab_management.sample_view.sample import get_all_standalone_sample_positions from alab_management.utils.module_ops import load_definition + from alab_management.utils.versioning import get_version + + config = AlabOSConfig() + working_dir = config["general"]["working_dir"] + + dir_to_import_from = copy(working_dir) + dir_to_import_from = ( + Path(dir_to_import_from) + if os.path.isabs(dir_to_import_from) + else config.path.parent / dir_to_import_from + ) + versions_dir = os.listdir(dir_to_import_from / "versions") + current_version = get_version() + if current_version not in versions_dir: + os.mkdir(dir_to_import_from / "versions" / current_version) + # copy all the folders and files other than versions folder to the new folder using shutil + folders = [ + entry + for entry in os.listdir(dir_to_import_from) + if os.path.isdir(os.path.join(dir_to_import_from, entry)) + ] + files = [ + entry + for entry in os.listdir(dir_to_import_from) + if os.path.isfile(os.path.join(dir_to_import_from, entry)) + ] + for folder in folders: + if folder != "versions": + # copy the folder to the new folder + shutil.copytree( + dir_to_import_from / folder, + dir_to_import_from / "versions" / current_version / folder, + ) + for file in files: + shutil.copy( + dir_to_import_from / file, + dir_to_import_from / "versions" / current_version / file, + ) - load_definition() + load_definition(current_version) devices = get_all_devices().values() DeviceView().add_devices_to_db() for device_instance in devices: diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index b481f349..ae1147eb 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -25,9 +25,11 @@ notify_shutdown=True, ) # time limit is set in ms. currently set to 30 days def run_task(task_id_str: str): - """Submit a task. In this system, each task is run in an - independent process, which will try to acquire device and - process samples. This will change the status of the task under the specified id into "RUNNING". + """Submit a task. Each task is run in an independent process, which will try to acquire device and process samples. + A task is created in TaskView with the status "WAITING". + When the task is ready to be run, the task status will be changed to "READY". + Once task is ready, TaskManager will pick up the task and call this function. + This will change the status of the task under the specified id into "RUNNING". If the task is not in "INITIATED" state, it has been picked up by another task actor beforehand, and no action is taken. If an Abort (exception) signal is sent, the task status will be changed to "CANCELLED". @@ -41,7 +43,6 @@ def run_task(task_id_str: str): """ from .lab_view import LabView # pylint: disable=cyclic-import - load_definition() task_view = TaskView() sample_view = SampleView() logger = DBLogger(task_id=None) @@ -49,6 +50,7 @@ def run_task(task_id_str: str): task_id = ObjectId(task_id_str) try: task_entry = task_view.get_task(task_id, encode=True) + load_definition(task_entry["commit_hash_or_version"]) except ValueError: print( f"{datetime.datetime.now()}: No task found with id: {task_id} -- assuming that alabos was aborted without " diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 11d4058c..08e17ec8 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -13,6 +13,7 @@ from alab_management.task_view import TaskView from alab_management.task_view.task_enums import CancelingProgress, TaskStatus from alab_management.utils.module_ops import load_definition +from alab_management.utils.versioning import get_version class TaskManager: @@ -24,7 +25,7 @@ class TaskManager: """ def __init__(self, live_time: float | None = None, termination_event=None): - load_definition() + load_definition(get_version()) self.task_view = TaskView() self.logger = DBLogger(task_id=None) super().__init__() @@ -35,7 +36,9 @@ def __init__(self, live_time: float | None = None, termination_event=None): def run(self): """Start the loop.""" start = time.time() - while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): + while not self.termination_event.is_set() and ( + self.live_time is None or time.time() - start < self.live_time + ): self._loop() time.sleep(1) diff --git a/alab_management/task_view/task_view.py b/alab_management/task_view/task_view.py index bf801d53..aece8206 100644 --- a/alab_management/task_view/task_view.py +++ b/alab_management/task_view/task_view.py @@ -30,6 +30,7 @@ def create_task( prev_tasks: ObjectId | list[ObjectId] | None = None, next_tasks: ObjectId | list[ObjectId] | None = None, task_id: ObjectId | None = None, + commit_hash_or_version: str | None = None, ) -> ObjectId: """ Insert a task into the task collection. @@ -69,6 +70,7 @@ def create_task( "next_tasks": next_tasks, "created_at": datetime.now(), "last_updated": datetime.now(), + "commit_hash_or_version": commit_hash_or_version, "message": "", } if isinstance(task_id, ObjectId): diff --git a/alab_management/utils/module_ops.py b/alab_management/utils/module_ops.py index 5d10c483..6ea2f3a2 100644 --- a/alab_management/utils/module_ops.py +++ b/alab_management/utils/module_ops.py @@ -19,9 +19,9 @@ def import_module_from_path(path: str | Path, parent_package: str | None = None) try: module = importlib.import_module(path.name, parent_package) except AttributeError as exception: - exception.args = ( - exception.args[0] - + " Maybe there is some bugs in your definition, please check that." + print( + f"Error occurred while importing module from path: {exception}." + f"Maybe there is some bugs in your definition, please check that." ) raise sys.path.pop(0) @@ -29,8 +29,8 @@ def import_module_from_path(path: str | Path, parent_package: str | None = None) return module -def load_definition(): - """Load device and task definitions from file (specified in config file).""" +def load_definition(commit_hash_or_version: str): + """Load device and task definitions from file (specified in config file) according to the specified commit_hash_or_version.""" from alab_management.config import AlabOSConfig config = AlabOSConfig() @@ -38,9 +38,12 @@ def load_definition(): dir_to_import_from = copy(working_dir) dir_to_import_from = ( - Path(dir_to_import_from) + Path(dir_to_import_from) / "versions" / commit_hash_or_version if os.path.isabs(dir_to_import_from) - else config.path.parent / dir_to_import_from + else config.path.parent + / dir_to_import_from + / "versions" + / commit_hash_or_version ) import_module_from_path(dir_to_import_from) diff --git a/alab_management/utils/versioning.py b/alab_management/utils/versioning.py new file mode 100644 index 00000000..a8d0d4c6 --- /dev/null +++ b/alab_management/utils/versioning.py @@ -0,0 +1,35 @@ +"""This file contains the functions to get the current version of the alabos either by commit hash or arbitrary versioning.""" + +import os +import subprocess +from copy import copy +from pathlib import Path + + +def get_version(): + """Get the current version of the alabos either by git commit_hash system or manual.""" + from alab_management.config import AlabOSConfig + + config = AlabOSConfig() + versioning_style = config["versioning"]["versioning_style"] + working_dir = config["general"]["working_dir"] + + dir_to_import_from = copy(working_dir) + dir_to_import_from = ( + Path(dir_to_import_from) + if os.path.isabs(dir_to_import_from) + else config.path.parent / dir_to_import_from + ) + # get current directory + current_dir = Path(os.getcwd()) + if versioning_style == "manual": + return config["versioning"]["version"] + elif versioning_style == "git": + # change to the directory to import from and get the commit hash + os.chdir(dir_to_import_from) + commit_hash = ( + subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip() + ) + # change back to the previous directory where the script was run + os.chdir(current_dir) + return commit_hash diff --git a/examples/fake_lab/config.toml b/examples/fake_lab/config.toml index ba5e1e51..4e47d22f 100644 --- a/examples/fake_lab/config.toml +++ b/examples/fake_lab/config.toml @@ -17,3 +17,6 @@ password = '' [rabbitmq] host = "localhost" port = 5672 + +[versioning] +versioning_style = "git" \ No newline at end of file diff --git a/tests/fake_lab/config.toml b/tests/fake_lab/config.toml index 307aa652..dff9a005 100644 --- a/tests/fake_lab/config.toml +++ b/tests/fake_lab/config.toml @@ -24,3 +24,6 @@ host = 'localhost' port = 27017 username = '' password = '' + +[versioning] +versioning_style = "git" diff --git a/tests/fake_lab/versions/do not remove this folder as this is used to keep versions definitions b/tests/fake_lab/versions/do not remove this folder as this is used to keep versions definitions new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_device_view.py b/tests/test_device_view.py index 391dfa55..bda66da5 100644 --- a/tests/test_device_view.py +++ b/tests/test_device_view.py @@ -28,6 +28,7 @@ def setUp(self): sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.device_view = DeviceView() diff --git a/tests/test_experiment_manager.py b/tests/test_experiment_manager.py index 5e36564e..57bf0ee4 100644 --- a/tests/test_experiment_manager.py +++ b/tests/test_experiment_manager.py @@ -15,6 +15,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.experiment_manager = ExperimentManager() @@ -26,6 +27,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) def test_handle_pending_experiments(self): diff --git a/tests/test_experiment_view.py b/tests/test_experiment_view.py index bf0c73b3..7c26afac 100644 --- a/tests/test_experiment_view.py +++ b/tests/test_experiment_view.py @@ -9,6 +9,7 @@ ) from alab_management.scripts.cleanup_lab import cleanup_lab from alab_management.scripts.setup_lab import setup_lab +from alab_management.utils.versioning import get_version class TestExperimentView(TestCase): @@ -19,6 +20,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.experiment_view = ExperimentView() @@ -32,6 +34,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) self.experiment_collection.drop() @@ -61,6 +64,7 @@ def test_create_experiment(self): exp_dict = exp_template.dict() exp_dict["_id"] = exp_id exp_dict["status"] = "PENDING" + exp_dict["commit_hash_or_version"] = get_version() exp.pop("submitted_at") diff --git a/tests/test_lab_view.py b/tests/test_lab_view.py index 1ecb1cd5..2c3686f1 100644 --- a/tests/test_lab_view.py +++ b/tests/test_lab_view.py @@ -32,6 +32,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.device_view = DeviceView() @@ -51,6 +52,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) time.sleep(1) diff --git a/tests/test_launch.py b/tests/test_launch.py index 848ea8bb..5222ad51 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -22,6 +22,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.task_view = TaskView() @@ -50,6 +51,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) def test_submit_experiment(self): diff --git a/tests/test_sample_view.py b/tests/test_sample_view.py index 58f23d3f..3fa014a5 100644 --- a/tests/test_sample_view.py +++ b/tests/test_sample_view.py @@ -32,6 +32,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.sample_view = SampleView() @@ -44,6 +45,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) self.sample_view._sample_collection.drop() diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py index 56216805..d6b4c604 100644 --- a/tests/test_task_manager.py +++ b/tests/test_task_manager.py @@ -34,6 +34,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.devices = get_all_devices() @@ -47,7 +48,7 @@ def setUp(self) -> None: } ) self.resource_requester = ResourceRequester(task_id=fake_task.inserted_id) - self.process = Process(target=launch_resource_manager) + self.process = Process(target=launch_resource_manager(live_time=300)) self.process.daemon = True self.process.start() time.sleep(0.5) diff --git a/tests/test_task_view.py b/tests/test_task_view.py index 785c4c9f..e8e25dc2 100644 --- a/tests/test_task_view.py +++ b/tests/test_task_view.py @@ -15,6 +15,7 @@ def setUp(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) setup_lab() self.task_view = TaskView() @@ -27,6 +28,7 @@ def tearDown(self) -> None: sim_mode=True, database_name="Alab_sim", user_confirmation="y", + remove_versions=True, ) self.task_view._task_collection.drop()