Skip to content

Commit

Permalink
Service and flow execution logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Sohambutala committed Oct 23, 2024
1 parent a8d8ced commit 36d56c8
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 90 deletions.
56 changes: 45 additions & 11 deletions echodataflow/deployment/flow.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,65 @@
import json
import os
from typing import Dict, List
from prefect import flow
from fastapi.encoders import jsonable_encoder
from prefect import Task, flow
from prefect_dask import get_dask_client
from echodataflow.models.deployment.stage import Stage
from echodataflow.models.output_model import EchodataflowObject
from echodataflow.utils import log_util
from echodataflow.utils.config_utils import club_raw_files, parse_raw_paths
from echodataflow.utils.function_utils import dynamic_function_call

@flow(name='Sv_flow')
def Sv_flow(stage: Stage):

# call source extraction, which converts source into intermediate data representation
# puts in dict of dict of list of Object
def_metadata: Dict[str, Dict[str, List[EchodataflowObject]]] = stage.source.extract_source(stage.options)
raw_files: List[str] = stage.source.extract_source()

# Group data based on given file(s)
file_dicts = parse_raw_paths(all_raw_files=raw_files, config=stage.source, group=stage.group)

log_util.log(
msg={
"msg": f"Files To Be Processed",
"mod_name": __file__,
"func_name": "Init Flow",
}
)
log_util.log(
msg={
"msg": json.dumps(jsonable_encoder(file_dicts)),
"mod_name": __file__,
"func_name": "Init Flow",
}
)
edf_metadata = club_raw_files(
config=stage.group,
raw_dicts=file_dicts,
raw_url_file=None,
json_storage_options=None,
)

# For each file, call tasks

for file in def_metadata:
# TODO: Logic to determine when and where to call the grouping and super grouping logic
for task in stage.tasks:
for group in edf_metadata:
for fdict in group:

task_fn = dynamic_function_call(task.module, task.name)
file_info = os.path.basename(fdict.get("file_path")).split(".", maxsplit=1)

data = task_fn.submit(task, data, stage.source.storage_options._storage_options_dict)
data_dict = {"source_file_path": fdict.get("file_path"),
"filename": file_info[0],
"file_extension": file_info[-1]}

for task in stage.tasks:


stage.destination.store_result(data=data, engine="zarr")
task_fn = dynamic_function_call(task.module, task.name)
if not isinstance(task_fn, Task):
raise ValueError(f"Task {task.name} is not a valid Task. Annotate tasks with @task decorator")

data_dict = task_fn.submit(task, data_dict, stage.source.storage_options._storage_options_dict)

data_dict = data_dict.result()
stage.destination.store_result(data=data_dict, engine="zarr")

return True
69 changes: 38 additions & 31 deletions echodataflow/deployment/service.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from typing import List, Dict, Any, Optional
from distributed import Client
from distributed import Client, LocalCluster
from prefect import flow
from prefect_dask import get_dask_client
from prefect_dask.task_runners import DaskTaskRunner

from echodataflow.aspects.singleton_echodataflow import Singleton_Echodataflow
from echodataflow.models.deployment.deployment import Cluster
from echodataflow.models.deployment.stage import Stage
from echodataflow.utils import log_util
from echodataflow.utils.config_utils import get_prefect_config_dict
from echodataflow.utils.function_utils import dynamic_function_call

from echodataflow.deployment.flow import edf_flow
from echodataflow.deployment.flow import Sv_flow

@flow(name="Echodataflow-Service")
def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bool:
def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]] = None, cluster: Optional[Cluster] = None) -> bool:
"""
Service for EDF
"""
Expand All @@ -25,44 +27,49 @@ def edf_service(stages: List[Stage], edf_logger: Optional[Dict[str, Any]]) -> bo
Singleton_Echodataflow(log_file=edf_logger)
gea = Singleton_Echodataflow().get_instance()

global_dask = False
close_cluster_allowed = True

if cluster:
global_dask = True
if cluster.address is None:
client = Client(LocalCluster(n_workers=cluster.workers, nanny=True).address)
else:
client = Client(cluster.address)
close_cluster_allowed = False

for stage in stages:

if stage.stage_params:
stage.prefect_config = stage.stage_params.get("prefect_config", None)
stage.options = stage.stage_params.get("options", None)

if stage.module:
master_flow = dynamic_function_call(stage.module, stage.name)
else:
master_flow = edf_flow

prefect_config_dict = get_prefect_config_dict(stage)
if stage.module:
master_flow = dynamic_function_call(stage.module, stage.name)
else:
master_flow = Sv_flow

prefect_config_dict = get_prefect_config_dict(stage)

prefect_config_dict["name"] = stage.name
prefect_config_dict["flow_run_name"] = stage.name

if global_dask:
prefect_config_dict["task_runner"] = DaskTaskRunner(address=client.scheduler.address)

if client is not None:
client.subscribe_topic("echodataflow", lambda event: log_util.log_event(event))

prefect_config_dict["name"] = stage.name
prefect_config_dict["flow_run_name"] = stage.name
if prefect_config_dict.get("task_runner", None):
stage.options["use_dask"] = True

if prefect_config_dict.get("task_runner", None):
stage.options["use_dask"] = True
if client is not None:
client.subscribe_topic("echodataflow", lambda event: log_util.log_event(event))
else:
with get_dask_client() as cl:
cl.subscribe_topic("echodataflow", lambda event: log_util.log_event(event))

# master_flow is expected to store the output to the destination
# Source for the next flows to match with the destination of previous flows if two stages are connected
master_flow.with_options(**prefect_config_dict)(stage)
# master_flow is expected to store the output to the destination
# Source for the next flows to match with the destination of previous flows if two stages are connected
master_flow.with_options(**prefect_config_dict)(stage)

for stage in stages:
if not stage.options.get("save_offline", True):
# cleanup
stage.destination.cleanup()

# close cluster if required
if client:
if client and close_cluster_allowed:
client.close()



return True

return True
12 changes: 7 additions & 5 deletions echodataflow/models/deployment/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Source(BaseModel):
parameters: Optional[Parameters] = Field(None, description="Parameters to apply to the source.")
window_options: Optional[Dict[str, Any]] = Field(None, description="Window options for the source.")
storage_options: Optional[StorageOptions] = Field(None, description="Storage options for the source.")
raw_regex: Optional[str] = Field("(.*)-?D(?P<date>\w{1,8})-T(?P<time>\w{1,6})", description="Raw regex pattern for the source.")

def render_path(self) -> Union[str, Dict[str, List[str]]]:
"""
Expand Down Expand Up @@ -74,17 +75,18 @@ def _render_template(self, template_str: str, env: jinja2.Environment) -> str:
return template.render(self.parameters.dict() if self.parameters else {})

def extract_source(self):
path = self.render_path()

from echodataflow.utils.config_utils import glob_all_files

if self.window_options is not None:

# Treat source as a folder and iterate over files to collect and group relevant files
pass
else:
# return single path
return path

pass

total_files = glob_all_files(config=self)
return total_files


class Config:
Expand Down
14 changes: 7 additions & 7 deletions echodataflow/models/deployment/storage_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ class StorageOptions(BaseModel):
block_name: Optional[str] = Field(None, description="The name of the storage block.")
anon: StrictBool = Field(False, description="Whether to use anonymous access. Default is False.")

@property
@functools.lru_cache()
_cached_options: Optional[dict] = None # Cache storage

@property
def _storage_options_dict(self):
"""
Extracts storage options using the handle_storage_options function.
Returns:
dict: A dictionary representation of the storage options.
"""
from echodataflow.utils.filesystem_utils import handle_storage_options
# Pass the StorageOptions instance itself to handle_storage_options to extract the dictionary
return handle_storage_options(self)
if self._cached_options is None:
from echodataflow.utils.filesystem_utils import handle_storage_options
self._cached_options = handle_storage_options(self)
return self._cached_options

# Model-wide validator to ensure logical dependencies between fields
@model_validator(mode='before')
Expand All @@ -61,9 +63,7 @@ def validate_storage_options(cls, values):
raise ValueError(f"A block_name must be provided when storage type is set to '{storage_type}'.")

return values



class Config:
use_enum_values = True

Expand Down
27 changes: 15 additions & 12 deletions echodataflow/tasklib/echopype.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from echopype.commongrid import compute_MVBS
from echopype.calibrate import compute_Sv
from prefect import task
from prefect_dask import get_dask_client

from echodataflow.models.deployment.stage import Task
from echodataflow.models.deployment.storage_options import StorageOptions
Expand All @@ -11,35 +12,37 @@
@task
def edf_open_raw(task: Task, data: Any, storage_options: Optional[Dict[str, Any]] = {}):

with get_dask_client() as dclient:
print(dclient.scheduler.address)

if task.task_params is not None:

# Validate task params if required

# TODO: Validate task params if required
ed = open_raw(
raw_file=data,
raw_file=data.get('source_file_path'),
sonar_model=task.task_params.get('sonar_model', None),
storage_options=storage_options,
)
else:
raise ValueError("task_params are required for edf_open_raw")

return {'data': ed}
data['data'] = ed

return data

@task
def edf_sv(task: Task, data: Dict[str, Any], storage_options: Optional[Dict[str, Any]] = {}):
def edf_Sv(task: Task, data: Dict[str, Any], storage_options: Optional[Dict[str, Any]] = {}):

if task.task_params is not None:

# Validate task params if required

ed = compute_Sv(
Sv=data.get('data'),
sonar_model=task.task_params.get('sonar_model', None),
storage_options=storage_options,
Sv = compute_Sv(
echodata=data.get('data', None)
)
else:
raise ValueError("task_params are required for edf_open_raw")
raise ValueError("task_params are required for compute_Sv")

return {'data': ed, 'output1': ed}
data['data'] = Sv
return data


Loading

0 comments on commit 36d56c8

Please sign in to comment.