Skip to content

Commit

Permalink
for test
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Dec 1, 2023
1 parent c531abd commit afdf0a3
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 44 deletions.
184 changes: 184 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
#include <filesystem>
#include <memory>
#include <fstream>
#include <iostream>
#include <optional>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
Expand Down Expand Up @@ -66,6 +68,11 @@
#include <regex>
#include "CHUtil.h"

#include <Storages/StorageMergeTreeFactory.h>
#include <Common/MergeTreeTool.h>
#include <Parser/TypeParser.h>
#include <Poco/JSON/Parser.h>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -614,6 +621,181 @@ void BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
settings.set("precise_float_parsing", true);
}

static void load_medadata_impl(std::vector<String> & metadata_lines, String & table_path, String & root_path)
{
String database = "default";
Poco::JSON::Parser parser;
// table path jie xi chu lai
// names // done
// tablename
// order by
// sort by
// database = clickhouse

String table;
String table_relative_path = table_path.substr(root_path.length());
Strings path_parts;
boost::split(path_parts, table_relative_path, boost::is_any_of("/"));

if (path_parts.size() == 1)
{
table = path_parts[0];
}
else if (path_parts.size() == 2)
{
database = path_parts[0];
table = path_parts[1];
}
else if (path_parts.size() == 3)
{
database = path_parts[0];
table = path_parts[1] + "_" + path_parts[2];
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "xxxxx");
}

std::shared_ptr<DB::StorageInMemoryMetadata> metadata;
NamesAndTypesList names_and_types_list;
String sort_column_names_str;

for (const auto & line : metadata_lines)
{
auto info = parser.parse(line).extract<Poco::JSON::Object::Ptr>();

if (auto metaData = info->getObject("metaData"))
{
auto schema_object = parser.parse(metaData->getValue<String>("schemaString")).extract<Poco::JSON::Object::Ptr>();
auto feilds = schema_object->get("fields").extract<Poco::JSON::Array::Ptr>();

for (size_t i = 0; i < feilds->size(); ++i)
{
auto field = feilds->get(i).extract<Poco::JSON::Object::Ptr>();
auto name = field->getValue<String>("name");
auto type = TypeParser::getCHTypeByName(field->getValue<String>("type"));

if (field->getValue<bool>("nullable"))
type = makeNullable(type);

names_and_types_list.push_back(NameAndTypePair(name, type));
}

metadata = buildMetaData(names_and_types_list, SerializedPlanParser::global_context);
}

if (auto commit_info = info->getObject("commitInfo"))
{
if (auto operation_parameters = commit_info->getObject("operationParameters"))
{
auto table_properties
= parser.parse(operation_parameters->getValue<String>("properties")).extract<Poco::JSON::Object::Ptr>();
sort_column_names_str = table_properties->getValue<String>("sortColumnNames");
}
}
}

if (!metadata)
return;

if (!sort_column_names_str.empty())
{
metadata->sorting_key
= KeyDescription::parse(sort_column_names_str, metadata->getColumns(), SerializedPlanParser::global_context);
metadata->primary_key
= KeyDescription::parse(sort_column_names_str, metadata->getColumns(), SerializedPlanParser::global_context);
}

auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
StorageID(database, table),
table_path,
*metadata,
false,
SerializedPlanParser::global_context,
"",
MergeTreeData::MergingParams(),
buildMergeTreeSettings());
custom_storage_merge_tree->loadDataParts(false, std::nullopt);
auto storage_factory = StorageMergeTreeFactory::instance();
storage_factory.loadStorage(StorageID(database, table), metadata->getColumns(), custom_storage_merge_tree);
}


static void load_medadata(std::string dir, std::string & root_path)
{
if (!fs::exists(dir) || std::filesystem::directory_entry(dir).status().type() != std::filesystem::file_type::directory)
return;

std::unordered_map<String, String> file_map;
std::filesystem::directory_iterator it(dir);

for (auto & child : it)
file_map[child.path().filename()] = child.path().string();

if (file_map.contains("_delta_log"))
{
String metadata_file = file_map["_delta_log"] + "/00000000000000000000.json";
if (!fs::exists(metadata_file) && !std::filesystem::directory_entry(metadata_file).is_regular_file())
{
LOG_WARNING(
&Poco::Logger::get("CHUtil"), "Can not find file '00000000000000000000.json' in_delta_log {}.", file_map["_delta_log"]);
return;
}

std::ifstream fio(metadata_file);
String line;
std::vector<String> lines;
size_t line_num = 0;
while (getline(fio, line))
{
lines.emplace_back(line);
line_num++;

if (line_num > 3)
break;
}

fio.close();

if (lines.size() < 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Format ch metadata error. Expected 3 rows but {}", line.size());

if (file_map.contains("format_version.txt"))
{
load_medadata_impl(lines, dir, root_path);
}
else
{
for (std::pair<String, String> p : file_map)
{
if (p.first == "_delta_log")
continue;

load_medadata_impl(lines, p.second, root_path);
}
}
}
else
{
for (std::pair<String, String> p : file_map)
load_medadata(p.second, root_path);
}
}

void BackendInitializerUtil::initMetadata(std::map<std::string, std::string> & backend_conf_map)
{
String key = "spark.gluten.sql.columnar.backend.ch.runtime_config.ch_metadata";
if (backend_conf_map.contains(key))
{
auto metadata_dir = backend_conf_map[key];
if (!metadata_dir.ends_with("/"))
metadata_dir.append("/");

load_medadata(metadata_dir, metadata_dir);

}
}

void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
{
/// Make sure global_context and shared_context are constructed only once.
Expand Down Expand Up @@ -756,6 +938,8 @@ void BackendInitializerUtil::init(std::string * plan)
0, // We don't need any threads one all the parts will be loaded
active_parts_loading_threads);
});

initMetadata(backend_conf_map);
}

void BackendInitializerUtil::updateConfig(DB::ContextMutablePtr context, std::string * plan)
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class BackendInitializerUtil
static void initLoggers(DB::Context::ConfigurationPtr config);
static void initEnvs(DB::Context::ConfigurationPtr config);
static void initSettings(std::map<std::string, std::string> & backend_conf_map, DB::Settings & settings);
static void initMetadata(std::map<std::string, std::string> & backend_conf_map);

static void initContexts(DB::Context::ConfigurationPtr config);
static void initCompiledExpressionCache(DB::Context::ConfigurationPtr config);
Expand Down
8 changes: 7 additions & 1 deletion cpp-ch/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MergeTreeTool.h"

#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>


#include <boost/algorithm/string.hpp>

#include "MergeTreeTool.h"

using namespace DB;

namespace local_engine
Expand All @@ -44,6 +49,7 @@ std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings()
auto settings = std::make_unique<DB::MergeTreeSettings>();
settings->set("min_bytes_for_wide_part", Field(0));
settings->set("min_rows_for_wide_part", Field(0));
settings->set("allow_nullable_key", Field(1));
return settings;
}

Expand Down
33 changes: 18 additions & 15 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,34 @@ MergeTreeRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & re
}
auto names_and_types_list = header.getNamesAndTypesList();
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context);
query_context.metadata = metadata;
// auto metadata = buildMetaData(names_and_types_list, context);


auto storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
metadata->getColumns(),
ColumnsDescription(),
[&]() -> CustomStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.relative_path,
*metadata,
false,
global_context,
"",
MergeTreeData::MergingParams(),
buildMergeTreeSettings());
custom_storage_merge_tree->loadDataParts(false, std::nullopt);
return custom_storage_merge_tree;
// auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
// StorageID(merge_tree_table.database, merge_tree_table.table),
// merge_tree_table.relative_path,
// *metadata,
// false,
// global_context,
// "",
// MergeTreeData::MergingParams(),
// buildMergeTreeSettings());
// custom_storage_merge_tree->loadDataParts(false, std::nullopt);
// return custom_storage_merge_tree;
return nullptr;
});
auto metadata = storage->getInMemoryMetadataPtr();
query_context.metadata = metadata;

for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;

query_context.storage_snapshot = std::make_shared<StorageSnapshot>(*storage, metadata);
query_context.storage_snapshot = std::make_shared<StorageSnapshot>(*storage, metadata);
query_context.custom_storage_merge_tree = storage;
auto query_info = buildQueryInfo(names_and_types_list);

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ static const std::set<std::string> FUNCTION_NEED_KEEP_ARGUMENTS = {"alias"};
struct QueryContext
{
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<DB::StorageInMemoryMetadata> metadata;
StorageMetadataPtr metadata;
std::shared_ptr<CustomStorageMergeTree> custom_storage_merge_tree;
};

Expand Down
23 changes: 18 additions & 5 deletions cpp-ch/local-engine/Parser/TypeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,31 @@ std::unordered_map<String, String> TypeParser::type_names_mapping
{"ByteType", "Int8"},
{"ShortType", "Int16"},
{"IntegerType", "Int32"},
{"integer", "Int32"},
{"LongType", "Int64"},
{"long", "Int64"},
{"FloatType", "Float32"},
{"DoubleType", "Float64"},
{"double", "Float64"},
{"StringType", "String"},
{"DateType", "Date32"}};
{"string", "String"},
{"DateType", "Date32"},
{"date", "Date32"}
};

String TypeParser::getCHTypeName(const String & spark_type_name)
{
auto it = type_names_mapping.find(spark_type_name);
if (it == type_names_mapping.end())
throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported substrait type: {}", spark_type_name);
return it->second;
if (startsWith(spark_type_name, "decimal("))
return spark_type_name;
else if (startsWith(spark_type_name, "DecimalType"))
return "Decimal" + spark_type_name.substr(strlen("DecimalType"));
else
{
auto it = type_names_mapping.find(spark_type_name);
if (it == type_names_mapping.end())
throw DB::Exception(DB::ErrorCodes::UNKNOWN_TYPE, "Unsupported substrait type: {}", spark_type_name);
return it->second;
}
}

DB::DataTypePtr TypeParser::getCHTypeByName(const String & spark_type_name)
Expand Down
Loading

0 comments on commit afdf0a3

Please sign in to comment.