From e4044c72d77d3b696f17b9711bd4c7a5b835659d Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Tue, 28 May 2024 10:43:12 -0400 Subject: [PATCH] Remove unused lattice attributes - named_args - named_kwargs - cova_imports - lattice_imports --- covalent/_results_manager/result.py | 33 +-- covalent/_serialize/lattice.py | 43 --- covalent/_shared_files/schemas/lattice.py | 14 +- covalent/_workflow/lattice.py | 12 +- covalent_dispatcher/_cli/cli.py | 14 +- covalent_dispatcher/_cli/migrate.py | 208 --------------- covalent_dispatcher/_cli/service.py | 12 - .../_dal/db_interfaces/lattice_utils.py | 8 - covalent_dispatcher/_dal/importers/lattice.py | 8 - covalent_dispatcher/_db/dispatchdb.py | 33 --- covalent_dispatcher/_db/models.py | 8 +- covalent_dispatcher/_db/upsert.py | 12 - covalent_dispatcher/_db/write_result_to_db.py | 8 - covalent_dispatcher/_service/models.py | 4 - covalent_ui/result_webhook.py | 9 +- .../_cli/cli_test.py | 1 - .../_cli/migrate_test.py | 249 ------------------ .../_dal/lattice_test.py | 3 - .../_db/update_test.py | 13 - .../_db/write_result_to_db_test.py | 16 -- .../workflow/dispatch_source_test.py | 94 ------- .../workflow/lattice_serialization_test.py | 1 - 22 files changed, 19 insertions(+), 784 deletions(-) delete mode 100644 covalent_dispatcher/_cli/migrate.py delete mode 100644 tests/covalent_dispatcher_tests/_cli/migrate_test.py delete mode 100644 tests/covalent_tests/workflow/dispatch_source_test.py diff --git a/covalent/_results_manager/result.py b/covalent/_results_manager/result.py index a42f514a6..8a6e3520b 100644 --- a/covalent/_results_manager/result.py +++ b/covalent/_results_manager/result.py @@ -18,7 +18,7 @@ import os import re from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Set, Union +from typing import TYPE_CHECKING, Any, Dict, List, Union from .._shared_files import logger from .._shared_files.config import get_config @@ -516,34 +516,3 @@ def _convert_to_electron_result(self) -> Any: """ return self._result - - -def _filter_cova_decorators(function_string: str, cova_imports: Set[str]) -> str: - """ - Given a string representing a function, comment out any Covalent-related decorators. - - Args - function_string: A string representation of a workflow function. - - Returns: - The function string with Covalent-related decorators commented out. - """ - - has_cova_decorator = False - in_decorator = 0 - function_lines = function_string.split("\n") - for i in range(len(function_lines)): - line = function_lines[i].strip() - if in_decorator > 0: - function_lines[i] = f"# {function_lines[i]}" - in_decorator += line.count("(") - in_decorator -= line.count(")") - elif line.startswith("@"): - decorator_name = line.split("@")[1].split(".")[0].split("(")[0] - if decorator_name in cova_imports: - function_lines[i] = f"# {function_lines[i]}" - has_cova_decorator = True - in_decorator += line.count("(") - in_decorator -= line.count(")") - - return "\n".join(function_lines) if has_cova_decorator else function_string diff --git a/covalent/_serialize/lattice.py b/covalent/_serialize/lattice.py index 6fbd1b98c..3d61fcfc1 100644 --- a/covalent/_serialize/lattice.py +++ b/covalent/_serialize/lattice.py @@ -40,10 +40,6 @@ "workflow_function_string": AssetType.TEXT, "doc": AssetType.TEXT, "inputs": AssetType.TRANSPORTABLE, - "named_args": AssetType.TRANSPORTABLE, - "named_kwargs": AssetType.TRANSPORTABLE, - "cova_imports": AssetType.JSONABLE, - "lattice_imports": AssetType.TEXT, "hooks": AssetType.JSONABLE, } @@ -112,33 +108,6 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets: lat.inputs, ASSET_TYPES["inputs"], storage_path, ASSET_FILENAME_MAP["inputs"] ) - # Deprecate - named_args_asset = save_asset( - lat.named_args, - ASSET_TYPES["named_args"], - storage_path, - ASSET_FILENAME_MAP["named_args"], - ) - named_kwargs_asset = save_asset( - lat.named_kwargs, - ASSET_TYPES["named_kwargs"], - storage_path, - ASSET_FILENAME_MAP["named_kwargs"], - ) - cova_imports_asset = save_asset( - lat.cova_imports, - ASSET_TYPES["cova_imports"], - storage_path, - ASSET_FILENAME_MAP["cova_imports"], - ) - lattice_imports_asset = save_asset( - lat.lattice_imports, - ASSET_TYPES["lattice_imports"], - storage_path, - ASSET_FILENAME_MAP["lattice_imports"], - ) - - # NOTE: these are actually JSONable hooks_asset = save_asset( lat.metadata["hooks"], ASSET_TYPES["hooks"], @@ -151,10 +120,6 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets: workflow_function_string=workflow_func_str_asset, doc=docstring_asset, inputs=inputs_asset, - named_args=named_args_asset, - named_kwargs=named_kwargs_asset, - cova_imports=cova_imports_asset, - lattice_imports=lattice_imports_asset, hooks=hooks_asset, ) @@ -166,20 +131,12 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict: ) doc = load_asset(assets.doc, ASSET_TYPES["doc"]) inputs = load_asset(assets.inputs, ASSET_TYPES["inputs"]) - named_args = load_asset(assets.named_args, ASSET_TYPES["named_args"]) - named_kwargs = load_asset(assets.named_kwargs, ASSET_TYPES["named_kwargs"]) - cova_imports = load_asset(assets.cova_imports, ASSET_TYPES["cova_imports"]) - lattice_imports = load_asset(assets.lattice_imports, ASSET_TYPES["lattice_imports"]) hooks = load_asset(assets.hooks, ASSET_TYPES["hooks"]) return { "workflow_function": workflow_function, "workflow_function_string": workflow_function_string, "__doc__": doc, "inputs": inputs, - "named_args": named_args, - "named_kwargs": named_kwargs, - "cova_imports": cova_imports, - "lattice_imports": lattice_imports, "metadata": { "hooks": hooks, }, diff --git a/covalent/_shared_files/schemas/lattice.py b/covalent/_shared_files/schemas/lattice.py index 2fece9c80..783b966ee 100644 --- a/covalent/_shared_files/schemas/lattice.py +++ b/covalent/_shared_files/schemas/lattice.py @@ -39,10 +39,6 @@ "workflow_function_string", "__doc__", "inputs", - "named_args", - "named_kwargs", - "cova_imports", - "lattice_imports", # user dependent assets "hooks", } @@ -83,10 +79,12 @@ class LatticeAssets(BaseModel): workflow_function_string: AssetSchema doc: AssetSchema # __doc__ inputs: AssetSchema - named_args: AssetSchema - named_kwargs: AssetSchema - cova_imports: AssetSchema - lattice_imports: AssetSchema + + # Deprecated + named_args: AssetSchema = AssetSchema(size=0) + named_kwargs: AssetSchema = AssetSchema(size=0) + cova_imports: AssetSchema = AssetSchema(size=0) + lattice_imports: AssetSchema = AssetSchema(size=0) # lattice.metadata hooks: AssetSchema diff --git a/covalent/_workflow/lattice.py b/covalent/_workflow/lattice.py index 84f74f6b1..146b837d9 100644 --- a/covalent/_workflow/lattice.py +++ b/covalent/_workflow/lattice.py @@ -47,7 +47,7 @@ from ..executor import BaseExecutor from ..triggers import BaseTrigger -from .._shared_files.utils import get_imports, get_serialized_function_str +from .._shared_files.utils import get_serialized_function_str consumable_constraints = [] @@ -81,10 +81,7 @@ def __init__( self.__doc__ = self.workflow_function.__doc__ self.post_processing = False self.inputs = None - self.named_args = None - self.named_kwargs = None self.electron_outputs = {} - self.lattice_imports, self.cova_imports = get_imports(self.workflow_function) self.workflow_function = TransportableObject.make_transportable(self.workflow_function) @@ -105,8 +102,6 @@ def serialize_to_json(self) -> str: attributes["transport_graph"] = self.transport_graph.serialize_to_json() attributes["inputs"] = self.inputs.to_dict() - attributes["named_args"] = self.named_args.to_dict() - attributes["named_kwargs"] = self.named_kwargs.to_dict() attributes["electron_outputs"] = {} for node_name, output in self.electron_outputs.items(): @@ -121,8 +116,6 @@ def deserialize_from_json(json_data: str) -> None: for node_name, object_dict in attributes["electron_outputs"].items(): attributes["electron_outputs"][node_name] = TransportableObject.from_dict(object_dict) - attributes["named_kwargs"] = TransportableObject.from_dict(attributes["named_kwargs"]) - attributes["named_args"] = TransportableObject.from_dict(attributes["named_args"]) attributes["inputs"] = TransportableObject.from_dict(attributes["inputs"]) if attributes["transport_graph"]: @@ -209,9 +202,6 @@ def build_graph(self, *args, **kwargs) -> None: new_kwargs = dict(named_kwargs.items()) self.inputs = TransportableObject({"args": args, "kwargs": kwargs}) - self.named_args = TransportableObject(named_args) - self.named_kwargs = TransportableObject(named_kwargs) - self.lattice_imports, self.cova_imports = get_imports(workflow_function) # Set any lattice metadata not explicitly set by the user constraint_names = {"executor", "workflow_executor", "hooks"} diff --git a/covalent_dispatcher/_cli/cli.py b/covalent_dispatcher/_cli/cli.py index f24f24aaf..f352305e6 100644 --- a/covalent_dispatcher/_cli/cli.py +++ b/covalent_dispatcher/_cli/cli.py @@ -25,18 +25,7 @@ from rich.console import Console from .groups import db, deploy -from .service import ( - cluster, - config, - logs, - migrate_legacy_result_object, - print_header, - purge, - restart, - start, - status, - stop, -) +from .service import cluster, config, logs, print_header, purge, restart, start, status, stop # Main entrypoint @@ -73,7 +62,6 @@ def cli(ctx: click.Context, version: bool) -> None: cli.add_command(cluster) cli.add_command(db) cli.add_command(config) -cli.add_command(migrate_legacy_result_object) cli.add_command(deploy) if __name__ == "__main__": diff --git a/covalent_dispatcher/_cli/migrate.py b/covalent_dispatcher/_cli/migrate.py deleted file mode 100644 index 032aafbf0..000000000 --- a/covalent_dispatcher/_cli/migrate.py +++ /dev/null @@ -1,208 +0,0 @@ -# Copyright 2021 Agnostiq Inc. -# -# This file is part of Covalent. -# -# Licensed under the Apache License 2.0 (the "License"). A copy of the -# License may be obtained with this software package or at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Use of this file is prohibited except in compliance with the License. -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utils for migrating legacy (0.110-era) result object to a modern result object.""" - -import pickle - -from covalent._results_manager import Result -from covalent._shared_files import logger -from covalent._shared_files.defaults import ( - attr_prefix, - electron_dict_prefix, - electron_list_prefix, - generator_prefix, - parameter_prefix, - subscript_prefix, -) -from covalent._shared_files.utils import get_named_params -from covalent._workflow.electron import to_decoded_electron_collection -from covalent._workflow.lattice import Lattice -from covalent._workflow.transport import TransportableObject, _TransportGraph, encode_metadata - -from .._db import update - -app_log = logger.app_log -log_stack_info = logger.log_stack_info - - -def process_node(node: dict) -> dict: - """Convert a node from a 0.110.2-vintage transport graph - - Args: - node: dictionary of node attributes - - Returns: - the converted node attributes - """ - - if "metadata" in node: - node["metadata"] = encode_metadata(node["metadata"]) - if "deps" not in node["metadata"]: - node["metadata"]["deps"] = {} - if "call_before" not in node["metadata"]: - node["metadata"]["call_before"] = [] - if "call_after" not in node["metadata"]: - node["metadata"]["call_after"] = [] - - node_name = node["name"] - - # encode output, remove "attribute_name", strip "attr_prefix" from name - if node_name.startswith(attr_prefix): - node["output"] = TransportableObject.make_transportable(node["output"]) - if "attribute_name" in node: - del node["attribute_name"] - new_node_name = node_name.replace(attr_prefix, "") - node["name"] = new_node_name - - # encode output, remove "key", strip "generator_prefix" from name - elif node_name.startswith(generator_prefix): - node["output"] = TransportableObject.make_transportable(node["output"]) - if "key" in node: - del node["key"] - new_node_name = node_name.replace(generator_prefix, "") - node["name"] = new_node_name - - # encode output, remove "key", strip "subscript_prefix" from name - elif node_name.startswith(subscript_prefix): - node["output"] = TransportableObject.make_transportable(node["output"]) - if "key" in node: - del node["key"] - new_node_name = node_name.replace(subscript_prefix, "") - node["name"] = new_node_name - - # Replace function for collection nodes - elif node_name.startswith(electron_list_prefix) or node_name.startswith(electron_dict_prefix): - node["function"] = TransportableObject(to_decoded_electron_collection) - - # Encode "value" and "output" for parameter nodes - elif node_name.startswith(parameter_prefix): - node["value"] = TransportableObject.make_transportable(node["value"]) - node["output"] = TransportableObject.make_transportable(node["output"]) - - # Function nodes: encode output and sublattice_result - else: - node["output"] = TransportableObject.make_transportable(node["output"]) - if "sublattice_result" in node: - if node["sublattice_result"] is not None: - node["sublattice_result"] = process_result_object(node["sublattice_result"]) - - return node - - -def process_transport_graph(tg: _TransportGraph) -> _TransportGraph: - """Convert a 0.110.2-vintage transport graph to a modern transport graph - - Args: - tg: old Transport Graph - - Returns: - the modernized Transport Graph - """ - tg_new = _TransportGraph() - g = tg.get_internal_graph_copy() - for node_id in g.nodes: - app_log.debug(f"Processing node {node_id}") - process_node(g.nodes[node_id]) - - if tg.lattice_metadata: - tg.lattice_metadata = encode_metadata(tg.lattice_metadata) - - tg_new._graph = g - return tg_new - - -def process_lattice(lattice: Lattice) -> Lattice: - """Convert a "legacy" (0.110.2) Lattice to a modern Lattice - - Args: - lattice: old lattice - - Returns: - the modernized lattice - """ - - workflow_function = lattice.workflow_function - lattice.workflow_function = TransportableObject.make_transportable(workflow_function) - inputs = {"args": lattice.args, "kwargs": lattice.kwargs} - lattice.inputs = TransportableObject(inputs) - - workflow_function = lattice.workflow_function.get_deserialized() - - named_args, named_kwargs = get_named_params(workflow_function, lattice.args, lattice.kwargs) - lattice.named_args = TransportableObject(named_args) - lattice.named_kwargs = TransportableObject(named_kwargs) - - metadata = lattice.metadata - - if "workflow_executor" not in metadata: - metadata["workflow_executor"] = "local" - - metadata = encode_metadata(metadata) - lattice.metadata = metadata - lattice.metadata["deps"] = {} - lattice.metadata["call_before"] = [] - lattice.metadata["call_after"] = [] - - lattice.transport_graph = process_transport_graph(lattice.transport_graph) - lattice.transport_graph.lattice_metadata = lattice.metadata - app_log.debug("Processed transport graph") - - # Delete raw inputs - del lattice.__dict__["args"] - del lattice.__dict__["kwargs"] - - return lattice - - -def process_result_object(result_object: Result) -> Result: - """Convert a "legacy" (0.110.2) Result object to a modern Result object - - Args: - result_object: the old Result object - - Returns: - the modernized result object - """ - - app_log.debug(f"Processing result object for dispatch {result_object.dispatch_id}") - process_lattice(result_object._lattice) - app_log.debug("Processed lattice") - - result_object._result = TransportableObject.make_transportable(result_object._result) - tg = result_object.lattice.transport_graph - for n in tg._graph.nodes: - tg.dirty_nodes.append(n) - - del result_object.__dict__["_inputs"] - return result_object - - -def migrate_pickled_result_object(path: str) -> None: - """Save legacy (0.110.2) result pickle file to a DataStore. - - This first transforms certain legacy properties of the result - object and then persists the result object to the datastore. - - Args: - path: path of the `result.pkl` file - """ - - with open(path, "rb") as f: - result_object = pickle.load(f) - - process_result_object(result_object) - update.persist(result_object) diff --git a/covalent_dispatcher/_cli/service.py b/covalent_dispatcher/_cli/service.py index df2299a8f..73d63da0b 100644 --- a/covalent_dispatcher/_cli/service.py +++ b/covalent_dispatcher/_cli/service.py @@ -56,7 +56,6 @@ from covalent._shared_files.config import ConfigManager, get_config, reload_config, set_config from .._db.datastore import DataStore -from .migrate import migrate_pickled_result_object UI_PIDFILE = get_config("dispatcher.cache_dir") + "/ui.pid" UI_LOGFILE = get_config("user_interface.log_dir") + "/covalent_ui.log" @@ -787,17 +786,6 @@ def logs() -> None: ) -@click.command() -@click.argument("result_pickle_path") -def migrate_legacy_result_object(result_pickle_path) -> None: - """Migrate a legacy result object - - Example: `covalent migrate-legacy-result-object result.pkl` - """ - - migrate_pickled_result_object(result_pickle_path) - - # Cluster CLI handlers (client side wrappers for the async handlers exposed # in the dask cluster process) async def _get_cluster_status(uri: str): diff --git a/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py b/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py index 676d0b68c..9871caefb 100644 --- a/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py +++ b/covalent_dispatcher/_dal/db_interfaces/lattice_utils.py @@ -28,10 +28,6 @@ "name", "doc", "inputs", - "named_args", - "named_kwargs", - "cova_imports", - "lattice_imports", } METADATA_KEYS = lattice.LATTICE_METADATA_KEYS.copy() @@ -68,10 +64,6 @@ "workflow_function_string": "function_string_filename", "doc": "docstring_filename", "inputs": "inputs_filename", - "named_args": "named_args_filename", - "named_kwargs": "named_kwargs_filename", - "cova_imports": "cova_imports_filename", - "lattice_imports": "lattice_imports_filename", "executor_data": "executor_data_filename", "workflow_executor_data": "workflow_executor_data_filename", "hooks": "hooks_filename", diff --git a/covalent_dispatcher/_dal/importers/lattice.py b/covalent_dispatcher/_dal/importers/lattice.py index 9e7f97037..55fa50925 100644 --- a/covalent_dispatcher/_dal/importers/lattice.py +++ b/covalent_dispatcher/_dal/importers/lattice.py @@ -24,16 +24,12 @@ from covalent._shared_files.config import get_config from covalent._shared_files.schemas.lattice import ( - LATTICE_COVA_IMPORTS_FILENAME, LATTICE_DOCSTRING_FILENAME, LATTICE_ERROR_FILENAME, LATTICE_FUNCTION_FILENAME, LATTICE_FUNCTION_STRING_FILENAME, LATTICE_HOOKS_FILENAME, LATTICE_INPUTS_FILENAME, - LATTICE_LATTICE_IMPORTS_FILENAME, - LATTICE_NAMED_ARGS_FILENAME, - LATTICE_NAMED_KWARGS_FILENAME, LATTICE_RESULTS_FILENAME, LATTICE_STORAGE_TYPE, LatticeAssets, @@ -71,12 +67,8 @@ def _get_lattice_meta(lat: LatticeSchema, storage_path) -> dict: "function_string_filename": LATTICE_FUNCTION_STRING_FILENAME, "error_filename": LATTICE_ERROR_FILENAME, "inputs_filename": LATTICE_INPUTS_FILENAME, - "named_args_filename": LATTICE_NAMED_ARGS_FILENAME, - "named_kwargs_filename": LATTICE_NAMED_KWARGS_FILENAME, "results_filename": LATTICE_RESULTS_FILENAME, "hooks_filename": LATTICE_HOOKS_FILENAME, - "cova_imports_filename": LATTICE_COVA_IMPORTS_FILENAME, - "lattice_imports_filename": LATTICE_LATTICE_IMPORTS_FILENAME, } kwargs.update(legacy_kwargs) return kwargs diff --git a/covalent_dispatcher/_db/dispatchdb.py b/covalent_dispatcher/_db/dispatchdb.py index 621022777..e78a02d6d 100644 --- a/covalent_dispatcher/_db/dispatchdb.py +++ b/covalent_dispatcher/_db/dispatchdb.py @@ -20,7 +20,6 @@ from datetime import datetime import networkx as nx -import simplejson import covalent.executor as covalent_executor from covalent._shared_files import logger @@ -125,38 +124,6 @@ def result_encoder(obj): return str(obj) -def encode_result(result_obj): - lattice = result_obj.lattice - - result_string = result_obj.encoded_result.json - if not result_string: - result_string = result_obj.encoded_result.object_string - - named_args = {k: v.object_string for k, v in lattice.named_args.items()} - named_kwargs = {k: v.object_string for k, v in lattice.named_kwargs.items()} - result_dict = { - "dispatch_id": result_obj.dispatch_id, - "status": result_obj.status, - "result": result_string, - "start_time": result_obj.start_time, - "end_time": result_obj.end_time, - "results_dir": result_obj.results_dir, - "error": result_obj.error, - "lattice": { - "function_string": lattice.workflow_function_string, - "doc": lattice.__doc__, - "name": lattice.__name__, - "inputs": encode_dict({**named_args, **named_kwargs}), - "metadata": extract_metadata(lattice.metadata), - }, - "graph": extract_graph(result_obj.lattice.transport_graph._graph), - } - - jsonified_result = simplejson.dumps(result_dict, default=result_encoder, ignore_nan=True) - - return jsonified_result - - class DispatchDB: """ Wrapper for the database of workflows. diff --git a/covalent_dispatcher/_db/models.py b/covalent_dispatcher/_db/models.py index 7e0521c35..e61f725ef 100644 --- a/covalent_dispatcher/_db/models.py +++ b/covalent_dispatcher/_db/models.py @@ -92,10 +92,10 @@ class Lattice(Base): # Name of the file containing the serialized input data inputs_filename = Column(Text) - # Name of the file containing the serialized named args + # DEPRECATED: Name of the file containing the serialized named args named_args_filename = Column(Text) - # Name of the file containing the serialized named kwargs + # DEPRECATED: Name of the file containing the serialized named kwargs named_kwargs_filename = Column(Text) # name of the file containing the serialized output @@ -104,10 +104,10 @@ class Lattice(Base): # Name of the file containing the default electron hooks hooks_filename = Column(Text) - # Name of the file containing the set of cova imports + # DEPRECATED: Name of the file containing the set of cova imports cova_imports_filename = Column(Text) - # Name of the file containing the set of lattice imports + # DEPRECATED: Name of the file containing the set of lattice imports lattice_imports_filename = Column(Text) # Results directory (will be deprecated soon) diff --git a/covalent_dispatcher/_db/upsert.py b/covalent_dispatcher/_db/upsert.py index 3bd7f0ca7..70ef99a45 100644 --- a/covalent_dispatcher/_db/upsert.py +++ b/covalent_dispatcher/_db/upsert.py @@ -57,12 +57,8 @@ LATTICE_DOCSTRING_FILENAME = LATTICE_FILENAMES["doc"] LATTICE_ERROR_FILENAME = LATTICE_FILENAMES["error"] LATTICE_INPUTS_FILENAME = LATTICE_FILENAMES["inputs"] -LATTICE_NAMED_ARGS_FILENAME = LATTICE_FILENAMES["named_args"] -LATTICE_NAMED_KWARGS_FILENAME = LATTICE_FILENAMES["named_kwargs"] LATTICE_RESULTS_FILENAME = LATTICE_FILENAMES["result"] LATTICE_HOOKS_FILENAME = LATTICE_FILENAMES["hooks"] -LATTICE_COVA_IMPORTS_FILENAME = LATTICE_FILENAMES["cova_imports"] -LATTICE_LATTICE_IMPORTS_FILENAME = LATTICE_FILENAMES["lattice_imports"] LATTICE_STORAGE_TYPE = "file" CUSTOM_ASSETS_FIELD = "custom_asset_keys" @@ -108,12 +104,8 @@ def _lattice_data(session: Session, result: Result, electron_id: int = None) -> ("doc", LATTICE_DOCSTRING_FILENAME, result.lattice.__doc__), ("error", LATTICE_ERROR_FILENAME, result.error), ("inputs", LATTICE_INPUTS_FILENAME, result.lattice.inputs), - ("named_args", LATTICE_NAMED_ARGS_FILENAME, result.lattice.named_args), - ("named_kwargs", LATTICE_NAMED_KWARGS_FILENAME, result.lattice.named_kwargs), ("result", LATTICE_RESULTS_FILENAME, result._result), ("hooks", LATTICE_HOOKS_FILENAME, result.lattice.metadata["hooks"]), - ("cova_imports", LATTICE_COVA_IMPORTS_FILENAME, result.lattice.cova_imports), - ("lattice_imports", LATTICE_LATTICE_IMPORTS_FILENAME, result.lattice.lattice_imports), ]: digest, size = local_store.store_file(data_storage_path, filename, data) asset_record_kwargs = { @@ -161,12 +153,8 @@ def _lattice_data(session: Session, result: Result, electron_id: int = None) -> "workflow_executor_data": json.dumps(result.lattice.metadata["workflow_executor_data"]), "error_filename": LATTICE_ERROR_FILENAME, "inputs_filename": LATTICE_INPUTS_FILENAME, - "named_args_filename": LATTICE_NAMED_ARGS_FILENAME, - "named_kwargs_filename": LATTICE_NAMED_KWARGS_FILENAME, "results_filename": LATTICE_RESULTS_FILENAME, "hooks_filename": LATTICE_HOOKS_FILENAME, - "cova_imports_filename": LATTICE_COVA_IMPORTS_FILENAME, - "lattice_imports_filename": LATTICE_LATTICE_IMPORTS_FILENAME, "results_dir": results_dir, "root_dispatch_id": result.root_dispatch_id, "python_version": result.lattice.python_version, diff --git a/covalent_dispatcher/_db/write_result_to_db.py b/covalent_dispatcher/_db/write_result_to_db.py index 9d928c1ec..08da952ca 100644 --- a/covalent_dispatcher/_db/write_result_to_db.py +++ b/covalent_dispatcher/_db/write_result_to_db.py @@ -95,12 +95,8 @@ def transaction_insert_lattices_data( workflow_executor_data: str, error_filename: str, inputs_filename: str, - named_args_filename: str, - named_kwargs_filename: str, results_filename: str, hooks_filename: str, - cova_imports_filename: str, - lattice_imports_filename: str, results_dir: str, root_dispatch_id: str, created_at: dt, @@ -133,12 +129,8 @@ def transaction_insert_lattices_data( workflow_executor_data=workflow_executor_data, error_filename=error_filename, inputs_filename=inputs_filename, - named_args_filename=named_args_filename, - named_kwargs_filename=named_kwargs_filename, results_filename=results_filename, hooks_filename=hooks_filename, - cova_imports_filename=cova_imports_filename, - lattice_imports_filename=lattice_imports_filename, results_dir=results_dir, root_dispatch_id=root_dispatch_id, is_active=True, diff --git a/covalent_dispatcher/_service/models.py b/covalent_dispatcher/_service/models.py index 2d2f7db10..18a33a071 100644 --- a/covalent_dispatcher/_service/models.py +++ b/covalent_dispatcher/_service/models.py @@ -41,11 +41,7 @@ class LatticeAssetKey(str, Enum): workflow_function_string = "workflow_function_string" doc = "doc" inputs = "inputs" - named_args = "named_args" - named_kwargs = "named_kwargs" hooks = "hooks" - cova_imports = "cova_imports" - lattice_imports = "lattice_imports" class ElectronAssetKey(str, Enum): diff --git a/covalent_ui/result_webhook.py b/covalent_ui/result_webhook.py index 3caf03c10..f5d311421 100644 --- a/covalent_ui/result_webhook.py +++ b/covalent_ui/result_webhook.py @@ -22,7 +22,7 @@ import covalent_ui.app as ui_server from covalent._results_manager import Result from covalent._shared_files import logger -from covalent._shared_files.utils import get_ui_url +from covalent._shared_files.utils import get_named_params, get_ui_url from covalent_dispatcher._db.dispatchdb import encode_dict, extract_graph, extract_metadata app_log = logger.app_log @@ -78,8 +78,11 @@ def send_draw_request(lattice) -> None: graph = lattice.transport_graph.get_internal_graph_copy() - named_args = lattice.named_args.get_deserialized() - named_kwargs = lattice.named_kwargs.get_deserialized() + inputs = lattice.inputs.get_deserialized() + fn = lattice.workflow_function.get_deserialized() + args = inputs["args"] + kwargs = inputs["kwargs"] + named_args, named_kwargs = get_named_params(fn, args, kwargs) draw_request = json.dumps( { diff --git a/tests/covalent_dispatcher_tests/_cli/cli_test.py b/tests/covalent_dispatcher_tests/_cli/cli_test.py index a50b083da..aac119712 100644 --- a/tests/covalent_dispatcher_tests/_cli/cli_test.py +++ b/tests/covalent_dispatcher_tests/_cli/cli_test.py @@ -61,7 +61,6 @@ def test_cli_commands(): "db", "deploy", "logs", - "migrate-legacy-result-object", "purge", "restart", "start", diff --git a/tests/covalent_dispatcher_tests/_cli/migrate_test.py b/tests/covalent_dispatcher_tests/_cli/migrate_test.py deleted file mode 100644 index 18289bcca..000000000 --- a/tests/covalent_dispatcher_tests/_cli/migrate_test.py +++ /dev/null @@ -1,249 +0,0 @@ -# Copyright 2021 Agnostiq Inc. -# -# This file is part of Covalent. -# -# Licensed under the Apache License 2.0 (the "License"). A copy of the -# License may be obtained with this software package or at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Use of this file is prohibited except in compliance with the License. -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Testing results_dir migration script""" - -import pickle -from pathlib import Path - -from covalent._results_manager import Result -from covalent._shared_files.defaults import attr_prefix, generator_prefix, subscript_prefix -from covalent._workflow.transport import TransportableObject, _TransportGraph -from covalent_dispatcher._cli.migrate import ( - migrate_pickled_result_object, - process_lattice, - process_node, - process_result_object, - process_transport_graph, - to_decoded_electron_collection, -) - -dispatch_id = "652dc473-fa37-4846-85f3-b314204fd432" -sub_dispatch_id = "c333d0b3-8711-4595-9374-421f5482a592" - -basedir = Path(__file__).parent -sample_results_dir = basedir / Path("sample_results_dir") -result_pkl = sample_results_dir / dispatch_id / "result.pkl" - -# task node 0, parameter node 1 -# attribute node 2 -# sublattice node 3 -# task node 4, generator nodes 5, 6 -# subscript node 7 - - -def get_sample_result_object(): - with open(result_pkl, "rb") as f: - result_object = pickle.load(f) - return result_object - - -def compare_nodes_and_edges(tg_orig: _TransportGraph, tg_new: _TransportGraph): - """Convenience function for comparing a legacy transport graph with a processed one.""" - - # Check metadata - for n in tg_new._graph.nodes: - metadata = tg_new._graph.nodes[n]["metadata"] - assert "deps" in metadata - assert "call_before" in metadata - assert "call_after" in metadata - - # Check other node attributes - task_node = tg_new._graph.nodes[0] - orig_output = tg_orig._graph.nodes[0]["output"] - - assert isinstance(task_node["output"], TransportableObject) - assert task_node["output"].get_deserialized().__dict__ == orig_output.__dict__ - - collection_node = tg_new._graph.nodes[1] - assert ( - collection_node["function"].get_serialized() - == TransportableObject(to_decoded_electron_collection).get_serialized() - ) - - param_node = tg_new._graph.nodes[2] - orig_output = tg_orig._graph.nodes[2]["output"] - orig_value = tg_orig._graph.nodes[2]["value"] - - assert isinstance(param_node["output"], TransportableObject) - assert isinstance(param_node["value"], TransportableObject) - assert param_node["output"].get_deserialized() == orig_output - - param_node = tg_new._graph.nodes[3] - orig_output = tg_orig._graph.nodes[3]["output"] - orig_value = tg_orig._graph.nodes[3]["value"] - - assert isinstance(param_node["output"], TransportableObject) - assert isinstance(param_node["value"], TransportableObject) - assert param_node["output"].get_deserialized() == orig_output - - attr_node = tg_new._graph.nodes[4] - orig_output = tg_orig._graph.nodes[4]["output"] - - assert isinstance(attr_node["output"], TransportableObject) - assert attr_node["output"].get_deserialized() == orig_output - assert "attribute_name" not in attr_node - assert attr_prefix not in attr_node["name"] - - subl_node = tg_new._graph.nodes[5] - orig_output = tg_orig._graph.nodes[5]["output"] - - assert isinstance(subl_node["output"], TransportableObject) - assert isinstance(subl_node["sublattice_result"], Result) - assert subl_node["output"].get_deserialized() == orig_output - - task_node = tg_new._graph.nodes[6] - orig_output = tg_orig._graph.nodes[6]["output"] - - assert isinstance(task_node["output"], TransportableObject) - assert task_node["output"].get_deserialized() == orig_output - - gen_node = tg_new._graph.nodes[7] - orig_output = tg_orig._graph.nodes[7]["output"] - - assert isinstance(gen_node["output"], TransportableObject) - assert gen_node["output"].get_deserialized() == orig_output - assert "key" not in gen_node - assert generator_prefix not in gen_node["name"] - - gen_node = tg_new._graph.nodes[8] - orig_output = tg_orig._graph.nodes[8]["output"] - - assert isinstance(gen_node["output"], TransportableObject) - assert gen_node["output"].get_deserialized() == orig_output - assert "key" not in gen_node - assert generator_prefix not in gen_node["name"] - - subscript_node = tg_new._graph.nodes[9] - orig_output = tg_orig._graph.nodes[9]["output"] - - assert isinstance(subscript_node["output"], TransportableObject) - assert subscript_node["output"].get_deserialized() == orig_output - assert "key" not in subscript_node - assert subscript_prefix not in subscript_node["name"] - - assert tg_orig._graph.edges == tg_new._graph.edges - - -def test_process_legacy_node(): - """Test process_node""" - - ro = get_sample_result_object() - ro_orig = get_sample_result_object() - tg = ro.lattice.transport_graph - tg_orig = ro_orig.lattice.transport_graph - - task_node = tg._graph.nodes[0] - orig_output = tg_orig._graph.nodes[0]["output"] - process_node(task_node) - - param_node = tg._graph.nodes[2] - orig_output = tg_orig._graph.nodes[2]["output"] - orig_value = tg_orig._graph.nodes[2]["value"] - process_node(param_node) - - param_node = tg._graph.nodes[3] - orig_output = tg_orig._graph.nodes[3]["output"] - orig_value = tg_orig._graph.nodes[3]["value"] - process_node(param_node) - - attr_node = tg._graph.nodes[4] - orig_output = tg_orig._graph.nodes[4]["output"] - assert "attribute_name" in attr_node - assert attr_prefix in attr_node["name"] - process_node(attr_node) - - subl_node = tg._graph.nodes[5] - orig_output = tg_orig._graph.nodes[5]["output"] - assert "sublattice_result" in subl_node - process_node(subl_node) - - task_node = tg._graph.nodes[6] - orig_output = tg_orig._graph.nodes[6]["output"] - process_node(task_node) - - gen_node = tg._graph.nodes[7] - orig_output = tg_orig._graph.nodes[7]["output"] - assert "key" in gen_node - assert generator_prefix in gen_node["name"] - process_node(gen_node) - - gen_node = tg._graph.nodes[8] - orig_output = tg_orig._graph.nodes[8]["output"] - assert "key" in gen_node - assert generator_prefix in gen_node["name"] - process_node(gen_node) - - subscript_node = tg._graph.nodes[9] - orig_output = tg_orig._graph.nodes[9]["output"] - assert "key" in subscript_node - assert subscript_prefix in subscript_node["name"] - process_node(subscript_node) - - -def test_process_transport_graph(): - """Test process_transport_graph""" - - ro = get_sample_result_object() - - tg = ro.lattice.transport_graph - tg_new = process_transport_graph(tg) - compare_nodes_and_edges(tg, tg_new) - assert "dirty_nodes" in tg_new.__dict__ - - -def test_process_lattice(): - """Test process_lattice""" - - ro = get_sample_result_object() - ro_orig = get_sample_result_object() - lattice = process_lattice(ro._lattice) - lattice.named_args = lattice.named_args.get_deserialized() - lattice.named_kwargs = lattice.named_kwargs.get_deserialized() - - assert isinstance(lattice.workflow_function, TransportableObject) - assert list(lattice.named_args.keys()) == ["z"] - assert list(lattice.named_kwargs.keys()) == ["zz"] - assert lattice.metadata["executor_data"]["short_name"] == "local" - assert lattice.metadata["workflow_executor"] == "local" - assert lattice.metadata["workflow_executor_data"] == {} - assert lattice.metadata["deps"] == {} - assert lattice.metadata["call_before"] == [] - assert lattice.metadata["call_after"] == [] - - -def test_process_result_object(): - """Test process_result_object""" - - ro = get_sample_result_object() - old_inputs = ro._inputs - ro_new = process_result_object(ro) - inputs = ro_new.inputs.get_deserialized() - assert old_inputs["args"] == inputs["args"] - assert old_inputs["kwargs"] == inputs["kwargs"] - assert isinstance(ro_new._result, TransportableObject) - assert "dirty_nodes" in ro_new.lattice.transport_graph.__dict__ - - -def test_migrate_pickled_result_object(mocker): - """Test migrate_pickled_result_object""" - - mock_process_ro = mocker.patch("covalent_dispatcher._cli.migrate.process_result_object") - mock_persist = mocker.patch("covalent_dispatcher._db.update.persist") - - migrate_pickled_result_object(result_pkl) - mock_process_ro.assert_called_once() - mock_persist.assert_called_once() diff --git a/tests/covalent_dispatcher_tests/_dal/lattice_test.py b/tests/covalent_dispatcher_tests/_dal/lattice_test.py index 7a55ac23f..f0f2d9a1a 100644 --- a/tests/covalent_dispatcher_tests/_dal/lattice_test.py +++ b/tests/covalent_dispatcher_tests/_dal/lattice_test.py @@ -83,9 +83,6 @@ def test_lattice_attributes(test_db, mocker): workflow_function = lat.get_value("workflow_function").get_deserialized() assert workflow_function(42) == 42 - res.lattice.lattice_imports == lat.get_value("lattice_imports") - res.lattice.cova_imports == lat.get_value("cova_imports") - def test_lattice_restricted_attributes(test_db, mocker): res = get_mock_result() diff --git a/tests/covalent_dispatcher_tests/_db/update_test.py b/tests/covalent_dispatcher_tests/_db/update_test.py index 567c83bc9..6e7dfb4c9 100644 --- a/tests/covalent_dispatcher_tests/_db/update_test.py +++ b/tests/covalent_dispatcher_tests/_db/update_test.py @@ -154,19 +154,6 @@ def test_result_persist_workflow_1(test_db, result_1, mocker): assert executor_data["short_name"] == le.short_name() assert executor_data["attributes"] == le.__dict__ - saved_named_args = local_store.load_file( - storage_path=lattice_storage_path, filename=lattice_row.named_args_filename - ) - - saved_named_kwargs = local_store.load_file( - storage_path=lattice_storage_path, filename=lattice_row.named_kwargs_filename - ) - saved_named_args_raw = saved_named_args.get_deserialized() - saved_named_kwargs_raw = saved_named_kwargs.get_deserialized() - - assert saved_named_args_raw == {} - assert saved_named_kwargs_raw == {"a": 1, "b": 2} - # Check that the electron records are as expected assert len(electron_rows) == 6 for electron in electron_rows: diff --git a/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py b/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py index 759dbbe1b..310367df7 100644 --- a/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py +++ b/tests/covalent_dispatcher_tests/_db/write_result_to_db_test.py @@ -59,8 +59,6 @@ WORKFLOW_EXECUTOR_DATA_FILENAME = "workflow_executor_data.pkl" ERROR_FILENAME = "error.txt" INPUTS_FILENAME = "inputs.pkl" -NAMED_ARGS_FILENAME = "named_args.pkl" -NAMED_KWARGS_FILENAME = "named_kwargs.pkl" RESULTS_FILENAME = "results.pkl" VALUE_FILENAME = "value.pkl" STDOUT_FILENAME = "stdout.log" @@ -68,8 +66,6 @@ ERROR_FILENAME = "error.log" TRANSPORT_GRAPH_FILENAME = "transport_graph.pkl" HOOKS_FILENAME = "hooks.pkl" -COVA_IMPORTS_FILENAME = "cova_imports.json" -LATTICE_IMPORTS_FILENAME = "lattice_imports.txt" RESULTS_DIR = "/tmp/results" @@ -126,12 +122,8 @@ def get_lattice_kwargs( workflow_executor_data=json.dumps({}), error_filename=ERROR_FILENAME, inputs_filename=INPUTS_FILENAME, - named_args_filename=NAMED_ARGS_FILENAME, - named_kwargs_filename=NAMED_KWARGS_FILENAME, results_filename=RESULTS_FILENAME, hooks_filename=HOOKS_FILENAME, - cova_imports_filename=COVA_IMPORTS_FILENAME, - lattice_imports_filename=LATTICE_IMPORTS_FILENAME, results_dir=RESULTS_DIR, root_dispatch_id="dispatch_1", created_at=None, @@ -159,12 +151,8 @@ def get_lattice_kwargs( "workflow_executor_data": workflow_executor_data, "error_filename": error_filename, "inputs_filename": inputs_filename, - "named_args_filename": named_args_filename, - "named_kwargs_filename": named_kwargs_filename, "results_filename": results_filename, "hooks_filename": hooks_filename, - "cova_imports_filename": cova_imports_filename, - "lattice_imports_filename": lattice_imports_filename, "results_dir": results_dir, "root_dispatch_id": root_dispatch_id, "created_at": created_at, @@ -286,12 +274,8 @@ def test_insert_lattices_data(test_db, mocker): assert lattice.workflow_executor == "dask" assert lattice.error_filename == ERROR_FILENAME assert lattice.inputs_filename == INPUTS_FILENAME - assert lattice.named_args_filename == NAMED_ARGS_FILENAME - assert lattice.named_kwargs_filename == NAMED_KWARGS_FILENAME assert lattice.results_filename == RESULTS_FILENAME assert lattice.hooks_filename == HOOKS_FILENAME - assert lattice.cova_imports_filename == COVA_IMPORTS_FILENAME - assert lattice.lattice_imports_filename == LATTICE_IMPORTS_FILENAME assert lattice.results_dir == RESULTS_DIR assert lattice.root_dispatch_id == f"dispatch_{i + 1}" assert ( diff --git a/tests/covalent_tests/workflow/dispatch_source_test.py b/tests/covalent_tests/workflow/dispatch_source_test.py deleted file mode 100644 index 94d588b6b..000000000 --- a/tests/covalent_tests/workflow/dispatch_source_test.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright 2021 Agnostiq Inc. -# -# This file is part of Covalent. -# -# Licensed under the Apache License 2.0 (the "License"). A copy of the -# License may be obtained with this software package or at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Use of this file is prohibited except in compliance with the License. -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for writing the dispatch_source.py file""" - -import pytest - -from covalent._results_manager.result import _filter_cova_decorators - -COVA_IMPORTS = {"covalent", "lattice", "electron", "ct", "cova", "etron"} - - -INPUT1 = "\n".join( - [ - "@covalent.electron(", - ' executor="local"', - ")", - "def identity(x):", - " return x", - "", - "@covalent.electron", - "@covalent.lattice", - "@covalent.electron(", - ' executor="local"', - ")", - "def double(x):", - " return 2*x", - ] -) - -INPUT2 = INPUT1.replace("covalent", "ct") -INPUT3 = INPUT1.replace("covalent", "cova") -INPUT4 = INPUT1.replace("ct.electron", "electron") -INPUT5 = INPUT1.replace("ct.electron", "etron") -INPUT6 = INPUT1.replace("ct.lattice", "lattice") - -OUTPUT1 = "\n".join( - [ - "# @covalent.electron(", - '# executor="local"', - "# )", - "def identity(x):", - " return x", - "", - "# @covalent.electron", - "# @covalent.lattice", - "# @covalent.electron(", - '# executor="local"', - "# )", - "def double(x):", - " return 2*x", - ] -) - -OUTPUT2 = OUTPUT1.replace("covalent", "ct") -OUTPUT3 = OUTPUT1.replace("covalent", "cova") -OUTPUT4 = OUTPUT1.replace("ct.electron", "electron") -OUTPUT5 = OUTPUT1.replace("ct.electron", "etron") -OUTPUT6 = OUTPUT1.replace("ct.lattice", "lattice") - - -@pytest.mark.parametrize( - "input_str, expected_str", - [ - (INPUT1, OUTPUT1), - (INPUT2, OUTPUT2), - (INPUT3, OUTPUT3), - (INPUT4, OUTPUT4), - (INPUT5, OUTPUT5), - (INPUT6, OUTPUT6), - ], -) -def test_filter_cova_decorators( - input_str, - expected_str, -): - """Test the filtering out of Covalent-related decorators.""" - - output_str = _filter_cova_decorators(input_str, COVA_IMPORTS) - - assert output_str == expected_str diff --git a/tests/covalent_tests/workflow/lattice_serialization_test.py b/tests/covalent_tests/workflow/lattice_serialization_test.py index 72d962d0d..4be41091a 100644 --- a/tests/covalent_tests/workflow/lattice_serialization_test.py +++ b/tests/covalent_tests/workflow/lattice_serialization_test.py @@ -55,7 +55,6 @@ def workflow(x): return f(x) workflow.build_graph(5) - workflow.cova_imports = ["dummy_module"] json_workflow = workflow.serialize_to_json()