Skip to content

Commit

Permalink
Nimble uses UnitLoader
Browse files Browse the repository at this point in the history
Summary: So we can implement prefetching.

Differential Revision: D56739671
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed May 1, 2024
1 parent 40d0ea5 commit 0bc09ec
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 78 deletions.
264 changes: 189 additions & 75 deletions dwio/nimble/velox/VeloxReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/
#include "dwio/nimble/velox/VeloxReader.h"
#include <velox/dwio/common/OnDemandUnitLoader.h>
#include <velox/dwio/common/UnitLoader.h>
#include <chrono>
#include <cstdint>
#include <optional>
Expand Down Expand Up @@ -74,6 +76,121 @@ std::shared_ptr<const velox::Type> createFlatType(
selectedFeatures.size(), valueType));
}

class NimbleUnit : public velox::dwio::common::LoadUnit {
public:
NimbleUnit(
uint32_t stripeId,
std::shared_ptr<const TabletReader> tabletReader,
std::shared_ptr<const Type> schema,
std::shared_ptr<MetricsLogger> logger,
const std::vector<uint32_t>& offsets)
: tabletReader_{std::move(tabletReader)},
schema_{std::move(schema)},
stripeId_{stripeId},
rowCount_{tabletReader_->stripeRowCount(stripeId)},
logger_{std::move(logger)},
offsets_{offsets} {}

~NimbleUnit() override = default;

// Perform the IO (read)
void load() override;

// Unload the unit to free memory
void unload() override;

// Number of rows in the unit
uint64_t getNumRows() override {
return rowCount_;
}

// Number of bytes that the IO will read
uint64_t getIoSize() override;

std::vector<std::unique_ptr<StreamLoader>> moveStreamLoaders() {
return std::move(streamLoaders_);
}

private:
std::shared_ptr<const TabletReader> tabletReader_;
std::shared_ptr<const Type> schema_;
uint32_t stripeId_;
uint64_t rowCount_;
// Right now each reader is considered its own session if not passed from
// writer option.
std::shared_ptr<MetricsLogger> logger_;
const std::vector<uint32_t>& offsets_;

// Lazy
std::optional<uint64_t> ioSize_;

std::optional<TabletReader::StripeIdentifier> stripeIdentifier_;
// Will be loaded on load() and moved away in moveStreamLoaders()
std::vector<std::unique_ptr<StreamLoader>> streamLoaders_;
};

void NimbleUnit::load() {
try {
StripeLoadMetrics metrics{};
velox::CpuWallTiming timing{};
{
velox::CpuWallTimer timer{timing};
// LoadAll returns all the stream available in a stripe.
// The streams returned might be a subset of the total streams available
// in the file, as the current stripe might have captured/encountered
// less streams than later stripes. In the extreme case, a stripe can
// return zero streams (for example, if all the streams in that stripe
// were contained all nulls).
if (!stripeIdentifier_.has_value()) {
stripeIdentifier_ = tabletReader_->getStripeIdentifier(stripeId_);
}
StreamLabels streamLabels(schema_);
streamLoaders_ = tabletReader_->load(
stripeIdentifier_.value(),
offsets_,
[&streamLabels](offset_size offset) {
return streamLabels.streamLabel(offset);
});
}
for (uint32_t i = 0; i < streamLoaders_.size(); ++i) {
if (streamLoaders_[i]) {
metrics.totalStreamSize += streamLoaders_[i]->getStream().size();
++metrics.streamCount;
}
}
metrics.stripeIndex = stripeId_;
metrics.rowsInStripe = tabletReader_->stripeRowCount(stripeId_);
metrics.cpuUsec = timing.cpuNanos / 1000;
metrics.wallTimeUsec = timing.wallNanos / 1000;
logger_->logStripeLoad(metrics);
} catch (const std::exception& e) {
logger_->logException(LogOperation::StripeLoad, e.what());
throw;
} catch (...) {
logger_->logException(
LogOperation::StripeLoad,
folly::to<std::string>(folly::exceptionStr(std::current_exception())));
throw;
}
}

