Skip to content

Commit

Permalink
[DOCS] Add notebooks used for pydata global 2023 presentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 6, 2023
1 parent a53cd51 commit ded724d
Show file tree
Hide file tree
Showing 2 changed files with 586 additions and 0 deletions.
301 changes: 301 additions & 0 deletions tutorials/intro.ipynb

Large diffs are not rendered by default.

285 changes: 285 additions & 0 deletions tutorials/talks_and_demos/pydata_global_2023.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import daft"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Reading Parquet Files\n",
"\n",
"## Read all data in Parquet files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = daft.read_parquet(\n",
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read one column only in Parquet files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = daft.read_parquet(\n",
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
")\n",
"df = df.select(\"L_ORDERKEY\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read filtered data from Parquet files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = daft.read_parquet(\n",
" \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
")\n",
"df = df.where(df[\"L_ORDERKEY\"] < 100)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Reading **many small files**\n",
"\n",
"## Listing Files\n",
"\n",
"Let's compare naive listing with the Python boto3 library with Daft's s3 listing capabilities.\n",
"\n",
"Listing many small files (100k++) is a notoriously expensive operation in S3, but Daft provides an extremely efficient solution for common cases with hierarchical file namespaces."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"import boto3\n",
"client = boto3.client(\"s3\")\n",
"kwargs = {\"Bucket\": \"daft-public-datasets\", \"Prefix\": \"tpch-lineitem/10k-1mb-csv-files\"}\n",
"response = client.list_objects_v2(**kwargs)\n",
"data = response[\"Contents\"]\n",
"token = response.get(\"NextContinuationToken\")\n",
"\n",
"while token is not None:\n",
" if token is not None:\n",
" kwargs[\"ContinuationToken\"] = token\n",
" response = client.list_objects_v2(**kwargs)\n",
" data.extend(response[\"Contents\"])\n",
" token = response.get(\"NextContinuationToken\")\n",
" \n",
"print(f\"Retrieved {len(data)} results.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df = daft.from_glob_path(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading 10K small 1MB CSV files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = daft.read_csv(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(df.num_partitions())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Downloading data from URLs\n",
"\n",
"In many unstructured/complex data workloads, you will often have URLs in your table pointing out to some external data. Daft is extremely fast at downloading this data. Much faster than anything I've ever managed to build using boto3 and Python."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = daft.from_glob_path(\"s3://daft-public-data/open-images/validation-images/**.jpg\")\n",
"\n",
"IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(max_connections=64))\n",
"df = df.with_column(\"data\", df[\"path\"].url.download(io_config=IO_CONFIG))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Running a full TPC-H Query\n",
"\n",
"Let's see what this profiling looks like for running a TPC-H query on "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import daft\n",
"import os\n",
"\n",
"PARQUET_FOLDER = \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/\"\n",
"def get_df(table_name: str) -> daft.DataFrame:\n",
" return daft.read_parquet(os.path.join(PARQUET_FOLDER, table_name, \"*.parquet\"))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from daft import col\n",
"import datetime\n",
"\n",
"lineitem = get_df(\"lineitem\")\n",
"\n",
"discounted_price = col(\"L_EXTENDEDPRICE\") * (1 - col(\"L_DISCOUNT\"))\n",
"taxed_discounted_price = discounted_price * (1 + col(\"L_TAX\"))\n",
"df = (\n",
" lineitem.where(col(\"L_SHIPDATE\") <= datetime.date(1998, 9, 2))\n",
" .groupby(col(\"L_RETURNFLAG\"), col(\"L_LINESTATUS\"))\n",
" .agg(\n",
" [\n",
" (col(\"L_QUANTITY\").alias(\"sum_qty\"), \"sum\"),\n",
" (col(\"L_EXTENDEDPRICE\").alias(\"sum_base_price\"), \"sum\"),\n",
" (discounted_price.alias(\"sum_disc_price\"), \"sum\"),\n",
" (taxed_discounted_price.alias(\"sum_charge\"), \"sum\"),\n",
" (col(\"L_QUANTITY\").alias(\"avg_qty\"), \"mean\"),\n",
" (col(\"L_EXTENDEDPRICE\").alias(\"avg_price\"), \"mean\"),\n",
" (col(\"L_DISCOUNT\").alias(\"avg_disc\"), \"mean\"),\n",
" (col(\"L_QUANTITY\").alias(\"count_order\"), \"count\"),\n",
" ]\n",
" )\n",
" .sort([\"L_RETURNFLAG\", \"L_LINESTATUS\"])\n",
")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit ded724d

Please sign in to comment.