-
Notifications
You must be signed in to change notification settings - Fork 174
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOCS] Add notebooks used for pydata global 2023 presentation (#1703)
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
- Loading branch information
Showing
2 changed files
with
586 additions
and
0 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,285 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import daft" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Reading Parquet Files\n", | ||
"\n", | ||
"## Read all data in Parquet files" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = daft.read_parquet(\n", | ||
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n", | ||
")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Read one column only in Parquet files" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = daft.read_parquet(\n", | ||
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n", | ||
")\n", | ||
"df = df.select(\"L_ORDERKEY\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Read filtered data from Parquet files" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = daft.read_parquet(\n", | ||
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n", | ||
")\n", | ||
"df = df.where(df[\"L_ORDERKEY\"] < 100)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Reading **many small files**\n", | ||
"\n", | ||
"## Listing Files\n", | ||
"\n", | ||
"Let's compare naive listing with the Python boto3 library with Daft's s3 listing capabilities.\n", | ||
"\n", | ||
"Listing many small files (100k++) is a notoriously expensive operation in S3, but Daft provides an extremely efficient solution for common cases with hierarchical file namespaces." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"import boto3\n", | ||
"client = boto3.client(\"s3\")\n", | ||
"kwargs = {\"Bucket\": \"daft-public-datasets\", \"Prefix\": \"tpch-lineitem/10k-1mb-csv-files\"}\n", | ||
"response = client.list_objects_v2(**kwargs)\n", | ||
"data = response[\"Contents\"]\n", | ||
"token = response.get(\"NextContinuationToken\")\n", | ||
"\n", | ||
"while token is not None:\n", | ||
" if token is not None:\n", | ||
" kwargs[\"ContinuationToken\"] = token\n", | ||
" response = client.list_objects_v2(**kwargs)\n", | ||
" data.extend(response[\"Contents\"])\n", | ||
" token = response.get(\"NextContinuationToken\")\n", | ||
" \n", | ||
"print(f\"Retrieved {len(data)} results.\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df = daft.from_glob_path(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Reading 10K small 1MB CSV files" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = daft.read_csv(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"print(df.num_partitions())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Downloading data from URLs\n", | ||
"\n", | ||
"In many unstructured/complex data workloads, you will often have URLs in your table pointing out to some external data. Daft is extremely fast at downloading this data. Much faster than anything I've ever managed to build using boto3 and Python." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = daft.from_glob_path(\"s3://daft-public-data/open-images/validation-images/**.jpg\")\n", | ||
"\n", | ||
"IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(max_connections=64))\n", | ||
"df = df.with_column(\"data\", df[\"path\"].url.download(io_config=IO_CONFIG))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"%%time\n", | ||
"\n", | ||
"df.collect()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Running a full TPC-H Query\n", | ||
"\n", | ||
"Let's see what this profiling looks like for running a TPC-H query on " | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import daft\n", | ||
"import os\n", | ||
"\n", | ||
"PARQUET_FOLDER = \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/\"\n", | ||
"def get_df(table_name: str) -> daft.DataFrame:\n", | ||
" return daft.read_parquet(os.path.join(PARQUET_FOLDER, table_name, \"*.parquet\"))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from daft import col\n", | ||
"import datetime\n", | ||
"\n", | ||
"lineitem = get_df(\"lineitem\")\n", | ||
"\n", | ||
"discounted_price = col(\"L_EXTENDEDPRICE\") * (1 - col(\"L_DISCOUNT\"))\n", | ||
"taxed_discounted_price = discounted_price * (1 + col(\"L_TAX\"))\n", | ||
"df = (\n", | ||
" lineitem.where(col(\"L_SHIPDATE\") <= datetime.date(1998, 9, 2))\n", | ||
" .groupby(col(\"L_RETURNFLAG\"), col(\"L_LINESTATUS\"))\n", | ||
" .agg(\n", | ||
" [\n", | ||
" (col(\"L_QUANTITY\").alias(\"sum_qty\"), \"sum\"),\n", | ||
" (col(\"L_EXTENDEDPRICE\").alias(\"sum_base_price\"), \"sum\"),\n", | ||
" (discounted_price.alias(\"sum_disc_price\"), \"sum\"),\n", | ||
" (taxed_discounted_price.alias(\"sum_charge\"), \"sum\"),\n", | ||
" (col(\"L_QUANTITY\").alias(\"avg_qty\"), \"mean\"),\n", | ||
" (col(\"L_EXTENDEDPRICE\").alias(\"avg_price\"), \"mean\"),\n", | ||
" (col(\"L_DISCOUNT\").alias(\"avg_disc\"), \"mean\"),\n", | ||
" (col(\"L_QUANTITY\").alias(\"count_order\"), \"count\"),\n", | ||
" ]\n", | ||
" )\n", | ||
" .sort([\"L_RETURNFLAG\", \"L_LINESTATUS\"])\n", | ||
")" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"language_info": { | ||
"name": "python" | ||
}, | ||
"orig_nbformat": 4 | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |