Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Iceberg partitioned writes #2842

Merged
merged 11 commits into from
Sep 20, 2024
Merged

Conversation

kevinzwang
Copy link
Member

@kevinzwang kevinzwang commented Sep 12, 2024

This also changes the behavior of some of the partitioning functions in ways that I would consider as bug fixes. However @samster25 maybe you should take a look at them to make sure their new behavior is correct. The changes:

  • truncate function now renames columns to {}_trunc instead of {}_truncate to match Spark behavior
  • day partitioning now returns an Int32Array instead of a DateArray. I believe the past behavior was there to match pyiceberg, but talking to the pyiceberg team, this actually seems like a bug. I plan on making a PR to pyiceberg to fix this, but I have also moved and fixed the buggy logic to our codebase so that it works with past versions of pyiceberg as well.

@github-actions github-actions bot added the enhancement New feature or request label Sep 12, 2024
Copy link

codspeed-hq bot commented Sep 12, 2024

CodSpeed Performance Report

Merging #2842 will degrade performances by 46.85%

Comparing kevin/iceberg-partitioned-writes (f902eb0) with main (c5b7062)

Summary

⚡ 1 improvements
❌ 2 regressions
✅ 13 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main kevin/iceberg-partitioned-writes Change
test_count[1 Small File] 18.7 ms 24.1 ms -22.31%
test_explain[100 Small Files] 37.4 ms 33.7 ms +10.98%
test_show[100 Small Files] 56.2 ms 105.7 ms -46.85%

@kevinzwang kevinzwang marked this pull request as ready for review September 12, 2024 23:31
daft/execution/rust_physical_plan_shim.py Outdated Show resolved Hide resolved
src/daft-plan/src/sink_info.rs Show resolved Hide resolved
daft/iceberg/iceberg_write.py Outdated Show resolved Hide resolved
daft/iceberg/iceberg_write.py Outdated Show resolved Hide resolved
daft/iceberg/iceberg_write.py Outdated Show resolved Hide resolved
daft/table/table_io.py Outdated Show resolved Hide resolved
Copy link

codecov bot commented Sep 13, 2024

Codecov Report

Attention: Patch coverage is 84.58781% with 43 lines in your changes missing coverage. Please review.

Project coverage is 66.29%. Comparing base (c5b7062) to head (f902eb0).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
daft/iceberg/iceberg_write.py 75.43% 28 Missing ⚠️
daft/table/table_io.py 95.16% 3 Missing ⚠️
src/daft-plan/src/logical_ops/sink.rs 75.00% 3 Missing ⚠️
src/daft-core/src/series/ops/partitioning.rs 80.00% 2 Missing ⚠️
daft/dataframe/dataframe.py 91.66% 1 Missing ⚠️
daft/execution/execution_step.py 50.00% 1 Missing ⚠️
daft/execution/physical_plan.py 0.00% 1 Missing ⚠️
daft/execution/rust_physical_plan_shim.py 0.00% 1 Missing ⚠️
daft/iceberg/iceberg_scan.py 90.00% 1 Missing ⚠️
daft/table/partitioning.py 97.22% 1 Missing ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2842      +/-   ##
==========================================
+ Coverage   66.01%   66.29%   +0.28%     
==========================================
  Files        1001     1003       +2     
  Lines      113467   113616     +149     
==========================================
+ Hits        74906    75325     +419     
+ Misses      38561    38291     -270     
Flag Coverage Δ
66.29% <84.58%> (+0.28%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
daft/expressions/expressions.py 93.37% <ø> (ø)
daft/logical/builder.py 89.61% <100.00%> (ø)
src/daft-core/src/array/ops/truncate.rs 100.00% <100.00%> (ø)
src/daft-plan/src/builder.rs 92.90% <100.00%> (ø)
src/daft-plan/src/sink_info.rs 32.32% <ø> (-0.03%) ⬇️
src/daft-scheduler/src/scheduler.rs 90.83% <100.00%> (-0.03%) ⬇️
daft/dataframe/dataframe.py 86.54% <91.66%> (+0.11%) ⬆️
daft/execution/execution_step.py 91.97% <50.00%> (-0.19%) ⬇️
daft/execution/physical_plan.py 89.30% <0.00%> (-0.14%) ⬇️
daft/execution/rust_physical_plan_shim.py 91.30% <0.00%> (-1.01%) ⬇️
... and 7 more

... and 18 files with indirect coverage changes

@jaychia jaychia removed the request for review from samster25 September 16, 2024 19:20
daft/iceberg/iceberg_scan.py Show resolved Hide resolved
daft/iceberg/iceberg_write.py Show resolved Hide resolved
daft/table/table_io.py Outdated Show resolved Hide resolved
daft/table/table_io.py Outdated Show resolved Hide resolved
daft/table/table_io.py Outdated Show resolved Hide resolved
src/daft-core/src/series/ops/partitioning.rs Outdated Show resolved Hide resolved
src/daft-core/src/series/ops/partitioning.rs Outdated Show resolved Hide resolved
src/daft-plan/src/logical_ops/sink.rs Outdated Show resolved Hide resolved
tests/io/iceberg/test_iceberg_writes.py Show resolved Hide resolved
tests/io/iceberg/test_iceberg_writes.py Outdated Show resolved Hide resolved
daft/table/table_io.py Outdated Show resolved Hide resolved
@kevinzwang kevinzwang requested a review from jaychia September 18, 2024 01:09
daft/table/partitioning.py Show resolved Hide resolved
daft/table/partitioning.py Show resolved Hide resolved
src/daft-plan/src/logical_ops/sink.rs Show resolved Hide resolved
read_back = daft.read_iceberg(table)
assert as_arrow == read_back.to_arrow()
assert all(op == "ADD" for op in as_dict["operation"]), as_dict["operation"]
assert sum(as_dict["rows"]) == 5, as_dict["rows"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is there an assertion we should make here on the number of files that were written?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@samster25
Copy link
Member

all partitioning functions now preserve the name of the original column. This conforms better to the behavior of other expressions/series ops

This would actually deviate from the behavior from spark and cause incorrect hive style reads! Maybe we can add an alias when writing out partitioning columns

@kevinzwang
Copy link
Member Author

This would actually deviate from the behavior from spark and cause incorrect hive style reads! Maybe we can add an alias when writing out partitioning columns

I changed this back actually. sorry I forgot to update the description. However this is maybe an interesting insight. Should our iceberg partitioned writes write out paths with modified names such as key_bucket_4=1234/0000.parquet? I'll also check pyiceberg and spark behavior

@kevinzwang kevinzwang enabled auto-merge (squash) September 20, 2024 20:34
@kevinzwang kevinzwang merged commit 2c13f17 into main Sep 20, 2024
37 of 38 checks passed
@kevinzwang kevinzwang deleted the kevin/iceberg-partitioned-writes branch September 20, 2024 21:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants