diff --git a/dev-docker-compose.yaml b/dev-docker-compose.yaml index ccf21aa..5331647 100644 --- a/dev-docker-compose.yaml +++ b/dev-docker-compose.yaml @@ -42,12 +42,13 @@ services: context: . command: /celery volumes: - - ./volumes/uploads:${UPLOADED_FILES_DIR_PATH} + - ./volumes/uploads:/volumes/uploads env_file: - .env environment: - RABBITMQ_HOST=rabbitmq - POSTGRES_HOST=postgres + - UPLOADED_FILES_DIR_PATH=/volumes/uploads depends_on: - rabbitmq restart: always @@ -58,12 +59,13 @@ services: context: . command: /flower volumes: - - ./volumes/uploads:${UPLOADED_FILES_DIR_PATH} + - ./volumes/uploads:/volumes/uploads env_file: - .env environment: - RABBITMQ_HOST=rabbitmq - POSTGRES_HOST=postgres + - UPLOADED_FILES_DIR_PATH=/volumes/uploads depends_on: - rabbitmq - celery diff --git a/internal/domain/task/__init__.py b/internal/domain/task/__init__.py index 281d711..2e40fd9 100644 --- a/internal/domain/task/__init__.py +++ b/internal/domain/task/__init__.py @@ -1,3 +1,4 @@ from internal.domain.task.entities import FdTask # noqa: F401 from internal.domain.task.entities import AfdTask # noqa: F401 from internal.domain.task.entities import AcTask # noqa: F401 +from internal.domain.task.entities import IndTask # noqa: F401 diff --git a/internal/domain/task/entities/__init__.py b/internal/domain/task/entities/__init__.py index 7639e55..9f13224 100644 --- a/internal/domain/task/entities/__init__.py +++ b/internal/domain/task/entities/__init__.py @@ -3,6 +3,8 @@ from internal.domain.task.entities.fd import FdTask from internal.domain.task.entities.afd import AfdTask from internal.domain.task.entities.ac import AcTask +from internal.domain.task.entities.ind import IndTask +from internal.domain.task.entities.aind import AindTask from internal.domain.task.value_objects import PrimitiveName @@ -26,4 +28,8 @@ def match_task_by_primitive_name(primitive_name: PrimitiveName): return AfdTask() case PrimitiveName.ac: return AcTask() + case PrimitiveName.ind: + return IndTask() + case PrimitiveName.aind: + return AindTask() assert_never(primitive_name) diff --git a/internal/domain/task/entities/aind/__init__.py b/internal/domain/task/entities/aind/__init__.py new file mode 100644 index 0000000..e240fc3 --- /dev/null +++ b/internal/domain/task/entities/aind/__init__.py @@ -0,0 +1 @@ +from internal.domain.task.entities.aind.aind_task import AindTask # noqa: F401 diff --git a/internal/domain/task/entities/aind/aind_task.py b/internal/domain/task/entities/aind/aind_task.py new file mode 100644 index 0000000..385f9c1 --- /dev/null +++ b/internal/domain/task/entities/aind/aind_task.py @@ -0,0 +1,55 @@ +from desbordante.ind import IndAlgorithm +from desbordante.aind.algorithms import Mind, Spider +from internal.domain.task.entities.task import Task +from internal.domain.task.value_objects import PrimitiveName, IncorrectAlgorithmName +from internal.domain.task.value_objects.aind import ( + AindAlgoName, + AindTaskConfig, + AindTaskResult, +) +from internal.domain.task.value_objects.aind import AindAlgoResult, AindModel + + +class AindTask(Task[IndAlgorithm, AindTaskConfig, AindTaskResult]): + """ + Task class for Inclusion Dependency (AIND) profiling. + + This class executes various AIND algorithms and processes the results + into the appropriate format. It implements abstract methods from the Task base class. + + Methods: + - _match_algo_by_name(algo_name: AindAlgoName) -> AindAlgorithm: + Match AIND algorithm by its name. + - _collect_result(algo: AindAlgorithm) -> AindTaskResult: + Process the output of the AIND algorithm and return the result. + """ + + def _collect_result(self, algo: IndAlgorithm) -> AindTaskResult: + """ + Collect and process the AIND result. + + Args: + algo (AindAlgorithm): AIND algorithm to process. + Returns: + AindTaskResult: Processed result containing AINDs. + """ + ainds = algo.get_inds() + algo_result = AindAlgoResult(inds=[AindModel.from_ind(aind) for aind in ainds]) + return AindTaskResult(primitive_name=PrimitiveName.aind, result=algo_result) + + def _match_algo_by_name(self, algo_name: str) -> IndAlgorithm: + """ + Match the inclusion dependency algorithm by name. + + Args: + algo_name (AindAlgoName): Name of the AIND algorithm. + Returns: + AindAlgorithm: The corresponding algorithm instance. + """ + match algo_name: + case AindAlgoName.Mind: + return Mind() + case AindAlgoName.Spider: + return Spider() + case _: + raise IncorrectAlgorithmName(algo_name, "AIND") diff --git a/internal/domain/task/entities/ind/__init__.py b/internal/domain/task/entities/ind/__init__.py new file mode 100644 index 0000000..9026b3e --- /dev/null +++ b/internal/domain/task/entities/ind/__init__.py @@ -0,0 +1 @@ +from internal.domain.task.entities.ind.ind_task import IndTask # noqa: F401 diff --git a/internal/domain/task/entities/ind/ind_task.py b/internal/domain/task/entities/ind/ind_task.py new file mode 100644 index 0000000..b33155b --- /dev/null +++ b/internal/domain/task/entities/ind/ind_task.py @@ -0,0 +1,57 @@ +from desbordante.ind import IndAlgorithm +from desbordante.ind.algorithms import Faida, Mind, Spider +from internal.domain.task.entities.task import Task +from internal.domain.task.value_objects import PrimitiveName, IncorrectAlgorithmName +from internal.domain.task.value_objects.ind import ( + IndAlgoName, + IndTaskConfig, + IndTaskResult, +) +from internal.domain.task.value_objects.ind import IndAlgoResult, IndModel + + +class IndTask(Task[IndAlgorithm, IndTaskConfig, IndTaskResult]): + """ + Task class for Inclusion Dependency (IND) profiling. + + This class executes various IND algorithms and processes the results + into the appropriate format. It implements abstract methods from the Task base class. + + Methods: + - _match_algo_by_name(algo_name: IndAlgoName) -> IndAlgorithm: + Match IND algorithm by its name. + - _collect_result(algo: IndAlgorithm) -> IndTaskResult: + Process the output of the IND algorithm and return the result. + """ + + def _collect_result(self, algo: IndAlgorithm) -> IndTaskResult: + """ + Collect and process the IND result. + + Args: + algo (IndAlgorithm): IND algorithm to process. + Returns: + IndTaskResult: Processed result containing INDs. + """ + inds = algo.get_inds() + algo_result = IndAlgoResult(inds=[IndModel.from_ind(ind) for ind in inds]) + return IndTaskResult(primitive_name=PrimitiveName.ind, result=algo_result) + + def _match_algo_by_name(self, algo_name: str) -> IndAlgorithm: + """ + Match the inclusion dependency algorithm by name. + + Args: + algo_name (IndAlgoName): Name of the IND algorithm. + Returns: + IndAlgorithm: The corresponding algorithm instance. + """ + match algo_name: + case IndAlgoName.Faida: + return Faida() + case IndAlgoName.Mind: + return Mind() + case IndAlgoName.Spider: + return Spider() + case _: + raise IncorrectAlgorithmName(algo_name, "IND") diff --git a/internal/domain/task/entities/task.py b/internal/domain/task/entities/task.py index cd8b00e..9d8e04f 100644 --- a/internal/domain/task/entities/task.py +++ b/internal/domain/task/entities/task.py @@ -60,6 +60,10 @@ def execute(self, table: pandas.DataFrame, task_config: C) -> R: algo_config = task_config.config options = algo_config.model_dump(exclude_unset=True, exclude={"algo_name"}) algo = self._match_algo_by_name(algo_config.algo_name) - algo.load_data(table=table) + # TODO: IND, AIND requires multiple tables + try: + algo.load_data(table=table) + except desbordante.ConfigurationError: + algo.load_data(tables=[table]) algo.execute(**options) return self._collect_result(algo) diff --git a/internal/domain/task/value_objects/__init__.py b/internal/domain/task/value_objects/__init__.py index 36447df..4432e32 100644 --- a/internal/domain/task/value_objects/__init__.py +++ b/internal/domain/task/value_objects/__init__.py @@ -4,6 +4,8 @@ from internal.domain.task.value_objects.afd import AfdTaskConfig, AfdTaskResult from internal.domain.task.value_objects.fd import FdTaskConfig, FdTaskResult from internal.domain.task.value_objects.ac import AcTaskConfig, AcTaskResult +from internal.domain.task.value_objects.ind import IndTaskConfig, IndTaskResult +from internal.domain.task.value_objects.aind import AindTaskConfig, AindTaskResult from internal.domain.task.value_objects.config import TaskConfig # noqa: F401 from internal.domain.task.value_objects.result import TaskResult # noqa: F401 @@ -22,11 +24,11 @@ ) OneOfTaskConfig = Annotated[ - Union[FdTaskConfig, AfdTaskConfig, AcTaskConfig], + Union[FdTaskConfig, AfdTaskConfig, AcTaskConfig, IndTaskConfig, AindTaskConfig], Field(discriminator="primitive_name"), ] OneOfTaskResult = Annotated[ - Union[FdTaskResult, AfdTaskResult, AcTaskResult], + Union[FdTaskResult, AfdTaskResult, AcTaskResult, IndTaskResult, AindTaskResult], Field(discriminator="primitive_name"), ] diff --git a/internal/domain/task/value_objects/aind/__init__.py b/internal/domain/task/value_objects/aind/__init__.py new file mode 100644 index 0000000..7b71263 --- /dev/null +++ b/internal/domain/task/value_objects/aind/__init__.py @@ -0,0 +1,23 @@ +from typing import Literal + +from pydantic import BaseModel + +from internal.domain.task.value_objects.primitive_name import PrimitiveName +from internal.domain.task.value_objects.aind.algo_config import OneOfAindAlgoConfig +from internal.domain.task.value_objects.aind.algo_result import ( # noqa: F401 + AindAlgoResult, + AindModel, +) +from internal.domain.task.value_objects.aind.algo_name import AindAlgoName # noqa: F401 + + +class BaseAindTaskModel(BaseModel): + primitive_name: Literal[PrimitiveName.aind] + + +class AindTaskConfig(BaseAindTaskModel): + config: OneOfAindAlgoConfig + + +class AindTaskResult(BaseAindTaskModel): + result: AindAlgoResult diff --git a/internal/domain/task/value_objects/aind/algo_config.py b/internal/domain/task/value_objects/aind/algo_config.py new file mode 100644 index 0000000..3910b4c --- /dev/null +++ b/internal/domain/task/value_objects/aind/algo_config.py @@ -0,0 +1,35 @@ +from typing import Literal, Annotated +from pydantic import Field +from internal.domain.common import OptionalModel +from internal.domain.task.value_objects.aind.algo_name import AindAlgoName +from internal.domain.task.value_objects.aind.algo_descriptions import descriptions + + +class BaseAindConfig(OptionalModel): + __non_optional_fields__ = { + "algo_name", + } + + +class MindConfig(BaseAindConfig): + algo_name: Literal[AindAlgoName.Mind] + + max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])] + error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])] + + +class SpiderConfig(BaseAindConfig): + algo_name: Literal[AindAlgoName.Spider] + + error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])] + is_null_equal_null: Annotated[ + bool, Field(description=descriptions["is_null_equal_null"]) + ] + threads: Annotated[int, Field(ge=0, description=descriptions["threads"])] + mem_limit: Annotated[int, Field(gt=0, description=descriptions["mem_limit"])] + + +OneOfAindAlgoConfig = Annotated[ + MindConfig | SpiderConfig, + Field(discriminator="algo_name"), +] diff --git a/internal/domain/task/value_objects/aind/algo_descriptions.py b/internal/domain/task/value_objects/aind/algo_descriptions.py new file mode 100644 index 0000000..0833423 --- /dev/null +++ b/internal/domain/task/value_objects/aind/algo_descriptions.py @@ -0,0 +1,11 @@ +descriptions = { + "max_arity": "Maximum arity of the inclusion dependency (IND).", + "sample_size": "Size of table sample for IND profiling.", + "ignore_constant_cols": "Ignore INDs containing columns with only one value for improved performance.", + "hll_accuracy": "HyperLogLog approximation accuracy. Closer to 0 means higher accuracy and memory usage.", + "ignore_null_cols": "Ignore INDs containing columns filled only with NULLs.", + "threads": "Number of threads to use. If 0, use all available threads.", + "error": "Error threshold for approximate IND algorithms.", + "is_null_equal_null": "Specify whether two NULL values should be treated as equal.", + "mem_limit": "Memory limit in MBs for the algorithm.", +} diff --git a/internal/domain/task/value_objects/aind/algo_name.py b/internal/domain/task/value_objects/aind/algo_name.py new file mode 100644 index 0000000..76acf48 --- /dev/null +++ b/internal/domain/task/value_objects/aind/algo_name.py @@ -0,0 +1,6 @@ +from enum import StrEnum, auto + + +class AindAlgoName(StrEnum): + Mind = auto() + Spider = auto() diff --git a/internal/domain/task/value_objects/aind/algo_result.py b/internal/domain/task/value_objects/aind/algo_result.py new file mode 100644 index 0000000..a72a887 --- /dev/null +++ b/internal/domain/task/value_objects/aind/algo_result.py @@ -0,0 +1,5 @@ +from internal.domain.task.value_objects.ind.algo_result import IndAlgoResult, IndModel + + +AindAlgoResult = IndAlgoResult +AindModel = IndModel diff --git a/internal/domain/task/value_objects/ind/__init__.py b/internal/domain/task/value_objects/ind/__init__.py new file mode 100644 index 0000000..f96c915 --- /dev/null +++ b/internal/domain/task/value_objects/ind/__init__.py @@ -0,0 +1,23 @@ +from typing import Literal + +from pydantic import BaseModel + +from internal.domain.task.value_objects.primitive_name import PrimitiveName +from internal.domain.task.value_objects.ind.algo_config import OneOfIndAlgoConfig +from internal.domain.task.value_objects.ind.algo_result import ( # noqa: F401 + IndAlgoResult, + IndModel, +) +from internal.domain.task.value_objects.ind.algo_name import IndAlgoName # noqa: F401 + + +class BaseIndTaskModel(BaseModel): + primitive_name: Literal[PrimitiveName.ind] + + +class IndTaskConfig(BaseIndTaskModel): + config: OneOfIndAlgoConfig + + +class IndTaskResult(BaseIndTaskModel): + result: IndAlgoResult diff --git a/internal/domain/task/value_objects/ind/algo_config.py b/internal/domain/task/value_objects/ind/algo_config.py new file mode 100644 index 0000000..8fa2362 --- /dev/null +++ b/internal/domain/task/value_objects/ind/algo_config.py @@ -0,0 +1,52 @@ +from typing import Literal, Annotated +from pydantic import Field +from internal.domain.common import OptionalModel +from internal.domain.task.value_objects.ind.algo_name import IndAlgoName +from internal.domain.task.value_objects.ind.algo_descriptions import descriptions + + +class BaseIndConfig(OptionalModel): + __non_optional_fields__ = { + "algo_name", + } + + +class FaidaConfig(BaseIndConfig): + algo_name: Literal[IndAlgoName.Faida] + + max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])] + sample_size: Annotated[int, Field(gt=0, description=descriptions["sample_size"])] + ignore_constant_cols: Annotated[ + bool, Field(description=descriptions["ignore_constant_cols"]) + ] + hll_accuracy: Annotated[ + float, Field(gt=0, description=descriptions["hll_accuracy"]) + ] + ignore_null_cols: Annotated[ + bool, Field(description=descriptions["ignore_null_cols"]) + ] + threads: Annotated[int, Field(ge=0, description=descriptions["threads"])] + + +class MindConfig(BaseIndConfig): + algo_name: Literal[IndAlgoName.Mind] + + max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])] + error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])] + + +class SpiderConfig(BaseIndConfig): + algo_name: Literal[IndAlgoName.Spider] + + error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])] + is_null_equal_null: Annotated[ + bool, Field(description=descriptions["is_null_equal_null"]) + ] + threads: Annotated[int, Field(ge=0, description=descriptions["threads"])] + mem_limit: Annotated[int, Field(gt=0, description=descriptions["mem_limit"])] + + +OneOfIndAlgoConfig = Annotated[ + FaidaConfig | MindConfig | SpiderConfig, + Field(discriminator="algo_name"), +] diff --git a/internal/domain/task/value_objects/ind/algo_descriptions.py b/internal/domain/task/value_objects/ind/algo_descriptions.py new file mode 100644 index 0000000..0833423 --- /dev/null +++ b/internal/domain/task/value_objects/ind/algo_descriptions.py @@ -0,0 +1,11 @@ +descriptions = { + "max_arity": "Maximum arity of the inclusion dependency (IND).", + "sample_size": "Size of table sample for IND profiling.", + "ignore_constant_cols": "Ignore INDs containing columns with only one value for improved performance.", + "hll_accuracy": "HyperLogLog approximation accuracy. Closer to 0 means higher accuracy and memory usage.", + "ignore_null_cols": "Ignore INDs containing columns filled only with NULLs.", + "threads": "Number of threads to use. If 0, use all available threads.", + "error": "Error threshold for approximate IND algorithms.", + "is_null_equal_null": "Specify whether two NULL values should be treated as equal.", + "mem_limit": "Memory limit in MBs for the algorithm.", +} diff --git a/internal/domain/task/value_objects/ind/algo_name.py b/internal/domain/task/value_objects/ind/algo_name.py new file mode 100644 index 0000000..6c04b0f --- /dev/null +++ b/internal/domain/task/value_objects/ind/algo_name.py @@ -0,0 +1,7 @@ +from enum import StrEnum, auto + + +class IndAlgoName(StrEnum): + Faida = auto() + Mind = auto() + Spider = auto() diff --git a/internal/domain/task/value_objects/ind/algo_result.py b/internal/domain/task/value_objects/ind/algo_result.py new file mode 100644 index 0000000..8cd11ca --- /dev/null +++ b/internal/domain/task/value_objects/ind/algo_result.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel +from desbordante.ind import IND + + +class IndModel(BaseModel): + @classmethod + def from_ind(cls, ind: IND): + return cls(lhs=ind.get_lhs(), rhs=ind.get_rhs()) + + lhs: tuple[str, ...] + rhs: tuple[str, ...] + + +class IndAlgoResult(BaseModel): + inds: list[IndModel] diff --git a/internal/domain/task/value_objects/primitive_name.py b/internal/domain/task/value_objects/primitive_name.py index f09e8b4..6e1f3c4 100644 --- a/internal/domain/task/value_objects/primitive_name.py +++ b/internal/domain/task/value_objects/primitive_name.py @@ -6,6 +6,8 @@ class PrimitiveName(StrEnum): afd = auto() # ar = auto() ac = auto() + ind = auto() + aind = auto() # fd_verification = auto() # mfd_verification = auto() # statistics = auto()