Skip to content
This repository has been archived by the owner on May 19, 2020. It is now read-only.

Commit

Permalink
Init numpy caching
Browse files Browse the repository at this point in the history
  • Loading branch information
fransik committed May 8, 2020
1 parent efe1847 commit cc71dd5
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 144 deletions.
63 changes: 54 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Pandas cache
# Table cache

Works by hashing the combinations of arguments of a function call with
the function name to create a unique id of a DataFrame retrieval. If
the function name to create a unique id of a table retrieval. If
the function call is new the original function will be called, and the
resulting DataFrame(s) will be stored in a HDFStore indexed by the
resulting tables(s) will be stored in a HDFStore indexed by the
hashed key. Next time the function is called with the same args the
DataFrame(s) will be retrieved from the store instead of executing the
tables(s) will be retrieved from the store instead of executing the
function.

The hashing of the arguments is done by first applying str() on the
Expand All @@ -23,12 +23,12 @@ database-clients.

The module automatically creates a `cache/data.h5` relative to
`__main__`, to change this set the environment variable
`PANDAS_CACHE_PATH` to be the desired directory of the `data.h5` file.
`CACHE_PATH` to be the desired directory of the `data.h5` file.

#### Disabling the cache with env-variable

To disable the pandas cache set the environment variable
`DISABLE_PANDAS_CACHE` to `TRUE`.
To disable the cache set the environment variable
`DISABLE_CACHE` to `TRUE`.

### Usage

Expand Down Expand Up @@ -154,8 +154,7 @@ import pandas as pd
@pandas_cache("a", "c")
def simple_func(a, *args, **kwargs):
sleep(5)
return pd.DataFrame([[1,2,3], [2,3,4]]), \
pd.DataFrame([[1,2,3], [2,3,4]]) * 10
return pd.DataFrame([[1,2,3], [2,3,4]]), pd.DataFrame([[1,2,3], [2,3,4]]) * 10


t0 = datetime.now()
Expand All @@ -180,3 +179,49 @@ print(datetime.now() - t0)
1 20 30 40)
0:00:00.019578
```

#### Disabling cache for tests

Caching can be disabled using the environment variable DISABLE_CACHE to TRUE

```python
from mock import patch
def test_cached_function():
with patch.dict("os.environ", {"DISABLE_PANDAS_CACHE": "TRUE"}, clear=True):
assert cached_function() == target
```

#### Numpy caching

```python
from pandas_cacher import numpy_cache
from time import sleep
from datetime import datetime
import numpy as np


@numpy_cache("a", "c")
def simple_func(a, *args, **kwargs):
sleep(5)
return np.array([[1, 2, 3], [2, 3, 4]]), np.array([[1, 2, 3], [2, 3, 4]]) * 10


t0 = datetime.now()
print(simple_func(1, b=2, c=True))
print(datetime.now() - t0)

