diff --git a/replay/data_preparator.py b/replay/data_preparator.py index 71d508c4..34790005 100644 --- a/replay/data_preparator.py +++ b/replay/data_preparator.py @@ -6,17 +6,20 @@ ``ToNumericFeatureTransformer`` leaves only numerical features by one-hot encoding of some features and deleting the others. """ +import logging import string from typing import Dict, List, Optional from pyspark.ml.feature import StringIndexerModel, IndexToString, StringIndexer from pyspark.sql import DataFrame from pyspark.sql import functions as sf -from pyspark.sql.types import NumericType +from pyspark.sql.types import DoubleType, NumericType from replay.constants import AnyDataFrame from replay.session_handler import State -from replay.utils import convert2spark +from replay.utils import convert2spark, process_timestamp_column + +LOG_COLUMNS = ["user_id", "item_id", "timestamp", "relevance"] class Indexer: # pylint: disable=too-many-instance-attributes @@ -161,64 +164,209 @@ def _reindex(self, df: DataFrame, entity: str): class DataPreparator: - """ - Convert pandas DataFrame to Spark, rename columns and apply indexer. + """Transforms data to a library format: + - read as a spark dataframe/ convert pandas dataframe to spark + - check for nulls + - create relevance/timestamp columns if absent + - convert dates to TimestampType + + Examples: + + Loading log DataFrame + + >>> import pandas as pd + >>> from replay.data_preparator import DataPreparator + >>> + >>> log = pd.DataFrame({"user": [2, 2, 2, 1], + ... "item_id": [1, 2, 3, 3], + ... "rel": [5, 5, 5, 5]} + ... ) + >>> dp = DataPreparator() + >>> correct_log = dp.transform(data=log, + ... columns_mapping={"user_id": "user", + ... "item_id": "item_id", + ... "relevance": "rel"} + ... ) + >>> correct_log.show(2) + +-------+-------+---------+-------------------+ + |user_id|item_id|relevance| timestamp| + +-------+-------+---------+-------------------+ + | 2| 1| 5.0|2099-01-01 00:00:00| + | 2| 2| 5.0|2099-01-01 00:00:00| + +-------+-------+---------+-------------------+ + only showing top 2 rows + + + + Loading user features + + >>> import pandas as pd + >>> from replay.data_preparator import DataPreparator + >>> + >>> log = pd.DataFrame({"user": ["user1", "user1", "user2"], + ... "f0": ["feature1","feature2","feature1"], + ... "f1": ["left","left","center"], + ... "ts": ["2019-01-01","2019-01-01","2019-01-01"]} + ... ) + >>> dp = DataPreparator() + >>> correct_log = dp.transform(data=log, + ... columns_mapping={"user_id": "user"}, + ... ) + >>> correct_log.show(3) + +-------+--------+------+----------+ + |user_id| f0| f1| ts| + +-------+--------+------+----------+ + | user1|feature1| left|2019-01-01| + | user1|feature2| left|2019-01-01| + | user2|feature1|center|2019-01-01| + +-------+--------+------+----------+ + + """ - def __init__(self): - self.indexer = Indexer() + _logger: Optional[logging.Logger] = None - def __call__( - self, - log: AnyDataFrame, - user_features: Optional[AnyDataFrame] = None, - item_features: Optional[AnyDataFrame] = None, - mapping: Optional[Dict] = None, - ) -> tuple: - """ - Convert ids into idxs for provided DataFrames - - :param log: historical log of interactions - ``[user_id, item_id, timestamp, relevance]`` - :param user_features: user features (must have ``user_id``) - :param item_features: item features (must have ``item_id``) - :param mapping: dictionary mapping "default column name: - column name in input DataFrame" - ``user_id`` and ``item_id`` mappings are required, - ``timestamp`` and``relevance`` are optional. - :return: three converted DataFrames - """ - log, user_features, item_features = [ - convert2spark(df) for df in [log, user_features, item_features] - ] - log, user_features, item_features = [ - self._rename(df, mapping) - for df in [log, user_features, item_features] - ] - if user_features: - users = log.select("user_id").union( - user_features.select("user_id") - ) - else: - users = log.select("user_id") - if item_features: - items = log.select("item_id").union( - item_features.select("item_id") - ) + @property + def logger(self) -> logging.Logger: + """ + :returns: get library logger + """ + if self._logger is None: + self._logger = logging.getLogger("replay") + return self._logger + + @staticmethod + def read_as_spark_df( + data: Optional[AnyDataFrame] = None, + path: str = None, + format_type: str = None, + **kwargs, + ) -> DataFrame: + """ + Read spark dataframe from file of transform pandas dataframe. + + :param data: DataFrame to process (``pass`` or ``data`` should be defined) + :param path: path to data (``pass`` or ``data`` should be defined) + :param format_type: file type, one of ``[csv , parquet , json , table]`` + :param kwargs: extra arguments passed to + ``spark.read.(path, **reader_kwargs)`` + :return: spark DataFrame + """ + if data is not None: + dataframe = convert2spark(data) + elif path and format_type: + spark = State().session + if format_type == "csv": + dataframe = spark.read.csv(path, inferSchema=True, **kwargs) + elif format_type == "parquet": + dataframe = spark.read.parquet(path) + elif format_type == "json": + dataframe = spark.read.json(path, **kwargs) + elif format_type == "table": + dataframe = spark.read.table(path) + else: + raise ValueError( + f"Invalid value of format_type='{format_type}'" + ) else: - items = log.select("item_id") - self.indexer.fit(users, items) + raise ValueError("Either data or path parameters must not be None") + return dataframe - log = self.indexer.transform(log) - if user_features: - user_features = self.indexer.transform(user_features) - if item_features: - item_features = self.indexer.transform(item_features) + def check_df( + self, dataframe: DataFrame, columns_mapping: Dict[str, str] + ) -> None: + """ + Check: + - if dataframe is not empty, + - if columns from ``columns_mapping`` are present in dataframe + - warn about nulls in columns from ``columns_mapping`` + - warn about absent of ``timestamp/relevance`` columns for interactions log + - warn about wrong relevance DataType + + :param dataframe: spark DataFrame to process + :param columns_mapping: dictionary mapping "key: column name in input DataFrame". + Possible keys: ``[user_id, user_id, timestamp, relevance]`` + ``columns_mapping`` values specifies the nature of the DataFrame: + - if both ``[user_id, item_id]`` are present, + then the dataframe is a log of interactions. + Specify ``timestamp, relevance`` columns in mapping if available. + - if ether ``user_id`` or ``item_id`` is present, + then the dataframe is a dataframe of user/item features + """ + if not dataframe.head(1): + raise ValueError("DataFrame is empty") - return log, user_features, item_features + for value in columns_mapping.values(): + if value not in dataframe.columns: + raise ValueError( + f"Column `{value}` stated in mapping is absent in dataframe" + ) + + for column in columns_mapping.values(): + if dataframe.where(sf.col(column).isNull()).count() > 0: + self.logger.info( + "Column `%s` has NULL values. Handle NULL values before " + "the next data preprocessing/model training steps", + column, + ) + + if ( + "user_id" in columns_mapping.keys() + and "item_id" in columns_mapping.keys() + ): + absent_cols = set(LOG_COLUMNS).difference(columns_mapping.keys()) + if len(absent_cols) > 0: + self.logger.info( + "Columns %s are absent, but may be required for models training. " + "Add them with DataPreparator().generate_absent_log_cols", + list(absent_cols), + ) + if "relevance" in columns_mapping.keys(): + if not isinstance( + dataframe.schema[columns_mapping["relevance"]].dataType, + NumericType, + ): + self.logger.info( + "Relevance column `%s` should be numeric, " "but it is %s", + columns_mapping["relevance"], + dataframe.schema[columns_mapping["relevance"]].dataType, + ) @staticmethod - def _rename(df: DataFrame, mapping: Dict) -> DataFrame: + def add_absent_log_cols( + dataframe: DataFrame, + columns_mapping: Dict[str, str], + default_relevance: float = 1.0, + default_ts: str = "2099-01-01", + ): + """ + Add ``relevance`` and ``timestamp`` columns with default values if + ``relevance`` or ``timestamp`` is absent among mapping keys. + + :param dataframe: interactions log to process + :param columns_mapping: dictionary mapping "key: column name in input DataFrame". + Possible keys: ``[user_id, user_id, timestamp, relevance]`` + :param default_relevance: default value for generated `relevance` column + :param default_ts: str, default value for generated `timestamp` column + :return: spark DataFrame with generated ``timestamp`` and ``relevance`` columns + if absent in original dataframe + """ + absent_cols = set(LOG_COLUMNS).difference(columns_mapping.keys()) + if "relevance" in absent_cols: + dataframe = dataframe.withColumn( + "relevance", sf.lit(default_relevance).cast(DoubleType()) + ) + if "timestamp" in absent_cols: + dataframe = dataframe.withColumn( + "timestamp", sf.to_timestamp(sf.lit(default_ts)) + ) + return dataframe + + @staticmethod + def _rename(df: DataFrame, mapping: Dict) -> Optional[DataFrame]: + """ + rename dataframe columns based on mapping + """ if df is None or mapping is None: return df for out_col, in_col in mapping.items(): @@ -226,14 +374,88 @@ def _rename(df: DataFrame, mapping: Dict) -> DataFrame: df = df.withColumnRenamed(in_col, out_col) return df - def back(self, df: DataFrame) -> DataFrame: + # pylint: disable=too-many-arguments + def transform( + self, + columns_mapping: Dict[str, str], + data: Optional[AnyDataFrame] = None, + path: Optional[str] = None, + format_type: Optional[str] = None, + date_format: Optional[str] = None, + reader_kwargs: Optional[Dict] = None, + ) -> DataFrame: """ - Convert DataFrame to the initial indexes. - - :param df: DataFrame with idxs - :return: DataFrame with ids + Transforms log, user or item features into a Spark DataFrame + ``[user_id, user_id, timestamp, relevance]``, + ``[user_id, *features]``, or ``[item_id, *features]``. + Input is either file of ``format_type`` + at ``path``, or ``pandas.DataFrame`` or ``spark.DataFrame``. + Transform performs: + - dataframe reading/convert to spark DataFrame format + - check dataframe (nulls, columns_mapping) + - rename columns from mapping to standard names (user_id, user_id, timestamp, relevance) + - for interactions log: create absent columns, + convert ``timestamp`` column to TimestampType and ``relevance`` to DoubleType + + :param columns_mapping: dictionary mapping "key: column name in input DataFrame". + Possible keys: ``[user_id, user_id, timestamp, relevance]`` + ``columns_mapping`` values specifies the nature of the DataFrame: + - if both ``[user_id, item_id]`` are present, + then the dataframe is a log of interactions. + Specify ``timestamp, relevance`` columns in mapping if present. + - if ether ``user_id`` or ``item_id`` is present, + then the dataframe is a dataframe of user/item features + + :param data: DataFrame to process + :param path: path to data + :param format_type: file type, one of ``[csv , parquet , json , table]`` + :param date_format: format for the ``timestamp`` column + :param reader_kwargs: extra arguments passed to + ``spark.read.(path, **reader_kwargs)`` + :return: processed DataFrame """ - return self.indexer.inverse_transform(df) + is_log = False + if ( + "user_id" in columns_mapping.keys() + and "item_id" in columns_mapping.keys() + ): + self.logger.info( + "Columns with ids of users or items are present in mapping. " + "The dataframe will be treated as an interactions log." + ) + is_log = True + elif ( + "user_id" not in columns_mapping.keys() + and "item_id" not in columns_mapping.keys() + ): + raise ValueError( + "Mapping either for user ids or for item ids is not stated in `columns_mapping`" + ) + else: + self.logger.info( + "Column with ids of users or items is absent in mapping. " + "The dataframe will be treated as a users'/items' features dataframe." + ) + reader_kwargs = {} if reader_kwargs is None else reader_kwargs + dataframe = self.read_as_spark_df( + data=data, path=path, format_type=format_type, **reader_kwargs + ) + self.check_df(dataframe, columns_mapping=columns_mapping) + dataframe = self._rename(df=dataframe, mapping=columns_mapping) + if is_log: + dataframe = self.add_absent_log_cols( + dataframe=dataframe, columns_mapping=columns_mapping + ) + dataframe = dataframe.withColumn( + "relevance", sf.col("relevance").cast(DoubleType()) + ) + dataframe = process_timestamp_column( + dataframe=dataframe, + column_name="timestamp", + date_format=date_format, + ) + + return dataframe class CatFeaturesTransformer: diff --git a/tests/test_data_preparator.py b/tests/test_data_preparator.py index c957f43e..69ff3873 100644 --- a/tests/test_data_preparator.py +++ b/tests/test_data_preparator.py @@ -1,12 +1,9 @@ # pylint: disable=redefined-outer-name, missing-function-docstring, unused-import -from datetime import datetime -from copy import deepcopy -from unittest.mock import Mock - +import logging import pytest import pandas as pd -from pyspark.sql import functions as sf, DataFrame -from pyspark.sql.types import IntegerType +from pyspark.sql import functions as sf +from pyspark.sql.types import TimestampType, StringType from replay.data_preparator import ( DataPreparator, @@ -23,31 +20,97 @@ ) -def test_preparator(): - prep = DataPreparator() - in_df = pd.DataFrame({"user": [4], "item_id": [3]}) - out_df = pd.DataFrame({"user_idx": [0], "item_idx": [0]}) - out_df = convert2spark(out_df) - out_df = out_df.withColumn( - "user_idx", sf.col("user_idx").cast(IntegerType()) - ) - out_df = out_df.withColumn( - "item_idx", sf.col("item_idx").cast(IntegerType()) - ) - res, _, _ = prep(in_df, mapping={"user_id": "user"}) - assert isinstance(res, DataFrame) - assert set(res.columns) == {"user_idx", "item_idx"} - sparkDataFrameEqual(res, out_df) - res = prep.back(res) - assert set(res.columns) == {"user_id", "item_id"} +@pytest.fixture +def data_preparator(): + return DataPreparator() @pytest.fixture -def indexer(): - return Indexer() +def mapping(): + return { + "user_id": "user_idx", + "item_id": "item_idx", + "timestamp": "timestamp", + "relevance": "relevance", + } + + +# checks in read_as_spark_df +def test_read_data_invalid_format(data_preparator): + with pytest.raises(ValueError, match=r"Invalid value of format_type.*"): + data_preparator.read_as_spark_df( + path="/test_path", format_type="blabla" + ) + + with pytest.raises( + ValueError, match="Either data or path parameters must not be None" + ): + data_preparator.read_as_spark_df(format_type="csv") + + +# errors in check_df +def test_check_df_errors(data_preparator, long_log_with_features, mapping): + with pytest.raises(ValueError, match="DataFrame is empty"): + data_preparator.check_df( + dataframe=long_log_with_features.filter(sf.col("user_idx") > 10), + columns_mapping=mapping, + ) + + with pytest.raises( + ValueError, + match="Column `relevance` stated in mapping is absent in dataframe", + ): + col_map = mapping + data_preparator.check_df( + dataframe=long_log_with_features.drop("relevance"), + columns_mapping=col_map, + ) + + +# logging in check_df +def test_read_check_df_logger_msg( + data_preparator, long_log_with_features, mapping, caplog +): + with caplog.at_level(logging.INFO): + mapping.pop("timestamp") + data_preparator.check_df( + dataframe=long_log_with_features.withColumn( + "relevance", + sf.when(sf.col("user_idx") == 1, None).otherwise( + sf.col("relevance").cast(StringType()) + ), + ).drop("timestamp"), + columns_mapping=mapping, + ) + assert ( + "Column `relevance` has NULL values. " + "Handle NULL values before the next data preprocessing/model training steps" + in caplog.text + ) + + assert ( + "Columns ['timestamp'] are absent, but may be required for models training. " + in caplog.text + ) + + assert ( + "Relevance column `relevance` should be numeric, but it is StringType" + in caplog.text + ) + + +def test_generate_cols(data_preparator, long_log_with_features, mapping): + mapping.pop("timestamp") + df = data_preparator.add_absent_log_cols( + dataframe=long_log_with_features.drop("timestamp"), + columns_mapping=mapping, + ) + assert "timestamp" in df.columns + assert isinstance(df.schema["timestamp"].dataType, TimestampType) -def test_indexer(indexer, long_log_with_features): +def test_indexer(long_log_with_features): + indexer = Indexer() df = long_log_with_features.withColumnRenamed("user_idx", "user_id") df = df.withColumnRenamed("item_idx", "item_id") indexer.fit(df, df)