Skip to content

Commit

Permalink
πŸ”– 0.12.0 (#182)
Browse files Browse the repository at this point in the history
* ✨ Add utils.load_pipeline() to load pipeline

* 🚨 Fix linting

* πŸ”– 0.12.0
  • Loading branch information
pwwang authored Oct 19, 2023
1 parent 8eaf661 commit 52782c7
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 100 deletions.
4 changes: 4 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 0.12.0

- ✨ Add utils.load_pipeline() to load pipeline

## 0.11.1

- Dismiss warning for fillna method for pandas 2.1
Expand Down
116 changes: 116 additions & 0 deletions pipen/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import re
import sys
import importlib
import importlib.util
import logging
import textwrap
import typing
Expand All @@ -20,6 +22,7 @@
Iterable,
List,
Mapping,
Sequence,
Tuple,
Type,
)
Expand Down Expand Up @@ -51,6 +54,7 @@

from .pipen import Pipen
from .proc import Proc
from .procgroup import ProcGroup


class RichHandler(_RichHandler):
Expand Down Expand Up @@ -587,3 +591,115 @@ def is_valid_name(name: str) -> bool:
True if valid, otherwise False
"""
return re.match(r"^[\w.-]+$", name) is not None


def _get_obj_from_spec(spec: str) -> Any:
"""Get the object from a spec like `<module[.submodule]>:name` or
`/path/to/script.py:name`
Args:
spec: The spec
Returns:
The object
Raises:
AttributeError: If name cannot be found in the module
"""
modpath, sep, name = spec.rpartition(":")
if sep != ":":
raise ValueError(
f"Invalid specification: {spec}.\n"
"It must be in the format '<module[.submodule]>:name' or \n"
"'/path/to/spec.py:name'"
)

path = Path(modpath)
if path.is_file():
spec = importlib.util.spec_from_file_location(path.stem, modpath)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
else:
module = importlib.import_module(modpath)

return getattr(module, name)


async def load_pipeline(
obj: str | Type[Proc] | Type[ProcGroup] | Type[Pipen],
cli_args: Sequence[str] = (),
**kwargs: Any,
) -> Pipen:
"""Load a pipeline from a Pipen, Proc or ProcGroup object
It does not only load the Pipen object or convert the Proc/ProcGroup
object to Pipen, but also build the process relationships. So that we
can access `pipeline.procs` and `requires/nexts` of each proc.
To avoid running the pipeline and notify the plugins that this is just
for loading the pipeline, `sys.argv[0]` is set to `@pipen`.
Args:
obj: The Pipen, Proc or ProcGroup object. It can also be a string in
the format of `part1:part2` to load the pipeline, where part1 is
a path to a python file or package directory, and part2 is the name
of the proc, procgroup or pipeline to load.
It should be able to be loaded by `getattr(module, part2)`, where
module is loaded from `part1`.
cli_args: The cli arguments, in case the pipeline or some plugins use
to parse the relationship of the processes.
kwargs: The kwargs to pass to the Pipen constructor
Returns:
The loaded Pipen object
Raises:
TypeError: If obj or loaded obj is not a Pipen, Proc or ProcGroup
object
"""
from .pipen import Pipen
from .proc import Proc
from .procgroup import ProcGroup

if isinstance(obj, str):
obj = _get_obj_from_spec(obj)
if (
not isinstance(obj, type)
or not issubclass(obj, (Pipen, Proc, ProcGroup))
):
raise TypeError(
f"Expected a Pipen, Proc or ProcGroup class, got {type(obj)}"
)

pipeline = obj
if isinstance(obj, type) and issubclass(obj, Proc):
kwargs.setdefault("name", f"{obj.name}Pipeline")
pipeline = Pipen(**kwargs).set_starts(obj)

if isinstance(obj, type) and issubclass(obj, ProcGroup):
pipeline = obj().as_pipen(**kwargs)

if isinstance(obj, type) and issubclass(obj, Pipen):
# Avoid "pipeline" to be used as pipeline name by varname
(pipeline, ) = (obj(**kwargs), )

if not isinstance(pipeline, Pipen):
raise TypeError(
f"Expected a Pipen, Proc or ProcGroup class, got {type(pipeline)}"
)

old_argv = sys.argv
sys.argv = ["@pipen"] + list(cli_args)
try:
# Initialize the pipeline so that the arguments definied by
# other plugins (i.e. pipen-args) to take in place.
pipeline.workdir = Path(pipeline.config.workdir).joinpath(
kwargs.get("name", pipeline.name)
)
await pipeline._init()
pipeline.workdir.mkdir(parents=True, exist_ok=True)
pipeline.build_proc_relationships()
finally:
sys.argv = old_argv

return pipeline
2 changes: 1 addition & 1 deletion pipen/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Provide version of pipen"""

__version__ = "0.11.1"
__version__ = "0.12.0"
Loading

0 comments on commit 52782c7

Please sign in to comment.