t0 = datetime.now()
print(simple_func(a=1, b=3, c=True))
print(datetime.now() - t0)
```

```commandline
(array([[1, 2, 3],
[2, 3, 4]]), array([[10, 20, 30],
[20, 30, 40]]))
0:00:05.009084
(array([[1, 2, 3],
[2, 3, 4]]), array([[10, 20, 30],
[20, 30, 40]]))
0:00:00.002000
```
4 changes: 2 additions & 2 deletions pandas_cacher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from pandas_cacher.pandas_cache import pandas_cache # noqa: F401
from pandas_cacher.pandas_cache import numpy_cache, pandas_cache # noqa: F401

VERSION = "0.1.2"
VERSION = "0.1.3"
218 changes: 130 additions & 88 deletions pandas_cacher/pandas_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,117 +5,159 @@
import os
import pathlib
from collections import defaultdict
from typing import Any, Callable, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Tuple, Type, Union

import h5py
import numpy as np
import pandas as pd

pandas_function = Callable[..., Union[Tuple[pd.DataFrame], pd.DataFrame]]
numpy_function = Callable[..., Union[Tuple[np.ndarray], np.ndarray]]
cached_data_type = Union[Tuple[Any], Any]
cache_able_function = Callable[..., cached_data_type]
store_function = Callable[[str, Callable[..., Any], Tuple[Any], Dict[str, Any]], Any]


def get_path() -> pathlib.Path:
cache_path = os.environ.get("PANDAS_CACHE_PATH", "")
cache_path = os.environ.get("CACHE_PATH", "")
cache_path = pathlib.Path.cwd() if cache_path == "" else pathlib.Path(cache_path)
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path


def get_df_hdf(
key: str, func: pandas_function, f_args: Any, f_kwargs: Any
) -> Union[Tuple[pd.DataFrame], pd.DataFrame]:
"""Retrieves the DataFrames from the HDFStore if the key exists,
else run the function then store & return the resulting DataFrames.
class StoreClass:
def __init__(self, file_path: str, mode: str):
raise NotImplementedError

Args:
key: Unique str hash of function call
func: Wrapped function, should return a DataFrame or tuple of them.
f_args: Arguments passed along to the function
f_kwargs: Keyword-Arguments passed along to the function
def __enter__(self):
raise NotImplementedError

Returns: DataFrames that func would originally return.
def __exit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError

"""
file_path = get_path() / "data.h5"
mode = "r+" if file_path.exists() else "w"
with pd.HDFStore(file_path, mode=mode) as store:
keys = defaultdict(list)
for s_key in store.keys():
keys[s_key.split("/")[1]].append(s_key)
if key in keys.keys():
dfs = [pd.read_hdf(store, key=key_) for key_ in keys[key]]
return tuple(dfs) if len(dfs) > 1 else dfs[0]
df = func(*f_args, **f_kwargs)
with pd.HDFStore(file_path, mode=mode) as store:
if isinstance(df, tuple):
for i, df_ in enumerate(df):
df_.to_hdf(store, key=f"{key}/df{i}")
else:
df.to_hdf(store, key=key)
return df


# pylint: disable=keyword-arg-before-vararg
def pandas_cache(orig_func: pandas_function = None, *args: str) -> pandas_function:
"""Decorator for caching function calls that return pandas DataFrames.
def keys(self) -> Iterable:
raise NotImplementedError

Args:
*args: arguments of the function to use as filename
**kwargs: keyword-arguments of the function to use as filename
def create_dataset(self, key: str, data: ...) -> None:
raise NotImplementedError

def __getitem__(self, key: str) -> ...:
raise NotImplementedError


class PandasStore(pd.HDFStore):
def create_dataset(self, key: str, data: pd.DataFrame) -> None:
data.to_hdf(self, key)

def __getitem__(self, key: str) -> pd.DataFrame:
return pd.read_hdf(self, key=key)

Returns: decorated function

def store_factory(data_storer: Type[StoreClass]) -> Type[store_function]:
"""Factory function for creating storing functions for the cache decorator.
Args:
data_storer: class with a context manager, and file_path + mode parameters.
Returns: function for storing tables
"""
if isinstance(orig_func, str):
args = list(args) + [orig_func]
orig_func = None

def decorated(func: pandas_function) -> pandas_function:
"""Wrapper of function that returns pandas DataFrames.
def store_func(
key: str, func: cache_able_function, f_args: Tuple[Any], f_kwargs: Dict[str, Any],
) -> cached_data_type:
"""Retrieves stored data if key exists in stored data if the key is new, retrieves data from
decorated function & stores the result with the given key.
Args:
func: function to be wrapped, should return a DataFrame or tuple of them.
key: unique key used to retrieve/store data
func: original cached function
f_args: args to pass to the function
f_kwargs: kwargs to pass to the function
Returns: wrapped function
Returns:
Data retrieved from the store if existing else from function
"""

@functools.wraps(func)
def wrapped(*f_args: ..., **f_kwargs: ...) -> Union[Tuple[pd.DataFrame], pd.DataFrame]:
""" Hashes function arguments to a unique key, and uses the key
to store/retrieve DataFrames from the HDFStore.
Args:
*f_args: Arguments passed along to the function
**f_kwargs: Keyword-Arguments passed along to the function
Returns: DataFrame(s)
"""
if os.environ.get("DISABLE_PANDAS_CACHE", "FALSE") == "TRUE":
return func(*f_args, **f_kwargs)
argspec = inspect.getfullargspec(func)
defaults = (
dict(zip(argspec.args[::-1], argspec.defaults[::-1])) if argspec.defaults else {}
)
kw_defaults = argspec.kwonlydefaults if argspec.kwonlydefaults else {}
full_args = {
**kw_defaults,
**defaults,
**f_kwargs,
**dict(zip(argspec.args, f_args)),
**{"arglist": f_args[len(argspec.args) :]},
}
full_args = full_args if not args else {arg: full_args[arg] for arg in args}
full_args.pop("self", "")
full_args = {k: str(v) for k, v in full_args.items()}
key = (
"df"
+ hashlib.md5((func.__name__ + json.dumps(full_args)).encode("utf-8")).hexdigest()
)
return get_df_hdf(key, func, f_args, f_kwargs)

return wrapped

if orig_func:
return decorated(orig_func)
return decorated
file_path = get_path() / "data.h5"
mode = "r+" if file_path.exists() else "w"
with data_storer(file_path, mode=mode) as store:
keys = defaultdict(list)
for s_key in store.keys():
s_key_ = s_key.split("-")[0] if "-" in s_key else s_key
keys[s_key_.strip("/")].append(s_key)
if key in keys.keys():
arrays = [store[key_][:] for key_ in keys[key]]
return tuple(arrays) if len(arrays) > 1 else arrays[0]
data = func(*f_args, **f_kwargs)
with data_storer(file_path, mode=mode) as store:
if isinstance(data, tuple):
for i, data_ in enumerate(data):
store.create_dataset(f"{key}-data{i}", data=data_)
else:
store.create_dataset(key, data=data)
return data

return store_func


def cache_decorator_factory(table_getter: Type[store_function]) -> Type[cache_able_function]:
# pylint: disable=keyword-arg-before-vararg
def cache_decorator(
orig_func: cache_able_function = None, *args: str
) -> Type[cache_able_function]:
if isinstance(orig_func, str):
args = list(args) + [orig_func]
orig_func = None

def decorated(func: cache_able_function) -> Type[cache_able_function]:
@functools.wraps(func)
def wrapped(*f_args: Tuple[Any], **f_kwargs: Dict[str, Any]) -> cached_data_type:
"""Hashes function arguments to a unique key, and uses the key to store/retrieve
data from the configured store.
Args:
*f_args: Arguments passed along to the function
**f_kwargs: Keyword-Arguments passed along to the function
Returns: Stored data if existing, else result from the function
"""
if os.environ.get("DISABLE_CACHE", "FALSE") == "TRUE":
return func(*f_args, **f_kwargs)
argspec = inspect.getfullargspec(func)
defaults = (
dict(zip(argspec.args[::-1], argspec.defaults[::-1]))
if argspec.defaults
else {}
)
kw_defaults = argspec.kwonlydefaults if argspec.kwonlydefaults else {}
full_args = {
**kw_defaults,
**defaults,
**f_kwargs,
**dict(zip(argspec.args, f_args)),
**{"arglist": f_args[len(argspec.args) :]},
}
full_args = full_args if not args else {arg: full_args[arg] for arg in args}
full_args.pop("self", "")
full_args = {k: str(v) for k, v in full_args.items()}
key = (
"df"
+ hashlib.md5(
(func.__name__ + json.dumps(full_args)).encode("utf-8")
).hexdigest()
)
return table_getter(key, func, f_args, f_kwargs)

return wrapped

if orig_func:
return decorated(orig_func)
return decorated

return cache_decorator


pandas_cache = cache_decorator_factory(store_factory(PandasStore))
numpy_cache = cache_decorator_factory(store_factory(h5py.File))
Loading

0 comments on commit cc71dd5

Please sign in to comment.