Skip to content

Commit

Permalink
Remove no longer needed backwards compatibility logic (#7567)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #7567

Reviewed By: xiaoxmeng, kgpai

Differential Revision: D51322056

Pulled By: mbasmanova

fbshipit-source-id: 8806f02d948bf45a4f742748b085572868a936dd
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Nov 15, 2023
1 parent 753d1c8 commit badbabc
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 117 deletions.
31 changes: 0 additions & 31 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,37 +177,6 @@ size_t ByteStream::size() const {
return total + std::max(ranges_.back().position, lastRangeEnd_);
}

size_t ByteStream::remainingSize() const {
if (ranges_.empty()) {
return 0;
}
const auto* lastRange = &ranges_[ranges_.size() - 1];
auto cur = current_;
size_t total{0};
if (cur == lastRange) {
total += (std::max(cur->position, lastRangeEnd_) - cur->position);
} else {
total += cur->size - cur->position;
}

while (++cur <= lastRange) {
total += (cur == lastRange) ? lastRangeEnd_ : cur->size;
}
return total;
}

bool ByteStream::atEnd() const {
if (!current_) {
return false;
}
if (current_->position < current_->size) {
return false;
}

VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back());
return current_ == &ranges_.back();
}

void ByteStream::appendBool(bool value, int32_t count) {
if (count == 1 && current_->size > current_->position) {
bits::setBit(
Expand Down
33 changes: 0 additions & 33 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,33 +214,6 @@ class ByteInputStream {
/// seeking back to start to write a length header.
class ByteStream {
public:
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
/// For input.
ByteStream() : isBits_(false), isReverseBitOrder_(false) {}

void resetInput(std::vector<ByteRange>&& ranges) {
ranges_ = std::move(ranges);
current_ = &ranges_[0];
}

template <typename Char>
void readBytes(Char* data, int32_t size) {
ByteInputStream inputStream(ranges_);
inputStream.seekp(tellp());
inputStream.readBytes(data, size);
seekp(inputStream.tellp());
}

template <typename T>
T read() {
ByteInputStream inputStream(ranges_);
inputStream.seekp(tellp());
auto value = inputStream.read<T>();
seekp(inputStream.tellp());
return value;
}
#endif

/// For output.
ByteStream(
StreamArena* arena,
Expand Down Expand Up @@ -281,12 +254,6 @@ class ByteStream {
/// the last range.
size_t size() const;

/// Returns the remaining size left from current reading position.
size_t remainingSize() const;

/// For input. Returns true if all input has been read.
bool atEnd() const;

int32_t lastRangeEnd() {
updateEnd();
return lastRangeEnd_;
Expand Down
4 changes: 0 additions & 4 deletions velox/dwio/common/FileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ class WriteFileSink final : public FileSink {
class LocalFileSink : public FileSink {
public:
LocalFileSink(const std::string& name, const Options& options);
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
LocalFileSink(const std::string& name, MetricsLogPtr metricLogger)
: LocalFileSink(name, {.metricLogger = std::move(metricLogger)}) {}
#endif

~LocalFileSink() override {
destroy();
Expand Down
16 changes: 0 additions & 16 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ class SerializedPage {
// VectorStreamGroup::read().
ByteInputStream prepareStreamForDeserialize();

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
void prepareStreamForDeserialize(ByteStream* stream) {
stream->resetInput(std::move(ranges_));
}
#endif

std::unique_ptr<folly::IOBuf> getIOBuf() const {
return iobuf_->clone();
}
Expand Down Expand Up @@ -114,16 +108,6 @@ class ExchangeQueue {
std::vector<std::unique_ptr<SerializedPage>>
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
std::unique_ptr<SerializedPage> dequeueLocked(
bool* atEnd,
ContinueFuture* future) {
auto pages = dequeueLocked(1, atEnd, future);
VELOX_CHECK_LE(pages.size(), 1);
return pages.empty() ? nullptr : std::move(pages.front());
}
#endif

/// Returns the total bytes held by SerializedPages in 'this'.
uint64_t totalBytes() const {
return totalBytes_;
Expand Down
9 changes: 0 additions & 9 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,6 @@ class Task : public std::enable_shared_from_this<Task> {
/// nodes require splits and there are not enough of these.
/// @param concurrentSplitGroups In grouped execution, maximum number of
/// splits groups processed concurrently.
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
static void start(
std::shared_ptr<Task> self,
uint32_t maxDrivers,
uint32_t concurrentSplitGroups = 1) {
self->start(maxDrivers, concurrentSplitGroups);
}
#endif

void start(uint32_t maxDrivers, uint32_t concurrentSplitGroups = 1);

/// If this returns true, this Task supports the single-threaded execution API
Expand Down
11 changes: 2 additions & 9 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/duckdb/conversion/DuckParser.h"
#include "velox/exec/Aggregate.h"
Expand All @@ -24,21 +25,13 @@
#include "velox/exec/TableWriter.h"
#include "velox/exec/WindowFunction.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/expression/Expr.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FunctionCallToSpecialForm.h"
#include "velox/expression/SignatureBinder.h"
#include "velox/parse/Expressions.h"
#include "velox/parse/TypeResolver.h"

#ifndef VELOX_ENABLE_BACKWARD_COMPATIBILITY
#include "velox/connectors/hive/TableHandle.h"
#include "velox/expression/Expr.h"
#else
#include <velox/core/ITypedExpr.h>
#include "velox/common/memory/Memory.h"
#include "velox/parse/ExpressionsParser.h"
#endif

using namespace facebook::velox;
using namespace facebook::velox::connector;
using namespace facebook::velox::connector::hive;
Expand Down
15 changes: 0 additions & 15 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,6 @@ class VectorStreamGroup : public StreamArena {
RowVectorPtr* result,
const VectorSerde::Options* options = nullptr);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
static void read(
ByteStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result) {
VELOX_CHECK(!source->ranges().empty());
ByteInputStream inputStream(source->ranges());
inputStream.seekp(source->tellp());
read(&inputStream, pool, type, result);

source->seekp(inputStream.tellp());
}
#endif

private:
std::unique_ptr<VectorSerializer> serializer_;
VectorSerde* serde_{nullptr};
Expand Down

0 comments on commit badbabc

Please sign in to comment.