From 9a4d5ac48237f31bb399f4c33bf6bcc3905ee272 Mon Sep 17 00:00:00 2001 From: Gianluca Ficarelli <26835404+GianlucaFicarelli@users.noreply.github.com> Date: Thu, 4 Apr 2024 09:56:14 +0200 Subject: [PATCH] Add df.etl.insert_columns (#9) Other: automatically shutdown the processes created by joblib and loky --- CHANGELOG.rst | 14 ++++++++++++++ pyproject.toml | 1 + src/blueetl_core/etl.py | 7 +++++++ src/blueetl_core/parallel.py | 28 ++++++++++++++++++---------- tests/test_etl_dataframe.py | 21 +++++++++++++++++++++ 5 files changed, 61 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9cb43f8..75193b2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,20 @@ Changelog ========= +Version 0.2.0 +------------- + +New Features +~~~~~~~~~~~~ + +- Add ``df.etl.insert_columns()`` to simplify the insertion of multiple columns at once in a DataFrame. + +Improvements +~~~~~~~~~~~~ + +- In ``run_parallel()``, automatically shutdown the processes created by joblib and loky. + + Version 0.1.8 ------------- diff --git a/pyproject.toml b/pyproject.toml index 6c5fbed..619febe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering :: Bio-Informatics", ] dependencies = [ diff --git a/src/blueetl_core/etl.py b/src/blueetl_core/etl.py index 7af74a0..4b3dbb9 100644 --- a/src/blueetl_core/etl.py +++ b/src/blueetl_core/etl.py @@ -305,6 +305,13 @@ def iterdict(self) -> Iterator[tuple[dict, dict]]: ): yield named_index, dict(zip(columns, value)) + def insert_columns(self, loc: int, columns: list, values: list) -> None: + """Insert multiple columns, similar to repeatedly calling DataFrame.insert().""" + if len(columns) != len(values): + raise ValueError("columns and values must have the same length") + for col, val in zip(reversed(columns), reversed(values)): + self._obj.insert(loc, col, val) + def _query_list(self, query_list: list[dict[str, Any]]) -> pd.DataFrame: """Given a list of query dicts, return the DataFrame filtered by columns and index.""" return query_frame(self._obj, query_list) diff --git a/src/blueetl_core/parallel.py b/src/blueetl_core/parallel.py index 5934c1e..f14eb1e 100644 --- a/src/blueetl_core/parallel.py +++ b/src/blueetl_core/parallel.py @@ -8,6 +8,7 @@ import numpy as np from joblib import Parallel, delayed +from joblib.externals.loky import get_reusable_executor from blueetl_core.constants import ( BLUEETL_JOBLIB_BACKEND, @@ -73,6 +74,7 @@ def run_parallel( backend: Optional[str] = None, verbose: Optional[int] = None, base_seed: Optional[int] = None, + shutdown_executor: bool = True, ) -> list[Any]: """Run tasks in parallel. @@ -87,6 +89,7 @@ def run_parallel( verbose: verbosity of joblib. If not specified, use the BLUEETL_JOBLIB_VERBOSE. base_seed: initial base seed. If specified, a different seed is added to the task context, and passed to each callable object. + shutdown_executor: if True and using loky, shutdown the subprocesses before returning. Returns: list of objects returned by the callable objects, in the same order. @@ -100,15 +103,20 @@ def run_parallel( jobs = int(jobs_env) if jobs_env else max((os.cpu_count() or 1) // 2, 1) if not backend: backend = os.getenv(BLUEETL_JOBLIB_BACKEND) - parallel = Parallel(n_jobs=jobs, backend=backend, verbose=verbose) - return parallel( - delayed(task)( - ctx=TaskContext( - task_id=i, - loglevel=loglevel, - seed=None if base_seed is None else base_seed + i, - ppid=os.getpid(), + try: + parallel = Parallel(n_jobs=jobs, backend=backend, verbose=verbose) + return parallel( + delayed(task)( + ctx=TaskContext( + task_id=i, + loglevel=loglevel, + seed=None if base_seed is None else base_seed + i, + ppid=os.getpid(), + ) ) + for i, task in enumerate(tasks) ) - for i, task in enumerate(tasks) - ) + finally: + if shutdown_executor and (not backend or backend == "loky"): + # shutdown the pool of processes used by loky + get_reusable_executor().shutdown(wait=True) diff --git a/tests/test_etl_dataframe.py b/tests/test_etl_dataframe.py index af40953..fd46d6d 100644 --- a/tests/test_etl_dataframe.py +++ b/tests/test_etl_dataframe.py @@ -472,6 +472,27 @@ def test_iterdict(dataframe1): assert value == {"v0": 0, "v1": 4} +def test_insert_columns(dataframe1): + columns = ["pre0", "pre1", "pre2"] + values = [111, [200, 201, 202, 203], np.nan] + original_columns = list(dataframe1.columns) + + dataframe1.etl.insert_columns(loc=0, columns=columns, values=values) + + assert_array_equal(dataframe1.columns, columns + original_columns) + assert_array_equal(dataframe1["pre0"], [111] * len(dataframe1)) + assert_array_equal(dataframe1["pre1"], [200, 201, 202, 203]) + assert_array_equal(dataframe1["pre2"], [np.nan] * len(dataframe1)) + + +def test_insert_columns_raises(dataframe1): + columns = ["pre0", "pre1", "pre2"] + values = [111] + + with pytest.raises(ValueError, match="columns and values must have the same length"): + dataframe1.etl.insert_columns(loc=0, columns=columns, values=values) + + @pytest.mark.parametrize( "params, expected_key, expected_df", [