-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connect): add repartition support #3382
base: main
Are you sure you want to change the base?
Conversation
e4dadeb
to
9bcb827
Compare
e1aa22e
to
c27e74e
Compare
9bcb827
to
886d014
Compare
c27e74e
to
cf1ba9d
Compare
886d014
to
14d1d50
Compare
cf1ba9d
to
581eb36
Compare
7302bfb
to
62ae066
Compare
581eb36
to
59a7046
Compare
62ae066
to
6e58760
Compare
59a7046
to
54df4fd
Compare
6e58760
to
65d47db
Compare
54df4fd
to
3d8335e
Compare
65d47db
to
670c495
Compare
3d8335e
to
7819f2a
Compare
670c495
to
c57e270
Compare
7819f2a
to
a97b69b
Compare
c57e270
to
7bdd623
Compare
a97b69b
to
5957c06
Compare
7bdd623
to
8fcea97
Compare
5957c06
to
353a6e1
Compare
8fcea97
to
3038c0f
Compare
353a6e1
to
eb6a937
Compare
3038c0f
to
31e8bc8
Compare
f18c35d
to
5e7d068
Compare
b7848ce
to
0de7b03
Compare
5e7d068
to
003929c
Compare
0de7b03
to
3cb7252
Compare
003929c
to
a40e010
Compare
3cb7252
to
21fc2ec
Compare
a40e010
to
9f2e53a
Compare
21fc2ec
to
f2a1799
Compare
9f2e53a
to
538bb93
Compare
f2a1799
to
11fefd5
Compare
538bb93
to
a269fea
Compare
a269fea
to
456be41
Compare
456be41
to
5196ab7
Compare
CodSpeed Performance ReportMerging #3382 will degrade performances by 32.09%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3382 +/- ##
=======================================
Coverage 77.86% 77.86%
=======================================
Files 719 720 +1
Lines 88459 88488 +29
=======================================
+ Hits 68877 68902 +25
- Misses 19582 19586 +4
|
let shuffle = shuffle.unwrap_or(true); | ||
|
||
if !shuffle { | ||
bail!("Repartitioning without shuffling is not yet supported"); | ||
} | ||
|
||
plan.builder = plan | ||
.builder | ||
.random_shuffle(Some(num_partitions)) | ||
.wrap_err("Failed to apply random_shuffle to logical plan")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should support non shuffle based repartitions too.
in daft, we do this via hash_repartition
def test_repartition(spark_session): | ||
# Create a simple DataFrame | ||
df = spark_session.range(10) | ||
|
||
# Test repartitioning to 2 partitions | ||
repartitioned = df.repartition(2) | ||
|
||
# Verify data is preserved after repartitioning | ||
original_data = sorted(df.collect()) | ||
repartitioned_data = sorted(repartitioned.collect()) | ||
assert repartitioned_data == original_data, "Data should be preserved after repartitioning" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd instead just look at the plan and assert that it has a partition node.
ex: (note: this is using native spark, so our explain will be a little different)
d = [{'idx': 1}]
df = spark.createDataFrame(d).repartition(2)
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
+- Scan ExistingRDD[idx#0L]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess this is fine for now considering we don't have explain yet
7d2d9d6
to
655aec2
Compare
- [ ] the test is not great but idk how to do it better since rdd does not work with spark connect (I think) - [ ] do we want to support non-shuffle repartitioning?
655aec2
to
36eba74
Compare
not work with spark connect (I think)