Skip to content

Commit

Permalink
[CHORE] Add TPC-H questions 11-22 to benchmarks (currently skipped) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang authored Jul 20, 2024
1 parent 876fca8 commit fd67dac
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/daft-profiling.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ jobs:
env:
DAFT_DEVELOPER_USE_THREAD_POOL: '0'
run: |
py-spy record --native --function -o tpch-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup || true
py-spy record --native --function -o tpch-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup --skip_questions=11,12,13,14,15,16,17,18,19,20,21,22 || true
- name: Run GIL Profiling on TPCH Benchmark
env:
DAFT_DEVELOPER_USE_THREAD_POOL: '0'
run: |
py-spy record --native --function --gil -o tpch-gil-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup || true
py-spy record --native --function --gil -o tpch-gil-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup --skip_questions=11,12,13,14,15,16,17,18,19,20,21,22 || true
- name: Upload Profile
Expand Down
4 changes: 2 additions & 2 deletions benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


class MetricsBuilder:
NUM_TPCH_QUESTIONS = 10
NUM_TPCH_QUESTIONS = 22

HEADERS = [
"started_at",
Expand Down Expand Up @@ -133,7 +133,7 @@ def run_all_benchmarks(
daft_context = get_context()
metrics_builder = MetricsBuilder(daft_context.runner_config.name)

for i in range(1, 11):
for i in range(1, 23):
if i in skip_questions:
logger.warning("Skipping TPC-H q%s", i)
continue
Expand Down
346 changes: 345 additions & 1 deletion benchmarking/tpch/answers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
from typing import Callable

from daft import DataFrame, col
from daft import DataFrame, col, lit

GetDFFunc = Callable[[str], DataFrame]

Expand Down Expand Up @@ -333,3 +333,347 @@ def decrease(x, y):
)

return daft_df


def q11(get_df: GetDFFunc) -> DataFrame:
partsupp = get_df("partsupp")
supplier = get_df("supplier")
nation = get_df("nation")

var_1 = "GERMANY"
var_2 = 0.0001 / 1

res_1 = (
partsupp.join(supplier, left_on=col("PS_SUPPKEY"), right_on=col("S_SUPPKEY"))
.join(nation, left_on=col("S_NATIONKEY"), right_on=col("N_NATIONKEY"))
.where(col("N_NAME") == var_1)
)

res_2 = res_1.agg((col("PS_SUPPLYCOST") * col("PS_AVAILQTY")).sum().alias("tmp")).select(
col("tmp") * var_2, lit(1).alias("lit")
)

daft_df = (
res_1.groupby("PS_PARTKEY")
.agg(
(col("PS_SUPPLYCOST") * col("PS_AVAILQTY")).sum().alias("value"),
)
.with_column("lit", lit(1))
.join(res_2, on="lit")
.where(col("value") > col("tmp"))
.select(col("PS_PARTKEY"), col("value").round(2))
.sort(col("value"), desc=True)
)

return daft_df


def q12(get_df: GetDFFunc) -> DataFrame:
orders = get_df("orders")
lineitem = get_df("lineitem")

daft_df = (
orders.join(lineitem, left_on=col("O_ORDERKEY"), right_on=col("L_ORDERKEY"))
.where(
col("L_SHIPMODE").is_in(["MAIL", "SHIP"])
& (col("L_COMMITDATE") < col("L_RECEIPTDATE"))
& (col("L_SHIPDATE") < col("L_COMMITDATE"))
& (col("L_RECEIPTDATE") >= datetime.date(1994, 1, 1))
& (col("L_RECEIPTDATE") < datetime.date(1995, 1, 1))
)
.groupby(col("L_SHIPMODE"))
.agg(
col("O_ORDERPRIORITY").is_in(["1-URGENT", "2-HIGH"]).if_else(1, 0).sum().alias("high_line_count"),
(~col("O_ORDERPRIORITY").is_in(["1-URGENT", "2-HIGH"])).if_else(1, 0).sum().alias("low_line_count"),
)
.sort(col("L_SHIPMODE"))
)

return daft_df


def q13(get_df: GetDFFunc) -> DataFrame:
customers = get_df("customer")
orders = get_df("orders")

daft_df = (
customers.join(
orders.where(~col("O_COMMENT").str.match(".*special.*requests.*")),
left_on="C_CUSTKEY",
right_on="O_CUSTKEY",
how="left",
)
.groupby(col("C_CUSTKEY"))
.agg(col("O_ORDERKEY").count().alias("c_count"))
.sort("C_CUSTKEY")
.groupby("c_count")
.agg(col("c_count").count().alias("custdist"))
.sort(["custdist", "c_count"], desc=[True, True])
)

return daft_df


def q14(get_df: GetDFFunc) -> DataFrame:
lineitem = get_df("lineitem")
part = get_df("part")

daft_df = (
lineitem.join(part, left_on=col("L_PARTKEY"), right_on=col("P_PARTKEY"))
.where((col("L_SHIPDATE") >= datetime.date(1995, 9, 1)) & (col("L_SHIPDATE") < datetime.date(1995, 10, 1)))
.agg(
col("P_TYPE")
.str.startswith("PROMO")
.if_else(col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT")), 0)
.sum()
.alias("tmp_1"),
(col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT"))).sum().alias("tmp_2"),
)
.select(100.00 * (col("tmp_1") / col("tmp_2")).alias("promo_revenue"))
)

return daft_df


def q15(get_df: GetDFFunc) -> DataFrame:
lineitem = get_df("lineitem")

revenue = (
lineitem.where(
(col("L_SHIPDATE") >= datetime.date(1996, 1, 1)) & (col("L_SHIPDATE") < datetime.date(1996, 4, 1))
)
.groupby(col("L_SUPPKEY"))
.agg((col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT"))).sum().alias("total_revenue"))
.select(col("L_SUPPKEY").alias("supplier_no"), "total_revenue")
)

revenue = revenue.join(revenue.max("total_revenue"), on="total_revenue")

supplier = get_df("supplier")

daft_df = (
supplier.join(revenue, left_on=col("S_SUPPKEY"), right_on=col("supplier_no"))
.select("S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_PHONE", "total_revenue")
.sort("S_SUPPKEY")
)

return daft_df


def q16(get_df: GetDFFunc) -> DataFrame:
part = get_df("part")
partsupp = get_df("partsupp")

supplier = get_df("supplier")

suppkeys = supplier.where(col("S_COMMENT").str.match(".*Customer.*Complaints.*")).select(
col("S_SUPPKEY"), col("S_SUPPKEY").alias("PS_SUPPKEY_RIGHT")
)

daft_df = (
part.join(partsupp, left_on=col("P_PARTKEY"), right_on=col("PS_PARTKEY"))
.where(
(col("P_BRAND") != "Brand#45")
& ~col("P_TYPE").str.startswith("MEDIUM POLISHED")
& (col("P_SIZE").is_in([49, 14, 23, 45, 19, 3, 36, 9]))
)
.join(suppkeys, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left")
.where(col("PS_SUPPKEY_RIGHT").is_null())
.select("P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY")
.distinct()
.groupby("P_BRAND", "P_TYPE", "P_SIZE")
.agg(col("PS_SUPPKEY").count().alias("supplier_cnt"))
.sort(["supplier_cnt", "P_BRAND", "P_TYPE", "P_SIZE"], desc=[True, False, False, False])
)

return daft_df


def q17(get_df: GetDFFunc) -> DataFrame:
lineitem = get_df("lineitem")
part = get_df("part")

res_1 = part.where((col("P_BRAND") == "Brand#23") & (col("P_CONTAINER") == "MED BOX")).join(
lineitem, left_on="P_PARTKEY", right_on="L_PARTKEY", how="left"
)

daft_df = (
res_1.groupby("P_PARTKEY")
.agg((0.2 * col("L_QUANTITY")).mean().alias("avg_quantity"))
.select(col("P_PARTKEY").alias("key"), col("avg_quantity"))
.join(res_1, left_on="key", right_on="P_PARTKEY")
.where(col("L_QUANTITY") < col("avg_quantity"))
.agg((col("L_EXTENDEDPRICE") / 7.0).sum().alias("avg_yearly"))
)

return daft_df


def q18(get_df: GetDFFunc) -> DataFrame:
customer = get_df("customer")
orders = get_df("orders")
lineitem = get_df("lineitem")

res_1 = lineitem.groupby("L_ORDERKEY").agg(col("L_QUANTITY").sum().alias("sum_qty")).where(col("sum_qty") > 300)

daft_df = (
orders.join(res_1, left_on=col("O_ORDERKEY"), right_on=col("L_ORDERKEY"))
.join(customer, left_on=col("O_CUSTKEY"), right_on=col("C_CUSTKEY"))
.join(lineitem, left_on=col("O_ORDERKEY"), right_on=col("L_ORDERKEY"))
.groupby("C_NAME", "C_CUSTKEY", "O_ORDERKEY", "O_ORDERDATE", "O_TOTALPRICE")
.agg(col("L_QUANTITY").sum().alias("sum"))
.select("C_NAME", "C_CUSTKEY", "O_ORDERKEY", col("O_ORDERDATE").alias("O_ORDERDAT"), "O_TOTALPRICE", "sum")
.sort(["O_TOTALPRICE", "O_ORDERDAT"], desc=[True, False])
.limit(100)
)

return daft_df


def q19(get_df: GetDFFunc) -> DataFrame:
lineitem = get_df("lineitem")
part = get_df("part")

daft_df = (
lineitem.join(part, left_on=col("L_PARTKEY"), right_on=col("P_PARTKEY"))
.where(
(
(col("P_BRAND") == "Brand#12")
& col("P_CONTAINER").is_in(["SM CASE", "SM BOX", "SM PACK", "SM PKG"])
& (col("L_QUANTITY") >= 1)
& (col("L_QUANTITY") <= 11)
& (col("P_SIZE") >= 1)
& (col("P_SIZE") <= 5)
& col("L_SHIPMODE").is_in(["AIR", "AIR REG"])
& (col("L_SHIPINSTRUCT") == "DELIVER IN PERSON")
)
| (
(col("P_BRAND") == "Brand#23")
& col("P_CONTAINER").is_in(["MED BAG", "MED BOX", "MED PKG", "MED PACK"])
& (col("L_QUANTITY") >= 10)
& (col("L_QUANTITY") <= 20)
& (col("P_SIZE") >= 1)
& (col("P_SIZE") <= 10)
& col("L_SHIPMODE").is_in(["AIR", "AIR REG"])
& (col("L_SHIPINSTRUCT") == "DELIVER IN PERSON")
)
| (
(col("P_BRAND") == "Brand#34")
& col("P_CONTAINER").is_in(["LG CASE", "LG BOX", "LG PACK", "LG PKG"])
& (col("L_QUANTITY") >= 20)
& (col("L_QUANTITY") <= 30)
& (col("P_SIZE") >= 1)
& (col("P_SIZE") <= 15)
& col("L_SHIPMODE").is_in(["AIR", "AIR REG"])
& (col("L_SHIPINSTRUCT") == "DELIVER IN PERSON")
)
)
.agg((col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT"))).sum().alias("revenue"))
)

return daft_df


def q20(get_df: GetDFFunc) -> DataFrame:
supplier = get_df("supplier")
nation = get_df("nation")
part = get_df("part")
partsupp = get_df("partsupp")
lineitem = get_df("lineitem")

res_1 = (
lineitem.where(
(col("L_SHIPDATE") >= datetime.date(1994, 1, 1)) & (col("L_SHIPDATE") < datetime.date(1995, 1, 1))
)
.groupby("L_PARTKEY", "L_SUPPKEY")
.agg(((col("L_QUANTITY") * 0.5).sum()).alias("sum_quantity"))
)

res_2 = nation.where(col("N_NAME") == "CANADA")
res_3 = supplier.join(res_2, left_on="S_NATIONKEY", right_on="N_NATIONKEY")

daft_df = (
part.where(col("P_NAME").str.startswith("forest"))
.select("P_PARTKEY")
.distinct()
.join(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY")
.join(
res_1,
left_on=["PS_SUPPKEY", "P_PARTKEY"],
right_on=["L_SUPPKEY", "L_PARTKEY"],
)
.where(col("PS_AVAILQTY") > col("sum_quantity"))
.select("PS_SUPPKEY")
.distinct()
.join(res_3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY")
.select("S_NAME", "S_ADDRESS")
.sort("S_NAME")
)

return daft_df


def q21(get_df: GetDFFunc) -> DataFrame:
supplier = get_df("supplier")
nation = get_df("nation")
lineitem = get_df("lineitem")
orders = get_df("orders")

res_1 = (
lineitem.select("L_SUPPKEY", "L_ORDERKEY")
.distinct()
.groupby("L_ORDERKEY")
.agg(col("L_SUPPKEY").count().alias("nunique_col"))
.where(col("nunique_col") > 1)
.join(lineitem.where(col("L_RECEIPTDATE") > col("L_COMMITDATE")), on="L_ORDERKEY")
)

daft_df = (
res_1.select("L_SUPPKEY", "L_ORDERKEY")
.groupby("L_ORDERKEY")
.agg(col("L_SUPPKEY").count().alias("nunique_col"))
.join(res_1, on="L_ORDERKEY")
.join(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY")
.join(nation, left_on="S_NATIONKEY", right_on="N_NATIONKEY")
.join(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY")
.where((col("nunique_col") == 1) & (col("N_NAME") == "SAUDI ARABIA") & (col("O_ORDERSTATUS") == "F"))
.groupby("S_NAME")
.agg(col("O_ORDERKEY").count().alias("numwait"))
.sort(["numwait", "S_NAME"], desc=[True, False])
.limit(100)
)

return daft_df


def q22(get_df: GetDFFunc) -> DataFrame:
orders = get_df("orders")
customer = get_df("customer")

res_1 = (
customer.with_column("cntrycode", col("C_PHONE").str.left(2))
.where(col("cntrycode").is_in(["13", "31", "23", "29", "30", "18", "17"]))
.select("C_ACCTBAL", "C_CUSTKEY", "cntrycode")
)

res_2 = (
res_1.where(col("C_ACCTBAL") > 0).agg(col("C_ACCTBAL").mean().alias("avg_acctbal")).with_column("lit", lit(1))
)

res_3 = orders.select("O_CUSTKEY")

daft_df = (
res_1.join(res_3, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left")
.where(col("O_CUSTKEY").is_null())
.with_column("lit", lit(1))
.join(res_2, on="lit")
.where(col("C_ACCTBAL") > col("avg_acctbal"))
.groupby("cntrycode")
.agg(
col("C_ACCTBAL").count().alias("numcust"),
col("C_ACCTBAL").sum().alias("totacctbal"),
)
.sort("cntrycode")
)

return daft_df
Loading

0 comments on commit fd67dac

Please sign in to comment.