Skip to content

Commit

Permalink
[CH]Daily Update Clickhouse Version (20241118) (#7968)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Manual rebase Clickhouse repo

How was this patch tested?
unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
liuneng1994 authored Nov 26, 2024
1 parent efd2cbd commit 8d8ee3d
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 31 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241111
CH_COMMIT=3f7e46d4e9e
CH_BRANCH=rebase_ch/20241118
CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,36 @@ bool isMetaDataFile(const std::string & path)
return !path.ends_with("bin");
}

TemporaryWriteBufferWrapper::TemporaryWriteBufferWrapper(
const String & file_name_, const std::shared_ptr<DB::TemporaryDataBuffer> & data_buffer_)
: WriteBufferFromFileBase(data_buffer_->buffer().size(), data_buffer_->buffer().begin(), 0)
, file_name(file_name_)
, data_buffer(data_buffer_)
{
}
void TemporaryWriteBufferWrapper::preFinalize()
{
next();
}

void TemporaryWriteBufferWrapper::finalizeImpl()
{
next();
data_buffer->finalizeImpl();
}

void TemporaryWriteBufferWrapper::cancelImpl() noexcept
{
data_buffer->cancelImpl();
}

void TemporaryWriteBufferWrapper::nextImpl()
{
data_buffer->position() = position();
data_buffer->next();
BufferBase::set(data_buffer->buffer().begin(), data_buffer->buffer().size(), data_buffer->offset());
}

void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
Expand All @@ -52,9 +82,9 @@ void CompactObjectStorageDiskTransaction::commit()
[&](auto & item)
{
DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first);
DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath());
auto read = item.second->read();
int file_size = 0;
while (int count = read.readBig(buffer.data(), buffer.size()))
while (int count = read->readBig(buffer.data(), buffer.size()))
{
file_size += count;
out.write(buffer.data(), count);
Expand Down Expand Up @@ -98,12 +128,13 @@ std::unique_ptr<DB::WriteBufferFromFileBase> CompactObjectStorageDiskTransaction
"Don't support write file in different dirs, path {}, prefix path: {}",
path,
prefix_path);
auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
auto tmp = std::make_shared<DB::TemporaryDataBuffer>(tmp_data.get());
files.emplace_back(path, tmp);
auto tx = disk.getMetadataStorage()->createTransaction();
tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
tx->createEmptyMetadataFile(path);
tx->commit();
return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(), buf_size);

return std::make_unique<TemporaryWriteBufferWrapper>(path, tmp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,41 @@ extern const int NOT_IMPLEMENTED;
namespace local_engine
{

class TemporaryWriteBufferWrapper : public DB::WriteBufferFromFileBase
{
public:
TemporaryWriteBufferWrapper(const String & file_name_, const std::shared_ptr<DB::TemporaryDataBuffer> & data_buffer_);

void sync() override { data_buffer->nextImpl(); }

void preFinalize() override;

protected:
void finalizeImpl() override;
void cancelImpl() noexcept override;

private:
void nextImpl() override;

public:
std::string getFileName() const override
{
return file_name;
}

private:
String file_name;
std::shared_ptr<DB::TemporaryDataBuffer> data_buffer;
};

class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
public:
static inline const String PART_DATA_FILE_NAME = "part_data.gluten";
static inline const String PART_META_FILE_NAME = "part_meta.gluten";

explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_)
explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::TemporaryDataOnDiskScopePtr tmp_)
: disk(disk_), tmp_data(tmp_)
{
chassert(!tmp_->isRemote());
}

void commit() override;
Expand Down Expand Up @@ -170,8 +196,8 @@ class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {

private:
DB::IDisk & disk;
DB::DiskPtr tmp_data;
std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>> files;
DB::TemporaryDataOnDiskScopePtr tmp_data;
std::vector<std::pair<String, std::shared_ptr<DB::TemporaryDataBuffer>>> files;
String prefix_path = "";
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ using namespace DB;

DiskTransactionPtr GlutenDiskHDFS::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getTempDataOnDisk());
}

void GlutenDiskHDFS::createDirectory(const String & path)
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace local_engine

DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getSharedTempDataOnDisk());
}

std::unique_ptr<DB::ReadBufferFromFileBase> GlutenDiskS3::readFile(
Expand Down
29 changes: 27 additions & 2 deletions cpp-ch/local-engine/Functions/SparkFunctionRoundHalfUp.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ class BaseFloatRoundingHalfUpComputation<T, Vectorize::No>
}
};

template <>
class BaseFloatRoundingHalfUpComputation<BFloat16, Vectorize::No>
{
public:
using ScalarType = BFloat16;
using VectorType = BFloat16;
static const size_t data_count = 1;

static VectorType load(const ScalarType * in) { return *in; }
static VectorType load1(const ScalarType in) { return in; }
static VectorType store(ScalarType * out, ScalarType val) { return *out = val;}
static VectorType multiply(VectorType val, VectorType scale) { return val * scale; }
static VectorType divide(VectorType val, VectorType scale) { return val / scale; }
template <RoundingMode mode>
static VectorType apply(VectorType val)
{
return BFloat16(std::roundf(static_cast<Float32>(val)));
}

static VectorType prepare(size_t scale)
{
return load1(BFloat16(static_cast<Float32>(scale)));
}
};


/** Implementation of low-level round-off functions for floating-point values.
*/
Expand Down Expand Up @@ -167,7 +192,7 @@ struct FloatRoundingHalfUpImpl

template <Vectorize vectorize =
#ifdef __SSE4_1__
Vectorize::Yes
std::is_same_v<T, BFloat16> ? Vectorize::No : Vectorize::Yes
#else
Vectorize::No
#endif
Expand Down Expand Up @@ -219,7 +244,7 @@ struct DispatcherRoundingHalfUp
{
template <ScaleMode scale_mode>
using FunctionRoundingImpl = std::conditional_t<
std::is_floating_point_v<T>,
std::is_floating_point_v<T> || std::is_same_v<T, BFloat16>,
FloatRoundingHalfUpImpl<T, rounding_mode, scale_mode>,
IntegerRoundingImpl<T, rounding_mode, scale_mode, tie_breaking_mode>>;

Expand Down
25 changes: 15 additions & 10 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ GraceAggregatingTransform::GraceAggregatingTransform(
, aggregate_columns(params_->params.aggregates_size)
, no_pre_aggregated(no_pre_aggregated_)
, final_output(final_output_)
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
, tmp_data_disk(context_->getTempDataOnDisk())
{
output_header = params->getHeader();
auto config = GraceMergingAggregateConfig::loadFromContext(context);
Expand Down Expand Up @@ -302,10 +302,13 @@ void GraceAggregatingTransform::flushBuckets()
flushBucket(i);
}

static size_t flushBlocksInfoDisk(DB::TemporaryFileStream * file_stream, std::list<DB::Block> & blocks)
static size_t flushBlocksInfoDisk(std::optional<DB::TemporaryBlockStreamHolder>& file_stream, std::list<DB::Block> & blocks)
{
size_t flush_bytes = 0;
DB::Blocks tmp_blocks;
if (!file_stream)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "file_stream is empty");
auto & tmp_stream = file_stream.value();
while (!blocks.empty())
{
while (!blocks.empty())
Expand All @@ -322,11 +325,11 @@ static size_t flushBlocksInfoDisk(DB::TemporaryFileStream * file_stream, std::li
flush_bytes += merged_block.bytes();
if (merged_block.rows())
{
file_stream->write(merged_block);
tmp_stream->write(merged_block);
}
}
if (flush_bytes)
file_stream->flush();
tmp_stream->flush();
return flush_bytes;
}

Expand All @@ -338,15 +341,15 @@ size_t GraceAggregatingTransform::flushBucket(size_t bucket_index)
if (!file_stream.original_blocks.empty())
{
if (!file_stream.original_file_stream)
file_stream.original_file_stream = &tmp_data_disk->createStream(header);
file_stream.original_file_stream = DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
flush_bytes += flushBlocksInfoDisk(file_stream.original_file_stream, file_stream.original_blocks);
}
if (!file_stream.intermediate_blocks.empty())
{
if (!file_stream.intermediate_file_stream)
{
auto intermediate_header = params->aggregator.getHeader(false);
file_stream.intermediate_file_stream = &tmp_data_disk->createStream(intermediate_header);
file_stream.intermediate_file_stream = DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
}
flush_bytes += flushBlocksInfoDisk(file_stream.intermediate_file_stream, file_stream.intermediate_blocks);
}
Expand All @@ -373,17 +376,18 @@ std::unique_ptr<AggregateDataBlockConverter> GraceAggregatingTransform::prepareB
if (buffer_file_stream.intermediate_file_stream)
{
buffer_file_stream.intermediate_file_stream->finishWriting();
auto reader = buffer_file_stream.intermediate_file_stream->getReadStream();
while (true)
{
auto block = buffer_file_stream.intermediate_file_stream->read();
auto block = reader->read();
if (!block.rows())
break;
read_bytes += block.bytes();
read_rows += block.rows();
mergeOneBlock(block, false);
block = {};
}
buffer_file_stream.intermediate_file_stream = nullptr;
buffer_file_stream.intermediate_file_stream.reset();
total_read_disk_time += watch.elapsedMilliseconds();
}
if (!buffer_file_stream.intermediate_blocks.empty())
Expand All @@ -398,17 +402,18 @@ std::unique_ptr<AggregateDataBlockConverter> GraceAggregatingTransform::prepareB
if (buffer_file_stream.original_file_stream)
{
buffer_file_stream.original_file_stream->finishWriting();
auto reader = buffer_file_stream.original_file_stream->getReadStream();
while (true)
{
auto block = buffer_file_stream.original_file_stream->read();
auto block = reader->read();
if (!block.rows())
break;
read_bytes += block.bytes();
read_rows += block.rows();
mergeOneBlock(block, true);
block = {};
}
buffer_file_stream.original_file_stream = nullptr;
buffer_file_stream.original_file_stream.reset();
total_read_disk_time += watch.elapsedMilliseconds();
}
if (!buffer_file_stream.original_blocks.empty())
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class GraceAggregatingTransform : public DB::IProcessor
DB::Aggregator::AggregateColumns aggregate_columns;
DB::AggregatingTransformParamsPtr params;
DB::ContextPtr context;
DB::TemporaryDataOnDiskPtr tmp_data_disk;
DB::TemporaryDataOnDiskScopePtr tmp_data_disk;
DB::AggregatedDataVariantsPtr current_data_variants = nullptr;
size_t current_bucket_index = 0;

Expand All @@ -83,9 +83,9 @@ class GraceAggregatingTransform : public DB::IProcessor
/// Only be used when there is no pre-aggregated step, store the original input blocks.
std::list<DB::Block> original_blocks;
/// store the intermediate result blocks.
DB::TemporaryFileStream * intermediate_file_stream = nullptr;
std::optional<DB::TemporaryBlockStreamHolder> intermediate_file_stream;
/// Only be used when there is no pre-aggregated step
DB::TemporaryFileStream * original_file_stream = nullptr;
std::optional<DB::TemporaryBlockStreamHolder> original_file_stream;
size_t pending_bytes = 0;
};
std::unordered_map<size_t, BufferFileStream> buckets;
Expand Down
9 changes: 8 additions & 1 deletion cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace DB
namespace Setting
{
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 min_joined_block_size_bytes;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -200,7 +201,13 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB:
{
JoinPtr hash_join = std::make_shared<HashJoin>(table_join, right->getCurrentHeader().cloneEmpty());
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left->getCurrentHeader(), right->getCurrentHeader(), hash_join, context->getSettingsRef()[Setting::max_block_size], 1, false);
left->getCurrentHeader(),
right->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
join_step->setStepDescription("CROSS_JOIN");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
Expand Down
13 changes: 11 additions & 2 deletions cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace Setting
{
extern const SettingsJoinAlgorithm join_algorithm;
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 min_joined_block_size_bytes;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -315,7 +316,13 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
JoinPtr smj_join = std::make_shared<FullSortingMergeJoin>(table_join, right->getCurrentHeader().cloneEmpty(), -1);
MultiEnum<DB::JoinAlgorithm> join_algorithm = context->getSettingsRef()[Setting::join_algorithm];
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left->getCurrentHeader(), right->getCurrentHeader(), smj_join, context->getSettingsRef()[Setting::max_block_size], 1, false);
left->getCurrentHeader(),
right->getCurrentHeader(),
smj_join,
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);

join_step->setStepDescription("SORT_MERGE_JOIN");
steps.emplace_back(join_step.get());
Expand Down Expand Up @@ -448,7 +455,7 @@ void JoinRelParser::collectJoinKeys(
table_join.addDisjunct();
const auto & expr = join_rel.expression();
auto & join_clause = table_join.getClauses().back();
std::list<const const substrait::Expression *> expressions_stack;
std::list<const substrait::Expression *> expressions_stack;
expressions_stack.push_back(&expr);
while (!expressions_stack.empty())
{
Expand Down Expand Up @@ -778,6 +785,7 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
right_plan->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
join_step->setStepDescription("Multi join on clause hash join");
Expand Down Expand Up @@ -817,6 +825,7 @@ DB::QueryPlanPtr JoinRelParser::buildSingleOnClauseHashJoin(
right_plan->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);

Expand Down
Loading

0 comments on commit 8d8ee3d

Please sign in to comment.