From c1125c60791d9a1cbd11cb2e8c946db2fcb41b83 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Wed, 1 Nov 2023 15:42:51 -0400 Subject: [PATCH 1/3] Add wrapper for catalog merge --- src/lsdb/catalog/catalog.py | 51 ++++++++++++ tests/lsdb/catalog/test_merge.py | 139 +++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 tests/lsdb/catalog/test_merge.py diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index d50616cb..8fed7367 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -216,3 +216,54 @@ def cone_search(self, ra: float, dec: float, radius: float): cone_search_ddf = cast(dd.DataFrame, cone_search_ddf) ddf_partition_map = {pixel: i for i, pixel in enumerate(pixels_in_cone)} return Catalog(cone_search_ddf, ddf_partition_map, filtered_hc_structure) + + def merge( + self, + other: Catalog, + how: str = "inner", + on: str | List | None = None, + left_on: str | List | None = None, + right_on: str | List | None = None, + left_index: bool = False, + right_index: bool = False, + suffixes: Tuple[str, str] | None = None, + ) -> dd.DataFrame: + """Performs the merge of two catalog Dataframes + + Args: + other (Catalog): The right catalog to merge with. + how (str): How to handle the merge of the two catalogs. + One of {'left', 'right', 'outer', 'inner'}, defaults to 'inner'. + on (str | List): Column or index names to join on. Defaults to the + intersection of columns in both Dataframes if on is None and not + merging on indexes. + left_on (str | List): Column to join on the left Dataframe. Lists are + supported if their length is one. + right_on (str | List): Column to join on the right Dataframe. Lists are + supported if their length is one. + left_index (bool): Use the index of the left Dataframe as the join key. + Defaults to False. + right_index (bool): Use the index of the right Dataframe as the join key. + Defaults to False. + suffixes (Tuple[str, str]): A pair of suffixes to be appended to the + end of each column name when they are joined. Defaults to using the + name of the catalog for the suffix. + + Returns: + A new Dask Dataframe containing the data points that result from the merge + of the two catalogs. + """ + if suffixes is None: + suffixes = (f"_{self.name}", f"_{other.name}") + if len(suffixes) != 2: + raise ValueError("`suffixes` must be a tuple with two strings") + return self._ddf.merge( + other._ddf, + how=how, + on=on, + left_on=left_on, + right_on=right_on, + left_index=left_index, + right_index=right_index, + suffixes=suffixes, + ) diff --git a/tests/lsdb/catalog/test_merge.py b/tests/lsdb/catalog/test_merge.py new file mode 100644 index 00000000..aec531e5 --- /dev/null +++ b/tests/lsdb/catalog/test_merge.py @@ -0,0 +1,139 @@ +import dask.dataframe as dd +import numpy as np +import pandas as pd +import pytest + + +def test_catalog_merge_invalid_suffixes(small_sky_catalog, small_sky_order1_catalog): + with pytest.raises(ValueError, match="`suffixes` must be a tuple with two strings"): + small_sky_catalog.merge( + small_sky_order1_catalog, how="inner", on="id", suffixes=("_left", "_middle", "_right") + ) + + +def test_catalog_merge_no_suffixes(small_sky_catalog, small_sky_order1_catalog): + on = "id" + how = "inner" + + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on) + assert isinstance(merged_ddf, dd.DataFrame) + + # Columns in the merged dataframe have the catalog name as suffix + non_join_columns_left = small_sky_catalog._ddf.columns.drop(on) + non_join_columns_right = small_sky_order1_catalog._ddf.columns.drop(on) + intersected_cols = list(set(non_join_columns_left) & set(non_join_columns_right)) + + suffixes = [f"_{small_sky_catalog.name}", f"_{small_sky_order1_catalog.name}"] + + for column in intersected_cols: + for suffix in suffixes: + assert f"{column}{suffix}" in merged_ddf.columns + + +def test_catalog_inner_merge(small_sky_catalog, small_sky_order1_catalog): + on = "id" + how = "inner" + suffixes = ("_left", "_right") + + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) + assert isinstance(merged_ddf, dd.DataFrame) + + merged_df = merged_ddf.compute() + left_df = small_sky_catalog._ddf.compute() + right_df = small_sky_order1_catalog._ddf.compute() + + # The join column matches the intersection of values on both dataframes + on_intersected = pd.Series(list(set(left_df[on]) & set(right_df[on]))) + assert_series_are_equal(merged_df[on], on_intersected) + + # The remaining columns come from the original dataframes + non_join_columns_df = merged_df.drop(on, axis=1) + assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) + + +def test_catalog_outer_merge(small_sky_catalog, small_sky_order1_catalog): + on = "id" + how = "outer" + suffixes = ("_left", "_right") + + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) + assert isinstance(merged_ddf, dd.DataFrame) + + merged_df = merged_ddf.compute() + left_df = small_sky_catalog._ddf.compute() + right_df = small_sky_order1_catalog._ddf.compute() + + # The join column matches the whole set of values on both dataframes + on_joined = pd.concat([left_df[on], right_df[on]]) + assert_series_are_equal(merged_df[on], on_joined) + + # The remaining columns come from the original dataframes + non_join_columns_df = merged_df.drop(on, axis=1) + assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) + + +def test_catalog_left_merge(small_sky_catalog, small_sky_order1_catalog): + on = "id" + how = "left" + suffixes = ("_left", "_right") + + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) + assert isinstance(merged_ddf, dd.DataFrame) + + merged_df = merged_ddf.compute() + left_df = small_sky_catalog.compute() + right_df = small_sky_order1_catalog._ddf.compute() + + # The join column matches the values on the left dataframe + assert_series_are_equal(merged_df[on], left_df[on]) + + # The remaining columns come from the original dataframes + non_join_columns_df = merged_df.drop(on, axis=1) + assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) + + +def test_catalog_right_merge(small_sky_catalog, small_sky_order1_catalog): + on = "id" + how = "right" + suffixes = ("_left", "_right") + + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) + assert isinstance(merged_ddf, dd.DataFrame) + + merged_df = merged_ddf.compute() + left_df = small_sky_catalog._ddf.compute() + right_df = small_sky_order1_catalog._ddf.compute() + + # The join column matches the values on the right dataframe + assert_series_are_equal(merged_df[on], right_df[on]) + + # The remaining columns come from the original dataframes + non_join_columns_df = merged_df.drop(on, axis=1) + assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) + + +def assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes): + """Ensures the columns of a merged dataframe have the expected provenience. If a column has + a suffix, the original dataframes had the same named column. If the column name has no suffix, + it is present in one of the dataframes, but not in both.""" + _left, _right = suffixes + for col_name, _ in non_join_columns_df.items(): + if col_name.endswith(_left): + original_col_name = col_name[: -len(_left)] + assert_series_are_equal(non_join_columns_df[col_name], left_df[original_col_name]) + elif col_name.endswith(_right): + original_col_name = col_name[: -len(_right)] + assert_series_are_equal(non_join_columns_df[col_name], right_df[original_col_name]) + elif col_name in left_df.columns: + assert col_name not in right_df.columns + assert_series_are_equal(non_join_columns_df[col_name], left_df[col_name]) + else: + assert col_name in right_df.columns and col_name not in left_df.columns + assert_series_are_equal(non_join_columns_df[col_name], right_df[col_name]) + + +def assert_series_are_equal(series_1, series_2): + """Checks if a pandas series is equal to another (in value), ignoring duplicates.""" + sorted_unique_1 = np.sort(series_1.drop_duplicates().to_numpy(), axis=0) + sorted_unique_2 = np.sort(series_2.drop_duplicates().to_numpy(), axis=0) + assert np.array_equal(sorted_unique_1, sorted_unique_2) From 6b014a25eabef5e01f1edd055cc69d1af41c2080 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Wed, 1 Nov 2023 15:52:32 -0400 Subject: [PATCH 2/3] Add link to pandas merge docs --- src/lsdb/catalog/catalog.py | 3 +++ tests/lsdb/catalog/test_merge.py | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index 8fed7367..2b548818 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -230,6 +230,9 @@ def merge( ) -> dd.DataFrame: """Performs the merge of two catalog Dataframes + More information about pandas merge is available + `here `__. + Args: other (Catalog): The right catalog to merge with. how (str): How to handle the merge of the two catalogs. diff --git a/tests/lsdb/catalog/test_merge.py b/tests/lsdb/catalog/test_merge.py index aec531e5..4ddf1ecd 100644 --- a/tests/lsdb/catalog/test_merge.py +++ b/tests/lsdb/catalog/test_merge.py @@ -44,7 +44,7 @@ def test_catalog_inner_merge(small_sky_catalog, small_sky_order1_catalog): # The join column matches the intersection of values on both dataframes on_intersected = pd.Series(list(set(left_df[on]) & set(right_df[on]))) - assert_series_are_equal(merged_df[on], on_intersected) + assert_series_match(merged_df[on], on_intersected) # The remaining columns come from the original dataframes non_join_columns_df = merged_df.drop(on, axis=1) @@ -65,7 +65,7 @@ def test_catalog_outer_merge(small_sky_catalog, small_sky_order1_catalog): # The join column matches the whole set of values on both dataframes on_joined = pd.concat([left_df[on], right_df[on]]) - assert_series_are_equal(merged_df[on], on_joined) + assert_series_match(merged_df[on], on_joined) # The remaining columns come from the original dataframes non_join_columns_df = merged_df.drop(on, axis=1) @@ -85,7 +85,7 @@ def test_catalog_left_merge(small_sky_catalog, small_sky_order1_catalog): right_df = small_sky_order1_catalog._ddf.compute() # The join column matches the values on the left dataframe - assert_series_are_equal(merged_df[on], left_df[on]) + assert_series_match(merged_df[on], left_df[on]) # The remaining columns come from the original dataframes non_join_columns_df = merged_df.drop(on, axis=1) @@ -105,7 +105,7 @@ def test_catalog_right_merge(small_sky_catalog, small_sky_order1_catalog): right_df = small_sky_order1_catalog._ddf.compute() # The join column matches the values on the right dataframe - assert_series_are_equal(merged_df[on], right_df[on]) + assert_series_match(merged_df[on], right_df[on]) # The remaining columns come from the original dataframes non_join_columns_df = merged_df.drop(on, axis=1) @@ -120,20 +120,20 @@ def assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, righ for col_name, _ in non_join_columns_df.items(): if col_name.endswith(_left): original_col_name = col_name[: -len(_left)] - assert_series_are_equal(non_join_columns_df[col_name], left_df[original_col_name]) + assert_series_match(non_join_columns_df[col_name], left_df[original_col_name]) elif col_name.endswith(_right): original_col_name = col_name[: -len(_right)] - assert_series_are_equal(non_join_columns_df[col_name], right_df[original_col_name]) + assert_series_match(non_join_columns_df[col_name], right_df[original_col_name]) elif col_name in left_df.columns: assert col_name not in right_df.columns - assert_series_are_equal(non_join_columns_df[col_name], left_df[col_name]) + assert_series_match(non_join_columns_df[col_name], left_df[col_name]) else: assert col_name in right_df.columns and col_name not in left_df.columns - assert_series_are_equal(non_join_columns_df[col_name], right_df[col_name]) + assert_series_match(non_join_columns_df[col_name], right_df[col_name]) -def assert_series_are_equal(series_1, series_2): - """Checks if a pandas series is equal to another (in value), ignoring duplicates.""" +def assert_series_match(series_1, series_2): + """Checks if a pandas series matches another in value, ignoring duplicates.""" sorted_unique_1 = np.sort(series_1.drop_duplicates().to_numpy(), axis=0) sorted_unique_2 = np.sort(series_2.drop_duplicates().to_numpy(), axis=0) assert np.array_equal(sorted_unique_1, sorted_unique_2) From c0ac6426d19ca9e93dcaf00cb84e957a624f0d2a Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Thu, 2 Nov 2023 09:41:41 -0400 Subject: [PATCH 3/3] Simplify catalog merge tests --- tests/lsdb/catalog/test_merge.py | 179 ++++++++++--------------------- 1 file changed, 59 insertions(+), 120 deletions(-) diff --git a/tests/lsdb/catalog/test_merge.py b/tests/lsdb/catalog/test_merge.py index 4ddf1ecd..17e81799 100644 --- a/tests/lsdb/catalog/test_merge.py +++ b/tests/lsdb/catalog/test_merge.py @@ -1,9 +1,62 @@ import dask.dataframe as dd -import numpy as np import pandas as pd import pytest +@pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) +def test_catalog_merge_on_indices(small_sky_catalog, small_sky_order1_catalog, how): + kwargs = { + "how": how, + "left_index": True, + "right_index": True, + "suffixes": ("_left", "_right") + } + # Setting the object "id" for index on both catalogs + small_sky_catalog._ddf = small_sky_catalog._ddf.set_index("id") + small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.set_index("id") + # The wrapper outputs the same result as the underlying pandas merge + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) + assert isinstance(merged_ddf, dd.DataFrame) + expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) + pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) + + +@pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) +def test_catalog_merge_on_columns(small_sky_catalog, small_sky_order1_catalog, how): + kwargs = { + "how": how, + "on": "id", + "suffixes": ("_left", "_right") + } + # Make sure none of the test catalogs have "id" for index + small_sky_catalog._ddf = small_sky_catalog._ddf.reset_index() + small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.reset_index() + # The wrapper outputs the same result as the underlying pandas merge + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) + assert isinstance(merged_ddf, dd.DataFrame) + expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) + pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) + + +@pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) +def test_catalog_merge_on_index_and_column(small_sky_catalog, small_sky_order1_catalog, how): + kwargs = { + "how": how, + "left_index": True, + "right_on": "id", + "suffixes": ("_left", "_right") + } + # Setting the object "id" for index on the left catalog + small_sky_catalog._ddf = small_sky_catalog._ddf.set_index("id") + # Make sure the right catalog does not have "id" for index + small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.reset_index() + # The wrapper outputs the same result as the underlying pandas merge + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) + assert isinstance(merged_ddf, dd.DataFrame) + expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) + pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) + + def test_catalog_merge_invalid_suffixes(small_sky_catalog, small_sky_order1_catalog): with pytest.raises(ValueError, match="`suffixes` must be a tuple with two strings"): small_sky_catalog.merge( @@ -12,128 +65,14 @@ def test_catalog_merge_invalid_suffixes(small_sky_catalog, small_sky_order1_cata def test_catalog_merge_no_suffixes(small_sky_catalog, small_sky_order1_catalog): - on = "id" - how = "inner" - - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on) + merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how="inner", on="id") assert isinstance(merged_ddf, dd.DataFrame) - - # Columns in the merged dataframe have the catalog name as suffix - non_join_columns_left = small_sky_catalog._ddf.columns.drop(on) - non_join_columns_right = small_sky_order1_catalog._ddf.columns.drop(on) + # Get the columns with the same name in both catalogs + non_join_columns_left = small_sky_catalog._ddf.columns.drop("id") + non_join_columns_right = small_sky_order1_catalog._ddf.columns.drop("id") intersected_cols = list(set(non_join_columns_left) & set(non_join_columns_right)) - + # The suffixes of these columns in the dataframe include the catalog names suffixes = [f"_{small_sky_catalog.name}", f"_{small_sky_order1_catalog.name}"] - for column in intersected_cols: for suffix in suffixes: assert f"{column}{suffix}" in merged_ddf.columns - - -def test_catalog_inner_merge(small_sky_catalog, small_sky_order1_catalog): - on = "id" - how = "inner" - suffixes = ("_left", "_right") - - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) - assert isinstance(merged_ddf, dd.DataFrame) - - merged_df = merged_ddf.compute() - left_df = small_sky_catalog._ddf.compute() - right_df = small_sky_order1_catalog._ddf.compute() - - # The join column matches the intersection of values on both dataframes - on_intersected = pd.Series(list(set(left_df[on]) & set(right_df[on]))) - assert_series_match(merged_df[on], on_intersected) - - # The remaining columns come from the original dataframes - non_join_columns_df = merged_df.drop(on, axis=1) - assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) - - -def test_catalog_outer_merge(small_sky_catalog, small_sky_order1_catalog): - on = "id" - how = "outer" - suffixes = ("_left", "_right") - - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) - assert isinstance(merged_ddf, dd.DataFrame) - - merged_df = merged_ddf.compute() - left_df = small_sky_catalog._ddf.compute() - right_df = small_sky_order1_catalog._ddf.compute() - - # The join column matches the whole set of values on both dataframes - on_joined = pd.concat([left_df[on], right_df[on]]) - assert_series_match(merged_df[on], on_joined) - - # The remaining columns come from the original dataframes - non_join_columns_df = merged_df.drop(on, axis=1) - assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) - - -def test_catalog_left_merge(small_sky_catalog, small_sky_order1_catalog): - on = "id" - how = "left" - suffixes = ("_left", "_right") - - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) - assert isinstance(merged_ddf, dd.DataFrame) - - merged_df = merged_ddf.compute() - left_df = small_sky_catalog.compute() - right_df = small_sky_order1_catalog._ddf.compute() - - # The join column matches the values on the left dataframe - assert_series_match(merged_df[on], left_df[on]) - - # The remaining columns come from the original dataframes - non_join_columns_df = merged_df.drop(on, axis=1) - assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) - - -def test_catalog_right_merge(small_sky_catalog, small_sky_order1_catalog): - on = "id" - how = "right" - suffixes = ("_left", "_right") - - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how=how, on=on, suffixes=suffixes) - assert isinstance(merged_ddf, dd.DataFrame) - - merged_df = merged_ddf.compute() - left_df = small_sky_catalog._ddf.compute() - right_df = small_sky_order1_catalog._ddf.compute() - - # The join column matches the values on the right dataframe - assert_series_match(merged_df[on], right_df[on]) - - # The remaining columns come from the original dataframes - non_join_columns_df = merged_df.drop(on, axis=1) - assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes) - - -def assert_other_columns_in_parent_dataframes(non_join_columns_df, left_df, right_df, suffixes): - """Ensures the columns of a merged dataframe have the expected provenience. If a column has - a suffix, the original dataframes had the same named column. If the column name has no suffix, - it is present in one of the dataframes, but not in both.""" - _left, _right = suffixes - for col_name, _ in non_join_columns_df.items(): - if col_name.endswith(_left): - original_col_name = col_name[: -len(_left)] - assert_series_match(non_join_columns_df[col_name], left_df[original_col_name]) - elif col_name.endswith(_right): - original_col_name = col_name[: -len(_right)] - assert_series_match(non_join_columns_df[col_name], right_df[original_col_name]) - elif col_name in left_df.columns: - assert col_name not in right_df.columns - assert_series_match(non_join_columns_df[col_name], left_df[col_name]) - else: - assert col_name in right_df.columns and col_name not in left_df.columns - assert_series_match(non_join_columns_df[col_name], right_df[col_name]) - - -def assert_series_match(series_1, series_2): - """Checks if a pandas series matches another in value, ignoring duplicates.""" - sorted_unique_1 = np.sort(series_1.drop_duplicates().to_numpy(), axis=0) - sorted_unique_2 = np.sort(series_2.drop_duplicates().to_numpy(), axis=0) - assert np.array_equal(sorted_unique_1, sorted_unique_2)