Skip to content

Commit

Permalink
Merge pull request #37 from OSOceanAcoustics/main
Browse files Browse the repository at this point in the history
Dev Merge
  • Loading branch information
Sohambutala authored Jan 10, 2024
2 parents 90a1c39 + 460accd commit da728e1
Show file tree
Hide file tree
Showing 247 changed files with 42,356 additions and 1,396 deletions.
6 changes: 0 additions & 6 deletions echoflow/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ class Pipeline(BaseModel):
Attributes:
recipe_name (str): The name of the recipe.
generate_graph (Optional[bool]): Flag to indicate whether to generate a graph. Default is False.
stages (List[Stage]): List of stages in the pipeline.
"""
recipe_name: str
generate_graph: Optional[bool] = False
stages: List[Stage]

class Recipe(BaseModel):
Expand All @@ -56,16 +54,12 @@ class Recipe(BaseModel):
use_local_dask (bool): Flag to indicate whether to use local Dask. Default is False.
n_workers (int): Number of workers to spin up for local cluster. Default is 3
scheduler_address (str): The scheduler address. Default is None.
use_previous_recipe (Optional[bool]): Flag to indicate whether to use a previous recipe. Default is False.
database_path (Optional[str]): The path to the database. Default is an empty string.
pipeline (List[Pipeline]): List of pipelines in the recipe.
"""
active_recipe: str
use_local_dask: bool = False
n_workers: int = 3
scheduler_address: str = None
use_previous_recipe: Optional[bool] = False
database_path: Optional[str] = ''
pipeline: List[Pipeline]


12 changes: 9 additions & 3 deletions echoflow/stages/echoflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from echoflow.utils.config_utils import get_storage_options, load_block

from echoflow.stages.echoflow_trigger import echoflow_trigger
from echoflow.utils.file_utils import format_windows_path


def check_internet_connection(host="8.8.8.8", port=53, timeout=5):
Expand Down Expand Up @@ -101,7 +102,7 @@ def echoflow_create_prefect_profile(
set_active=True,
)
"""
config_path = os.path.expanduser("~/.prefect/profiles.toml")
config_path = os.path.expanduser(format_windows_path("~/.prefect/profiles.toml"))
with open(config_path, "r") as f:
config = toml.load(f)

Expand Down Expand Up @@ -228,16 +229,19 @@ def echoflow_start(
print("Pipeline execution result:", result)
"""

print("\nChecking Configuration")
# Try loading the Prefect config block
try:
echoflow_config = load_block(
name="echoflow-config", type=StorageType.ECHOFLOW)
except ValueError as e:
print("No Prefect Cloud Configuration found. Creating Prefect Local named 'echoflow-local'. Please add your prefect cloud ")
print("\nNo Prefect Cloud Configuration found. Creating Prefect Local named 'echoflow-local'. Please add your prefect cloud ")
# Add local profile to echoflow config but keep default as active since user might configure using Prefect setup
echoflow_create_prefect_profile(
name="echoflow-local", set_active=False)
print("\nConfiguration Check Completed")

print("\nChecking Connection to Prefect Server")
# Check if program can connect to the Internet.
if check_internet_connection() == False:
active_profile = get_active_profile()
Expand All @@ -246,10 +250,12 @@ def echoflow_start(
"Please connect to internet or consider switching to a local prefect environment. This can be done by calling load_profile(name_of_local_prefect_profile or 'echoflow-local' if no prefect profile was created) method."
)
else:
print("Using a local prefect environment. To go back to your cloud workspace call load_profile(<name>) with <name> of your cloud profile.")
print("\nUsing a local prefect environment. To go back to your cloud workspace call load_profile(<name>) with <name> of your cloud profile.")

if isinstance(storage_options, Block):
storage_options = get_storage_options(storage_options=storage_options)

print("\nStarting the Pipeline")
# Call the actual pipeline
return echoflow_trigger(
dataset_config=dataset_config,
Expand Down
12 changes: 8 additions & 4 deletions echoflow/stages/echoflow_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def echoflow_trigger(
)
print("Pipeline output:", pipeline_output)
"""
print(storage_options)
if storage_options is not None:
# Check if storage_options is a Block (fsspec storage) and convert it to a dictionary
if isinstance(storage_options, dict) and storage_options.get("block_name") is not None:
Expand Down Expand Up @@ -114,15 +113,17 @@ def echoflow_trigger(
else:
logging_config_dict = logging_config

print("\n\nDataset Configuration Loaded For This Run")
print("-"*50)
print(dataset_config_dict)
print("\n\nPipeline Configuration Loaded For This Run")
print("-"*50)
print(pipeline_config_dict)
# Do any config checks on config dicts
# Should be done in pydantic class
check_config(dataset_config_dict, pipeline_config_dict)
pipeline = Recipe(**pipeline_config_dict)
dataset = Dataset(**dataset_config_dict)


dataset = Dataset(**dataset_config_dict)

if options.get('storage_options_override') is not None and options['storage_options_override'] is False:
storage_options = {}
Expand Down Expand Up @@ -161,6 +162,9 @@ def echoflow_trigger(
dataset.args.storage_options_dict = storage_options
dataset.args.transect.storage_options_dict = storage_options

print("\nInitiliazing Singleton Object")
Singleton_Echoflow(log_file=logging_config_dict,
pipeline=pipeline, dataset=dataset)

print("\nReading Configurations")
return init_flow(config=dataset, pipeline=pipeline, json_data_path=json_data_path)
2 changes: 1 addition & 1 deletion echoflow/stages/subflows/compute_MVBS.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def process_compute_MVBS(
ed_list = get_zarr_list.fn(transect_data=out_data, storage_options=config.output.storage_options_dict)
xr_d_mvbs = ep.commongrid.compute_MVBS(
ds_Sv=ed_list[0],
range_meter_bin=stage.external_params.get(
range_bin=stage.external_params.get(
"range_meter_bin"),
ping_time_bin=stage.external_params.get("ping_time_bin")
)
Expand Down
2 changes: 2 additions & 0 deletions echoflow/stages/subflows/initialization_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def init_flow(
if config.args.raw_json_path is None:
total_files = glob_all_files(config=config)
file_dicts = parse_raw_paths(all_raw_files=total_files, config=config)
print("Files To Be Processed", total_files)
data = club_raw_files(
config=config,
raw_dicts=file_dicts,
Expand Down Expand Up @@ -110,6 +111,7 @@ def init_flow(
prefect_config_dict["task_runner"] = DaskTaskRunner(
address=client.scheduler.address)
print(client)
print("Scheduler at : ", client.scheduler.address)

function = function.with_options(**prefect_config_dict)
print("-"*50)
Expand Down
2 changes: 1 addition & 1 deletion echoflow/tests/AWS/datastore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ args:
anon: true
transect:
file: ./EK60_SH1707_Shimada.txt
default_transect_num: 2017
default_transect_num: 2017
json_export: true
output: # Output arguments
urlpath: <YOUR-S3-BUCKET>
Expand Down
2 changes: 1 addition & 1 deletion echoflow/tests/AWS/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ pipeline:
options:
use_offline: true
external_params:
range_meter_bin: 20
range_meter_bin: 20m
ping_time_bin: 20S
Loading

0 comments on commit da728e1

Please sign in to comment.