diff --git a/docs/index.rst b/docs/index.rst index 756b809b..12b439ac 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -47,3 +47,4 @@ Notes: Home page API Reference + Notebooks \ No newline at end of file diff --git a/docs/notebooks.rst b/docs/notebooks.rst new file mode 100644 index 00000000..60418880 --- /dev/null +++ b/docs/notebooks.rst @@ -0,0 +1,7 @@ +Notebooks +======================================================================================== + +.. toctree:: + :maxdepth: 1 + + Import catalogs \ No newline at end of file diff --git a/docs/notebooks/import_catalogs.ipynb b/docs/notebooks/import_catalogs.ipynb new file mode 100644 index 00000000..0913afb1 --- /dev/null +++ b/docs/notebooks/import_catalogs.ipynb @@ -0,0 +1,304 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "26685ece166fa3f7", + "metadata": { + "collapsed": false + }, + "source": [ + "# Importing catalogs to HiPSCat format\n", + "\n", + "This notebook presents two ways of importing catalogs to HiPSCat format. The first uses the __lsdb.from_dataframe()__ method, which is helpful to load smaller catalogs from a single dataframe, while the second uses the __hipscat import pipeline__." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c2de4ed424644058", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:55.941948Z", + "start_time": "2023-11-10T15:14:55.912372Z" + } + }, + "outputs": [], + "source": [ + "import lsdb\n", + "import os\n", + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b5f76fe439a32b9", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:55.946920Z", + "start_time": "2023-11-10T15:14:55.920402Z" + } + }, + "outputs": [], + "source": [ + "catalog_name = \"small_sky_order1\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59e857888ae9ea70", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:55.958272Z", + "start_time": "2023-11-10T15:14:55.924340Z" + } + }, + "outputs": [], + "source": [ + "# Input paths\n", + "test_data_dir = os.path.join(\"../../tests\", \"data\")\n", + "catalog_dir = os.path.join(test_data_dir, catalog_name)\n", + "catalog_csv_path = os.path.join(catalog_dir, f\"{catalog_name}.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "263e7a037919a20b", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:55.958815Z", + "start_time": "2023-11-10T15:14:55.927288Z" + } + }, + "outputs": [], + "source": [ + "# Output paths\n", + "catalog_from_dataframe = f\"{catalog_name}-from_dataframe\"\n", + "catalog_from_importer = f\"{catalog_name}-from_importer\"" + ] + }, + { + "cell_type": "markdown", + "id": "b5e74432b3437e2f", + "metadata": { + "collapsed": false + }, + "source": [ + "## Using lsdb.from_dataframe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "initial_id", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:55.987313Z", + "start_time": "2023-11-10T15:14:55.934107Z" + } + }, + "outputs": [], + "source": [ + "%%time\n", + "\n", + "# Read simple catalog from its CSV file\n", + "catalog = lsdb.from_dataframe(\n", + " pd.read_csv(catalog_csv_path),\n", + " catalog_name=catalog_from_dataframe,\n", + " catalog_type=\"object\",\n", + " highest_order=5,\n", + " threshold=100\n", + ")\n", + "\n", + "# Save it to disk in HiPSCat format\n", + "catalog.to_hipscat(catalog_from_dataframe)" + ] + }, + { + "cell_type": "markdown", + "id": "bc1cc2a7eac29dba", + "metadata": { + "collapsed": false + }, + "source": [ + "## Using the import pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "46bca7773cf6b261", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:57.130701Z", + "start_time": "2023-11-10T15:14:55.985757Z" + } + }, + "outputs": [], + "source": [ + "# Install hipscat-import\n", + "!pip install hipscat-import --quiet" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d2d1324d21d62c81", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:57.134432Z", + "start_time": "2023-11-10T15:14:57.131884Z" + } + }, + "outputs": [], + "source": [ + "from dask.distributed import Client\n", + "from hipscat_import.catalog.arguments import ImportArguments\n", + "from hipscat_import.pipeline import pipeline_with_client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9be67178e901acc", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:57.136858Z", + "start_time": "2023-11-10T15:14:57.134937Z" + } + }, + "outputs": [], + "source": [ + "# Create directory if it does not yet exist\n", + "os.makedirs(catalog_from_importer, exist_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59d1538d8545265", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:14:57.172913Z", + "start_time": "2023-11-10T15:14:57.138071Z" + } + }, + "outputs": [], + "source": [ + "args = ImportArguments(\n", + " id_column=\"id\",\n", + " ra_column=\"ra\",\n", + " dec_column=\"dec\",\n", + " highest_healpix_order=5,\n", + " pixel_threshold=100,\n", + " input_path=catalog_dir,\n", + " input_format=f\"small_sky_order1.csv\",\n", + " output_catalog_name=catalog_from_importer,\n", + " output_path=\".\",\n", + " dask_tmp=\".\",\n", + " overwrite=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "300eb3d3fcbb7b62", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:15:01.266753Z", + "start_time": "2023-11-10T15:14:57.156033Z" + } + }, + "outputs": [], + "source": [ + "%%time\n", + "with Client(local_directory=args.dask_tmp, n_workers=4) as client:\n", + " pipeline_with_client(args, client)" + ] + }, + { + "cell_type": "markdown", + "id": "5cffa1283ddbca39", + "metadata": { + "collapsed": false + }, + "source": [ + "### Load both catalogs and check that they are equivalent" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "620ff4e2b7ae291b", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:15:01.288751Z", + "start_time": "2023-11-10T15:15:01.264910Z" + } + }, + "outputs": [], + "source": [ + "from_dataframe_catalog = lsdb.read_hipscat(catalog_from_dataframe)\n", + "from_dataframe_catalog._ddf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "575daf21cd4cfcad", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:15:01.313421Z", + "start_time": "2023-11-10T15:15:01.279523Z" + } + }, + "outputs": [], + "source": [ + "from_importer_catalog = lsdb.read_hipscat(catalog_from_importer)\n", + "from_importer_catalog._ddf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b9d176a8dc303aa0", + "metadata": { + "ExecuteTime": { + "end_time": "2023-11-10T15:15:01.334413Z", + "start_time": "2023-11-10T15:15:01.289748Z" + } + }, + "outputs": [], + "source": [ + "# Verify that pixels are similar\n", + "assert from_dataframe_catalog.get_healpix_pixels() == from_importer_catalog.get_healpix_pixels()\n", + "# Verify that resulting dataframes contain the same data\n", + "pd.testing.assert_frame_equal(from_dataframe_catalog.compute().sort_index(), from_importer_catalog.compute().sort_index(), check_dtype=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index 23cfa38b..aa3f339f 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -1,19 +1,19 @@ from __future__ import annotations import dataclasses -from typing import Dict, List, Tuple, Type, cast +from typing import Any, Dict, List, Tuple, Type, Union, cast import dask.dataframe as dd import hipscat as hc from hipscat.pixel_math import HealpixPixel +from lsdb import io from lsdb.catalog.dataset.dataset import Dataset from lsdb.core.cone_search import cone_filter from lsdb.core.crossmatch.abstract_crossmatch_algorithm import AbstractCrossmatchAlgorithm from lsdb.core.crossmatch.crossmatch_algorithms import BuiltInCrossmatchAlgorithm from lsdb.dask.crossmatch_catalog_data import crossmatch_catalog_data - -DaskDFPixelMap = Dict[HealpixPixel, int] +from lsdb.types import DaskDFPixelMap # pylint: disable=R0903, W0212 @@ -25,7 +25,6 @@ class Catalog(Dataset): hc_structure: `hipscat.Catalog` object representing the structure and metadata of the HiPSCat catalog """ - hc_structure: hc.catalog.Catalog def __init__( @@ -247,3 +246,20 @@ def cone_search(self, ra: float, dec: float, radius: float): cone_search_ddf = cast(dd.DataFrame, cone_search_ddf) ddf_partition_map = {pixel: i for i, pixel in enumerate(pixels_in_cone)} return Catalog(cone_search_ddf, ddf_partition_map, filtered_hc_structure) + + def to_hipscat( + self, + base_catalog_path: str, + catalog_name: Union[str, None] = None, + storage_options: Union[Dict[Any, Any], None] = None, + **kwargs + ): + """Saves the catalog to disk in HiPSCat format + + Args: + base_catalog_path (str): Location where catalog is saved to + catalog_name (str): The name of the catalog to be saved + storage_options (dict): Dictionary that contains abstract filesystem credentials + **kwargs: Arguments to pass to the parquet write operations + """ + io.to_hipscat(self, base_catalog_path, catalog_name, storage_options, **kwargs) diff --git a/src/lsdb/dask/crossmatch_catalog_data.py b/src/lsdb/dask/crossmatch_catalog_data.py index a760ad16..4b3631b1 100644 --- a/src/lsdb/dask/crossmatch_catalog_data.py +++ b/src/lsdb/dask/crossmatch_catalog_data.py @@ -14,9 +14,10 @@ from lsdb.core.crossmatch.abstract_crossmatch_algorithm import AbstractCrossmatchAlgorithm from lsdb.core.crossmatch.crossmatch_algorithms import BuiltInCrossmatchAlgorithm from lsdb.core.crossmatch.kdtree_match import KdTreeCrossmatch +from lsdb.types import DaskDFPixelMap if TYPE_CHECKING: - from lsdb.catalog.catalog import Catalog, DaskDFPixelMap + from lsdb.catalog.catalog import Catalog builtin_crossmatch_algorithms = {BuiltInCrossmatchAlgorithm.KD_TREE: KdTreeCrossmatch} diff --git a/src/lsdb/io/__init__.py b/src/lsdb/io/__init__.py new file mode 100644 index 00000000..a6e35065 --- /dev/null +++ b/src/lsdb/io/__init__.py @@ -0,0 +1 @@ +from .to_hipscat import to_hipscat diff --git a/src/lsdb/io/to_hipscat.py b/src/lsdb/io/to_hipscat.py new file mode 100644 index 00000000..0c7962e1 --- /dev/null +++ b/src/lsdb/io/to_hipscat.py @@ -0,0 +1,154 @@ +import dataclasses +from copy import copy +from importlib.metadata import version +from typing import Any, Dict, Union + +import hipscat as hc +from hipscat.io import FilePointer +from hipscat.pixel_math import HealpixPixel + +from lsdb.types import HealpixInfo + + +# pylint: disable=W0212 +def to_hipscat( + catalog, + base_catalog_path: str, + catalog_name: Union[str, None] = None, + storage_options: Union[Dict[Any, Any], None] = None, + **kwargs, +): + """Writes a catalog to disk, in HiPSCat format. The output catalog comprises + partition parquet files and respective metadata, as well as JSON files detailing + partition, catalog and provenance info. + + Args: + catalog (Catalog): A catalog to export + base_catalog_path (str): Location where catalog is saved to + catalog_name (str): The name of the output catalog + storage_options (dict): Dictionary that contains abstract filesystem credentials + **kwargs: Arguments to pass to the parquet write operations + """ + # Create base directory + base_catalog_dir_fp = hc.io.get_file_pointer_from_path(base_catalog_path) + hc.io.file_io.make_directory(base_catalog_dir_fp) + # Save partition parquet files + pixel_to_partition_size_map = write_partitions(catalog, base_catalog_dir_fp, storage_options, **kwargs) + # Save parquet metadata + hc.io.write_parquet_metadata(base_catalog_path, storage_options, **kwargs) + # Save partition info + partition_info = _get_partition_info_dict(pixel_to_partition_size_map) + hc.io.write_partition_info(base_catalog_dir_fp, partition_info, storage_options) + # Save catalog info + new_hc_structure = create_modified_catalog_structure( + catalog.hc_structure, + base_catalog_path, + catalog_name if catalog_name else catalog.hc_structure.catalog_name, + total_rows=sum(pi[0] for pi in partition_info.values()), + ) + hc.io.write_catalog_info( + catalog_base_dir=base_catalog_path, + dataset_info=new_hc_structure.catalog_info, + storage_options=storage_options, + ) + # Save provenance info + hc.io.write_metadata.write_provenance_info( + catalog_base_dir=base_catalog_dir_fp, + dataset_info=new_hc_structure.catalog_info, + tool_args=_get_provenance_info(new_hc_structure), + storage_options=storage_options, + ) + + +def write_partitions( + catalog, base_catalog_dir_fp: FilePointer, storage_options: Union[Dict[Any, Any], None] = None, **kwargs +) -> Dict[HealpixPixel, int]: + """Saves catalog partitions as parquet to disk + + Args: + catalog (Catalog): A catalog to export + base_catalog_dir_fp (FilePointer): Path to the base directory of the catalog + storage_options (dict): Dictionary that contains abstract filesystem credentials + **kwargs: Arguments to pass to the parquet write operations + + Returns: + A dictionary mapping each HEALPix pixel to the number of data points in it. + """ + pixel_to_partition_size_map = {} + + for hp_pixel, partition_index in catalog._ddf_pixel_map.items(): + # Create pixel directory if it does not exist + pixel_dir = hc.io.pixel_directory(base_catalog_dir_fp, hp_pixel.order, hp_pixel.pixel) + hc.io.file_io.make_directory(pixel_dir, exist_ok=True, storage_options=storage_options) + # Write parquet file + partition = catalog._ddf.partitions[partition_index].compute() + pixel_path = hc.io.paths.pixel_catalog_file(base_catalog_dir_fp, hp_pixel.order, hp_pixel.pixel) + hc.io.file_io.write_dataframe_to_parquet(partition, pixel_path, storage_options, **kwargs) + pixel_to_partition_size_map[hp_pixel] = len(partition) + + return pixel_to_partition_size_map + + +def _get_partition_info_dict(ddf_points_map: Dict[HealpixPixel, int]) -> Dict[HealpixPixel, HealpixInfo]: + """Creates the partition info dictionary + + Args: + ddf_points_map (Dict[HealpixPix,int]): Dictionary mapping each HealpixPixel + to the respective number of points inside its partition + + Returns: + A partition info dictionary, where the keys are the HEALPix pixels and + the values are pairs where the first element is the number of points + inside the pixel, and the second is the list of destination pixel numbers. + """ + return {pixel: (length, [pixel.pixel]) for pixel, length in ddf_points_map.items()} + + +def create_modified_catalog_structure( + catalog_structure: hc.catalog.Catalog, catalog_base_dir: str, catalog_name: str, **kwargs +) -> hc.catalog.Catalog: + """Creates a modified version of the HiPSCat catalog structure + + Args: + catalog_structure (hc.catalog.Catalog): HiPSCat catalog structure + catalog_base_dir (str): Base location for the catalog + catalog_name (str): The name of the catalog to be saved + **kwargs: The remaining parameters to be updated in the catalog info object + + Returns: + A HiPSCat structure, modified with the parameters provided. + """ + new_hc_structure = copy(catalog_structure) + new_hc_structure.catalog_name = catalog_name + new_hc_structure.catalog_path = catalog_base_dir + new_hc_structure.catalog_base_dir = hc.io.file_io.get_file_pointer_from_path(catalog_base_dir) + new_hc_structure.on_disk = True + new_hc_structure.catalog_info = dataclasses.replace( + new_hc_structure.catalog_info, catalog_name=catalog_name, **kwargs + ) + return new_hc_structure + + +def _get_provenance_info(catalog_structure: hc.catalog.Catalog) -> dict: + """Fill all known information in a dictionary for provenance tracking. + + Returns: + dictionary with all argument_name -> argument_value as key -> value pairs. + """ + catalog_info = catalog_structure.catalog_info + args = { + "catalog_name": catalog_structure.catalog_name, + "output_path": catalog_structure.catalog_path, + "output_catalog_name": catalog_structure.catalog_name, + "catalog_path": catalog_structure.catalog_path, + "epoch": catalog_info.epoch, + "catalog_type": catalog_info.catalog_type, + "ra_column": catalog_info.ra_column, + "dec_column": catalog_info.dec_column, + } + provenance_info = { + "tool_name": "lsdb", + "version": version("lsdb"), + "runtime_args": args, + } + return provenance_info diff --git a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py index a9b3059b..f1cce155 100644 --- a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py +++ b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py @@ -1,5 +1,6 @@ from __future__ import annotations +import dataclasses import math from typing import Dict, List, Tuple @@ -11,14 +12,11 @@ from hipscat.catalog.catalog_info import CatalogInfo from hipscat.pixel_math import HealpixPixel, generate_histogram from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, compute_hipscat_id, healpix_to_hipscat_id -from typing_extensions import TypeAlias -from lsdb.catalog.catalog import Catalog, DaskDFPixelMap +from lsdb.catalog.catalog import Catalog +from lsdb.types import DaskDFPixelMap, HealpixInfo -# Compute pixel map returns a tuple. The first element is -# the number of data points within the HEALPix pixel, the -# second element is the list of pixels it contains. -HealpixInfo: TypeAlias = Tuple[int, List[int]] +pd.options.mode.chained_assignment = None # default='warn' class DataframeCatalogLoader: @@ -31,7 +29,7 @@ def __init__( dataframe: pd.DataFrame, lowest_order: int = 0, highest_order: int = 5, - partition_size: float | None = None, + partition_size: int | None = None, threshold: int | None = None, **kwargs, ) -> None: @@ -41,7 +39,7 @@ def __init__( dataframe (pd.Dataframe): Catalog Pandas Dataframe lowest_order (int): The lowest partition order highest_order (int): The highest partition order - partition_size (float): The desired partition size, in megabytes + partition_size (int): The desired partition size, in number of rows threshold (int): The maximum number of data points per pixel **kwargs: Arguments to pass to the creation of the catalog info """ @@ -51,12 +49,12 @@ def __init__( self.threshold = self._calculate_threshold(partition_size, threshold) self.catalog_info = self._create_catalog_info(**kwargs) - def _calculate_threshold(self, partition_size: float | None = None, threshold: int | None = None) -> int: - """Calculates the number of pixels per HEALPix pixel (threshold) - for the desired partition size. + def _calculate_threshold(self, partition_size: int | None = None, threshold: int | None = None) -> int: + """Calculates the number of pixels per HEALPix pixel (threshold) for the + desired partition size. Args: - partition_size (float): The desired partition size, in megabytes + partition_size (int): The desired partition size, in number of rows threshold (int): The maximum number of data points per pixel Returns: @@ -66,10 +64,9 @@ def _calculate_threshold(self, partition_size: float | None = None, threshold: i raise ValueError("Specify only one: threshold or partition_size") if threshold is None: if partition_size is not None: - df_size_bytes = self.dataframe.memory_usage().sum() # Round the number of partitions to the next integer, otherwise the # number of pixels per partition may exceed the threshold - num_partitions = math.ceil(df_size_bytes / (partition_size * (1 << 20))) + num_partitions = math.ceil(len(self.dataframe) / partition_size) threshold = len(self.dataframe.index) // num_partitions else: threshold = DataframeCatalogLoader.DEFAULT_THRESHOLD @@ -99,7 +96,8 @@ def load_catalog(self) -> Catalog: """ self._set_hipscat_index() pixel_map = self._compute_pixel_map() - ddf, ddf_pixel_map = self._generate_dask_df_and_map(pixel_map) + ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixel_map) + self.catalog_info = dataclasses.replace(self.catalog_info, total_rows=total_rows) healpix_pixels = list(pixel_map.keys()) hc_structure = hc.catalog.Catalog(self.catalog_info, healpix_pixels) return Catalog(ddf, ddf_pixel_map, hc_structure) @@ -137,7 +135,7 @@ def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]: def _generate_dask_df_and_map( self, pixel_map: Dict[HealpixPixel, HealpixInfo] - ) -> Tuple[dd.DataFrame, DaskDFPixelMap]: + ) -> Tuple[dd.DataFrame, DaskDFPixelMap, int]: """Load Dask DataFrame from HEALPix pixel Dataframes and generate a mapping of HEALPix pixels to HEALPix Dataframes @@ -146,8 +144,8 @@ def _generate_dask_df_and_map( HEALPix pixels and respective data information Returns: - Tuple containing the Dask Dataframe and the mapping of - HEALPix pixels to the respective Pandas Dataframes + Tuple containing the Dask Dataframe, the mapping of HEALPix pixels + to the respective Pandas Dataframes and the total number of rows. """ # Dataframes for each destination HEALPix pixel pixel_dfs: List[pd.DataFrame] = [] @@ -156,18 +154,43 @@ def _generate_dask_df_and_map( for hp_pixel_index, hp_pixel_info in enumerate(pixel_map.items()): hp_pixel, (_, pixels) = hp_pixel_info - # Obtain Dataframe for the current HEALPix pixel - pixel_dfs.append(self._get_dataframe_for_healpix(pixels)) ddf_pixel_map[hp_pixel] = hp_pixel_index + # Obtain Dataframe for the current HEALPix pixel + df = self._get_dataframe_for_healpix(pixels) + df = self.append_partition_information_to_dataframe(df, hp_pixel) + # Save current dataframe + pixel_dfs.append(df) # Generate Dask Dataframe with original schema - schema = pd.DataFrame(columns=self.dataframe.columns).astype(self.dataframe.dtypes) - ddf = self._generate_dask_dataframe(pixel_dfs, schema) + schema = pd.DataFrame(columns=pixel_dfs[0].columns).astype(pixel_dfs[0].dtypes) + ddf, total_rows = self._generate_dask_dataframe(pixel_dfs, schema) + + return ddf, ddf_pixel_map, total_rows + + def append_partition_information_to_dataframe(self, dataframe: pd.DataFrame, pixel: HealpixPixel): + """Appends partitioning information to a HEALPix dataframe - return ddf, ddf_pixel_map + Args: + dataframe (pd.Dataframe): A HEALPix's pandas dataframe + pixel (HealpixPixel): The HEALPix pixel for the current partition + + Returns: + The dataframe for a HEALPix, with data points and respective partition information. + """ + ordered_columns = ["Norder", "Dir", "Npix"] + # Generate partition information + dataframe["Norder"] = pixel.order + dataframe["Npix"] = pixel.pixel + dataframe["Dir"] = [int(x / 10_000) * 10_000 for x in dataframe["Npix"]] + # Force new column types to int + dataframe[ordered_columns] = dataframe[ordered_columns].astype(int) + # Reorder the columns to match full path + return dataframe[[col for col in dataframe.columns if col not in ordered_columns] + ordered_columns] @staticmethod - def _generate_dask_dataframe(pixel_dfs: List[pd.DataFrame], schema: pd.DataFrame) -> dd.DataFrame: + def _generate_dask_dataframe( + pixel_dfs: List[pd.DataFrame], schema: pd.DataFrame + ) -> Tuple[dd.DataFrame, int]: """Create the Dask Dataframe from the list of HEALPix pixel Dataframes Args: @@ -175,11 +198,11 @@ def _generate_dask_dataframe(pixel_dfs: List[pd.DataFrame], schema: pd.DataFrame schema (pd.Dataframe): The original Dataframe schema Returns: - The catalog's Dask Dataframe + The catalog's Dask Dataframe and its total number of rows. """ delayed_dfs = [delayed(df) for df in pixel_dfs] ddf = dd.from_delayed(delayed_dfs, meta=schema) - return ddf if isinstance(ddf, dd.DataFrame) else ddf.to_frame() + return ddf if isinstance(ddf, dd.DataFrame) else ddf.to_frame(), len(ddf) def _get_dataframe_for_healpix(self, pixels: List[int]) -> pd.DataFrame: """Computes the Pandas Dataframe containing the data points diff --git a/src/lsdb/loaders/dataframe/from_dataframe.py b/src/lsdb/loaders/dataframe/from_dataframe.py index c099d69d..34667925 100644 --- a/src/lsdb/loaders/dataframe/from_dataframe.py +++ b/src/lsdb/loaders/dataframe/from_dataframe.py @@ -10,7 +10,7 @@ def from_dataframe( dataframe: pd.DataFrame, lowest_order: int = 0, highest_order: int = 5, - partition_size: float | None = None, + partition_size: int | None = None, threshold: int | None = None, **kwargs, ) -> Catalog: @@ -20,7 +20,7 @@ def from_dataframe( dataframe (pd.Dataframe): The catalog Pandas Dataframe lowest_order (int): The lowest partition order highest_order (int): The highest partition order - partition_size (float): The desired partition size, in megabytes + partition_size (int): The desired partition size, in number of rows threshold (int): The maximum number of data points per pixel **kwargs: Arguments to pass to the creation of the catalog info diff --git a/src/lsdb/types.py b/src/lsdb/types.py new file mode 100644 index 00000000..65a544f6 --- /dev/null +++ b/src/lsdb/types.py @@ -0,0 +1,11 @@ +from typing import Dict, List, Tuple + +from hipscat.pixel_math import HealpixPixel +from typing_extensions import TypeAlias + +# Compute pixel map returns a tuple. The first element is +# the number of data points within the HEALPix pixel, the +# second element is the list of pixels it contains. +HealpixInfo: TypeAlias = Tuple[int, List[int]] + +DaskDFPixelMap = Dict[HealpixPixel, int] diff --git a/tests/data/small_sky/Norder=0/Dir=0/Npix=11.parquet b/tests/data/small_sky/Norder=0/Dir=0/Npix=11.parquet index b28a398f..d05f7ce2 100644 Binary files a/tests/data/small_sky/Norder=0/Dir=0/Npix=11.parquet and b/tests/data/small_sky/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/data/small_sky/_common_metadata b/tests/data/small_sky/_common_metadata index db74d1f7..d2200672 100644 Binary files a/tests/data/small_sky/_common_metadata and b/tests/data/small_sky/_common_metadata differ diff --git a/tests/data/small_sky/_metadata b/tests/data/small_sky/_metadata index d13f08fc..57556cfd 100644 Binary files a/tests/data/small_sky/_metadata and b/tests/data/small_sky/_metadata differ diff --git a/tests/data/small_sky/catalog_info.json b/tests/data/small_sky/catalog_info.json index 45f7922a..b3fd9a27 100644 --- a/tests/data/small_sky/catalog_info.json +++ b/tests/data/small_sky/catalog_info.json @@ -1,12 +1,8 @@ { "catalog_name": "small_sky", "catalog_type": "object", - "version": "0.0.1", - "generation_date": "2022.12.20", + "total_rows": 131, "epoch": "J2000", - "ra_kw": "ra", - "dec_kw": "dec", - "id_kw": "id", - "total_objects": 131, - "pixel_threshold": 1000000 -} \ No newline at end of file + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/tests/data/small_sky_no_metadata/Norder=0/Dir=0/Npix=11.parquet b/tests/data/small_sky_no_metadata/Norder=0/Dir=0/Npix=11.parquet index b28a398f..d05f7ce2 100644 Binary files a/tests/data/small_sky_no_metadata/Norder=0/Dir=0/Npix=11.parquet and b/tests/data/small_sky_no_metadata/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/data/small_sky_no_metadata/catalog_info.json b/tests/data/small_sky_no_metadata/catalog_info.json index 45f7922a..b3fd9a27 100644 --- a/tests/data/small_sky_no_metadata/catalog_info.json +++ b/tests/data/small_sky_no_metadata/catalog_info.json @@ -1,12 +1,8 @@ { "catalog_name": "small_sky", "catalog_type": "object", - "version": "0.0.1", - "generation_date": "2022.12.20", + "total_rows": 131, "epoch": "J2000", - "ra_kw": "ra", - "dec_kw": "dec", - "id_kw": "id", - "total_objects": 131, - "pixel_threshold": 1000000 -} \ No newline at end of file + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=44.parquet b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=44.parquet index 30ef36c8..ea9027f2 100644 Binary files a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=44.parquet and b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=44.parquet differ diff --git a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=45.parquet b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=45.parquet index b0af099e..a740a599 100644 Binary files a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=45.parquet and b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=45.parquet differ diff --git a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=46.parquet b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=46.parquet index 7d4ad212..1758728b 100644 Binary files a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=46.parquet and b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=46.parquet differ diff --git a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=47.parquet b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=47.parquet index 01f723de..388c3ca6 100644 Binary files a/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=47.parquet and b/tests/data/small_sky_order1/Norder=1/Dir=0/Npix=47.parquet differ diff --git a/tests/data/small_sky_order1/_common_metadata b/tests/data/small_sky_order1/_common_metadata index c91dd855..d2200672 100644 Binary files a/tests/data/small_sky_order1/_common_metadata and b/tests/data/small_sky_order1/_common_metadata differ diff --git a/tests/data/small_sky_order1/_metadata b/tests/data/small_sky_order1/_metadata index 5cc8539c..7354a76d 100644 Binary files a/tests/data/small_sky_order1/_metadata and b/tests/data/small_sky_order1/_metadata differ diff --git a/tests/data/small_sky_order1/catalog_info.json b/tests/data/small_sky_order1/catalog_info.json index 614fa554..a61f882a 100644 --- a/tests/data/small_sky_order1/catalog_info.json +++ b/tests/data/small_sky_order1/catalog_info.json @@ -1,12 +1,8 @@ { "catalog_name": "small_sky_order1", - "catalog_type": "source", - "version": "0.0.0", - "generation_date": "2022.12.21", + "catalog_type": "object", + "total_rows": 131, "epoch": "J2000", - "ra_kw": "ra", - "dec_kw": "dec", - "id_kw": "id", - "total_objects": 131, - "pixel_threshold": 50 -} \ No newline at end of file + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/tests/data/small_sky_order1/small_sky_order1.csv b/tests/data/small_sky_order1/small_sky_order1.csv index b11f8287..1bb66e95 100644 --- a/tests/data/small_sky_order1/small_sky_order1.csv +++ b/tests/data/small_sky_order1/small_sky_order1.csv @@ -1,132 +1,132 @@ id,ra,dec,ra_error,dec_error +700,282.5,-58.5,0,0 +701,299.5,-48.5,0,0 +702,310.5,-27.5,0,0 703,286.5,-69.5,0,0 -707,308.5,-69.5,0,0 -716,305.5,-60.5,0,0 -718,292.5,-60.5,0,0 -723,315.5,-68.5,0,0 -729,299.5,-59.5,0,0 -730,322.5,-61.5,0,0 -733,329.5,-65.5,0,0 -734,348.5,-66.5,0,0 -735,299.5,-65.5,0,0 -736,303.5,-52.5,0,0 -738,345.5,-64.5,0,0 -739,332.5,-57.5,0,0 -747,327.5,-61.5,0,0 -748,296.5,-63.5,0,0 -750,338.5,-67.5,0,0 -758,325.5,-53.5,0,0 -760,320.5,-53.5,0,0 -766,310.5,-63.5,0,0 -768,297.5,-60.5,0,0 -771,348.5,-67.5,0,0 -772,348.5,-64.5,0,0 -775,321.5,-54.5,0,0 -776,344.5,-63.5,0,0 -780,326.5,-52.5,0,0 -787,320.5,-47.5,0,0 -792,320.5,-69.5,0,0 -794,300.5,-66.5,0,0 -795,306.5,-58.5,0,0 -797,308.5,-62.5,0,0 -801,309.5,-50.5,0,0 -804,322.5,-66.5,0,0 -807,303.5,-60.5,0,0 -810,301.5,-59.5,0,0 -811,315.5,-68.5,0,0 -815,283.5,-68.5,0,0 -816,288.5,-69.5,0,0 -817,318.5,-48.5,0,0 -818,300.5,-55.5,0,0 -822,301.5,-54.5,0,0 -826,335.5,-69.5,0,0 -830,306.5,-50.5,0,0 704,326.5,-45.5,0,0 705,335.5,-32.5,0,0 -710,341.5,-39.5,0,0 -719,344.5,-39.5,0,0 -720,344.5,-47.5,0,0 -722,350.5,-58.5,0,0 -724,323.5,-41.5,0,0 -726,341.5,-37.5,0,0 -728,328.5,-47.5,0,0 -731,343.5,-52.5,0,0 -732,337.5,-39.5,0,0 -742,348.5,-45.5,0,0 -744,349.5,-39.5,0,0 -745,337.5,-38.5,0,0 -751,330.5,-44.5,0,0 -757,346.5,-34.5,0,0 -761,329.5,-29.5,0,0 -762,327.5,-51.5,0,0 -779,347.5,-29.5,0,0 -781,330.5,-46.5,0,0 -784,338.5,-40.5,0,0 -786,336.5,-33.5,0,0 -803,336.5,-25.5,0,0 -808,320.5,-40.5,0,0 -812,346.5,-60.5,0,0 -813,349.5,-37.5,0,0 -821,330.5,-52.5,0,0 -823,338.5,-45.5,0,0 -828,330.5,-26.5,0,0 -700,282.5,-58.5,0,0 -701,299.5,-48.5,0,0 706,297.5,-36.5,0,0 +707,308.5,-69.5,0,0 708,307.5,-37.5,0,0 709,294.5,-45.5,0,0 +710,341.5,-39.5,0,0 711,305.5,-49.5,0,0 712,288.5,-49.5,0,0 713,298.5,-41.5,0,0 714,303.5,-37.5,0,0 715,280.5,-35.5,0,0 +716,305.5,-60.5,0,0 717,303.5,-43.5,0,0 +718,292.5,-60.5,0,0 +719,344.5,-39.5,0,0 +720,344.5,-47.5,0,0 +721,314.5,-34.5,0,0 +722,350.5,-58.5,0,0 +723,315.5,-68.5,0,0 +724,323.5,-41.5,0,0 725,308.5,-41.5,0,0 +726,341.5,-37.5,0,0 727,301.5,-44.5,0,0 +728,328.5,-47.5,0,0 +729,299.5,-59.5,0,0 +730,322.5,-61.5,0,0 +731,343.5,-52.5,0,0 +732,337.5,-39.5,0,0 +733,329.5,-65.5,0,0 +734,348.5,-66.5,0,0 +735,299.5,-65.5,0,0 +736,303.5,-52.5,0,0 +737,316.5,-33.5,0,0 +738,345.5,-64.5,0,0 +739,332.5,-57.5,0,0 740,306.5,-33.5,0,0 741,303.5,-38.5,0,0 +742,348.5,-45.5,0,0 +743,307.5,-25.5,0,0 +744,349.5,-39.5,0,0 +745,337.5,-38.5,0,0 746,283.5,-31.5,0,0 +747,327.5,-61.5,0,0 +748,296.5,-63.5,0,0 749,293.5,-55.5,0,0 +750,338.5,-67.5,0,0 +751,330.5,-44.5,0,0 752,291.5,-34.5,0,0 753,307.5,-45.5,0,0 +754,313.5,-30.5,0,0 755,303.5,-38.5,0,0 +756,319.5,-35.5,0,0 +757,346.5,-34.5,0,0 +758,325.5,-53.5,0,0 759,290.5,-48.5,0,0 +760,320.5,-53.5,0,0 +761,329.5,-29.5,0,0 +762,327.5,-51.5,0,0 763,306.5,-38.5,0,0 764,297.5,-45.5,0,0 765,306.5,-35.5,0,0 +766,310.5,-63.5,0,0 +767,314.5,-29.5,0,0 +768,297.5,-60.5,0,0 769,307.5,-42.5,0,0 770,285.5,-29.5,0,0 +771,348.5,-67.5,0,0 +772,348.5,-64.5,0,0 773,293.5,-50.5,0,0 774,281.5,-54.5,0,0 +775,321.5,-54.5,0,0 +776,344.5,-63.5,0,0 777,307.5,-39.5,0,0 +778,313.5,-36.5,0,0 +779,347.5,-29.5,0,0 +780,326.5,-52.5,0,0 +781,330.5,-46.5,0,0 782,290.5,-39.5,0,0 783,286.5,-42.5,0,0 +784,338.5,-40.5,0,0 785,296.5,-44.5,0,0 +786,336.5,-33.5,0,0 +787,320.5,-47.5,0,0 788,283.5,-61.5,0,0 789,287.5,-45.5,0,0 790,286.5,-35.5,0,0 -793,289.5,-58.5,0,0 -800,299.5,-37.5,0,0 -802,304.5,-49.5,0,0 -805,297.5,-52.5,0,0 -809,283.5,-34.5,0,0 -820,286.5,-46.5,0,0 -827,310.5,-40.5,0,0 -702,310.5,-27.5,0,0 -721,314.5,-34.5,0,0 -737,316.5,-33.5,0,0 -743,307.5,-25.5,0,0 -754,313.5,-30.5,0,0 -756,319.5,-35.5,0,0 -767,314.5,-29.5,0,0 -778,313.5,-36.5,0,0 791,312.5,-28.5,0,0 +792,320.5,-69.5,0,0 +793,289.5,-58.5,0,0 +794,300.5,-66.5,0,0 +795,306.5,-58.5,0,0 796,320.5,-33.5,0,0 +797,308.5,-62.5,0,0 798,316.5,-36.5,0,0 799,313.5,-31.5,0,0 +800,299.5,-37.5,0,0 +801,309.5,-50.5,0,0 +802,304.5,-49.5,0,0 +803,336.5,-25.5,0,0 +804,322.5,-66.5,0,0 +805,297.5,-52.5,0,0 806,312.5,-29.5,0,0 +807,303.5,-60.5,0,0 +808,320.5,-40.5,0,0 +809,283.5,-34.5,0,0 +810,301.5,-59.5,0,0 +811,315.5,-68.5,0,0 +812,346.5,-60.5,0,0 +813,349.5,-37.5,0,0 814,312.5,-33.5,0,0 +815,283.5,-68.5,0,0 +816,288.5,-69.5,0,0 +817,318.5,-48.5,0,0 +818,300.5,-55.5,0,0 819,313.5,-35.5,0,0 +820,286.5,-46.5,0,0 +821,330.5,-52.5,0,0 +822,301.5,-54.5,0,0 +823,338.5,-45.5,0,0 824,305.5,-28.5,0,0 825,315.5,-30.5,0,0 +826,335.5,-69.5,0,0 +827,310.5,-40.5,0,0 +828,330.5,-26.5,0,0 829,314.5,-35.5,0,0 +830,306.5,-50.5,0,0 \ No newline at end of file diff --git a/tests/lsdb/catalog/test_catalog.py b/tests/lsdb/catalog/test_catalog.py index 1cb1acd7..8758a1c6 100644 --- a/tests/lsdb/catalog/test_catalog.py +++ b/tests/lsdb/catalog/test_catalog.py @@ -1,9 +1,13 @@ +import os + import dask.array as da import dask.dataframe as dd import pandas as pd import pytest from hipscat.pixel_math import HealpixPixel +import lsdb + def test_catalog_pixels_equals_hc_catalog_pixels(small_sky_order1_catalog, small_sky_order1_hipscat_catalog): assert ( @@ -106,3 +110,14 @@ def test_assign_with_invalid_arguments(small_sky_order1_catalog): chunks = small_sky_order1_catalog._ddf.npartitions + 1 array = da.random.random(size=10, chunks=chunks) small_sky_order1_catalog.assign(new_column=array) + + +def test_save_catalog(small_sky_catalog, tmp_path): + new_catalog_name = "small_sky" + base_catalog_path = os.path.join(tmp_path, new_catalog_name) + small_sky_catalog.to_hipscat(base_catalog_path, catalog_name=new_catalog_name) + expected_catalog = lsdb.read_hipscat(base_catalog_path) + assert expected_catalog.hc_structure.catalog_name == new_catalog_name + assert expected_catalog.hc_structure.catalog_info == small_sky_catalog.hc_structure.catalog_info + assert expected_catalog.get_healpix_pixels() == small_sky_catalog.get_healpix_pixels() + pd.testing.assert_frame_equal(expected_catalog.compute(), small_sky_catalog._ddf.compute()) diff --git a/tests/lsdb/loaders/dataframe/test_from_dataframe.py b/tests/lsdb/loaders/dataframe/test_from_dataframe.py index 893d8458..bada68b1 100644 --- a/tests/lsdb/loaders/dataframe/test_from_dataframe.py +++ b/tests/lsdb/loaders/dataframe/test_from_dataframe.py @@ -33,9 +33,9 @@ def test_from_dataframe(small_sky_order1_df, small_sky_order1_catalog): assert catalog.hc_structure.catalog_info == small_sky_order1_catalog.hc_structure.catalog_info # Dataframes have the same data (column data types may differ) pd.testing.assert_frame_equal( - catalog.compute().reset_index(drop=True), - small_sky_order1_catalog.compute().reset_index(drop=True), - check_column_type=False, + catalog.compute().sort_index(), + small_sky_order1_catalog.compute().sort_index(), + check_dtype=False, ) @@ -107,19 +107,14 @@ def test_from_dataframe_with_non_default_ra_dec_columns(small_sky_order1_df, sma def test_partitions_obey_partition_size(small_sky_order1_df, small_sky_order1_catalog): """Tests that partitions are limited by the specified size""" - # Use 1KB size partitions - partition_size_bytes = 1 << 10 - partition_size_megabytes = partition_size_bytes / (1 << 20) + # Use partitions with 10 rows + partition_size = 10 # Read CSV file for the small sky order 1 catalog - kwargs = get_catalog_kwargs( - small_sky_order1_catalog, partition_size=partition_size_megabytes, threshold=None - ) + kwargs = get_catalog_kwargs(small_sky_order1_catalog, partition_size=partition_size, threshold=None) catalog = lsdb.from_dataframe(small_sky_order1_df, **kwargs) # Calculate size of dataframe per partition - partition_sizes = [ - partition_df.compute().memory_usage().sum() for partition_df in catalog._ddf.partitions - ] - assert all(size <= partition_size_bytes for size in partition_sizes) + partition_sizes = [len(partition_df) for partition_df in catalog._ddf.partitions] + assert all(size <= partition_size for size in partition_sizes) def test_partitions_obey_threshold(small_sky_order1_df, small_sky_order1_catalog):