From 6d8c9f457d1506a123831b3237b4a022a06d909d Mon Sep 17 00:00:00 2001 From: usha-nemani-jdas <57137791+usha-nemani-jdas@users.noreply.github.com> Date: Wed, 26 Aug 2020 13:46:41 +0530 Subject: [PATCH] Added docs for ktk cube functionality (#334) * Added docs for ktk cube functionality * Added glossary file, updated all cube related files with minor corrections * Removed downpinning of Sphinx, as the issue is resolved in 3.2.1 release --- conda-test-requirements.txt | 2 +- docs/guide/cube/command_line_features.rst | 151 ++++++++ docs/guide/cube/cube_design_features.rst | 243 ++++++++++++ docs/guide/cube/examples.rst | 428 ++++++++++++++++++++++ docs/guide/cube/glossary.rst | 79 ++++ docs/guide/cube/kartothek_cubes.rst | 23 ++ docs/guide/cube/query_system.rst | 294 +++++++++++++++ docs/index.rst | 14 +- test-requirements.txt | 2 +- 9 files changed, 1233 insertions(+), 3 deletions(-) create mode 100644 docs/guide/cube/command_line_features.rst create mode 100644 docs/guide/cube/cube_design_features.rst create mode 100644 docs/guide/cube/examples.rst create mode 100644 docs/guide/cube/glossary.rst create mode 100644 docs/guide/cube/kartothek_cubes.rst create mode 100644 docs/guide/cube/query_system.rst diff --git a/conda-test-requirements.txt b/conda-test-requirements.txt index bb04ba7a..d593d2bb 100644 --- a/conda-test-requirements.txt +++ b/conda-test-requirements.txt @@ -15,7 +15,7 @@ setuptools_scm asv # Documentation -sphinx<3.2.0 # https://github.com/sphinx-doc/sphinx/issues/8096 +sphinx sphinx_rtd_theme sphinx-click IPython diff --git a/docs/guide/cube/command_line_features.rst b/docs/guide/cube/command_line_features.rst new file mode 100644 index 00000000..2d39e5c2 --- /dev/null +++ b/docs/guide/cube/command_line_features.rst @@ -0,0 +1,151 @@ + +Command Line Features +--------------------- + +.. raw:: html + + + +Kartothek Cube also features a command line interface (CLI) for some cube operations. To use it, create a ``skv.yml`` file that +describes `storefact`_ stores: + +.. code-block:: yaml + + dataset: + type: hfs + path: path/to/data + +Now use the ``kartothek_cube`` command to gather certain cube information: + +.. code-block:: bash + + kartothek_cube geodata info + +.. raw:: html + +
+   Infos
+   UUID Prefix:        geodata
+   Dimension Columns:
+     - city: string
+     - day: timestamp[ns]
+   Partition Columns:
+     - country: string
+   Index Columns:
+    
+   Seed Dataset:      seed
+    
+   Dataset: latlong
+   Partition Keys:
+     - country: string
+   Partitions: 4
+   Metadata:
+     {
+       "creation_time": "2019-10-01T12:11:38.263496",
+       "klee_dimension_columns": [
+         "city",
+         "day"
+       ],
+       "klee_is_seed": false,
+       "klee_partition_columns": [
+         "country"
+       ]
+     }
+   Dimension Columns:
+     - city: string
+   Payload Columns:
+     - latitude: double
+     - longitude: double
+    
+   Dataset: seed
+   Partition Keys:
+     - country: string
+   Partitions: 3
+   Metadata:
+     {
+       "creation_time": "2019-10-01T12:11:38.206653",
+       "klee_dimension_columns": [
+         "city",
+         "day"
+       ],
+       "klee_is_seed": true,
+       "klee_partition_columns": [
+         "country"
+       ]
+     }
+   Dimension Columns:
+     - city: string
+     - day: timestamp[ns]
+   Payload Columns:
+     - avg_temp: int64
+    
+   Dataset: time
+   Partitions: 1
+   Metadata:
+     {
+       "creation_time": "2019-10-01T12:11:41.734913",
+       "klee_dimension_columns": [
+         "city",
+         "day"
+       ],
+       "klee_is_seed": false,
+       "klee_partition_columns": [
+         "country"
+       ]
+     }
+   Dimension Columns:
+     - day: timestamp[ns]
+   Payload Columns:
+     - month: int64
+     - weekday: int64
+     - year: int64
+   
+ +Some information is not available when reading the schema information and require a cube scan: + +.. code-block:: bash + + kartothek_cube geodata stats + +.. raw:: html + +
+   [########################################] | 100% Completed |  0.1s
+   latlong
+   blobsize:  5,690
+   files:  4
+   partitions:  4
+   rows:  4
+    
+   seed
+   blobsize:  4,589
+   files:  3
+   partitions:  3
+   rows:  8
+    
+   time
+   blobsize:  3,958
+   files:  1
+   partitions:  1
+   rows:  366
+    
+   __total__
+   blobsize:  14,237
+   files:  8
+   
+ + +Use ``kartothek_cube --help`` to get a list of all commands, or see :mod:`kartothek_cube.cli`. + +.. _storefact: https://github.com/blue-yonder/storefact + diff --git a/docs/guide/cube/cube_design_features.rst b/docs/guide/cube/cube_design_features.rst new file mode 100644 index 00000000..c83ac7e0 --- /dev/null +++ b/docs/guide/cube/cube_design_features.rst @@ -0,0 +1,243 @@ +Cube Design Features +-------------------- +.. contents:: Table of Contents + +This section discusses different design aspects of kartothek Cubes Functionality. Not that many decisions here are heavily bound to the +`Kartothek` format and the features it provides. Also, Kartothek Cube must support the usage patterns we have seen and +provide a clear API (in technical and semantical terms). It especially is no allrounder solution and we do NOT want to +support all possible use cases. + + +Multi-Dataset VS Multi-Table +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When mapping multiple parts (tables or datasets) to `Kartothek`, we have the following options: + +a) *Multi-Table:* As used by Klee1, use a single dataset w/ multiple tables. +b) **Multi-Dataset:** Use multiple datasets each w/ a single table. + +We use multiple `Kartothek` datasets instead of multiple tables (like Klee1 did) for the following reasons: + +- `Kartothek` discourages unbalanced tables, e.g. when Kartothek Cube writes a seed table and an enrich step is only done for a + part of the data (e.g. the present and future). This worked for Klee1 because we have used a way simpler format + (based on Kartothek V2) and ignored the recommendation of the `Kartothek` main author. +- OSS tools like `Dask.DataFrame`_ do not support any form of multi-table datasets and the current `Kartothek` format + is just a workaround to make multi-table datasets look like multiple regular datasets. +- The `Kartothek`developers want to deprecate multi-table `Kartothek` datasets in the future. +- `Kartothek` needs a central metadata file for performance reason (listing blob containers w/ massive amounts of blobs + is very slow) which is shared between tables. So running 2 tasks that create 2 completely unrelated tables would still + result in write conflicts. Using separate datasets solves this issue. +- Using multiple datasets allows users to copy, backup and delete them separately. +- Index structures are bound to datasets which feels more consistent than the former solution where you did not know + which table was meant by an index entry. + +Implicit Existence VS Explicit Cube Metadata +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +There are two ways how a cube could be created out of `Kartothek` datasets: + +a) **UUID Namespace:** specify a UUID prefix and the base cube on existing of datasets (if a dataset w/ a given prefix + exists, it is part of the cube) +b) *Cube Metadata File:* write another metadata file, similar to what `Kartothek` is doing but like a "dataset of + datasets" + +By using another metadata file, we would introduce a potential write conflict, one that was solved by using multiple +datasets instead of multiple tables. Also, it would make copying, deleting and backup up datasets in an independent +fashion more difficult. + +Using an UUID prefix an implicit existing does not has these issues, but requires us to list the store content for +dataset discovery. While listing all store keys can be really slow, listing w/ a path separator is really quick, +especially if the number of "folders" and "top-level files" is small, which is given using `Kartothek` V4 datasets. +Also, it allows us to load external datasets (like ones produces by other systems) as cubes and therefore simplifies +interoperability. + +.. important:: + Since all datasets that match a given prefix are part of the cube, they all must be valid in terms of the `Format`_. + +Explicit Physical Partitions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +There are multiple ways to store data to the blob store: + +a) *Single, Large Parquet Files:* Just one giant `Parquet`_ file for everything. +b) **Explicit Physical Partitions:** Split the data into partitions described by dedicated columns. +c) *Automatic Partitioning:* Leave the load-balancing entirely to kartothek_cube. + +We have decided for explicit physical partitions since we have seen that this data model works well for our current data +flow. It allows quick and efficient re-partitioning to allow row-based, group-based, and timeseries-based data +processing steps, while keeping the technical complexity rather low (compared to an automatic + dynamic partitioning). +It also maps well to multiple backends we plan to use. + +Using a single, large `Parquet`_ file would scale during the read path due to predicate pushdown and using well-thought +chunk sizes, but either require a write path that supports writing individual chunks (currently not available and +unlikely to be supported anytime soon) or one fat-node at the end of every write pipeline which is inefficient and even +impossible w/ our current architecture. + +In a future version of Klee, there may be a way to get the data automatically partitioned, maybe even w/ some +feedback-loop based approach (submit tasks, watch memory/CPU consumption, adjust, re-submit). + +.. important:: + Note that while physical partitions are explicit, their semantic impact should be small. They are an optimization + and may speed up load and store operations, but cubes w/ different partition columns but built out of the same data + should behave the same (except some query features like re-partioning may be differ due to missing indices). + +Update Granularity +~~~~~~~~~~~~~~~~~~ +We are aware of two ways to design the update granularity: + +a) **Partition-wise:** Entire partitions can overwrite old physical partitions. Deletion operations are partition-wise. +b) *Row-wise:* The entire cube behaves like one large, virtual DataFrame and the user handles rows. Physical partitions + are just an optimization. + +While the row-wise approach has the nice property that the physical partitioning is just an optimization, it is complex +to implement and a major performance problem, especially when many transaction were written to the cube. This is due to +the fact that the query planner cannot inspect the cube cells from each parquet file without reading it and therefore +either needs a very expensive planning phase with loads of IO operations or it cannot prune data early, leading to +massive IO in the execution phase. So we decided for partition-wise IO, which turned out to be quite simple to +implement. + +Column Namespacing +~~~~~~~~~~~~~~~~~~ +There are multiple options regarding the mapping of dataset columns to DataFrame columns: + +a) **No Mapping:** do not change column names but prohibit collisions of payload columns (i.e. columns that are neither + dimension nor partition columns) +b) *Namespace all Columns:* e.g. dimension and partition columns have now prefix, but payload columns have the form + ``__`` +c) *Namespace on Demand:* only prefix in case of a name collision, similar to what `DataFrame.merge`_ is doing. + +Since Kartothek Cube is intended to use in production, "Namespace on Demand" is not an option since it may result in hard to +debug runtime errors. "Namespace all Columns" is a very stable option, but would require every part of our data +processing pipelines to know which dataset produces which column. Currently, this is transparent and columns can be +moved from one stage to another w/o resulting to larger code changes. We would like to keep this nice behavior, so we +went for "No Mapping". + +Seed-Based Join System +~~~~~~~~~~~~~~~~~~~~~~ +When data is stored in multiple parts (tables or datasets), the question is how to expose it to the user during read +operations: + +a) *Seperate DataFrames:* Conditions are group-by operations are applied to the individual parts and no join operations + are performed by kartothek_cube. +b) **Seed-Based Join:** Mark a single part as seed which provides the groundtruth regarding cells (i.e. unique dimension + entries) in the cube, all other parts are just additional columns. +c) *Fully Configurable Join Order:* Leave it to the user to configure the join order (this was done in an early version + of Klee1). + +Separate DataFrames would give the user full control, but would also force them to create load of boilerplate code, +likely resulting in another framework on top of kartothek_cube. This would contradict any `KISS`_ approach we try to take here. +Also it makes reasoning about conditions and partition-by parameters more difficult since it is not always clear how +these effects cross-influence different parts of the cube. + +Using a fully configurable was tried in Klee1, but it turned out that many users do not want to care about the +consequences of this complex beast. Also, it makes predicate pushdown and efficient index operations more difficult to +implement, especially since the core of Kartothek Cube is based on `Pandas`_ which lacks proper NULL-handling. + +Finally, we have decided for a seed-based system some time ago in Klee1 and our users are happy and know what to expect. +It is also easy to teach, good to implement and test, and it matches the semantic of our data processing pipelines +(groundtruth from an initial external source, subsequent enrichments w/ additional columns on top of it.) + +.. important:: + There are two variants of the seed-based system: + + a) *Enforced:* Cells in non-seed datasets must be present in the seed dataset. This is enforced during write + operations. + b) **Lazy:** The seed semantic is only enforced during queries. + + We have decided for the lazy approach, since it better supports independent copies and backups of datasets and also + simplifies some of our processing pipelines (e.g. geolocation data can blindly be fetched for too many locaations and dates.) + +Format +~~~~~~ +This section describes how `Kartothek` must be structured to be consumed by kartothek_cube. + +Cube +```` +An abstract cube is described by the following attributes: + +- **UUID Prefix:** A common prefix for UUIDs of all datasets that are part of the cube. +- **Dimension Columns:** Which orthogonal dimensions form the cube. Hence, every cell described by these columns is + unique. +- **Partition Columns:** Columns that describe how data is partitioned when written to the blob store. These columns + will form the `Kartothek` ``partition_on`` attribute. +- **Seed Dataset:** Which dataset forms the ground truth regarding the set of cells in the cube. +- **Index Columns:** Which non-dimension and non-partition columns should also be indexed. + +Datasets +```````` + +All `Kartothek` datasets that are part of a cube must fulfill the following criteria: + +- **Kartothek Dataset UUID:** ``'++'``. E.g. for a cube called ``'my_cube'`` and a dataset + called ``'weather'``, the UUID that is used in `Kartothek` is ``'my_cube++weather'``. +- **Metadata Version:** 4 +- **Metadata Storage Format:** `JSON`_ (`MessagePack`_ can be read as well) +- **DataFrame Serialization Format:** `Parquet`_ with `ZSTD`_ compression (other compressions can be read as well) +- **Kartothek Tables:** Only a single one called ``'table'`` (same as ``SINGLE_TABLE`` in `Kartothek`) +- **Partition Keys:** + + - **Seed Dataset:** ````. + - **Other Datasets:** Can be anything. + +- **Partition Labels:** The user has no ability set the partition labels, instead the default `Kartothek` `UUID4`_ + generation mechanism is used. + +Indices +``````` + +The following index structures must be present (additional indices will be ignored): + +- **Partition Indices:** According to the partition keys described above. +- **Explicit Secondary Indices:** For all index columns. For the seed dataset also for all dimension columns. Additional + indices may exist and can be used by the query engine. + +Metadata +```````` +Kartothek Cube allows the user to specify per-dataset metadata. Furthermore, the following entries are added by default to every +dataset: + +- ``'klee_is_seed'``: boolean entry to mark the seed dataset +- ``'klee_dimension_columns'``: list of :term:`Dimension Columns` +- ``'klee_partition_columns'``: list of :term:`Partition Columns` + +DataFrame Normalization +~~~~~~~~~~~~~~~~~~~~~~~ + +On top of what `Kartothek` is doing, the following properties of preserved DataFrames will be ensured: + +- all column names are unicode strings (``str``); that especially excludes integer-based column names +- DataFrame indices are range indices starting at 0 with a step size of 1; this is equivalent to + `DataFrame.reset_index`_ +- values are sorted by dimension columns (if present) in the order given by cube specification. + +Documentation +~~~~~~~~~~~~~ +Examples in docstrings, README and specs should use real-world column names (like ``'COUNTRY_CODE'``). + +Examples in pytest should use simplified column names: + +- dimension columns: ``'x'``, ``'y'``,... +- partition columns: ``'p'``, ``'q'``,... +- index columns: ``'i1'``, ``'i2'``,... +- payload columns: ``'v1'``, ``'v2'``,... + +CLI examples are produced using ``kartothek_cube --color=always ... | terminal-to-html > out.html`` with `terminal-to-html`_ and +are wrapped into the following code snipped: + +.. code-block:: rst + + .. raw:: html + +
+      ...
+      
+ +.. _DataFrame.merge: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.merge.html?highlight=merge#pandas.DataFrame.merge +.. _DataFrame.reset_index: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.reset_index.html?highlight=reset_index#pandas.DataFrame.reset_index +.. _Dask.DataFrame: https://docs.dask.org/en/latest/dataframe.html +.. _JSON: https://json.org/ +.. _KISS: https://en.wikipedia.org/wiki/KISS_principle +.. _MessagePack: https://msgpack.org/ +.. _Pandas: https://pandas.pydata.org/ +.. _Parquet: https://parquet.apache.org/ +.. _terminal-to-html: https://github.com/buildkite/terminal-to-html +.. _UUID4: https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random) +.. _ZSTD: https://github.com/facebook/zstd diff --git a/docs/guide/cube/examples.rst b/docs/guide/cube/examples.rst new file mode 100644 index 00000000..0dad2f5e --- /dev/null +++ b/docs/guide/cube/examples.rst @@ -0,0 +1,428 @@ +Examples +-------- +.. contents:: Table of Contents + +This is a quick walk through the basic functionality of Kartothek Cubes. + +First, we want to create a cube for geodata: + +>>> from kartothek.core.cube.cube import Cube +>>> cube = Cube( +... uuid_prefix="geodata", +... dimension_columns=["city", "day"], +... partition_columns=["country"], +... ) + +Apart from an abstract cube definition, we need a `simplekv`_-based storage backend: + +>>> from functools import partial +>>> import tempfile +>>> import storefact +>>> store_location = tempfile.mkdtemp() +>>> store_factory = partial( +... storefact.get_store_from_url, +... "hfs://" + store_location, +... ) +>>> store = store_factory() + +Some generic setups of libraries: + +>>> import pandas as pd +>>> pd.set_option("display.max_rows", 40) +>>> pd.set_option("display.width", None) +>>> pd.set_option('display.max_columns', None) +>>> pd.set_option('display.expand_frame_repr', False) + +Build +````` + +Kartothek cube should be initially filled with the following information: + +>>> from io import StringIO +>>> import pandas as pd +>>> df_weather = pd.read_csv( +... filepath_or_buffer=StringIO(""" +... avg_temp city country day +... 6 Hamburg DE 2018-01-01 +... 5 Hamburg DE 2018-01-02 +... 8 Dresden DE 2018-01-01 +... 4 Dresden DE 2018-01-02 +... 6 London UK 2018-01-01 +... 8 London UK 2018-01-02 +... """.strip()), +... delim_whitespace=True, +... parse_dates=["day"], +... ) + +We use the simple :py:mod:`kartothek.io.eager_cube` backend to store the data: + +>>> from kartothek.io.eager_cube import build_cube +>>> datasets_build = build_cube( +... data=df_weather, +... store=store, +... cube=cube, +... ) + +We just have preserved a single `Kartothek` dataset: + +>>> print(", ".join(sorted(datasets_build.keys()))) +seed +>>> ds_seed = datasets_build["seed"].load_all_indices(store) +>>> print(ds_seed.uuid) +geodata++seed +>>> print(", ".join(sorted(ds_seed.indices))) +city, country, day + +Finally, let's have a quick look at the store content. Note that we cut out UUIDs and timestamps here for brevity. + +>>> import re +>>> def print_filetree(s, prefix=""): +... entries = [] +... for k in sorted(s.keys(prefix)): +... k = re.sub("[a-z0-9]{32}", "", k) +... k = re.sub("[0-9]{4}-[0-9]{2}-[0-9]{2}((%20)|(T))[0-9]{2}%3A[0-9]{2}%3A[0-9]+.[0-9]{6}", "", k) +... entries.append(k) +... print("\n".join(sorted(entries))) +>>> print_filetree(store) +geodata++seed.by-dataset-metadata.json +geodata++seed/indices/city/.by-dataset-index.parquet +geodata++seed/indices/day/.by-dataset-index.parquet +geodata++seed/table/_common_metadata +geodata++seed/table/country=DE/.parquet +geodata++seed/table/country=UK/.parquet + +Extend +`````` +Now let's say we would also like to have longitude and latitude data in our cube. + +>>> from kartothek.io.eager_cube import extend_cube +>>> df_location = pd.read_csv( +... filepath_or_buffer=StringIO(""" +... city country latitude longitude +... Hamburg DE 53.551086 9.993682 +... Dresden DE 51.050407 13.737262 +... London UK 51.509865 -0.118092 +... Tokyo JP 35.652832 139.839478 +... """.strip()), +... delim_whitespace=True, +... ) + +.. hint:: + Obviously, this data does not change over time. As long as the data spans at least a single dimensions and describes + all partition columns, you are free to use projected data for non-seed datasets. + +>>> datasets_extend = extend_cube( +... data={"latlong": df_location}, +... store=store, +... cube=cube, +... ) + +This results in an extra dataset: + +>>> print(", ".join(sorted(datasets_extend.keys()))) +latlong +>>> ds_latlong = datasets_extend["latlong"].load_all_indices(store) +>>> print(ds_latlong.uuid) +geodata++latlong +>>> print(", ".join(sorted(ds_latlong.indices))) +country + +Note that for the second dataset, no indices for ``'city'`` and ``'day'`` exists. These are only created for the seed +dataset, since that datasets forms the groundtruth about which city-day entries are part of the cube. + +.. hint:: + Since the seed dataset forms the groundtruth regarding cells in the cube, additional data in other datasets are + ignored. So in this case, ``'Tokyo'`` will be store to the cube but will cut out during queries. + +If you look at the file tree, you can see that the second dataset is completely separated. This is useful to copy/backup +parts of the cube: + +>>> print_filetree(store) +geodata++latlong.by-dataset-metadata.json +geodata++latlong/table/_common_metadata +geodata++latlong/table/country=DE/.parquet +geodata++latlong/table/country=JP/.parquet +geodata++latlong/table/country=UK/.parquet +geodata++seed.by-dataset-metadata.json +geodata++seed/indices/city/.by-dataset-index.parquet +geodata++seed/indices/day/.by-dataset-index.parquet +geodata++seed/table/_common_metadata +geodata++seed/table/country=DE/.parquet +geodata++seed/table/country=UK/.parquet + +Query +````` +Now the whole beauty of Kartothek Cube does not come from storing multiple datasets, but especially from retrieving the data in a +very comfortable way. It is possible to treat the entire cube as a single, large DataFrame: + +>>> from kartothek.io.eager_cube import query_cube +>>> query_cube( +... cube=cube, +... store=store, +... )[0] + avg_temp city country day latitude longitude +0 8 Dresden DE 2018-01-01 51.050407 13.737262 +1 4 Dresden DE 2018-01-02 51.050407 13.737262 +2 6 Hamburg DE 2018-01-01 53.551086 9.993682 +3 5 Hamburg DE 2018-01-02 53.551086 9.993682 +4 6 London UK 2018-01-01 51.509865 -0.118092 +5 8 London UK 2018-01-02 51.509865 -0.118092 + +As you can see, we get a list of results back. This is because Kartothek Cube naturally supports partition-by semantic, which is +more helpful for distributed backends like `Distributed`_ or `Yamal`_: + +>>> dfs = query_cube( +... cube=cube, +... store=store, +... partition_by="country", +... ) +>>> dfs[0] + avg_temp city country day latitude longitude +0 8 Dresden DE 2018-01-01 51.050407 13.737262 +1 4 Dresden DE 2018-01-02 51.050407 13.737262 +2 6 Hamburg DE 2018-01-01 53.551086 9.993682 +3 5 Hamburg DE 2018-01-02 53.551086 9.993682 +>>> dfs[1] + avg_temp city country day latitude longitude +0 6 London UK 2018-01-01 51.509865 -0.118092 +1 8 London UK 2018-01-02 51.509865 -0.118092 + +The query system also supports selection and projection: + +>>> from kartothek.core.cube.conditions import C +>>> query_cube( +... cube=cube, +... store=store, +... payload_columns=["avg_temp"], +... conditions=( +... (C("country") == "DE") & +... C("latitude").in_interval(50., 52.) & +... C("longitude").in_interval(13., 14.) +... ), +... )[0] + avg_temp city country day +0 8 Dresden DE 2018-01-01 +1 4 Dresden DE 2018-01-02 + +Transform +````````` +Query and Extend can be combined to build powerful transformation pipelines. To better illustrate this we will use +`Dask.Bag`_ for that example. + +.. important:: + Since `Dask`_ operations can also be executed in subprocesses, multiple threads, or even on other machines using + `Distributed`_, Kartothek Cube requires the user to pass a :term:`Store Factory` instead of a store. This ensures that no file + handles, TCP connections, or other non-transportable objects are shared. + +>>> from kartothek.io.dask.bag_cube import ( +... extend_cube_from_bag, +... query_cube_bag, +... ) +>>> def transform(df): +... df["avg_temp_country_min"] = df["avg_temp"].min() +... return { +... "transformed": df.loc[ +... :, +... [ +... "avg_temp_country_min", +... "city", +... "country", +... "day", +... ] +... ], +... } +>>> transformed = query_cube_bag( +... cube=cube, +... store=store_factory, +... partition_by="day", +... ).map(transform) +>>> datasets_transformed = extend_cube_from_bag( +... data=transformed, +... store=store_factory, +... cube=cube, +... ktk_cube_dataset_ids=["transformed"], +... ).compute() +>>> query_cube( +... cube=cube, +... store=store, +... payload_columns=[ +... "avg_temp", +... "avg_temp_country_min", +... ], +... )[0] + avg_temp avg_temp_country_min city country day +0 8 6 Dresden DE 2018-01-01 +1 4 4 Dresden DE 2018-01-02 +2 6 6 Hamburg DE 2018-01-01 +3 5 4 Hamburg DE 2018-01-02 +4 6 6 London UK 2018-01-01 +5 8 4 London UK 2018-01-02 + +Notice that the ``partition_by`` argument does not have to match the cube :term:`Partition Columns` to work. You may use +any indexed column. Keep in mind that fine-grained partitioning can have drawbacks though, namely large scheduling +overhead and many blob files which can make reading the data inefficient: + +>>> print_filetree(store, "geodata++transformed") +geodata++transformed.by-dataset-metadata.json +geodata++transformed/table/_common_metadata +geodata++transformed/table/country=DE/.parquet +geodata++transformed/table/country=DE/.parquet +geodata++transformed/table/country=UK/.parquet +geodata++transformed/table/country=UK/.parquet + + +Append +`````` +New rows can be added to the cube using an append operation: + +>>> from kartothek.io.eager_cube import append_to_cube +>>> df_weather2 = pd.read_csv( +... filepath_or_buffer=StringIO(""" +... avg_temp city country day +... 20 Santiago CL 2018-01-01 +... 22 Santiago CL 2018-01-02 +... """.strip()), +... delim_whitespace=True, +... parse_dates=["day"], +... ) +>>> datasets_appended = append_to_cube( +... data=df_weather2, +... store=store, +... cube=cube, +... ) +>>> print_filetree(store, "geodata++seed") +geodata++seed.by-dataset-metadata.json +geodata++seed/indices/city/.by-dataset-index.parquet +geodata++seed/indices/city/.by-dataset-index.parquet +geodata++seed/indices/day/.by-dataset-index.parquet +geodata++seed/indices/day/.by-dataset-index.parquet +geodata++seed/table/_common_metadata +geodata++seed/table/country=CL/.parquet +geodata++seed/table/country=DE/.parquet +geodata++seed/table/country=UK/.parquet + +Notice that the indices where updated automatically. + +>>> query_cube( +... cube=cube, +... store=store, +... )[0] + avg_temp avg_temp_country_min city country day latitude longitude +0 8 6.0 Dresden DE 2018-01-01 51.050407 13.737262 +1 4 4.0 Dresden DE 2018-01-02 51.050407 13.737262 +2 6 6.0 Hamburg DE 2018-01-01 53.551086 9.993682 +3 5 4.0 Hamburg DE 2018-01-02 53.551086 9.993682 +4 6 6.0 London UK 2018-01-01 51.509865 -0.118092 +5 8 4.0 London UK 2018-01-02 51.509865 -0.118092 +6 20 NaN Santiago CL 2018-01-01 NaN NaN +7 22 NaN Santiago CL 2018-01-02 NaN NaN + +Remove +`````` +You can remove entire partitions from the cube using the remove operation: + +>>> from kartothek.io.eager_cube import remove_partitions +>>> datasets_after_removal = remove_partitions( +... cube=cube, +... store=store, +... ktk_cube_dataset_ids=["latlong"], +... conditions=(C("country") == "UK"), +... ) +>>> query_cube( +... cube=cube, +... store=store, +... )[0] + avg_temp avg_temp_country_min city country day latitude longitude +0 8 6.0 Dresden DE 2018-01-01 51.050407 13.737262 +1 4 4.0 Dresden DE 2018-01-02 51.050407 13.737262 +2 6 6.0 Hamburg DE 2018-01-01 53.551086 9.993682 +3 5 4.0 Hamburg DE 2018-01-02 53.551086 9.993682 +4 6 6.0 London UK 2018-01-01 NaN NaN +5 8 4.0 London UK 2018-01-02 NaN NaN +6 20 NaN Santiago CL 2018-01-01 NaN NaN +7 22 NaN Santiago CL 2018-01-02 NaN NaN + +Delete +`````` +You can also delete entire datasets (or the entire cube): + +>>> from kartothek.io.eager_cube import delete_cube +>>> datasets_still_in_cube = delete_cube( +... cube=cube, +... store=store, +... datasets=["transformed"], +... ) +>>> query_cube( +... cube=cube, +... store=store, +... )[0] + avg_temp city country day latitude longitude +0 8 Dresden DE 2018-01-01 51.050407 13.737262 +1 4 Dresden DE 2018-01-02 51.050407 13.737262 +2 6 Hamburg DE 2018-01-01 53.551086 9.993682 +3 5 Hamburg DE 2018-01-02 53.551086 9.993682 +4 6 London UK 2018-01-01 NaN NaN +5 8 London UK 2018-01-02 NaN NaN +6 20 Santiago CL 2018-01-01 NaN NaN +7 22 Santiago CL 2018-01-02 NaN NaN + +Dimensionality and Partitioning +``````````````````````````````` +Sometimes, you have data that only exists in a projection of the cube, like the ``latlong`` data from the `Extend`_ +section. For non-seed datasets, you can just leave out :term:`Dimension Columns`, as long as at least a single +:term:`Dimension Column` remains. + +Sometimes, you may find that the standard partitioning does not match the data really well, so for non-seed datasets, you can change the partitioning: + +- **leave out partition columns:** especially helpful when the dataset is really small or data only exists on a specific + projection that does lead to partitioning (e.g. the ``day`` dimension from the example cube) +- **additional partition columns:** when the dataset has many and/or very memory-intense columns + +.. important:: + + Although other partitionings than the cube :term:`Partition Columns` can be specified, it is strongly adviced to not + diverge too much from these for performance reasons. + +>>> df_time = pd.DataFrame({ +... "day": pd.date_range( +... start="2018-01-01", +... end="2019-01-01", +... freq="D", +... ), +... }) +>>> df_time["weekday"] = df_time.day.dt.weekday +>>> df_time["month"] = df_time.day.dt.month +>>> df_time["year"] = df_time.day.dt.year +>>> datasets_time = extend_cube( +... data={"time": df_time}, +... store=store, +... cube=cube, +... partition_on={"time": []}, +... ) +>>> print_filetree(store, "geodata++time") +geodata++time.by-dataset-metadata.json +geodata++time/table/.parquet +geodata++time/table/_common_metadata +>>> query_cube( +... cube=cube, +... store=store, +... )[0] + avg_temp city country day latitude longitude month weekday year +0 8 Dresden DE 2018-01-01 51.050407 13.737262 1 0 2018 +1 4 Dresden DE 2018-01-02 51.050407 13.737262 1 1 2018 +2 6 Hamburg DE 2018-01-01 53.551086 9.993682 1 0 2018 +3 5 Hamburg DE 2018-01-02 53.551086 9.993682 1 1 2018 +4 6 London UK 2018-01-01 NaN NaN 1 0 2018 +5 8 London UK 2018-01-02 NaN NaN 1 1 2018 +6 20 Santiago CL 2018-01-01 NaN NaN 1 0 2018 +7 22 Santiago CL 2018-01-02 NaN NaN 1 1 2018 + + +.. _Distributed: https://distributed.readthedocs.io/ +.. _DataFrame.merge: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.merge.html?highlight=merge#pandas.DataFrame.merge +.. _DataFrame.reset_index: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.reset_index.html?highlight=reset_index#pandas.DataFrame.reset_index +.. _Dask: https://docs.dask.org/ +.. _Dask.Bag: https://docs.dask.org/en/latest/bag.html +.. _Dask.DataFrame: https://docs.dask.org/en/latest/dataframe.html +.. _simplekv: https://simplekv.readthedocs.io/ +.. _Yamal: https://software.blue-yonder.org/DynamicPricing/generic/yamal/latest/+doc/index.html \ No newline at end of file diff --git a/docs/guide/cube/glossary.rst b/docs/guide/cube/glossary.rst new file mode 100644 index 00000000..3f6133a5 --- /dev/null +++ b/docs/guide/cube/glossary.rst @@ -0,0 +1,79 @@ +Glossary +-------- + +.. glossary:: + + Build + Process of creating a new cube. + + Cell + A unique combination of :term:`Dimension` values. Will result in a single row in input and output DataFrames. + + Cube + A combination of multiple datasets that model an `Data Cubes`_-like construct. The core data structure of kartothek cube. + + Dataset ID + The ID of a dataset that belongs to the cube w/o any :term:`Uuid Prefix`. + + Dimension + Part of the address for a certain cube :term:`Cell`. Usually refered as :term:`Dimension Column`. Different + dimension should describe orthogonal attributes. + + Dimension Column + DataFrame column that contains values for a certain :term:`Dimension`. + + Dimension Columns + Ordered list of all :term:`Dimension Column` for a :term:`Cube`. + + Extend + Process of adding new datasets to an existing cube. + + Index Column + Column for which additional index structures are build. + + Kartothek Dataset UUID + Name that makes a dataset unique in a store, includes :term:`Uuid Prefix` and :term:`Dataset ID` as + ``++``. + + Logical Partition + Partition that was created by ``partition_by`` arguments to the :term:`Query`. + + Physical Partition + A single chunk of data that is stored to the blob store. May contain multiple `Parquet`_ files. + + Partition Column + DataFrame column that contains one part that makes a :term:`Physical Partition`. + + Partition Columns + Ordered list of all :term:`Partition Column` for a :term:`Cube`. + + Projection + Process of dimension reduction of a cube (like a 3D object projects a shadow on the wall). Only works if the + involved payload only exists in the subdimensional space since no automatic aggregation is supported. + + Seed + Dataset that provides the groundtruth about which :term:`Cell` are in a :term:`Cube`. + + Store Factory + A callable that does not take any arguments and creates a new `simplekv`_ store when being called. Its type is + ``Callable[[], simplekv.KeyValueStore]``. + + Query + A request for data from the cube, including things like "payload columns", "conditions", and more. + + Query Execution + Process of reading out data from a :term:`Cube`, aka the execution of a :term:`Query`. + + Query Intention + The actual intention of a :term:`Query`, e.g.: + + - if the user queries "all columns", the intention includes the concrete set of columns + - if the user does not specify the dimension columns, it should use the cube dimension column (aka "no + :term:`Projection`") + + Uuid Prefix + Common prefix for all datasets that belong to a :term:`Cube`. + +.. _Data Cubes: https://en.wikipedia.org/wiki/Data_cube +.. _Parquet: https://parquet.apache.org/ +.. _simplekv: https://simplekv.readthedocs.io/ \ No newline at end of file diff --git a/docs/guide/cube/kartothek_cubes.rst b/docs/guide/cube/kartothek_cubes.rst new file mode 100644 index 00000000..984a86bb --- /dev/null +++ b/docs/guide/cube/kartothek_cubes.rst @@ -0,0 +1,23 @@ +=============== +Kartothek Cubes +=============== + +Imagine a typical machine learning workflow, which might look like this: + + - First, we get some input data, or source data. In the context of Kartothek cubes, we will refer to the source data as seed data or seed dataset. + - On this seed dataset, we might want to train a model that generates predictions. + - Based on these predicitons, we might want to generate reports and calculate KPIs. + - Last, but not least, we might want to create some dashboards showing plots of the aggregated KPIs as well as the underlying input data. + +What we need for this workflow is not a table-like view on our data, but a single (virtual) view on everything that we generated in these different steps. + +Kartothek Cubes deal with multiple `Kartothek` datasets loosely modeled after `Data Cubes`_. + +One cube is made by multiple `Kartothek` datasets. User-facing APIs are mostly consume and provide `Pandas`_ DataFrames. + +Cubes offer an interface to query all of the data without performing complex join operations manually each time. +Because kartothek offers a view on our cube similar to large virtual pandas DataFrame, querying the whole dataset is very comfortable. + + +.. _Data Cubes: https://en.wikipedia.org/wiki/Data_cube +.. _Pandas: https://pandas.pydata.org/ \ No newline at end of file diff --git a/docs/guide/cube/query_system.rst b/docs/guide/cube/query_system.rst new file mode 100644 index 00000000..0acf64e0 --- /dev/null +++ b/docs/guide/cube/query_system.rst @@ -0,0 +1,294 @@ + +Query System +------------ +.. contents:: Table of Contents + +Kartothek views the whole cube as a large, virtual DataFrame. The seed dataset presents the groundtruth regarding rows, all +other datasets are joined via a left join. The user should not see that data is partitioned via +:term:`Partition Columns` or split along datasets. + +.. important:: + It is a common misconception that Kartothek is able to join arbitrary datasets or implements a complete join system like + SQL. This is NOT the case! + +This section explain some technical details around this mechanism. + +Per-Dataset Partitions +`````````````````````` +First of all, all partition files for all datasets are gathered. Every partition file is represented by a unique label. +For every dataset, index data for the Primary Indices (aka partition columns) will be loaded and joined w/ the labels:: + + P | Q | __ktk_cube_labels_seed + ===+===+========================= + 0 | 0 | P=0/Q=0/.parquet + 0 | 1 | P=0/Q=1/.parquet + 0 | 1 | P=0/Q=1/.parquet + 1 | 0 | P=1/Q=0/.parquet + 1 | 0 | P=1/Q=1/.parquet + + +Also, pre-conditions are applied during that step. These are conditions that can be evaluated based on index data +(Partition Indices, Explicit Secondary Indices for dimension columns as well as index columns):: + + condition = (I1 > 1) + + join index information: + + P | Q | I1 | __ktk_cube_labels_seed + ===+===+====+========================= + 0 | 0 | 1 | P=0/Q=0/.parquet + 0 | 1 | 2 | P=0/Q=1/.parquet + 0 | 1 | 3 | P=0/Q=1/.parquet + 1 | 0 | 4 | P=1/Q=0/.parquet + 1 | 0 | 5 | P=1/Q=1/.parquet + + filter: + + P | Q | I1 | __ktk_cube_labels_seed + ===+===+====+========================= + 0 | 1 | 2 | P=0/Q=1/.parquet + 0 | 1 | 3 | P=0/Q=1/.parquet + 1 | 0 | 4 | P=1/Q=0/.parquet + 1 | 0 | 5 | P=1/Q=0/.parquet + + remove index information: + + P | Q | __ktk_cube_labels_seed + ===+===+========================= + 0 | 1 | P=0/Q=1/.parquet + 0 | 1 | P=0/Q=1/.parquet + 1 | 0 | P=1/Q=0/.parquet + 1 | 0 | P=1/Q=1/.parquet + + +Now, partition-by data is added (if not already present):: + + partition-by = I2 + + P | Q | I2 | __ktk_cube_labels_seed + ===+===+====+========================= + 0 | 1 | 1 | P=0/Q=1/.parquet + 0 | 1 | 1 | P=0/Q=1/.parquet + 1 | 0 | 1 | P=1/Q=0/.parquet + 1 | 0 | 2 | P=1/Q=0/.parquet + +Finally, rows w/ identical partition information (physical and partition-by) are compactified:: + + P | Q | I2 | __ktk_cube_labels_seed + ===+===+====+================================================== + 0 | 1 | 1 | P=0/Q=1/.parquet, P=0/Q=1/.parquet + 1 | 0 | 1 | P=1/Q=0/.parquet + 1 | 0 | 2 | P=1/Q=0/.parquet + + +Alignment +````````` +After data is prepared for every dataset, they are aligned using their physical partitions. Partitions that are present +in non-seed datasets but are missing from the seed dataset are dropped:: + + inputs: + + P | Q | I2 | __ktk_cube_labels_seed + ===+===+====+================================================== + 0 | 1 | 1 | P=0/Q=1/.parquet, P=0/Q=1/.parquet + 1 | 0 | 1 | P=1/Q=0/.parquet + 1 | 0 | 2 | P=1/Q=0/.parquet + + P | Q | __ktk_cube_labels_enrich + ===+===+================================================== + 0 | 0 | P=0/Q=1/.parquet + 0 | 1 | P=0/Q=1/.parquet + 1 | 0 | P=1/Q=0/.parquet, P=0/Q=1/.parquet + 9 | 0 | P=9/Q=0/.parquet + + + output: + + P | Q | I2 | __ktk_cube_labels_seed | __ktk_cube_labels_enrich + ===+===+====+==================================================+================================================== + 0 | 1 | 1 | P=0/Q=1/.parquet, P=0/Q=1/.parquet | P=0/Q=1/.parquet + 1 | 0 | 1 | P=1/Q=0/.parquet | P=1/Q=0/.parquet, P=0/Q=1/.parquet + 1 | 0 | 2 | P=1/Q=0/.parquet | P=1/Q=0/.parquet, P=0/Q=1/.parquet + + +In case pre-conditions got applied to any non-seed dataset or partition-by columns that are neither a +:term:`Partition Column` nor :term:`Dimension Column`, the resulting join will be an inner join. This may result in +removing potential partitions early. + +Re-grouping +``````````` +Now, the DataFrame is grouped by partition-by:: + + partition-by: I2 + + group 1: + + P | Q | I2 | __ktk_cube_labels_seed | __ktk_cube_labels_enrich + ===+===+====+==================================================+================================================== + 0 | 1 | 1 | P=0/Q=1/.parquet, P=0/Q=1/.parquet | P=0/Q=1/.parquet + 1 | 0 | 1 | P=1/Q=0/.parquet | P=1/Q=0/.parquet, P=0/Q=1/.parquet + + group 2: + + P | Q | I2 | __ktk_cube_labels_seed | __ktk_cube_labels_enrich + ===+===+====+==================================================+================================================== + 1 | 0 | 2 | P=1/Q=0/.parquet | P=1/Q=0/.parquet, P=0/Q=1/.parquet + +Intra-Partition Joins +````````````````````` +This section explains how DataFrames within a partition within a group are joined. + +A simple explanation of the join logic would be: "The coordinates (cube cells) are taken from the seed dataset, all +other information is add via a left join." + +Because the user is able to add conditions to the query and because we want to utilize predicate pushdown in a very +efficient way, we define another term: **restricted dataset**. These are datasets which contain +non-:term:`Dimension Column` and non-:term:`Partition Column` to which users wishes to apply restrictions (via +conditions or via partition-by). Because these restrictions always need to apply, we can evaluate them pre-join and +execute an inner join with the seed dataset. + +Examples +```````` +The following sub-sections illustrate this system in multiple steps. + + +Example 1 (Join Semantics) +~~~~~~~~~~~~~~~~~~~~~~~~~~ +Here, a rather standard example is shown with explanations why data is kept or not:: + + columns = [P, PRED] + condition = (OK == true) & (SCHED == true) + + Seed | Conditions | Enrichments + db_data | data_checks | schedule | predictions + =========+=============+=============+============= + P=1 | P=1 | P=1 | P=1 <-- included, trivial case + | OK=true | SCHED=true | PRED=0.23 + ---------+-------------+-------------+------------- + P=2 | P=2 | P=2 | P=2 <-- excluded, because OK=false + | OK=false | SCHED=true | PRED=0.12 + ---------+-------------+-------------+------------- + P=3 | P=3 | P=3 | P=3 <-- excluded, because SCHED=false + | OK=true | SCHED=false | PRED=0.13 + ---------+-------------+-------------+------------- + | P=4 | P=4 | P=4 <-- excluded, seed is missing + | OK=true | SCHED=true | PRED=0.03 where does this data even come from?! + ---------+-------------+-------------+------------- + P=5 | P=5 | P=5 | <-- included, even though PRED is missing + | OK=true | SCHED=true | + ---------+-------------+-------------+------------- + P=6 | P=6 | | P=6 <-- excluded, SCHED is missing + | OK=true | | PRED=0.01 + + ^ ^ ^ ^ + | | | | + +---------+-------------+ | + | | + inner join | + tmp1 = db_data <-> data_checks on P | + out = tmp1 <-> schedule on P | + (but order actually doesn't matter) | + ^ | + | | + +-----------------+---------+ + | + left join + | + v + + P | PRED + ===+====== + 1 | 0.23 + 5 | NaN + + +Example 2 (Outer Join) +~~~~~~~~~~~~~~~~~~~~~~ +Now, we have a P-L cube, with all datasets except of ``schedule`` having P-L dimensionality:: + + columns = [P, L, PRED] + condition = (OK == true) & (SCHED == true) + + Seed | Conditions | Enrichments + db_data | data_checks | schedule | predictions + =========+=============+=============+============= + P=1 | P=1 | P=1 | P=1 <-- included, trivial case + L=1 | L=1 | | L=1 + | OK=true | SCHED=true | PRED=0.23 + ---------+-------------+ +------------- + P=1 | P=1 | | P=1 <-- excluded, because OK=false + L=2 | L=2 | | L=2 + | OK=false | | PRED=0.12 + ---------+-------------+-------------+------------- + P=2 | P=2 | P=2 | P=2 <-+ excluded, because SCHED=false + L=1 | L=1 | | L=1 | + | OK=true | SCHED=false | PRED=0.13 | + ---------+-------------+ +------------- | + P=2 | P=2 | | P=2 <-+ + L=2 | L=2 | | L=2 + | OK=true | | PRED=0.13 + + ^ ^ ^ ^ + | | | | + +---------+-------------+ | + | | + inner join | + tmp1 = db_data <-> data_checks on P,L | + out = tmp1 <-> schedule on P | + (but order actually doesn't matter) | + ^ | + | | + +-----------------+---------+ + | + left join + | + v + + P | L | PRED + ===+===+====== + 1 | 1 | 0.23 + + +Example 3 (Projection) +~~~~~~~~~~~~~~~~~~~~~~ +This shows how the seed dataset can be used to also produce sub-dimensional / projected results:: + + columns = [P, AVG] + condition = (SCHED == true) + + Seed | Conditions | Enrichments + db_data | schedule | agg + =========+=============+============= + P=1 | P=1 | P=1 <-- included, trivial case + L=? | | + | SCHED=true | AVG=10.2 + ---------+-------------+------------- + P=2 | P=2 | P=2 <-- excluded, because SCHED=false + L=? | | + | SCHED=false | AVG=1.34 + + ^ ^ ^ + | | | + | +---+ | + | | | + project to P | | + | | | + +---------+---+ | + | +---------+ + inner join | + out = db_data <-> schedule on P | + ^ | + | | + +-----------------+-----+ + | + left join + | + v + + P | AVG + ===+======= + 1 | 10.2 + +Final Concat +~~~~~~~~~~~~ +After DataFrames for all partitions in a group are joined, they are concatenated in order of :term:`Partition Columns`. diff --git a/docs/index.rst b/docs/index.rst index 83ae3a78..f290300f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -30,7 +30,6 @@ partitions and selects subsets of data transparently. To get started, have a look at our :doc:`guide/getting_started` guide, head to the description of the :doc:`spec/index` or head straight to the API documentation :doc:`api`. - What is a (real) Kartothek? --------------------------- @@ -56,6 +55,19 @@ A Kartothek (or more modern: Zettelkasten/Katalogkasten) is a tool to organize Dask indexing Examples +.. toctree:: + :maxdepth: 2 + :caption: Cube Functionality + :hidden: + + Kartothek Cubes + Examples + Query System + Command Line Features + Cube Design Features + Glossary + + .. toctree:: :maxdepth: 2 :caption: Background diff --git a/test-requirements.txt b/test-requirements.txt index 244c3a86..f3445adc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -15,7 +15,7 @@ setuptools-scm asv # Documentation -sphinx<3.2.0 # https://github.com/sphinx-doc/sphinx/issues/8096 +sphinx sphinx_rtd_theme sphinx-click IPython