You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Recently I have been running some benchmarks on TFT using a variety of different datasets. One thing I have noticed is that CacheableCombineAccumulate seems to have a bit of a scaling issue, in particular with tft.bucketize and the QuantilesCombiner.
As a concrete example, I ran an experiment with a dataset of 20 billion rows and around 1000 dense scalar columns. I added just one analyzer: tft.bucketize(num_buckets=100, elementwise=True) and applied it to the 1000 columns concatenated together.
Dataflow job ID: 2020-06-12_11_25_12-8283309558685381372
Elapsed time: 21 hr 57 min
Total vCPU time: 11,455.279 vCPU hr
Around 19 of the 22 wall-clock hours were spent on the InitialCombineGlobally step.
To verify that this was a bottleneck on InitialCombineGlobally I forked _IntermediateAccumulateCombineImpl and added .with_fanout(1000) to beam.CombineGlobally.
Dataflow job ID: 2020-06-22_13_45_28-14720730178676069772
Elapsed time: 2 hr 43 min
Total vCPU time: 6,638.944 vCPU hr
It would be nice if we could annotate our analyzers/combiners with the desired fanout size so that we have control over this issue should it arise. Also, while the above experiment seems like a rather perversely-chosen dataset and transformation combination that is unlikely to exist in practice, this workflow is somewhat similar to certain real-world problems that I am working with so this would be very helpful to have fixed.
The text was updated successfully, but these errors were encountered:
Recently I have been running some benchmarks on TFT using a variety of different datasets. One thing I have noticed is that
CacheableCombineAccumulate
seems to have a bit of a scaling issue, in particular withtft.bucketize
and theQuantilesCombiner
.As a concrete example, I ran an experiment with a dataset of 20 billion rows and around 1000 dense scalar columns. I added just one analyzer:
tft.bucketize(num_buckets=100, elementwise=True)
and applied it to the 1000 columns concatenated together.Dataflow job ID:
2020-06-12_11_25_12-8283309558685381372
Elapsed time: 21 hr 57 min
Total vCPU time: 11,455.279 vCPU hr
Around 19 of the 22 wall-clock hours were spent on the
InitialCombineGlobally
step.To verify that this was a bottleneck on
InitialCombineGlobally
I forked_IntermediateAccumulateCombineImpl
and added.with_fanout(1000)
tobeam.CombineGlobally
.Dataflow job ID:
2020-06-22_13_45_28-14720730178676069772
Elapsed time: 2 hr 43 min
Total vCPU time: 6,638.944 vCPU hr
It would be nice if we could annotate our analyzers/combiners with the desired fanout size so that we have control over this issue should it arise. Also, while the above experiment seems like a rather perversely-chosen dataset and transformation combination that is unlikely to exist in practice, this workflow is somewhat similar to certain real-world problems that I am working with so this would be very helpful to have fixed.
The text was updated successfully, but these errors were encountered: