Skip to content

Commit

Permalink
Allow URLs and stores to be passed explicitly (#345)
Browse files Browse the repository at this point in the history
* Allow URLs and stores to be passed explicitly

* Add doc section for KV interface

* Use CamelCase for store types

* Define StoreInput using StoreFactory
  • Loading branch information
fjetter authored Sep 24, 2020
1 parent cbbed47 commit 555a01f
Show file tree
Hide file tree
Showing 42 changed files with 433 additions and 321 deletions.
4 changes: 2 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
Changelog
=========

Version 3.14.x (unreleased)
Version 3.15.0 (unreleased)
===========================

Improvements
^^^^^^^^^^^^
* Reduce memory consumption during index write.

* Allow `simplekv` stores and `storefact` URLs to be passed explicitly as input for the `store` arguments

Version 3.14.0 (2020-08-27)
===========================
Expand Down
3 changes: 1 addition & 2 deletions docs/guide/cube/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ very comfortable way. It is possible to treat the entire cube as a single, large
5 8 London UK 2018-01-02 51.509865 -0.118092

As you can see, we get a list of results back. This is because Kartothek Cube naturally supports partition-by semantic, which is
more helpful for distributed backends like `Distributed`_ or `Yamal`_:
more helpful for distributed backends like `Distributed`_:

>>> dfs = query_cube(
... cube=cube,
Expand Down Expand Up @@ -425,4 +425,3 @@ geodata++time/table/_common_metadata
.. _Dask.Bag: https://docs.dask.org/en/latest/bag.html
.. _Dask.DataFrame: https://docs.dask.org/en/latest/dataframe.html
.. _simplekv: https://simplekv.readthedocs.io/
.. _Yamal: https://software.blue-yonder.org/DynamicPricing/generic/yamal/latest/+doc/index.html
8 changes: 3 additions & 5 deletions docs/guide/dask_indexing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ Calculating a dask index is usually a very expensive operation which requires da
import numpy as np
import pandas as pd
from functools import partial
from tempfile import TemporaryDirectory
from storefact import get_store_from_url
from kartothek.io.eager import store_dataframes_as_dataset
dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")
store_url = f"hfs://{dataset_dir.name}"
df = pd.DataFrame(
{
Expand Down Expand Up @@ -51,10 +49,10 @@ Calculating a dask index is usually a very expensive operation which requires da
ddf_indexed.reset_index(),
table="table",
dataset_uuid="dataset_ddf_with_index",
store=store_factory,
store=store_url,
partition_on="B",
).compute()
read_dataset_as_ddf(
dataset_uuid=dm.uuid, store=store_factory, dask_index_on="B", table="table"
dataset_uuid=dm.uuid, store=store_url, dask_index_on="B", table="table"
)
22 changes: 9 additions & 13 deletions docs/guide/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ Setup a store

.. ipython:: python
from storefact import get_store_from_url
from functools import partial
from tempfile import TemporaryDirectory
# You can, of course, also directly use S3, ABS or anything else
# supported by :mod:`storefact`
dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")
store_url = f"hfs://{dataset_dir.name}"
.. ipython:: python
Expand All @@ -34,14 +32,12 @@ Setup a store
}
store_dataframes_as_dataset(
store=store_factory, dataset_uuid=dataset_uuid, dfs=[df], metadata=metadata
store=store_url, dataset_uuid=dataset_uuid, dfs=[df], metadata=metadata
)
# Load your data
# By default the single dataframe is stored in the 'core' table
df_from_store = read_table(
store=store_factory, dataset_uuid=dataset_uuid, table="table"
)
df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid, table="table")
df_from_store
Expand Down Expand Up @@ -77,7 +73,7 @@ Write
#  which refers to the created dataset
dataset = store_dataframes_as_dataset(
dfs=input_list_of_partitions,
store=store_factory,
store=store_url,
dataset_uuid="MyFirstDataset",
metadata={"dataset": "metadata"}, #  This is optional dataset metadata
metadata_version=4,
Expand All @@ -95,7 +91,7 @@ Read
#  Create the pipeline with a minimal set of configs
list_of_partitions = read_dataset_as_dataframes(
dataset_uuid="MyFirstDataset", store=store_factory
dataset_uuid="MyFirstDataset", store=store_url
)
# In case you were using the dataset created in the Write example
Expand Down Expand Up @@ -139,7 +135,7 @@ Write
#  which refers to the created dataset
dataset = store_dataframes_as_dataset__iter(
input_list_of_partitions,
store=store_factory,
store=store_url,
dataset_uuid="MyFirstDatasetIter",
metadata={"dataset": "metadata"}, #  This is optional dataset metadata
metadata_version=4,
Expand All @@ -156,7 +152,7 @@ Read
#  Create the pipeline with a minimal set of configs
list_of_partitions = read_dataset_as_dataframes__iterator(
dataset_uuid="MyFirstDatasetIter", store=store_factory
dataset_uuid="MyFirstDatasetIter", store=store_url
)
# the iter backend returns a generator object. In our case we want to look at
# all partitions at once
Expand Down Expand Up @@ -203,7 +199,7 @@ Write
# show the generated task graph.
task = store_delayed_as_dataset(
input_list_of_partitions,
store=store_factory,
store=store_url,
dataset_uuid="MyFirstDatasetDask",
metadata={"dataset": "metadata"}, #  This is optional dataset metadata
metadata_version=4,
Expand All @@ -225,6 +221,6 @@ Read
import pandas as pd
from kartothek.io.dask.delayed import read_dataset_as_delayed
tasks = read_dataset_as_delayed(dataset_uuid="MyFirstDatasetDask", store=store_factory)
tasks = read_dataset_as_delayed(dataset_uuid="MyFirstDatasetDask", store=store_url)
tasks
dask.compute(tasks)
23 changes: 7 additions & 16 deletions docs/guide/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ what follows is the filepath).
dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")
store_url = f"hfs://{dataset_dir.name}"
.. admonition:: Storage locations

Expand All @@ -74,13 +74,6 @@ what follows is the filepath).
- ``hazure``: AzureBlockBlobStorage
- ``hs3``: BotoStore (Amazon S3)

.. admonition:: Store factories

The reason ``store_factory`` is defined as a ``partial`` callable with the store
information as arguments is because, when using distributed computing backends in
Kartothek, the connections of the store cannot be safely transferred between
processes and thus we pass storage information to workers as a factory function.

Interface
---------

Expand Down Expand Up @@ -109,7 +102,7 @@ to store the ``DataFrame`` ``df`` that we already have in memory.
df.dtypes.equals(another_df.dtypes) # both have the same schema
dm = store_dataframes_as_dataset(
store_factory, "a_unique_dataset_identifier", [df, another_df]
store_url, "a_unique_dataset_identifier", [df, another_df]
)
Expand Down Expand Up @@ -187,7 +180,7 @@ For example,

@verbatim
In [24]: store_dataframes_as_dataset(
....: store_factory,
....: store_url,
....: "will_not_work",
....: [df, df2],
....: )
Expand Down Expand Up @@ -231,7 +224,7 @@ default table name ``table`` and generates a UUID for the partition name.
},
]
dm = store_dataframes_as_dataset(store_factory, dataset_uuid="two-tables", dfs=dfs)
dm = store_dataframes_as_dataset(store_url, dataset_uuid="two-tables", dfs=dfs)
dm.tables
Expand All @@ -246,7 +239,7 @@ table of the dataset as a pandas DataFrame.
from kartothek.io.eager import read_table
read_table("a_unique_dataset_identifier", store_factory, table="table")
read_table("a_unique_dataset_identifier", store_url, table="table")
We can also read a dataframe iteratively, using
Expand All @@ -259,7 +252,7 @@ represent the `tables` of the dataset. For example,
from kartothek.io.iter import read_dataset_as_dataframes__iterator
for partition_index, df_dict in enumerate(
read_dataset_as_dataframes__iterator(dataset_uuid="two-tables", store=store_factory)
read_dataset_as_dataframes__iterator(dataset_uuid="two-tables", store=store_url)
):
print(f"Partition #{partition_index}")
for table_name, table_df in df_dict.items():
Expand All @@ -284,9 +277,7 @@ function but returns a collection of ``dask.delayed`` objects.
.. ipython:: python
# Read only values table `core-table` where `f` < 2.5
read_table(
"two-tables", store_factory, table="core-table", predicates=[[("f", "<", 2.5)]]
)
read_table("two-tables", store_url, table="core-table", predicates=[[("f", "<", 2.5)]])
For a deeper dive into Kartothek you can take a look at
Expand Down
35 changes: 16 additions & 19 deletions docs/guide/mutating_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ some data there with Kartothek.
dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")
store_url = f"hfs://{dataset_dir.name}"
df = pd.DataFrame(
{
Expand All @@ -46,7 +46,7 @@ some data there with Kartothek.
.. ipython:: python
dm = store_dataframes_as_dataset(
store=store_factory, dataset_uuid="partitioned_dataset", dfs=[df], partition_on="B"
store=store_url, dataset_uuid="partitioned_dataset", dfs=[df], partition_on="B"
)
sorted(dm.partitions.keys())
Expand Down Expand Up @@ -77,9 +77,7 @@ Now, we create ``another_df`` with the same schema as our intial dataframe
}
)
dm = update_dataset_from_dataframes(
[another_df], store=store_factory, dataset_uuid=dm.uuid
)
dm = update_dataset_from_dataframes([another_df], store=store_url, dataset_uuid=dm.uuid)
sorted(dm.partitions.keys())
Expand All @@ -93,7 +91,7 @@ previous contents.
from kartothek.io.eager import read_table
updated_df = read_table(dataset_uuid=dm.uuid, store=store_factory, table="table")
updated_df = read_table(dataset_uuid=dm.uuid, store=store_url, table="table")
updated_df
Expand All @@ -118,7 +116,7 @@ To illustrate this point better, let's first create a dataset with two tables:
df2
dm_two_tables = store_dataframes_as_dataset(
store_factory, "two_tables", dfs=[{"data": {"table1": df, "table2": df2}}]
store_url, "two_tables", dfs=[{"data": {"table1": df, "table2": df2}}]
)
dm_two_tables.tables
sorted(dm_two_tables.partitions.keys())
Expand Down Expand Up @@ -150,7 +148,7 @@ with new data for ``table1`` and ``table2``:
dm_two_tables = update_dataset_from_dataframes(
{"data": {"table1": another_df, "table2": another_df2}},
store=store_factory,
store=store_url,
dataset_uuid=dm_two_tables.uuid,
)
dm_two_tables.tables
Expand All @@ -169,7 +167,7 @@ Trying to update only a subset of tables throws a ``ValueError``:
....: "table2": another_df2
....: }
....: },
....: store=store_factory,
....: store=store_url,
....: dataset_uuid=dm_two_tables.uuid
....: )
....:
Expand All @@ -190,7 +188,7 @@ To do so we use the ``delete_scope`` keyword argument as shown in the example be
dm = update_dataset_from_dataframes(
None,
store=store_factory,
store=store_url,
dataset_uuid=dm.uuid,
partition_on="B",
delete_scope=[{"B": pd.Timestamp("20130102")}],
Expand Down Expand Up @@ -227,7 +225,7 @@ list but have to be specified instead as individual dictionaries, i.e.
duplicate_df.F = "bar"
dm = store_dataframes_as_dataset(
store_factory,
store_url,
"another_partitioned_dataset",
[df, duplicate_df],
partition_on=["E", "F"],
Expand All @@ -239,7 +237,7 @@ list but have to be specified instead as individual dictionaries, i.e.
dm = update_dataset_from_dataframes(
None,
store=store_factory,
store=store_url,
dataset_uuid=dm.uuid,
partition_on=["E", "F"],
delete_scope=[{"E": "train", "F": "foo"}, {"E": "test", "F": "bar"}],
Expand All @@ -260,9 +258,7 @@ with one update:
df # Column B includes 2 values for '2013-01-02' and another 2 for '2013-01-03'
dm = store_dataframes_as_dataset(
store_factory, "replace_partition", [df], partition_on="B"
)
dm = store_dataframes_as_dataset(store_url, "replace_partition", [df], partition_on="B")
sorted(dm.partitions.keys()) # two partitions, one for each value of `B`
modified_df = another_df.copy()
Expand All @@ -273,7 +269,7 @@ with one update:
[
modified_df
], # specify dataframe which has 'new' data for partition to be replaced
store=store_factory,
store=store_url,
dataset_uuid=dm.uuid,
partition_on="B", # don't forget to specify the partitioning column
delete_scope=[
Expand All @@ -282,7 +278,7 @@ with one update:
)
sorted(dm.partitions.keys())
read_table(dm.uuid, store_factory, table="table")
read_table(dm.uuid, store_url, table="table")
As can be seen in the example above, the resultant dataframe from :func:`~kartothek.io.eager.read_table`
Expand Down Expand Up @@ -317,12 +313,13 @@ When garbage collection is called, the files are removed.
.. ipython:: python
from kartothek.io.eager import garbage_collect_dataset
from storefact import get_store_from_url
store = store_factory()
store = get_store_from_url(store_url)
files_before = set(store.keys())
garbage_collect_dataset(store=store_factory, dataset_uuid=dm.uuid)
garbage_collect_dataset(store=store, dataset_uuid=dm.uuid)
files_before.difference(store.keys()) # Show files removed
Expand Down
Loading

0 comments on commit 555a01f

Please sign in to comment.