void NimbleUnit::unload() {
streamLoaders_.clear();
stripeIdentifier_.reset();
}

uint64_t NimbleUnit::getIoSize() {
if (ioSize_.has_value()) {
return ioSize_.value();
}
if (!stripeIdentifier_.has_value()) {
stripeIdentifier_ = tabletReader_->getStripeIdentifier(stripeId_);
}
ioSize_ =
tabletReader_->getStreamSizeSum(stripeIdentifier_.value(), offsets_);
return ioSize_.value();
}

} // namespace

const std::vector<std::string>& VeloxReader::preloadedOptionalSections() {
Expand Down Expand Up @@ -118,7 +235,6 @@ VeloxReader::VeloxReader(
tabletReader_{std::move(tabletReader)},
parameters_{std::move(params)},
schema_{loadSchema(*tabletReader_)},
streamLabels_{schema_},
type_{
selector ? selector->getSchema()
: std::dynamic_pointer_cast<const velox::RowType>(
Expand All @@ -128,7 +244,9 @@ VeloxReader::VeloxReader(
? std::make_unique<velox::dwio::common::ExecutorBarrier>(
params.decodingExecutor)
: nullptr},
logger_{parameters_.metricsLogger} {
logger_{
parameters_.metricsLogger ? parameters_.metricsLogger
: std::make_shared<MetricsLogger>()} {
static_assert(std::is_same_v<velox::vector_size_t, int32_t>);

if (!selector) {
Expand Down Expand Up @@ -198,92 +316,60 @@ VeloxReader::VeloxReader(
<< "). Total stripes: " << tabletReader_->stripeCount()
<< ". Total rows: " << tabletReader_->tabletRowCount();

if (!logger_) {
logger_ = std::make_shared<MetricsLogger>();
}
unitLoader_ = getUnitLoader();
}

void VeloxReader::loadStripeIfAny() {
if (nextStripe_ < lastStripe_) {
loadStripe();
loadNextStripe();
}
}

void VeloxReader::loadStripe() {
try {
if (loadedStripe_ != std::numeric_limits<uint32_t>::max() &&
loadedStripe_ == nextStripe_) {
// We are not reloading the current stripe, but we expect all
// decoders/readers to be reset after calling loadStripe(), therefore, we
// need to explicitly reset all decoders and readers.
rootReader_->reset();

rowsRemainingInStripe_ = tabletReader_->stripeRowCount(nextStripe_);
++nextStripe_;
return;
}
void VeloxReader::loadNextStripe() {
if (loadedStripe_.has_value() && loadedStripe_.value() == nextStripe_) {
// We are not reloading the current stripe, but we expect all
// decoders/readers to be reset after calling loadNextStripe(), therefore,
// we need to explicitly reset all decoders and readers.
rootReader_->reset();

rowsRemainingInStripe_ = tabletReader_->stripeRowCount(nextStripe_);
unitLoader_->onSeek(getUnitIndex(loadedStripe_.value()), 0);
++nextStripe_;
return;
}

StripeLoadMetrics metrics{};
velox::CpuWallTiming timing{};
{
velox::CpuWallTimer timer{timing};
// LoadAll returns all the stream available in a stripe.
// The streams returned might be a subset of the total streams available
// in the file, as the current stripe might have captured/encountered less
// streams than later stripes.
// In the extreme case, a stripe can return zero streams (for example, if
// all the streams in that stripe were contained all nulls).
auto stripeIdentifier = tabletReader_->getStripeIdentifier(nextStripe_);
auto streams = tabletReader_->load(
stripeIdentifier, offsets_, [this](offset_size offset) {
return streamLabels_.streamLabel(offset);
});
for (uint32_t i = 0; i < streams.size(); ++i) {
if (!streams[i]) {
// As this stream is not present in current stripe (might be present
// in previous one) we set to nullptr, One of the case is where you
// are projecting more fields in FlatMap than the stripe actually
// has.
decoders_[offsets_[i]] = nullptr;
} else {
metrics.totalStreamSize += streams[i]->getStream().size();
decoders_[offsets_[i]] = std::make_unique<ChunkedStreamDecoder>(
pool_,
std::make_unique<InMemoryChunkedStream>(
pool_, std::move(streams[i])),
*logger_);
++metrics.streamCount;
}
}
rowsRemainingInStripe_ = tabletReader_->stripeRowCount(nextStripe_);
loadedStripe_ = nextStripe_;
++nextStripe_;
rootReader_ = rootFieldReaderFactory_->createReader(decoders_);
}
metrics.stripeIndex = loadedStripe_;
metrics.rowsInStripe = rowsRemainingInStripe_;
metrics.cpuUsec = timing.cpuNanos / 1000;
metrics.wallTimeUsec = timing.wallNanos / 1000;
if (parameters_.blockedOnIoCallback) {
parameters_.blockedOnIoCallback(
std::chrono::nanoseconds{timing.wallNanos});
auto streams = loadStripe(nextStripe_);
for (uint32_t i = 0; i < streams.size(); ++i) {
if (!streams[i]) {
// As this stream is not present in current stripe (might be present
// in previous one) we set to nullptr, One of the case is where you
// are projecting more fields in FlatMap than the stripe actually
// has.
decoders_[offsets_[i]] = nullptr;
} else {
decoders_[offsets_[i]] = std::make_unique<ChunkedStreamDecoder>(
pool_,
std::make_unique<InMemoryChunkedStream>(pool_, std::move(streams[i])),
*logger_);
}
logger_->logStripeLoad(metrics);
} catch (const std::exception& e) {
logger_->logException(LogOperation::StripeLoad, e.what());
throw;
} catch (...) {
logger_->logException(
LogOperation::StripeLoad,
folly::to<std::string>(folly::exceptionStr(std::current_exception())));
throw;
}
loadedStripe_ = nextStripe_++;
rootReader_ = rootFieldReaderFactory_->createReader(decoders_);
}

std::vector<std::unique_ptr<StreamLoader>> VeloxReader::loadStripe(
uint32_t stripe) {
auto& unit = unitLoader_->getLoadedUnit(getUnitIndex(stripe));
auto* nimbleUnit = dynamic_cast<NimbleUnit*>(&unit);
NIMBLE_ASSERT(nimbleUnit, "Should be a NimbleUnit");
rowsRemainingInStripe_ = nimbleUnit->getNumRows();
return nimbleUnit->moveStreamLoaders();
}

bool VeloxReader::next(uint64_t rowCount, velox::VectorPtr& result) {
if (rowsRemainingInStripe_ == 0) {
if (nextStripe_ < lastStripe_) {
loadStripe();
loadNextStripe();
} else {
return false;
}
Expand All @@ -293,6 +379,8 @@ bool VeloxReader::next(uint64_t rowCount, velox::VectorPtr& result) {
if (parameters_.decodingTimeCallback) {
startTime = std::chrono::steady_clock::now();
}
unitLoader_->onRead(
getUnitIndex(loadedStripe_.value()), getCurrentRowInStripe(), rowsToRead);
rootReader_->next(rowsToRead, result, nullptr /*scatterBitmap*/);
if (barrier_) {
// Wait for all reader tasks to complete.
Expand Down Expand Up @@ -354,7 +442,7 @@ uint64_t VeloxReader::seekToRow(uint64_t rowNumber) {
}

auto rowsSkipped = skipStripes(0, rowNumber);
loadStripe();
loadNextStripe();
skipInCurrentStripe(rowNumber - rowsSkipped);
return rowNumber;
}
Expand Down Expand Up @@ -387,7 +475,7 @@ uint64_t VeloxReader::skipRows(uint64_t numberOfRowsToSkip) {
return rowsSkipped;
}

loadStripe();
loadNextStripe();
skipInCurrentStripe(numberOfRowsToSkip - rowsSkipped);
return numberOfRowsToSkip;
}
Expand Down Expand Up @@ -419,9 +507,35 @@ void VeloxReader::skipInCurrentStripe(uint64_t rowsToSkip) {
rowsToSkip <= rowsRemainingInStripe_,
"Not Enough rows to skip in stripe!");
rowsRemainingInStripe_ -= rowsToSkip;
unitLoader_->onSeek(
getUnitIndex(loadedStripe_.value()), getCurrentRowInStripe());
rootReader_->skip(rowsToSkip);
}

VeloxReader::~VeloxReader() = default;

std::unique_ptr<velox::dwio::common::UnitLoader> VeloxReader::getUnitLoader() {
std::vector<std::unique_ptr<velox::dwio::common::LoadUnit>> units;
units.reserve(lastStripe_ - firstStripe_);
for (uint32_t stripe = firstStripe_; stripe < lastStripe_; ++stripe) {
units.push_back(std::make_unique<NimbleUnit>(
stripe, tabletReader_, schema_, logger_, offsets_));
}
if (parameters_.unitLoaderFactory) {
return parameters_.unitLoaderFactory->create(std::move(units));
}
velox::dwio::common::OnDemandUnitLoaderFactory factory(
parameters_.blockedOnIoCallback);
return factory.create(std::move(units));
}

size_t VeloxReader::getUnitIndex(uint32_t stripeIndex) const {
return stripeIndex - firstStripe_;
}

uint64_t VeloxReader::getCurrentRowInStripe() const {
return tabletReader_->stripeRowCount(loadedStripe_.value()) -
rowsRemainingInStripe_;
}

} // namespace facebook::nimble
16 changes: 13 additions & 3 deletions dwio/nimble/velox/VeloxReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ class VeloxReader {

private:
// Loads the next stripe's streams.
void loadStripe();
void loadNextStripe();

std::vector<std::unique_ptr<StreamLoader>> loadStripe(uint32_t stripe);

// True if the file contain zero rows.
bool isEmptyFile() const {
Expand All @@ -152,11 +154,16 @@ class VeloxReader {

static const std::vector<std::string>& preloadedOptionalSections();

std::unique_ptr<velox::dwio::common::UnitLoader> getUnitLoader();

size_t getUnitIndex(uint32_t stripeIndex) const;

uint64_t getCurrentRowInStripe() const;

velox::memory::MemoryPool& pool_;
std::shared_ptr<const TabletReader> tabletReader_;
const VeloxReadParams parameters_;
std::shared_ptr<const Type> schema_;
StreamLabels streamLabels_;
std::shared_ptr<const velox::RowType> type_;
std::vector<uint32_t> offsets_;
folly::F14FastMap<offset_size, std::unique_ptr<Decoder>> decoders_;
Expand All @@ -171,14 +178,17 @@ class VeloxReader {
// Reading state for reader
uint32_t nextStripe_ = 0;
uint64_t rowsRemainingInStripe_ = 0;
uint64_t currentRowInStripe_ = 0;
// stripe currently loaded. Initially state is no stripe loaded (INT_MAX)
uint32_t loadedStripe_ = std::numeric_limits<uint32_t>::max();
std::optional<uint32_t> loadedStripe_;

std::unique_ptr<velox::dwio::common::ExecutorBarrier> barrier_;
// Right now each reader is considered its own session if not passed from
// writer option.
std::shared_ptr<MetricsLogger> logger_;

std::unique_ptr<velox::dwio::common::UnitLoader> unitLoader_;

friend class VeloxReaderHelper;
};

Expand Down

0 comments on commit 0bc09ec

Please sign in to comment.