forked from facebookincubator/velox
-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add RowsStreamingWindowBuild to avoid OOM in Window operator
- Loading branch information
Showing
20 changed files
with
554 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/exec/RowsStreamingWindowBuild.h" | ||
#include "velox/exec/RowsStreamingWindowPartition.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
RowsStreamingWindowBuild::RowsStreamingWindowBuild( | ||
const std::shared_ptr<const core::WindowNode>& windowNode, | ||
velox::memory::MemoryPool* pool, | ||
const common::SpillConfig* spillConfig, | ||
tsan_atomic<bool>* nonReclaimableSection) | ||
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} | ||
|
||
void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) { | ||
if (windowPartitions_.size() <= inputCurrentPartition_) { | ||
windowPartitions_.push_back(std::make_shared<RowsStreamingWindowPartition>( | ||
data_.get(), | ||
folly::Range<char**>(nullptr, nullptr), | ||
inversedInputChannels_, | ||
sortKeyInfo_)); | ||
} | ||
|
||
windowPartitions_[inputCurrentPartition_]->addNewRows(inputRows_); | ||
|
||
if (isFinished) { | ||
windowPartitions_[inputCurrentPartition_]->setInputRowsFinished(); | ||
inputCurrentPartition_++; | ||
} | ||
|
||
inputRows_.clear(); | ||
} | ||
|
||
void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { | ||
for (auto i = 0; i < inputChannels_.size(); ++i) { | ||
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); | ||
} | ||
|
||
for (auto row = 0; row < input->size(); ++row) { | ||
char* newRow = data_->newRow(); | ||
|
||
for (auto col = 0; col < input->childrenSize(); ++col) { | ||
data_->store(decodedInputVectors_[col], row, newRow, col); | ||
} | ||
|
||
if (previousRow_ != nullptr && | ||
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { | ||
buildNextInputOrPartition(true); | ||
} | ||
|
||
// Wait for the peers to be ready in single partition; these peers are the | ||
// rows that have identical values in the ORDER BY clause. | ||
if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_ && | ||
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) { | ||
buildNextInputOrPartition(false); | ||
} | ||
|
||
inputRows_.push_back(newRow); | ||
previousRow_ = newRow; | ||
} | ||
} | ||
|
||
void RowsStreamingWindowBuild::noMoreInput() { | ||
buildNextInputOrPartition(true); | ||
} | ||
|
||
std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() { | ||
if (outputCurrentPartition_ > 0) { | ||
windowPartitions_[outputCurrentPartition_].reset(); | ||
} | ||
|
||
return windowPartitions_[++outputCurrentPartition_]; | ||
} | ||
|
||
bool RowsStreamingWindowBuild::hasNextPartition() { | ||
return windowPartitions_.size() > 0 && | ||
outputCurrentPartition_ <= int(windowPartitions_.size() - 2); | ||
} | ||
|
||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include "velox/exec/WindowBuild.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of | ||
/// processing window functions as rows arrive within a single partition, | ||
/// without the need to wait for the entire partition to be ready. This approach | ||
/// can significantly reduce memory usage, especially when a single partition | ||
/// contains a large amount of data. It is particularly suited for optimizing | ||
/// rank and row_number functions, as well as aggregate window functions with a | ||
/// default frame. | ||
class RowsStreamingWindowBuild : public WindowBuild { | ||
public: | ||
RowsStreamingWindowBuild( | ||
const std::shared_ptr<const core::WindowNode>& windowNode, | ||
velox::memory::MemoryPool* pool, | ||
const common::SpillConfig* spillConfig, | ||
tsan_atomic<bool>* nonReclaimableSection); | ||
|
||
void addInput(RowVectorPtr input) override; | ||
|
||
void spill() override { | ||
VELOX_UNREACHABLE(); | ||
} | ||
|
||
std::optional<common::SpillStats> spilledStats() const override { | ||
return std::nullopt; | ||
} | ||
|
||
void noMoreInput() override; | ||
|
||
bool hasNextPartition() override; | ||
|
||
std::shared_ptr<WindowPartition> nextPartition() override; | ||
|
||
bool needsInput() override { | ||
// No partitions are available or the currentPartition is the last available | ||
// one, so can consume input rows. | ||
return windowPartitions_.size() == 0 || | ||
outputCurrentPartition_ == windowPartitions_.size() - 1; | ||
} | ||
|
||
private: | ||
void buildNextInputOrPartition(bool isFinished); | ||
|
||
// Holds input rows within the current partition. | ||
std::vector<char*> inputRows_; | ||
|
||
// Used to compare rows based on partitionKeys. | ||
char* previousRow_ = nullptr; | ||
|
||
// Current partition being output. Used to return the WidnowPartitions. | ||
vector_size_t outputCurrentPartition_ = -1; | ||
|
||
// Current partition when adding input. Used to construct WindowPartitions. | ||
vector_size_t inputCurrentPartition_ = 0; | ||
|
||
// Holds all the WindowPartitions. | ||
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_; | ||
}; | ||
|
||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#include "velox/exec/RowsStreamingWindowPartition.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
RowsStreamingWindowPartition::RowsStreamingWindowPartition( | ||
RowContainer* data, | ||
const folly::Range<char**>& rows, | ||
const std::vector<column_index_t>& inputMapping, | ||
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo) | ||
: WindowPartition(data, rows, inputMapping, sortKeyInfo) { | ||
partitionStartRows_.push_back(0); | ||
} | ||
|
||
void RowsStreamingWindowPartition::addNewRows(std::vector<char*> rows) { | ||
partitionStartRows_.push_back(partitionStartRows_.back() + rows.size()); | ||
|
||
sortedRows_.insert(sortedRows_.end(), rows.begin(), rows.end()); | ||
} | ||
|
||
bool RowsStreamingWindowPartition::buildNextRows() { | ||
if (currentPartition_ >= int(partitionStartRows_.size() - 2)) | ||
return false; | ||
|
||
currentPartition_++; | ||
|
||
// Erase previous rows in current partition. | ||
if (currentPartition_ > 0) { | ||
auto numPreviousPartitionRows = partitionStartRows_[currentPartition_] - | ||
partitionStartRows_[currentPartition_ - 1]; | ||
data_->eraseRows( | ||
folly::Range<char**>(sortedRows_.data(), numPreviousPartitionRows)); | ||
sortedRows_.erase( | ||
sortedRows_.begin(), sortedRows_.begin() + numPreviousPartitionRows); | ||
} | ||
|
||
auto partitionSize = partitionStartRows_[currentPartition_ + 1] - | ||
partitionStartRows_[currentPartition_]; | ||
|
||
partition_ = folly::Range(sortedRows_.data(), partitionSize); | ||
return true; | ||
} | ||
|
||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#pragma once | ||
|
||
#include "velox/exec/RowContainer.h" | ||
#include "velox/exec/WindowPartition.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
/// RowsStreamingWindowPartition is to facilitate RowsStreamingWindowBuild by | ||
/// processing rows within WindowPartition in a streaming manner. | ||
class RowsStreamingWindowPartition : public WindowPartition { | ||
public: | ||
RowsStreamingWindowPartition( | ||
RowContainer* data, | ||
const folly::Range<char**>& rows, | ||
const std::vector<column_index_t>& inputMapping, | ||
const std::vector<std::pair<column_index_t, core::SortOrder>>& | ||
sortKeyInfo); | ||
|
||
// Returns the number of rows in the current partial window partition, | ||
// including the offset within the full partition. | ||
vector_size_t numRows() const override { | ||
if (currentPartition_ == -1) { | ||
return 0; | ||
} else { | ||
return partition_.size() + partitionStartRows_[currentPartition_]; | ||
} | ||
} | ||
|
||
// Returns the starting offset of the current partial window partition within | ||
// the full partition. | ||
vector_size_t offsetInPartition() const override { | ||
return partitionStartRows_[currentPartition_]; | ||
} | ||
|
||
// Indicates support for row-level streaming processing. | ||
bool supportRowLevelStreaming() const override { | ||
return true; | ||
} | ||
|
||
// Sets the flag indicating that all input rows have been processed on the | ||
// producer side. | ||
void setInputRowsFinished() override { | ||
inputRowsFinished_ = true; | ||
} | ||
|
||
// Adds new rows to the partition using a streaming approach on the producer | ||
// side. | ||
void addNewRows(std::vector<char*> rows) override; | ||
|
||
// Builds the next set of available rows on the consumer side. | ||
bool buildNextRows() override; | ||
|
||
// Determines if the current partition is complete and then proceed to the | ||
// next partition. | ||
bool processFinished() const override { | ||
return ( | ||
inputRowsFinished_ && | ||
currentPartition_ == partitionStartRows_.size() - 2); | ||
} | ||
|
||
private: | ||
// Indicates whether all input rows have been added to sortedRows_ | ||
bool inputRowsFinished_ = false; | ||
|
||
// Stores new rows added to the WindowPartition. | ||
std::vector<char*> sortedRows_; | ||
|
||
// Indices of the start row (in sortedRows_) of each partitial partition. | ||
std::vector<vector_size_t> partitionStartRows_; | ||
|
||
// Current partial partition being output. | ||
vector_size_t currentPartition_ = -1; | ||
}; | ||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.