diff --git a/echoflow/models/pipeline.py b/echoflow/models/pipeline.py index 7b0df3e..9afc8ff 100644 --- a/echoflow/models/pipeline.py +++ b/echoflow/models/pipeline.py @@ -40,11 +40,9 @@ class Pipeline(BaseModel): Attributes: recipe_name (str): The name of the recipe. - generate_graph (Optional[bool]): Flag to indicate whether to generate a graph. Default is False. stages (List[Stage]): List of stages in the pipeline. """ recipe_name: str - generate_graph: Optional[bool] = False stages: List[Stage] class Recipe(BaseModel): @@ -56,16 +54,12 @@ class Recipe(BaseModel): use_local_dask (bool): Flag to indicate whether to use local Dask. Default is False. n_workers (int): Number of workers to spin up for local cluster. Default is 3 scheduler_address (str): The scheduler address. Default is None. - use_previous_recipe (Optional[bool]): Flag to indicate whether to use a previous recipe. Default is False. - database_path (Optional[str]): The path to the database. Default is an empty string. pipeline (List[Pipeline]): List of pipelines in the recipe. """ active_recipe: str use_local_dask: bool = False n_workers: int = 3 scheduler_address: str = None - use_previous_recipe: Optional[bool] = False - database_path: Optional[str] = '' pipeline: List[Pipeline] diff --git a/echoflow/stages/echoflow.py b/echoflow/stages/echoflow.py index 70172ed..eaf6c81 100644 --- a/echoflow/stages/echoflow.py +++ b/echoflow/stages/echoflow.py @@ -45,6 +45,7 @@ from echoflow.utils.config_utils import get_storage_options, load_block from echoflow.stages.echoflow_trigger import echoflow_trigger +from echoflow.utils.file_utils import format_windows_path def check_internet_connection(host="8.8.8.8", port=53, timeout=5): @@ -101,7 +102,7 @@ def echoflow_create_prefect_profile( set_active=True, ) """ - config_path = os.path.expanduser("~/.prefect/profiles.toml") + config_path = os.path.expanduser(format_windows_path("~/.prefect/profiles.toml")) with open(config_path, "r") as f: config = toml.load(f) @@ -228,16 +229,19 @@ def echoflow_start( print("Pipeline execution result:", result) """ + print("\nChecking Configuration") # Try loading the Prefect config block try: echoflow_config = load_block( name="echoflow-config", type=StorageType.ECHOFLOW) except ValueError as e: - print("No Prefect Cloud Configuration found. Creating Prefect Local named 'echoflow-local'. Please add your prefect cloud ") + print("\nNo Prefect Cloud Configuration found. Creating Prefect Local named 'echoflow-local'. Please add your prefect cloud ") # Add local profile to echoflow config but keep default as active since user might configure using Prefect setup echoflow_create_prefect_profile( name="echoflow-local", set_active=False) + print("\nConfiguration Check Completed") + print("\nChecking Connection to Prefect Server") # Check if program can connect to the Internet. if check_internet_connection() == False: active_profile = get_active_profile() @@ -246,10 +250,12 @@ def echoflow_start( "Please connect to internet or consider switching to a local prefect environment. This can be done by calling load_profile(name_of_local_prefect_profile or 'echoflow-local' if no prefect profile was created) method." ) else: - print("Using a local prefect environment. To go back to your cloud workspace call load_profile() with of your cloud profile.") + print("\nUsing a local prefect environment. To go back to your cloud workspace call load_profile() with of your cloud profile.") if isinstance(storage_options, Block): storage_options = get_storage_options(storage_options=storage_options) + + print("\nStarting the Pipeline") # Call the actual pipeline return echoflow_trigger( dataset_config=dataset_config, diff --git a/echoflow/stages/echoflow_trigger.py b/echoflow/stages/echoflow_trigger.py index 9c1dad6..7ce0a68 100644 --- a/echoflow/stages/echoflow_trigger.py +++ b/echoflow/stages/echoflow_trigger.py @@ -73,7 +73,6 @@ def echoflow_trigger( ) print("Pipeline output:", pipeline_output) """ - print(storage_options) if storage_options is not None: # Check if storage_options is a Block (fsspec storage) and convert it to a dictionary if isinstance(storage_options, dict) and storage_options.get("block_name") is not None: @@ -114,15 +113,17 @@ def echoflow_trigger( else: logging_config_dict = logging_config + print("\n\nDataset Configuration Loaded For This Run") + print("-"*50) print(dataset_config_dict) + print("\n\nPipeline Configuration Loaded For This Run") + print("-"*50) print(pipeline_config_dict) # Do any config checks on config dicts # Should be done in pydantic class check_config(dataset_config_dict, pipeline_config_dict) pipeline = Recipe(**pipeline_config_dict) - dataset = Dataset(**dataset_config_dict) - - + dataset = Dataset(**dataset_config_dict) if options.get('storage_options_override') is not None and options['storage_options_override'] is False: storage_options = {} @@ -161,6 +162,9 @@ def echoflow_trigger( dataset.args.storage_options_dict = storage_options dataset.args.transect.storage_options_dict = storage_options + print("\nInitiliazing Singleton Object") Singleton_Echoflow(log_file=logging_config_dict, pipeline=pipeline, dataset=dataset) + + print("\nReading Configurations") return init_flow(config=dataset, pipeline=pipeline, json_data_path=json_data_path) diff --git a/echoflow/stages/subflows/compute_MVBS.py b/echoflow/stages/subflows/compute_MVBS.py index 459616c..114f808 100644 --- a/echoflow/stages/subflows/compute_MVBS.py +++ b/echoflow/stages/subflows/compute_MVBS.py @@ -137,7 +137,7 @@ def process_compute_MVBS( ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict) xr_d_mvbs = ep.commongrid.compute_MVBS( ds_Sv=ed_list[0], - range_meter_bin=stage.external_params.get( + range_bin=stage.external_params.get( "range_meter_bin"), ping_time_bin=stage.external_params.get("ping_time_bin") ) diff --git a/echoflow/stages/subflows/initialization_flow.py b/echoflow/stages/subflows/initialization_flow.py index f082c7c..84900b3 100644 --- a/echoflow/stages/subflows/initialization_flow.py +++ b/echoflow/stages/subflows/initialization_flow.py @@ -69,6 +69,7 @@ def init_flow( if config.args.raw_json_path is None: total_files = glob_all_files(config=config) file_dicts = parse_raw_paths(all_raw_files=total_files, config=config) + print("Files To Be Processed", total_files) data = club_raw_files( config=config, raw_dicts=file_dicts, @@ -110,6 +111,7 @@ def init_flow( prefect_config_dict["task_runner"] = DaskTaskRunner( address=client.scheduler.address) print(client) + print("Scheduler at : ", client.scheduler.address) function = function.with_options(**prefect_config_dict) print("-"*50) diff --git a/echoflow/tests/AWS/datastore.yaml b/echoflow/tests/AWS/datastore.yaml index 35a1856..e1f5a94 100644 --- a/echoflow/tests/AWS/datastore.yaml +++ b/echoflow/tests/AWS/datastore.yaml @@ -11,7 +11,7 @@ args: anon: true transect: file: ./EK60_SH1707_Shimada.txt - default_transect_num: 2017 + default_transect_num: 2017 json_export: true output: # Output arguments urlpath: diff --git a/echoflow/tests/AWS/pipeline.yaml b/echoflow/tests/AWS/pipeline.yaml index 975f3a5..9cb6211 100644 --- a/echoflow/tests/AWS/pipeline.yaml +++ b/echoflow/tests/AWS/pipeline.yaml @@ -23,5 +23,5 @@ pipeline: options: use_offline: true external_params: - range_meter_bin: 20 + range_meter_bin: 20m ping_time_bin: 20S diff --git a/echoflow/tests/offline/LocalEchoflowDemo.ipynb b/echoflow/tests/offline/LocalEchoflowDemo.ipynb index d790045..1c468f2 100644 --- a/echoflow/tests/offline/LocalEchoflowDemo.ipynb +++ b/echoflow/tests/offline/LocalEchoflowDemo.ipynb @@ -192,7 +192,7 @@ " anon: true\n", " transect:\n", " file: ./EK60_SH1707_Shimada.txt\n", - " default_transect_num: 2017\n", + " default_transect_num: 2017\n", " json_export: true \n", "output: \n", " urlpath: ./echoflow-output\n", @@ -272,7 +272,7 @@ "transect = open('EK60_SH1707_Shimada.txt','w')\n", "i = 0\n", "for f in files:\n", - " if i == 100:\n", + " if i == 10:\n", " break\n", " transect.write(f+\".raw\\n\")\n", " i = i + 1\n", @@ -289,40 +289,519 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "\n", - "--------------------------------------------------\n", "\n", - "Executing stage : name='echoflow_open_raw' module='echoflow.stages.subflows.open_raw' external_params=None options={'save_raw_file': True, 'use_raw_offline': True, 'use_offline': True} prefect_config=None\n", - "[Errno 111] Connection refused\n", + "Checking Configuration\n", + "\n", + "Configuration Check Completed\n", + "Checking Connection to Prefect Server\n", + "\n", + "Starting the Pipeline\n", "\n", - "Completed stage name='echoflow_open_raw' module='echoflow.stages.subflows.open_raw' external_params=None options={'save_raw_file': True, 'use_raw_offline': True, 'use_offline': True} prefect_config=None\n", + "\n", + "Dataset Configuration Loaded For This Run\n", "--------------------------------------------------\n", - "\n", + "{'name': 'Bell_M._Shimada-SH1707-EK60', 'sonar_model': 'EK60', 'raw_regex': '(.*)-?D(?P\\\\w{1,8})-T(?P