diff --git a/CMakeLists.txt b/CMakeLists.txt index 33f03cc..ebf4e78 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,6 +65,13 @@ if(MONGOCLIENT_FOUND) list(APPEND HUSKY_EXTERNAL_DEFINITION ${MONGOCLIENT_DEFINITION}) endif(MONGOCLIENT_FOUND) +# Redis +if(REDISCLIENT_FOUND) + list(APPEND HUSKY_EXTERNAL_INCLUDE ${REDISCLIENT_INCLUDE_DIR}) + list(APPEND HUSKY_EXTERNAL_LIB ${REDISCLIENT_LIBRARY}) + list(APPEND HUSKY_EXTERNAL_DEFINITION ${REDISCLIENT_DEFINITION}) +endif(REDISCLIENT_FOUND) + # Thrift if(THRIFT_FOUND) list(APPEND HUSKY_EXTERNAL_INCLUDE ${THRIFT_INCLUDE_DIR}) diff --git a/cmake/dep.cmake b/cmake/dep.cmake index 810f915..c586ad2 100644 --- a/cmake/dep.cmake +++ b/cmake/dep.cmake @@ -99,6 +99,65 @@ if(WITHOUT_MONGODB) unset(MONGOCLIENT_FOUND) message(STATUS "Not using MongoClient due to WITHOUT_MONGODB option") endif(WITHOUT_MONGODB) + +### Redis ### + +if(REDISCLIENT_SEARCH_PATH) + find_path(REDISCLIENT_INCLUDE_DIR NAMES hiredis PATHS ${REDISCLIENT_SEARCH_PATH}) + find_library(REDISCLIENT_LIBRARY NAMES hiredis PATHS ${REDISCLIENT_SEARCH_PATH}) +else(REDISCLIENT_SEARCH_PATH) + find_path(REDISCLIENT_INCLUDE_DIR NAMES hiredis) + find_library(REDISCLIENT_LIBRARY NAMES hiredis) +endif(REDISCLIENT_SEARCH_PATH) +if(REDISCLIENT_INCLUDE_DIR AND REDISCLIENT_LIBRARY) + set(REDISCLIENT_FOUND true) +endif(REDISCLIENT_INCLUDE_DIR AND REDISCLIENT_LIBRARY) +if(REDISCLIENT_FOUND) + set(REDISCLIENT_DEFINITION "-DWITH_REDIS") + message (STATUS "Found Hiredis:") + message (STATUS " (Headers) ${REDISCLIENT_INCLUDE_DIR}") + message (STATUS " (Library) ${REDISCLIENT_LIBRARY}") + message (STATUS " (Definition) ${REDISCLIENT_DEFINITION}") +else(REDISCLIENT_FOUND) + if(WIN32) + message (STATUS "Redis and hiredis are currently not available on win32") + else(WIN32) + message (STATUS "hiredis will be included as a third party:") + include(ExternalProject) + set(THIRDPARTY_DIR ${PROJECT_SOURCE_DIR}/third_party) + if(NOT REDISCLIENT_INCLUDE_DIR OR NOT REDISCLIENT_LIBRARY) + set(REDIS_INSTALL "cp") + ExternalProject_Add( + hiredis + GIT_REPOSITORY "https://github.com/redis/hiredis" + GIT_TAG "v0.13.3" + PREFIX ${THIRDPARTY_DIR} + UPDATE_COMMAND "" + CONFIGURE_COMMAND "" + # TODO: if remove "-pedantic" strict Warnings. + BUILD_COMMAND sed -i "s/ -pedantic//g" ${THIRDPARTY_DIR}/src/hiredis/Makefile COMMAND make dynamic COMMAND make static + BUILD_IN_SOURCE 1 + INSTALL_COMMAND mkdir -p ${PROJECT_BINARY_DIR}/include/hiredis/adapters ${PROJECT_BINARY_DIR}/lib COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/hiredis.h ${PROJECT_BINARY_DIR}/include/hiredis/hiredis.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/read.h ${PROJECT_BINARY_DIR}/include/hiredis/read.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/sds.h ${PROJECT_BINARY_DIR}/include/hiredis/sds.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/async.h ${PROJECT_BINARY_DIR}/include/hiredis/async.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/ae.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/ae.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/libev.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/libev.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/libevent.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/libevent.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/libhiredis.so ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0.13 COMMAND ln -sf ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0.13 ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0 COMMAND ln -sf ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0 ${PROJECT_BINARY_DIR}/lib/libhiredis.so COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/libhiredis.a ${PROJECT_BINARY_DIR}/lib/libhiredis.a + ) + list(APPEND external_project_dependencies hiredis) + endif(NOT REDISCLIENT_INCLUDE_DIR OR NOT REDISCLIENT_LIBRARY) + set(REDISCLIENT_INCLUDE_DIR "${PROJECT_BINARY_DIR}/include/hiredis") + if(BUILD_SHARED_LIBRARY) + set(REDISCLIENT_LIBRARY "${PROJECT_BINARY_DIR}/lib/libhiredis.so") + else(BUILD_SHARED_LIBRARY) + set(REDISCLIENT_LIBRARY "${PROJECT_BINARY_DIR}/lib/libhiredis.a") + endif(BUILD_SHARED_LIBRARY) + message (STATUS " (Headers should be) ${REDISCLIENT_INCLUDE_DIR}") + message (STATUS " (Library should be) ${REDISCLIENT_LIBRARY}") + set(REDISCLIENT_FOUND true) + set(REDISCLIENT_DEFINITION "-DWITH_REDIS") + endif(WIN32) +endif(REDISCLIENT_FOUND) +if(WITHOUT_REDIS) + unset(REDISCLIENT_FOUND) + unset(REDISCLIENT_DEFINITION) + message(STATUS "Not using Hiredis due to WITHOUT_REDIS option") +endif(WITHOUT_REDIS) ### RT ### diff --git a/core/constants.hpp b/core/constants.hpp index a6e81f3..05f8f7b 100644 --- a/core/constants.hpp +++ b/core/constants.hpp @@ -72,6 +72,9 @@ const uint32_t TYPE_KAFKA_REQ = 0xfa091343; const uint32_t TYPE_KAFKA_END_REQ = 0xfa091344; const uint32_t TYPE_MONGODB_REQ = 0xfa091388; const uint32_t TYPE_MONGODB_END_REQ = 0xfa091389; +const uint32_t TYPE_REDIS_REQ = 0xfa0913c8; +const uint32_t TYPE_REDIS_QRY_REQ = 0xfa0913c9; +const uint32_t TYPE_REDIS_END_REQ = 0xfa0913ca; const uint32_t TYPE_LOCAL_BLK_REQ = 0xfa0e12a2; const uint32_t TYPE_ORC_BLK_REQ = 0xfa2e32a1; const uint32_t TYPE_STOP_ASYNC_REQ = 0xf89d74b4; diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index a5d46bc..c04d8ef 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -67,6 +67,12 @@ target_link_libraries(WordCountFlume ${husky}) target_link_libraries(WordCountFlume ${HUSKY_EXTERNAL_LIB}) husky_default_properties(WordCountFlume) +# WordCountRedis +add_executable(WordCountRedis wc_mr_redis.cpp) +target_link_libraries(WordCountRedis ${husky}) +target_link_libraries(WordCountRedis ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(WordCountRedis) + # WordCountORC add_executable(WordCountORC wc_mr_orc.cpp) target_link_libraries(WordCountORC ${husky}) diff --git a/examples/wc_mr_redis.cpp b/examples/wc_mr_redis.cpp new file mode 100644 index 0000000..1844eb6 --- /dev/null +++ b/examples/wc_mr_redis.cpp @@ -0,0 +1,160 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "boost/tokenizer.hpp" +#include "hiredis/hiredis.h" + +#include "base/serialization.hpp" +#include "core/engine.hpp" +#include "io/input/inputformat_store.hpp" +#include "io/output/redis_outputformat.hpp" +#include "lib/aggregator_factory.hpp" + +namespace pt = boost::property_tree; + +class Word { + public: + using KeyT = std::string; + + Word() = default; + explicit Word(const KeyT& w) : word(w) {} + const KeyT& id() const { return word; } + + KeyT word; + int count = 0; +}; + +bool operator<(const std::pair& a, const std::pair& b) { + return a.first == b.first ? a.second < b.second : a.first < b.first; +} + +void wc() { + auto& inputformat = husky::io::InputFormatStore::create_redis_inputformat(); + inputformat.set_server(); + + husky::io::RedisOutputFormat outputformat; + outputformat.set_server(); + + auto& word_list = husky::ObjListStore::create_objlist(); + auto& ch = husky::ChannelStore::create_push_combined_channel>(inputformat, word_list); + + auto parse_wc = [&](husky::io::RedisInputFormat::RecordT& record_pair) { + auto datatype = record_pair.first; + switch (datatype) { + case husky::io::RedisInputFormat::RedisDataType::String: + { + pt::ptree reader, content_reader; + std::stringstream jsonstream, content_stream; + jsonstream << record_pair.second; + try { + pt::read_json(jsonstream, reader); + } + catch (pt::json_parser::json_parser_error) { + husky::LOG_E << "json_parser_error: " << record_pair.second; + return; + } + + content_stream << reader.begin()->second.get_value(); + try { + pt::read_json(content_stream, content_reader); + std::string content = content_reader.get("content"); + boost::char_separator sep(" \t"); + boost::tokenizer> tok(content, sep); + for (auto& w : tok) { + ch.push(1, w); + } + } + catch (pt::ptree_bad_path) { + husky::LOG_E << "invalid content field"; + return; + } + catch (pt::json_parser::json_parser_error) { + husky::LOG_E << "json_parser_error:" << reader.begin()->second.get_value(); + return; + } + } break; + case husky::io::RedisInputFormat::RedisDataType::List: + case husky::io::RedisInputFormat::RedisDataType::Hash: + break; + case husky::io::RedisInputFormat::RedisDataType::Null: + // waiting for keys + // husky::LOG_I << "waiting more keys"; + break; + default: + husky::LOG_E << "undefined data structure"; + return; + } + }; + + husky::load(inputformat, parse_wc); + + // Show topk words. + const int kMaxNum = 10; + typedef std::set> TopKPairs; + auto add_to_topk = [](TopKPairs& pairs, const std::pair& p) { + if (pairs.size() == kMaxNum && *pairs.begin() < p) + pairs.erase(pairs.begin()); + if (pairs.size() < kMaxNum) + pairs.insert(p); + }; + husky::lib::Aggregator unique_topk( + TopKPairs(), + [add_to_topk](TopKPairs& a, const TopKPairs& b) { + for (auto& i : b) + add_to_topk(a, i); + }, + [](TopKPairs& a) { a.clear(); }, + [add_to_topk](husky::base::BinStream& in, TopKPairs& pairs) { + pairs.clear(); + for (size_t n = husky::base::deser(in); n--;) + add_to_topk(pairs, husky::base::deser>(in)); + }, + [](husky::base::BinStream& out, const TopKPairs& pairs) { + out << pairs.size(); + for (auto& p : pairs) + out << p; + }); + + husky::list_execute(word_list, [&ch, &unique_topk, add_to_topk](Word& word) { + unique_topk.update(add_to_topk, std::make_pair(ch.get(word), word.id())); + }); + + husky::lib::AggregatorFactory::sync(); + + if (husky::Context::get_global_tid() == 0) { + for (auto& i : unique_topk.get_value()) + husky::LOG_I << i.second << " " << i.first; + } + + /* Output result to Redis as a Hash table + std::string result_key("WordCountResult"); + std::map result_map; + outputformat.commit(result_key, result_map); + */ +} + +int main(int argc, char** argv) { + if (!husky::init_with_args(argc, argv, {"redis_hostname", "redis_port", "redis_keys_pattern"})) + return 1; + husky::run_job(wc); + return 0; +} diff --git a/io/input/CMakeLists.txt b/io/input/CMakeLists.txt index 73aac10..98bdf81 100644 --- a/io/input/CMakeLists.txt +++ b/io/input/CMakeLists.txt @@ -40,6 +40,11 @@ if(MONGOCLIENT_FOUND) list(APPEND io-input-src-files ${io-input-mongo-src-files}) endif(MONGOCLIENT_FOUND) +if(REDISCLIENT_FOUND) + file(GLOB io-input-redis-src-files redis_split.cpp redis_inputformat.cpp) + list(APPEND io-input-src-files ${io-input-redis-src-files}) +endif(REDISCLIENT_FOUND) + if(THRIFT_FOUND) file(GLOB io-input-flume-src-files flume_connector/flume_types.cpp diff --git a/io/input/inputformat_store.cpp b/io/input/inputformat_store.cpp index 93ee914..415fb8b 100644 --- a/io/input/inputformat_store.cpp +++ b/io/input/inputformat_store.cpp @@ -111,6 +111,17 @@ MongoDBInputFormat& InputFormatStore::create_mongodb_inputformat() { } #endif +#ifdef WITH_REDIS +RedisInputFormat& InputFormatStore::create_redis_inputformat() { + InputFormatMap& inputformat_map = get_inputformat_map(); + int id = g_gen_inputformat_id++; + ASSERT_MSG(inputformat_map.find(id) == inputformat_map.end(), "Should not be reached"); + auto* redis_input_format = new RedisInputFormat(); + inputformat_map.insert({id, redis_input_format}); + return *redis_input_format; +} +#endif + ElasticsearchInputFormat& InputFormatStore::create_elasticsearch_inputformat() { InputFormatMap& inputformat_map = get_inputformat_map(); int id = g_gen_inputformat_id++; diff --git a/io/input/inputformat_store.hpp b/io/input/inputformat_store.hpp index 2d6d35a..8120680 100644 --- a/io/input/inputformat_store.hpp +++ b/io/input/inputformat_store.hpp @@ -26,6 +26,9 @@ #ifdef WITH_MONGODB #include "io/input/mongodb_inputformat.hpp" #endif +#ifdef WITH_REDIS +#include "io/input/redis_inputformat.hpp" +#endif #include "io/input/elasticsearch_inputformat.hpp" #include "io/input/separator_inputformat.hpp" #include "io/input/xml_inputformat.hpp" @@ -53,6 +56,9 @@ class InputFormatStore { #endif #ifdef WITH_MONGODB static MongoDBInputFormat& create_mongodb_inputformat(); +#endif +#ifdef WITH_REDIS + static RedisInputFormat& create_redis_inputformat(); #endif static ElasticsearchInputFormat& create_elasticsearch_inputformat(); diff --git a/io/input/redis_inputformat.cpp b/io/input/redis_inputformat.cpp new file mode 100644 index 0000000..bf0a829 --- /dev/null +++ b/io/input/redis_inputformat.cpp @@ -0,0 +1,354 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "io/input/redis_inputformat.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "base/serialization.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/coordinator.hpp" +#include "core/network.hpp" + +#include "hiredis/hiredis.h" +#include "boost/property_tree/ptree.hpp" +#include "boost/property_tree/json_parser.hpp" + +namespace husky { +namespace io { + +namespace pt = boost::property_tree; + +enum RedisInputFormatSetUp { + NotSetUp = 0, + ServerSetUp = 1 << 2, + AuthSetUp = 1 << 2, + AllSetUp = ServerSetUp | AuthSetUp, +}; + +RedisInputFormat::RedisInputFormat() { + is_setup_ = RedisInputFormatSetUp::NotSetUp; +} + +RedisInputFormat::~RedisInputFormat() { + records_vector_.clear(); + best_keys_.clear(); + for ( auto& con : cons_ ) { + redisFree(con.second); + } + cons_.clear(); + split_i_id_.clear(); +} + +bool RedisInputFormat::is_setup() const { + return !(is_setup_ ^ RedisInputFormatSetUp::AllSetUp); +} + +void RedisInputFormat::set_server() { + ask_redis_splits_info(); + create_redis_cons(); + is_setup_ |= RedisInputFormatSetUp::ServerSetUp; +} + +void RedisInputFormat::set_auth(const std::string& password) { + need_auth_ = true; + password_ = password; + is_setup_ |= RedisInputFormatSetUp::AuthSetUp; +} + +void RedisInputFormat::reset_auth() { + need_auth_ = false; +} + +void RedisInputFormat::ask_redis_splits_info() { + BinStream question; + question << 1; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_REDIS_QRY_REQ); + answer >> splits_; + + split_i_id_.clear(); + for (int split_i=0; split_i < splits_.size(); split_i++) { + split_i_id_.push_back(""); + for (auto& split : splits_) { + if (split.second.get_sn() == split_i) { + split_i_id_[split_i] = split.first; + break; + } + } + } +} + +void RedisInputFormat::create_redis_cons() { + redisReply *reply = NULL; + for (auto& split : splits_) { + std::string proc_ip = parse_host(get_hostname()); + redisContext * c = NULL; + if (!split.second.get_ip().compare(proc_ip)) { + std::string sock_file_path = "/tmp/redis_"; + sock_file_path += std::to_string(split.second.get_port()) + ".sock"; + c = redisConnectUnixWithTimeout(sock_file_path.c_str(), timeout_); + } else { + c = redisConnectWithTimeout(split.second.get_ip().c_str(), split.second.get_port(), timeout_); + } + if (NULL == c || c->err) { + if (c) { + LOG_E << "Connection error: " + std::string(c->errstr); + redisFree(c); + } else { + LOG_E << "Connection error: can't allocate redis context"; + } + return; + } + if (need_auth_) { + reply = redisCmd(c, "AUTH %s", password_.c_str()); + CHECK(reply); + } + cons_[split.second.get_id()] = c; + } + if (reply) { + freeReplyObject(reply); + } +} + +// ask master for a set of best keys, with their location +int RedisInputFormat::ask_best_keys() { + BinStream question; + question << husky::Context::get_global_tid(); + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_REDIS_REQ); + int task_status = -1; + + answer >> task_status; + answer >> best_keys_; + + return task_status; +} + +std::string RedisInputFormat::parse_host(const std::string& hostname) { + hostent * record = gethostbyname(hostname.c_str()); + if (record == NULL) { + LOG_E << "hostname parse failed: " << hostname; + return "failed"; + } + in_addr * address = (in_addr *)record->h_addr; + std::string ip_address = inet_ntoa(*address); + return ip_address; +} + +// connect Redis split to retrieve RECORDS +void RedisInputFormat::fetch_split_records(int split_i, const std::vector& keys) { + std::string split_id = split_i_id_[split_i]; + RedisSplit split = splits_[split_id]; + redisContext * c = cons_[split_id]; + + if (!split.is_valid()) { + LOG_E << "Redis instance invalid: " << split.get_id(); + return; + } + + // slave read-only, master-slaves load balance + if (split.get_master().compare("-")) { + redisReply * reply; + reply = redisCmd(c, "READONLY"); + if ( strcmp(reply->str, "OK") ) { + LOG_E << "Slave failed to start up read-only"; + freeReplyObject(reply); + return; + } + freeReplyObject(reply); + } + + for (auto& key : keys) { + if (!key.str_.compare("")) { + LOG_E << "empty key <- " << split.get_ip() << ":" << split.get_port(); + continue; + } + redisReply * cur_data = redisCmd(c, "TYPE %s", key.str_.c_str()); + if (!strcmp(cur_data->str, "string")) { + pt::ptree string_js; + cur_data = redisCmd(c, "GET %s", key.str_.c_str()); + switch (cur_data->type) { + case REDIS_REPLY_STRING: + break; + case REDIS_REPLY_NIL: + case REDIS_REPLY_ERROR: + default: + LOG_E << "error reply: " << cur_data->type; + break; + } + std::string value(cur_data->str); + pt::ptree::path_type key_path(key.str_, SEP); + string_js.put(key_path, value); + std::stringstream jsonvalue; + pt::write_json(jsonvalue, string_js); + records_vector_.push_back(std::make_pair(RedisDataType::String, jsonvalue.str())); + } else if (!strcmp(cur_data->str, "list")) { + pt::ptree root; + pt::ptree list_js; + cur_data = redisCmd(c, "LRANGE %s %d %d", key.str_.c_str(), key.start_, key.end_); + for (int j=0; j < cur_data->elements; j++) { + std::string value(cur_data->element[j]->str); + pt::ptree element; + element.put("", value); + list_js.push_back(std::make_pair("", element)); + } + pt::ptree::path_type key_path(key.str_, SEP); + root.add_child(key_path, list_js); + std::stringstream jsonvalue; + pt::write_json(jsonvalue, root); + records_vector_.push_back(std::make_pair(RedisDataType::List, jsonvalue.str())); + } else if (!strcmp(cur_data->str, "hash")) { + pt::ptree root; + pt::ptree hash_js; + cur_data = redisCmd(c, "HGETALL %s", key.str_.c_str()); + std::string field; + std::string value; + for (int j=0; j < cur_data->elements; j+=2) { + field = cur_data->element[j]->str; + value = cur_data->element[j+1]->str; + pt::ptree::path_type field_path(field, SEP); + hash_js.put(field_path, value); + } + pt::ptree::path_type key_path(key.str_, SEP); + root.add_child(key_path, hash_js); + std::stringstream jsonvalue; + pt::write_json(jsonvalue, root); + records_vector_.push_back(std::make_pair(RedisDataType::Hash, jsonvalue.str())); + } else if (!strcmp(cur_data->str, "set")) { + pt::ptree root; + pt::ptree set_js; + int scursor = 0; + do { + cur_data = redisCmd(c, "SSCAN %s %d COUNT %d", key.str_.c_str(), scursor, ITER_STEP); + scursor = atoi(cur_data->element[0]->str); + std::string value; + for (int j=0; j < cur_data->element[1]->elements; j++) { + value = cur_data->element[1]->element[j]->str; + pt::ptree element; + element.put("", value); + // LOG_I << "input SET: " << key.str_ << "-" << value; + set_js.push_back(std::make_pair("", element)); + } + } while (scursor); + pt::ptree::path_type key_path(key.str_, SEP); + root.add_child(key_path, set_js); + std::stringstream jsonvalue; + pt::write_json(jsonvalue, root); + records_vector_.push_back(std::make_pair(RedisDataType::Set, jsonvalue.str())); + } else if (!strcmp(cur_data->str, "zset")) { + pt::ptree root; + pt::ptree zset_js; + int zcursor = 0; + do { + cur_data = redisCmd(c, "ZSCAN %s %d COUNT %d", key.str_.c_str(), zcursor, ITER_STEP); + zcursor = atoi(cur_data->element[0]->str); + std::string value; + std::string score; + for (int j=0; j < cur_data->element[1]->elements; j+=2) { + value = cur_data->element[1]->element[j]->str; + score = cur_data->element[1]->element[j+1]->str; + pt::ptree element; + pt::ptree::path_type value_path(value, SEP); + element.put(value_path, score); + // LOG_I << "input ZSET: " << key.str_ << "-" << value << "-" << score; + zset_js.push_back(std::make_pair("", element)); + } + } while (zcursor); + pt::ptree::path_type key_path(key.str_, SEP); + root.add_child(key_path, zset_js); + std::stringstream jsonvalue; + pt::write_json(jsonvalue, root); + records_vector_.push_back(std::make_pair(RedisDataType::ZSet, jsonvalue.str())); + } else if (nullptr != cur_data->str) { + LOG_E << "Failed to read data: " << std::string(cur_data->str) << " <- " << gen_slot_crc16(key.str_.c_str(), key.str_.length()) << " " << split.get_ip() << ":" << split.get_port() << " <- " << key.str_; + } else { + LOG_E << "Failed to read data: without data"; + } + freeReplyObject(cur_data); + } +} + +void RedisInputFormat::send_end() { + BinStream question; + int tid = husky::Context::get_global_tid(); + int num_fetched = records_vector_.size(); + question << tid; + question << num_fetched; + husky::Context::get_coordinator()->ask_master(question, husky::TYPE_REDIS_END_REQ); + return; +} + +bool RedisInputFormat::next(RecordT& ref) { + if (if_pop_record_) { + records_vector_.pop_back(); + if_pop_record_ = false; + } + + if (records_vector_.empty()) { + if (if_all_assigned_) return false; + int task_status = ask_best_keys(); + for (int split_i=0; split_i < best_keys_.size(); split_i++) { + if (best_keys_[split_i].empty()) continue; + fetch_split_records(split_i, best_keys_[split_i]); + } + if (!records_vector_.empty()) { + send_end(); + ref = records_vector_.back(); + } else { + records_vector_.push_back(std::make_pair(RedisDataType::Null, std::string(""))); + ref = records_vector_.back(); + } + if_pop_record_ = true; + best_keys_.clear(); + switch (task_status) { + case RedisTaskStatus::NoMoreTask: + if_all_assigned_ = true; + break; + case RedisTaskStatus::WaitTasks: break; + case RedisTaskStatus::Abnormal: + LOG_E << "assigner's abnormal action"; + break; + default: + LOG_E << "undefined error when ask tasks"; + break; + } + } else { + ref = records_vector_.back(); + if_pop_record_ = true; + } + + return true; +} + +uint16_t RedisInputFormat::gen_slot_crc16(const char *buf, int len) { + int counter; + uint16_t crc = 0; + for (counter = 0; counter < len; counter++) + crc = (crc << 8) ^ crc16tab_[((crc >> 8) ^ *buf++)&0x00FF]; + return crc % 16384; +} + +} // namespace io +} // namespace husky diff --git a/io/input/redis_inputformat.hpp b/io/input/redis_inputformat.hpp new file mode 100644 index 0000000..0b32d13 --- /dev/null +++ b/io/input/redis_inputformat.hpp @@ -0,0 +1,88 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "hiredis/hiredis.h" + +#include "io/input/inputformat_base.hpp" +#include "io/input/redis_split.hpp" + +#define ITER_STEP 1 +#define SEP '/' + +#define redisCmd(context, ...) static_cast(redisCommand(context, __VA_ARGS__)) + +namespace husky { +namespace io { + +class RedisInputFormat final : public InputFormatBase { +public: + typedef enum RedisDataType { + String, + List, + Hash, + Set, + ZSet, + Null + } RedisDataType; + typedef std::pair RecordT; + +public: + RedisInputFormat(); + virtual ~RedisInputFormat(); + + virtual bool is_setup() const; + virtual bool next(RecordT& ref); + + void set_server(); + void set_auth(const std::string&); + void reset_auth(); + +private: + void ask_redis_splits_info(); + void create_redis_cons(); + int ask_best_keys(); + void fetch_split_records(int split_i, const std::vector& keys); + void send_end(); + + uint16_t gen_slot_crc16(const char *buf, int len); + std::string parse_host(const std::string& hostname); + +private: + std::string ip_; + int port_; + struct timeval timeout_ = {1, 500000}; + bool need_auth_ = false; + std::string password_; + std::map cons_; + std::map splits_; + std::vector split_i_id_; + + int is_setup_ = 0; + + std::vector records_vector_; + bool if_pop_record_ = false; + bool if_all_assigned_ = false; + std::vector > best_keys_; +}; + +} // namespace io +} // namespace husky diff --git a/io/input/redis_split.cpp b/io/input/redis_split.cpp new file mode 100644 index 0000000..6aa8789 --- /dev/null +++ b/io/input/redis_split.cpp @@ -0,0 +1,158 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "io/input/redis_split.hpp" + +#include +#include +#include + +#include "base/log.hpp" +#include "base/serialization.hpp" + +namespace husky { +namespace io { + +// RedisSplit +RedisSplit::RedisSplit() : is_valid_(false), slots_start_(-1), slots_end_(-1) {} + +RedisSplit::RedisSplit(const RedisSplit& other) { + is_valid_ = other.is_valid_; + sn_ = other.sn_; + id_ = other.id_; + ip_ = other.ip_; + master_ = other.master_; + port_ = other.port_; + slots_start_ = other.slots_start_; + slots_end_ = other.slots_end_; +} + +void RedisSplit::set_valid(bool valid) { is_valid_ = valid; } +void RedisSplit::set_sn(int sn) { sn_ = sn; } +void RedisSplit::set_id(const std::string& id) { id_ = id; } +void RedisSplit::set_ip(const std::string& ip) { ip_ = ip; } +void RedisSplit::set_master(const std::string& master) { master_ = master; } + +void RedisSplit::set_port(int port) { port_ = port; } +void RedisSplit::set_sstart(int start) { slots_start_ = start; } +void RedisSplit::set_send(int end) { slots_end_ = end; } + +// RedisSplitGroup +RedisSplitGroup::RedisSplitGroup(RedisSplit master) { + add_member(master.get_id()); +} + +RedisSplitGroup::~RedisSplitGroup() { + priority_.clear(); + sorted_members_.clear(); + members_.clear(); +} + +void RedisSplitGroup::add_member(std::string split_id) { + priority_[split_id] = members_.size(); + members_.push_back(split_id); +} + +void RedisSplitGroup::update_priority() { + assert(members_.size() == priority_.size()); + for ( auto& pr : priority_ ) { + pr.second = (pr.second + 1) % members_.size(); + } +} + +void RedisSplitGroup::sort_members() { + sorted_members_.clear(); + int lowest_priority = members_.size(); + for ( int higher_priority = 0; higher_priority < lowest_priority; higher_priority++ ) { + for ( auto& member : members_ ) { + if ( get_priority(member) == higher_priority ) { + sorted_members_.push_back(member); + // no identical priority + break; + } + } + } +} + +int RedisSplitGroup::get_priority(std::string member_id, bool if_balance) { + int priority = priority_[member_id]; + if (if_balance) { + update_priority(); + } + return priority; +} + +int RedisSplitGroup::get_num_members() { + assert(members_.size() == priority_.size()); + return members_.size(); +} + +const std::vector& RedisSplitGroup::get_sorted_members() { + return sorted_members_; +} + +// BinStream series +// for RedisOutputFormat::redis_masters_info +BinStream& operator<<(BinStream& stream, const RedisSplit& split) { + stream << split.is_valid(); + stream << split.get_sn(); + stream << split.get_id(); + stream << split.get_ip(); + stream << split.get_port(); + stream << split.get_master(); + stream << split.get_sstart(); + stream << split.get_send(); + return stream; +} + +BinStream& operator>>(BinStream& stream, RedisSplit& split) { + bool is_valid; + int sn; + std::string id; + std::string ip; + int port; + std::string master; + int slots_start; + int slots_end; + stream >> is_valid; + stream >> sn; + stream >> id; + stream >> ip; + stream >> port; + stream >> master; + stream >> slots_start; + stream >> slots_end; + split.set_valid(is_valid); + split.set_sn(sn); + split.set_id(id); + split.set_ip(ip); + split.set_port(port); + split.set_master(master); + split.set_sstart(slots_start); + split.set_send(slots_end); + return stream; +} + +BinStream& operator<<(BinStream& stream, const RedisRangeKey& key) { + stream << key.str_ << key.start_ << key.end_; + return stream; +} + +BinStream& operator>>(BinStream& stream, RedisRangeKey& key) { + stream >> key.str_ >> key.start_ >> key.end_; + return stream; +} + +} // namespace io +} // namespace husky diff --git a/io/input/redis_split.hpp b/io/input/redis_split.hpp new file mode 100644 index 0000000..cd43a5d --- /dev/null +++ b/io/input/redis_split.hpp @@ -0,0 +1,172 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include + +#include "base/serialization.hpp" + +namespace husky { +namespace io { + +using base::BinStream; + +struct RedisRangeKey; + +typedef enum RedisTaskStatus { + WaitTasks, + NoMoreTask, + Abnormal +} RedisTaskStatus; + +class RedisSplit { + public: + RedisSplit(); + RedisSplit(const RedisSplit& other); + + // Set-functions + void set_valid(bool valid); + + void set_sn(int sn); + void set_id(const std::string& id); + void set_ip(const std::string& ip); + void set_master(const std::string& master); + + void set_port(int port); + void set_sstart(int start); + void set_send(int end); + + // Get-functions + inline const int get_sn() const { return sn_; } + inline const std::string& get_id() const { return id_; } + inline const std::string& get_ip() const { return ip_; } + inline const std::string& get_master() const { return master_; } + + inline int get_port() const { return port_; } + inline int get_sstart() const { return slots_start_; } + inline int get_send() const { return slots_end_; } + + // Miscellaneous + inline bool is_valid() const { return is_valid_; } + + // For std::map's key + bool operator<(const RedisSplit &rs)const { + return slots_start_ < rs.get_sstart(); + } + + RedisSplit& operator=(const RedisSplit &other) { + is_valid_ = other.is_valid_; + sn_ = other.sn_; + id_ = other.id_; + ip_ = other.ip_; + master_ = other.master_; + port_ = other.port_; + slots_start_ = other.slots_start_; + slots_end_ = other.slots_end_; + return *this; + } + + private: + bool is_valid_; + + int sn_; + std::string id_; + std::string ip_; + // '-' - master, id - slave + std::string master_; + int port_; + int slots_start_; + int slots_end_; +}; + +// for master-slaves load balance +class RedisSplitGroup { + public: + RedisSplitGroup() = default; + explicit RedisSplitGroup(RedisSplit master); + ~RedisSplitGroup(); + + // Set / Add + void add_member(std::string split_id); + void update_priority(); + void sort_members(); + + // Get + const std::vector& get_members() { return members_; } + int get_priority(std::string member_id, bool if_balance = false); + int get_num_members(); + const std::vector& get_sorted_members(); + + private: + std::vector members_; + std::vector sorted_members_; + // the smallest value, the highest priority + std::map priority_; +}; + +// for heavy-keys load balance +struct RedisRangeKey { + std::string str_; + int start_ = 0; + int end_ = -1; +}; + +BinStream& operator<<(BinStream& stream, const RedisSplit& split); +BinStream& operator>>(BinStream& stream, RedisSplit& split); + +BinStream& operator<<(BinStream& stream, const RedisRangeKey& key); +BinStream& operator>>(BinStream& stream, RedisRangeKey& key); + +const uint16_t crc16tab_[256]= { + 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, + 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, + 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, + 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, + 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, + 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, + 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, + 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, + 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, + 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, + 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, + 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, + 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, + 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, + 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, + 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, + 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, + 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, + 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, + 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, + 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, + 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, + 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, + 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, + 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, + 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, + 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, + 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, + 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, + 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, + 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, + 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 +}; + +} // namespace io +} // namespace husky diff --git a/io/output/CMakeLists.txt b/io/output/CMakeLists.txt index d8ac31c..f6fcd67 100644 --- a/io/output/CMakeLists.txt +++ b/io/output/CMakeLists.txt @@ -22,7 +22,10 @@ if(MONGOCLIENT_FOUND) list(APPEND io-output-src-file ${io-output-mongo-src-file}) endif(MONGOCLIENT_FOUND) -husky_cache_variable(io-output-src-file ${io-output-src-file}) +if(REDISCLIENT_FOUND) + file(GLOB io-output-redis-src-file redis_outputformat.cpp) + list(APPEND io-output-src-file ${io-output-redis-src-file}) +endif(REDISCLIENT_FOUND) add_library(output-objs OBJECT ${io-output-src-file}) husky_default_properties(output-objs) diff --git a/io/output/redis_outputformat.cpp b/io/output/redis_outputformat.cpp new file mode 100644 index 0000000..7824c8b --- /dev/null +++ b/io/output/redis_outputformat.cpp @@ -0,0 +1,371 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "io/output/redis_outputformat.hpp" + +#include "base/log.hpp" +#include "core/network.hpp" + +#include +#include + +#define CHECKREPLY(X) if ( !X || X->type == REDIS_REPLY_ERROR ) { LOG_E << "Error"; exit(-1); } + +namespace husky { +namespace io { + +using base::log_msg; + +enum RedisOutputFormatSetUp { + NotSetUp = 0, + ServerSetUp = 1 << 2, + AuthSetUp = 1 << 2, + AllSetUp = ServerSetUp | AuthSetUp, +}; + +RedisOutputFormat::RedisOutputFormat(int number_clients, int flush_buffer_size): number_clients_(number_clients), flush_buffer_size_(flush_buffer_size) { + records_map_.clear(); + cons_.clear(); + splits_.clear(); +} + +RedisOutputFormat::~RedisOutputFormat() { + records_map_.clear(); + for (auto& con : cons_) { + if (con.second.first) { + redisFree(con.second.first); + con.second.first = NULL; + } + } + cons_.clear(); + splits_.clear(); + sorted_split_group_name_.clear(); +} + +void RedisOutputFormat::set_server() { + ask_redis_masters_info(); + create_redis_con_pool(); + is_setup_ |= RedisOutputFormatSetUp::ServerSetUp; +} + +void RedisOutputFormat::set_auth(const std::string& password) { + password_ = password; + need_auth_ = true; + is_setup_ |= RedisOutputFormatSetUp::AuthSetUp; +} + +bool RedisOutputFormat::is_setup() const { + return !(is_setup_ ^ RedisOutputFormatSetUp::AllSetUp); +} + +void RedisOutputFormat::ask_redis_masters_info() { + BinStream question; + question << 0; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_REDIS_QRY_REQ); + answer >> splits_; + + num_split_groups_ = splits_.size(); + + sorted_split_group_name_.clear(); + for (auto& split_group : splits_) { + sorted_split_group_name_.push_back(split_group.first); + } + std::sort(sorted_split_group_name_.begin(), sorted_split_group_name_.end(), + [&](std::string& a, std::string& b){ + return splits_[a].get_sstart() < splits_[b].get_sstart(); + }); + num_slots_per_group_ = 16384 / num_split_groups_; +} + +std::string RedisOutputFormat::parse_host(const std::string& hostname) { + hostent * record = gethostbyname(hostname.c_str()); + if (record == NULL) { + LOG_E << "Hostname parse failed:" << hostname; + return "failed"; + } + in_addr * address = (in_addr*)record->h_addr; + std::string ip_address = inet_ntoa(*address); + return ip_address; +} + +void RedisOutputFormat::create_redis_con_pool() { + redisReply *reply = NULL; + for (auto& split : splits_) { + std::string proc_ip = parse_host(get_hostname()); + redisContext * c = NULL; + if (!split.second.get_ip().compare(proc_ip)) { + std::string sock_file_path = "/tmp/redis_"; + sock_file_path += std::to_string(split.second.get_port()) + ".sock"; + c = redisConnectUnixWithTimeout(sock_file_path.c_str(), timeout_); + } else { + c = redisConnectWithTimeout(split.second.get_ip().c_str(), split.second.get_port(), timeout_); + } + if (NULL == c || c->err) { + if (c) { + LOG_E << "Connection error: " + std::string(c->errstr); + redisFree(c); + } else { + LOG_E << "Connection error: can't allocate redis context"; + } + return; + } + if (need_auth_) { + reply = redisCmd(c, "AUTH %s", password_.c_str()); + CHECKREPLY(reply); + } + std::pair con(c, 0); + cons_[split.second.get_id()] = con; + } + if (reply) { + freeReplyObject(reply); + } +} + +int RedisOutputFormat::flush_all() { + int buffer_size = records_bytes_; + + if (records_map_.empty()) + return -1; + + redisContext * c = NULL; + int * c_count = 0; + redisReply *reply = NULL; + + for (auto& record : records_map_) { + std::string key = record.first; + RedisOutputFormat::DataType data_type = record.second.first; + BinStream result_stream = record.second.second; + + uint16_t target_slot = gen_slot_crc16(key.c_str(), key.length()); + int master_i = target_slot / num_slots_per_group_; + master_i = master_i > num_split_groups_-1 ? --master_i : master_i; + std::string master_id = sorted_split_group_name_[master_i]; + if (target_slot < splits_[master_id].get_sstart()) { + master_i--; + } else if ( target_slot > splits_[master_id].get_send() ) { + master_i++; + } + master_id = sorted_split_group_name_[master_i]; + c = cons_[master_id].first; + c_count = &cons_[master_id].second; + + switch (data_type) { + case RedisOutputFormat::DataType::RedisString: + { + std::string result_string; + result_stream >> result_string; + redisAppendCommand(c, "SET %b %b", key.c_str(), (size_t) key.length(), result_string.c_str(), (size_t) result_string.length()); + ++(*c_count); + } + break; + case RedisOutputFormat::DataType::RedisList: + { + int inner_data_type = RedisOutputFormat::InnerDataType::Other; + result_stream >> inner_data_type; + switch (inner_data_type) { + case RedisOutputFormat::InnerDataType::Char: + { + std::vector result_list; + result_stream >> result_list; + for (auto& result : result_list) { + redisCmd(c, "LPUSH %s %c", key.c_str(), result); + } + } + case RedisOutputFormat::InnerDataType::Short: + case RedisOutputFormat::InnerDataType::Int: + case RedisOutputFormat::InnerDataType::Long: + { + std::vector result_list; + result_stream >> result_list; + for (auto& result : result_list) { + redisCmd(c, "LPUSH %s %d", key.c_str(), result); + } + } + break; + case RedisOutputFormat::InnerDataType::Bool: + { + std::vector result_list; + result_stream >> result_list; + for (auto result : result_list) { + redisCmd(c, "LPUSH %s %s", key.c_str(), result ? "true" : "false"); + } + } + break; + case RedisOutputFormat::InnerDataType::Float: + { + std::vector result_list; + result_stream >> result_list; + for (auto& result : result_list) { + redisCmd(c, "LPUSH %s %f", key.c_str(), result); + } + } + break; + case RedisOutputFormat::InnerDataType::Double: + { + std::vector result_list; + result_stream >> result_list; + for (auto& result : result_list) { + redisCmd(c, "LPUSH %s %lf", key.c_str(), result); + } + } + break; + case RedisOutputFormat::InnerDataType::String: + { + std::vector result_list; + result_stream >> result_list; + for (auto& result : result_list) { + redisCmd(c, "LPUSH %s %s", key.c_str(), result.c_str()); + } + } + break; + default: + LOG_E << "undefined inner data type of vector"; + break; + } + } + break; + case RedisOutputFormat::DataType::RedisHash: + { + int inner_data_type = RedisOutputFormat::InnerDataType::Other; + result_stream >> inner_data_type; + switch (inner_data_type) { + case RedisOutputFormat::InnerDataType::Char: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %c", key.c_str(), result.first.c_str(), result.second); + } + } + case RedisOutputFormat::InnerDataType::Short: + case RedisOutputFormat::InnerDataType::Int: + case RedisOutputFormat::InnerDataType::Long: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %d", key.c_str(), result.first.c_str(), result.second); + } + } + break; + case RedisOutputFormat::InnerDataType::Bool: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %s", key.c_str(), result.first.c_str(), result.second ? "true" : "false"); + } + } + break; + case RedisOutputFormat::InnerDataType::Float: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %f", key.c_str(), result.first.c_str(), result.second); + } + } + break; + case RedisOutputFormat::InnerDataType::Double: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %lf", key.c_str(), result.first.c_str(), result.second); + } + } + break; + case RedisOutputFormat::InnerDataType::String: + { + std::map result_map; + result_stream >> result_map; + for (auto& result : result_map) { + redisCmd(c, "HSET %s %s %s", key.c_str(), result.first.c_str(), result.second.c_str()); + } + } + break; + default: + LOG_E << "undefined inner data type of map"; + break; + } + } + break; + case RedisOutputFormat::DataType::RedisSet: + // redisCmd(c, "SADD %s %s", ); + break; + case RedisOutputFormat::DataType::RedisZSet: + // with score, batch available + // redisCmd("ZADD %s %d %s", ); + break; + default: + LOG_E << "undefined data structure"; + break; + } + } + + + for (auto& con : cons_) { + while (con.second.second-- > 0) { + int r = redisGetReply(con.second.first, (void **) &reply); + if (r == REDIS_ERR) { + LOG_E << "REDIS_ERR"; + exit(-1); + } + // CHECKREPLY(reply); + if (!reply) { + LOG_E << "NULL REPLY"; + } else if (reply->type == REDIS_REPLY_ERROR) { + LOG_E << "REDIS_REPLY_ERROR -> " << reply->str; + LOG_E << "pipeline remained -> " << con.second.second; + ask_redis_masters_info(); + } + } + con.second.second = con.second.second < 0 ? 0 : con.second.second; + } + + + if (reply) { + freeReplyObject(reply); + reply = NULL; + } + + records_map_.clear(); + records_bytes_ = 0; + + return buffer_size; +} + +uint16_t RedisOutputFormat::gen_slot_crc16(const char *buf, int len) { + int counter; + uint16_t crc = 0; + for (counter = 0; counter < len; counter++) + crc = (crc << 8) ^ crc16tab_[((crc >> 8) ^ *buf++)&0x00FF]; + return crc % 16384; +} + +} // namespace io +} // namespace husky diff --git a/io/output/redis_outputformat.hpp b/io/output/redis_outputformat.hpp new file mode 100644 index 0000000..cf56b45 --- /dev/null +++ b/io/output/redis_outputformat.hpp @@ -0,0 +1,207 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +// #include "hiredis/adapters/libevent.h" +#include "hiredis/hiredis.h" + +#include "base/serialization.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/worker_info.hpp" +#include "io/input/redis_split.hpp" +#include "io/output/outputformat_base.hpp" + +#define redisCmd(context, ...) static_cast(redisCommand(context, __VA_ARGS__)) + +namespace husky { +namespace io { + +class RedisOutputFormat final : public OutputFormatBase { +private: + typedef enum DataType { + RedisString, + RedisList, + RedisHash, + RedisSet, + RedisZSet + } DataType; + typedef enum InnerDataType { + Other, + Char, + Short, + Int, + Long, + Bool, + Float, + Double, + String + } InnerDataType; + +public: + explicit RedisOutputFormat(int number_clients = 10, int flush_buffer_size = 102400); + ~RedisOutputFormat(); + virtual bool is_setup() const; + + void set_auth(const std::string& password); + void set_server(); + + inline bool commit(const std::string& key, const std::string& result_string); + template + inline bool commit(const std::string& key, const std::vector& result_list); + template + inline bool commit(const std::string& key, const std::map& result_hash); + int flush_all(); + +private: + void ask_redis_masters_info(); + void create_redis_con_pool(); + + uint16_t gen_slot_crc16(const char *buf, int len); + std::string parse_host(const std::string& hostname); + template + inline int get_template_type(DataT sample); + +private: + int num_split_groups_; + int num_slots_per_group_; + std::vector sorted_split_group_name_; + + bool need_auth_ = false; + std::string password_; + // mixed-type data waited to be flushed + std::map > records_map_; + struct timeval timeout_ = {1, 500000}; + int records_bytes_ = 0; + // number of connections for each redis master + int number_clients_; + int flush_buffer_size_; + std::map > cons_; + std::map splits_; +}; + +// commit template implementation +bool RedisOutputFormat::commit(const std::string& key, const std::string& result_string) { + if (!is_setup()) + return false; + if (result_string.empty()) + return false; + + const RedisOutputFormat::DataType data_type = RedisOutputFormat::DataType::RedisString; + BinStream result_stream; + result_stream << result_string; + + records_map_[key] = std::pair(data_type, result_stream); + records_bytes_ += result_string.length(); + + if (records_bytes_ >= flush_buffer_size_) { + flush_all(); + return true; + } else { + return false; + } +} + +template +bool RedisOutputFormat::commit(const std::string& key, const std::vector& result_list) { + if (!is_setup()) + return false; + if (result_list.empty()) + return false; + + RedisOutputFormat::DataType data_type = RedisOutputFormat::DataType::RedisList; + BinStream result_stream; + int inner_data_type = get_template_type(result_list[0]); + result_stream << inner_data_type; + result_stream << result_list; + const std::string result_stream_buffer = result_stream.to_string(); + + records_map_[key] = std::pair(data_type, result_stream); + records_bytes_ += result_stream_buffer.length(); + + if (records_bytes_ >= flush_buffer_size_) { + flush_all(); + return true; + } else { + return false; + } +} + +template +bool RedisOutputFormat::commit(const std::string& key, const std::map& result_hash) { + if (!is_setup()) + return false; + if (result_hash.empty()) + return false; + + RedisOutputFormat::DataType data_type = RedisOutputFormat::DataType::RedisHash; + BinStream result_stream; + int inner_data_type = get_template_type(result_hash.begin()->second); + result_stream << inner_data_type; + result_stream << result_hash; + const std::string result_stream_buffer = result_stream.to_string(); + + records_map_[key] = std::pair(data_type, result_stream); + records_bytes_ += result_stream_buffer.length(); + + if (records_bytes_ >= flush_buffer_size_) { + flush_all(); + return true; + } else { + return false; + } +} + +template +int RedisOutputFormat::get_template_type(DataT sample) { + const char * sample_type = typeid(sample).name(); + char test_char; + int16_t test_short; + int test_int; + int64_t test_long; + bool test_bool; + float test_float; + double test_double; + std::string test_string; + if (!strcmp(typeid(test_string).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::String; + } else if (!strcmp(typeid(test_short).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Short; + } else if (!strcmp(typeid(test_int).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Int; + } else if (!strcmp(typeid(test_long).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Long; + } else if (!strcmp(typeid(test_bool).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Bool; + } else if (!strcmp(typeid(test_float).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Float; + } else if (!strcmp(typeid(test_double).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Double; + } else if (!strcmp(typeid(test_char).name(), sample_type)) { + return RedisOutputFormat::InnerDataType::Char; + } else { + return RedisOutputFormat::InnerDataType::Other; + } +} + +} // namespace io +} // namespace husky diff --git a/master/CMakeLists.txt b/master/CMakeLists.txt index 899101a..c0fd9fb 100644 --- a/master/CMakeLists.txt +++ b/master/CMakeLists.txt @@ -25,6 +25,10 @@ if(MONGOCLIENT_FOUND) list(APPEND master_plugins mongodb_assigner.cpp) endif(MONGOCLIENT_FOUND) +if(REDISCLIENT_FOUND) + list(APPEND master_plugins redis_assigner.cpp) +endif(REDISCLIENT_FOUND) + if(ORC_FOUND) list(APPEND master_plugins orc_assigner.cpp) endif(ORC_FOUND) diff --git a/master/redis_assigner.cpp b/master/redis_assigner.cpp new file mode 100644 index 0000000..b56f7f6 --- /dev/null +++ b/master/redis_assigner.cpp @@ -0,0 +1,939 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef WITH_REDIS + +#include "master/redis_assigner.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include "boost/tokenizer.hpp" +#include "hiredis/hiredis.h" + +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/zmq_helpers.hpp" +#include "master/master.hpp" + +#define CHECK(X) if ( !X || X->type == REDIS_REPLY_ERROR ) { LOG_E << "Error"; exit(-1); } + +namespace husky { + +static RedisSplitAssigner redis_split_assigner; + +RedisSplitAssigner::RedisSplitAssigner() { + Master::get_instance().register_main_handler(TYPE_REDIS_REQ, + std::bind(&RedisSplitAssigner::master_redis_req_handler, this)); + Master::get_instance().register_main_handler(TYPE_REDIS_QRY_REQ, + std::bind(&RedisSplitAssigner::master_redis_qry_req_handler, this)); + Master::get_instance().register_main_handler(TYPE_REDIS_END_REQ, + std::bind(&RedisSplitAssigner::master_redis_req_end_handler, this)); + Master::get_instance().register_setup_handler(std::bind(&RedisSplitAssigner::master_setup_handler, this)); +} + +bool RedisSplitAssigner::load_parameters() { + if ("" == Context::get_param("redis_hostname") || "" == Context::get_param("redis_port")) { + LOG_E << "Redis required parameters missed:"; + LOG_E << " --redis_hostname HOSTNAME"; + LOG_E << " --redis_port PORT"; + return false; + } else { + ip_ = Context::get_param("redis_hostname"); + port_ = atoi(Context::get_param("redis_port").c_str()); + if (Context::get_param("redis_key_split_threshold").compare("")) { + key_split_size_ = stoi(Context::get_param("redis_key_split_threshold")); + } else { + key_split_size_ = 0; + LOG_I << "split heavy List: \033[1;32m[OFF]\033[0m"; + } + if ((keys_path_ = Context::get_param("redis_keys_file")).compare("")) { + keys_file_.open(keys_path_, std::ios::in); + } else { + LOG_I << "load keys from file: \033[1;32m[OFF]\033[0m"; + } + if (Context::get_param("redis_keys_pattern").compare("")) { + keys_pattern_ = Context::get_param("redis_keys_pattern"); + } else { + keys_pattern_ = ""; + LOG_I << "load keys from pattern: \033[1;32m[OFF]\033[0m"; + } + if (Context::get_param("redis_keys_list").compare("")) { + keys_list_ = Context::get_param("redis_keys_list"); + } else { + keys_list_ = ""; + LOG_I << "load keys from list: \033[1;32m[OFF]\033[0m"; + } + if (Context::get_param("redis_local_latency").compare("")) { + local_served_latency_ = atoi(Context::get_param("redis_local_latency").c_str()); + } else { + local_served_latency_ = 100; + LOG_I << "local latency: \033[1;32m[DEFAULT 100 microseconds]\033[0m"; + } + if (Context::get_param("redis_non_local_latency").compare("")) { + non_local_served_latency_ = atoi(Context::get_param("redis_non_local_latency").c_str()); + } else { + non_local_served_latency_ = 100; + LOG_I << "remote latency: \033[1;32m[DEFAULT 100 microseconds]\033[0m"; + } + return true; + } +} + +void RedisSplitAssigner::master_redis_qry_req_handler() { + auto& master = Master::get_instance(); + auto master_socket = master.get_socket(); + BinStream stream = zmq_recv_binstream(master_socket.get()); + + int query_type = -1; + stream >> query_type; + + stream.clear(); + + std::map redis_splits_info; + switch (query_type) { + case 0: + { + answer_masters_info(redis_splits_info); + break; + } + case 1: + { + answer_splits_info(redis_splits_info); + break; + } + default: + { + LOG_E << "Undefined query type: " << query_type; + break; + } + } + stream << redis_splits_info; + + zmq_sendmore_string(master_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(master_socket.get()); + zmq_send_binstream(master_socket.get(), stream); +} + +void RedisSplitAssigner::master_redis_req_handler() { + auto& master = Master::get_instance(); + auto master_socket = master.get_socket(); + BinStream stream = zmq_recv_binstream(master_socket.get()); + + int global_tid = -1; + stream >> global_tid; + + stream.clear(); + + // deliver keys to the incoming worker + std::vector > ret; + answer_tid_best_keys(global_tid, ret); + + /* visualize delivered keys + for (int split_i=0; split_i keys = ret[split_i]; + for (auto& key : keys) { + LOG_I << split_i << " " << split.second.get_ip() << ":" << split.second.get_port() << " <- " << gen_slot_crc16(key.str_.c_str(), key.str_.length()) << " " << key.str_; + } + break; + } + } + } + */ + + stream << worker_task_status_[global_tid]; + stream << ret; + + zmq_sendmore_string(master_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(master_socket.get()); + zmq_send_binstream(master_socket.get(), stream); +} + +void RedisSplitAssigner::master_redis_req_end_handler() { + auto& master = Master::get_instance(); + auto master_socket = master.get_socket(); + BinStream stream = zmq_recv_binstream(master_socket.get()); + int global_tid = -1; + int num_received_keys = -1; + + stream >> global_tid; + stream >> num_received_keys; + receive_end(global_tid, num_received_keys); + + if (if_all_keys_fetched_) { + reset_default_states(); + } + + stream.clear(); + zmq_sendmore_string(master_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(master_socket.get()); + zmq_send_binstream(master_socket.get(), stream); +} + +void RedisSplitAssigner::master_setup_handler() { + if (!load_parameters()) return; + + seed_ = std::chrono::system_clock::now().time_since_epoch().count(); + srand(seed_); + + create_redis_info(); + create_husky_info(); + create_split_proc_map(); + create_redis_con_pool(); + import_all_pattern_keys(); + create_schedule_thread(); +} + +RedisSplitAssigner::~RedisSplitAssigner() { + splits_.clear(); + split_groups_.clear(); + + all_keys_.clear(); + batch_keys_.clear(); + non_local_served_keys_.clear(); + worker_keys_pools_.clear(); + worker_num_fetched_keys_.clear(); + worker_num_keys_assigned_.clear(); + + split_proc_map_.clear(); + procs_load_.clear(); + keys_latency_map_.clear(); + proc_keys_stat_.clear(); + + if (keys_file_.is_open()) { + keys_file_.close(); + } + + // release Redis connection pool + for (auto& con : cons_) { + if (con.second) { + redisFree(con.second); + } + } + cons_.clear(); + + scheduler_.join(); +} + +void RedisSplitAssigner::set_auth(const std::string& password) { + password_ = password; + need_auth_ = true; +} + +void RedisSplitAssigner::reset_auth() { need_auth_ = false; } + +bool RedisSplitAssigner::refresh_splits_info() { + redisContext *c = NULL; + redisReply *reply = NULL; + c = redisConnectWithTimeout(ip_.c_str(), port_, timeout_); + if (NULL == c || c->err) { + if (c) { + LOG_E << "Connection error: " << std::string(c->errstr); + redisFree(c); + } else { + LOG_E << "Connection error: can't allocate redis context"; + } + return 0; + } + + // to be tested + if (need_auth_) { + reply = redisCmd(c, "AUTH %s", password_.c_str()); + CHECK(reply); + } + + int serial_number = 0; + // get the cluster nodes list + reply = redisCmd(c, "CLUSTER NODES"); + std::istringstream rep_lines(reply->str); + char line_buf[256] = ""; + // parse each redis server + while (rep_lines.getline(line_buf, sizeof(line_buf))) { + RedisSplit split; + split.set_sn(serial_number++); + std::istringstream line(line_buf); + std::vector split_info; + // parse a line of server info + while (line) { + std::string field; + line >> field; + if ("" != field) { + split_info.push_back(field); + } + } + // parse id + split.set_id(split_info[0]); + // parse ip + std::istringstream ip_port(split_info[1]); + ip_port.getline(line_buf, sizeof(line_buf), ':'); + split.set_ip(line_buf); + // parse port + ip_port.getline(line_buf, sizeof(line_buf), ':'); + std::istringstream ports(line_buf); + ports.getline(line_buf, sizeof(line_buf), '@'); + split.set_port(atoi(line_buf)); + // parse role + split.set_master(split_info[3]); + // parse node state + // if healthy + if (split_info[2].substr(split_info[2].size()-4, 4).compare("fail")) { + // if master + if (!split_info[3].compare("-")) { + // slots start + std::istringstream slots(split_info[8]); + slots.getline(line_buf, sizeof(line_buf), '-'); + split.set_sstart(atoi(line_buf)); + // slots end + slots.getline(line_buf, sizeof(line_buf), '-'); + split.set_send(atoi(line_buf)); + slots.getline(line_buf, sizeof(line_buf), '-'); + // init load balance + RedisSplitGroup split_group(split); + split_groups_[split.get_id()] = split_group; + } + split.set_valid(true); + // if down + } else { + split.set_valid(false); + } + splits_[split.get_id()] = split; + } + + // set slaves' slots range for load balancing + for (auto& split : splits_) { + std::string my_master; + if ((my_master = split.second.get_master()).compare("-")) { + split.second.set_sstart(splits_[my_master].get_sstart()); + split.second.set_send(splits_[my_master].get_send()); + split_groups_[my_master].add_member(split.first); + } + } + + // sort split_groups for efficient query + sorted_split_group_name_.clear(); + for ( auto& split_group : split_groups_ ) { + sorted_split_group_name_.push_back(split_group.first); + } + std::sort(sorted_split_group_name_.begin(), sorted_split_group_name_.end(), + [&](std::string& a, std::string& b){ + return splits_[a].get_sstart() < splits_[b].get_sstart(); + }); + num_slots_per_group_ = 16384 / split_groups_.size(); + + if (reply) { + freeReplyObject(reply); + } + if (c) { + redisFree(c); + } +} + +void RedisSplitAssigner::reset_default_states() { + num_keys_amount_ = 0; + num_keys_batched_ = 0; + num_keys_scheduled_ = 0; + num_keys_fetched_ = 0; + cur_start_ = 0; + non_local_offset_ = 0; + + is_dynamic_imported_ = false; + is_pattern_imported_ = false; + is_pattern_batched_ = false; + is_file_imported_ = false; + if_all_keys_scheduled_ = false; + if_all_keys_fetched_ = false; + + refresh_splits_info(); + create_redis_con_pool(); + + all_keys_.clear(); + batch_keys_.clear(); + non_local_served_keys_.clear(); + keys_latency_map_.clear(); + + procs_load_.clear(); + proc_keys_stat_.clear(); + proc_worker_offset_.clear(); + proc_worker_map_.clear(); + + worker_num_keys_assigned_.clear(); + worker_num_fetched_keys_.clear(); + worker_task_status_.clear(); + worker_keys_pools_.clear(); + + create_husky_info(); + + if (if_scheduler_stopped_) { + scheduler_.join(); + if_scheduler_stopped_ = false; + create_schedule_thread(); + } +} + +bool RedisSplitAssigner::create_redis_info() { + refresh_splits_info(); + + for (auto& split_group : split_groups_) { + const std::vector members = split_group.second.get_members(); + LOG_I << "\033[1;32m====================================================\033[0m"; + LOG_I << "redis group id: " << split_group.first; + LOG_I << "slots start: " << std::to_string(splits_[split_group.first].get_sstart()); + LOG_I << "slots end: " << std::to_string(splits_[split_group.first].get_send()); + for (auto& member_id : members) { + LOG_I << "serial number:" << splits_[member_id].get_sn() << " member id:" << member_id << " ip:" << splits_[member_id].get_ip() << " port:" << std::to_string(splits_[member_id].get_port()) << " priority:" << std::to_string(split_group.second.get_priority(member_id)); + } + } + LOG_I << "\033[1;32m====================================================\033[0m"; +} + +void RedisSplitAssigner::answer_tid_best_keys(int global_tid, std::vector >& ret) { + std::unique_lock pools_lock(worker_pools_mutex_); + + int num_answered = 0; + int num_splits = splits_.size(); + int num_remained_keys = 0; + for (int split_i=0; split_i < num_splits; split_i++) { + num_remained_keys += worker_keys_pools_[global_tid][split_i].size(); + std::vector split_keys; + ret.push_back(split_keys); + } + int num_to_be_answered = worker_num_keys_assigned_[global_tid].front() < num_remained_keys ? worker_num_keys_assigned_[global_tid].front() : num_remained_keys; + + if (num_to_be_answered > 0) { + int i = 0; + while (num_answered < num_to_be_answered) { + int split_i = i++ % num_splits; + if (!worker_keys_pools_[global_tid][split_i].empty()) { + ret[split_i].push_back(worker_keys_pools_[global_tid][split_i].back()); + worker_keys_pools_[global_tid][split_i].pop_back(); + num_answered++; + } + } + } + + if (0 == num_remained_keys - num_answered) { + if (is_dynamic_imported_ && is_pattern_batched_ && is_file_imported_ && if_all_keys_scheduled_) { + worker_task_status_[global_tid] = io::RedisTaskStatus::NoMoreTask; + } else if (!is_dynamic_imported_ || !is_pattern_batched_ || !is_file_imported_ || !if_all_keys_scheduled_) { + worker_task_status_[global_tid] = io::RedisTaskStatus::WaitTasks; + } else { + worker_task_status_[global_tid] = io::RedisTaskStatus::Abnormal; + } + } else { + worker_task_status_[global_tid] = io::RedisTaskStatus::WaitTasks; + } +} + +void RedisSplitAssigner::answer_masters_info(std::map& redis_masters_info) { + for (auto& split_group : split_groups_) { + RedisSplit& master = splits_[split_group.first]; + redis_masters_info[master.get_id()] = master; + } +} + +void RedisSplitAssigner::answer_splits_info(std::map& redis_splits_info) { + for (auto& split : splits_) { + redis_splits_info[split.first] = split.second; + } +} + +void RedisSplitAssigner::receive_end(int global_tid, int num_received_keys) { + std::unique_lock pools_lock(worker_pools_mutex_); + worker_num_fetched_keys_[global_tid] += num_received_keys; + num_keys_fetched_ += num_received_keys; + if (worker_num_keys_assigned_[global_tid].front() == num_received_keys) { + worker_num_keys_assigned_[global_tid].pop(); + } else { + LOG_E << "end:worker_" << global_tid << "[" << num_received_keys << "/" << worker_num_keys_assigned_[global_tid].front() << "]"; + worker_num_keys_assigned_[global_tid].pop(); + } + LOG_I << "fetched [" << num_keys_fetched_ << "," << num_keys_scheduled_ << "," << num_keys_batched_ << "/" << num_keys_amount_ << "]"; + if (num_keys_fetched_ == num_keys_amount_) { + if_all_keys_fetched_ = true; + } +} + +uint16_t RedisSplitAssigner::gen_slot_crc16(const char *buf, int len) { + int counter; + uint16_t crc = 0; + for (counter = 0; counter < len; counter++) + crc = (crc << 8) ^ io::crc16tab_[((crc >> 8) ^ *buf++)&0x00FF]; + return crc % 16384; +} + +std::string RedisSplitAssigner::parse_host(const std::string& hostname) { + hostent * record = gethostbyname(hostname.c_str()); + if (record == NULL) { + LOG_E << "Hostname parse failed:" << hostname; + return "failed"; + } + in_addr * address = (in_addr *)record->h_addr; + std::string ip_address = inet_ntoa(*address); + return ip_address; +} + +void RedisSplitAssigner::create_split_proc_map() { + for (auto& split : splits_) { + for (int proc_id=0; proc_id < num_procs_; proc_id++) { + std::string proc_host = work_info_.get_hostname(proc_id); + if (!(parse_host(proc_host)).compare(split.second.get_ip())) { + split_proc_map_[split.first] = proc_id; + break; + } + } + } +} + +void RedisSplitAssigner::create_husky_info() { + work_info_ = Context::get_worker_info(); + num_procs_ = work_info_.get_num_processes(); + num_workers_ = work_info_.get_num_workers(); + batch_size_ = num_workers_ * 10000; + for (int proc_id=0; proc_id < num_procs_; proc_id++) { + // initialize process-level keys statistics + std::vector keys_stat{0, 0}; + proc_keys_stat_.push_back(keys_stat); + // initialize process-level workload + procs_load_.push_back(0); + // mapping processes and workers + int num_local_workers = work_info_.get_num_local_workers(proc_id); + std::vector local_workers; + for (int worker_id=0; worker_id < num_local_workers; worker_id++) { + local_workers.push_back(work_info_.local_to_global_id(proc_id, worker_id)); + } + proc_worker_map_.push_back(local_workers); + // initialize process' worker offset + proc_worker_offset_.push_back(0); + } + for (int worker_id=0; worker_id < num_workers_; worker_id++) { + // create worker-level keys pools + std::vector > best_keys_pool; + for (int split_i=0; split_i < splits_.size(); split_i++) { + std::vector split_keys; + best_keys_pool.push_back(split_keys); + } + worker_keys_pools_.push_back(best_keys_pool); + // create worker-level fetched keys statistics + worker_num_fetched_keys_.push_back(0); + // create worker-level task status + worker_task_status_.push_back(io::RedisTaskStatus::WaitTasks); + // create worker-level batch keys statistics + std::queue worker_num_batch_keys; + worker_num_keys_assigned_.push_back(worker_num_batch_keys); + } +} + +void RedisSplitAssigner::create_redis_con_pool() { + // create Redis connection pool + redisReply *reply = NULL; + for (auto& split_group : split_groups_) { + RedisSplit master = splits_[split_group.first]; + redisContext * c = redisConnectWithTimeout(master.get_ip().c_str(), master.get_port(), timeout_); + if (NULL == c || c->err) { + if (c) { + LOG_E << "Connection error: " + std::string(c->errstr); + redisFree(c); + } else { + LOG_E << "Connection error: can't allocate redis context"; + } + return; + } + + // to be tested + if (need_auth_) { + reply = redisCmd(c, "AUTH %s", password_.c_str()); + } + + cons_[split_group.first] = c; + } + if (reply) { + freeReplyObject(reply); + } +} + +void RedisSplitAssigner::create_schedule_thread() { + scheduler_ = std::thread(&RedisSplitAssigner::load_schedule, this); +} + +void RedisSplitAssigner::load_schedule() { + if (if_streaming_mode_) { + while (true) { + std::unique_lock pools_lock(worker_pools_mutex_, std::defer_lock); + load_batch_keys(); + schedule_batch_keys(pools_lock.mutex()); + } + } else { + while (!is_dynamic_imported_ || !is_pattern_batched_ || !is_file_imported_ || !if_all_keys_scheduled_) { + std::unique_lock pools_lock(worker_pools_mutex_, std::defer_lock); + load_batch_keys(); + schedule_batch_keys(pools_lock.mutex()); + } + } + if_scheduler_stopped_ = true; + LOG_I << "scheduling thread finished."; +} + +void RedisSplitAssigner::import_all_pattern_keys() { + redisReply *reply = NULL; + + if (keys_pattern_.compare("")) { + LOG_I << "keys from pattern [" << keys_pattern_ << "] ..."; + for (auto& split_group : split_groups_) { + RedisSplit master = splits_[split_group.first]; + reply = redisCmd(cons_[master.get_id()], "KEYS %s", keys_pattern_.c_str()); + LOG_I << "[" << reply->elements << "] matched on [" << master.get_ip() << ":" << master.get_port() << "]"; + // no record matches this pattern on this master + if (reply->elements <= 0) { + continue; + } + std::vector split_all_keys; + int i = 0; + for (; i < reply->elements; i++) { + std::string key = std::string(reply->element[i]->str); + split_all_keys.push_back(key); + } + all_keys_.push_back(split_all_keys); + num_keys_amount_ += split_all_keys.size(); + } + LOG_I << "[" << num_keys_amount_ << "] keys from pattern DONE"; + } + + is_pattern_imported_ = true; + + if (reply) { + freeReplyObject(reply); + } +} + +// load a batch of keys +void RedisSplitAssigner::load_batch_keys() { + redisReply *reply = NULL; + + // mode 1: load keys from Redis LIST, a batch at a time + if (!is_dynamic_imported_) { + if (keys_list_.compare("")) { + LOG_I << "keys from list [" << keys_list_ << "] ..."; + if (!if_found_keys_list_) { + uint16_t slot = gen_slot_crc16(keys_list_.c_str(), keys_list_.length()); + int split_group_id = slot / num_slots_per_group_; + split_group_id = split_group_id > sorted_split_group_name_.size()-1 ? --split_group_id : split_group_id; + if (slot < splits_[sorted_split_group_name_[split_group_id]].get_sstart()) { + split_group_id--; + } else if (slot > splits_[sorted_split_group_name_[split_group_id]].get_send()) { + split_group_id++; + } + keys_list_master_ = splits_[sorted_split_group_name_[split_group_id]]; + // check if the list exists + reply = redisCmd(cons_[keys_list_master_.get_id()], "EXISTS %s", keys_list_.c_str()); + if (0 == reply->integer) { + is_dynamic_imported_ = true; + LOG_E << "didn't find keys-list:" << keys_list_; + } else { + if_found_keys_list_ = true; + } + } + if (if_found_keys_list_) { + reply = redisCmd(cons_[keys_list_master_.get_id()], "LRANGE %s %d %d", keys_list_.c_str(), cur_start_, cur_start_ + batch_size_ - 1); + cur_start_ += batch_size_; + for (int i=0; i < reply->elements; i++) { + std::string key = std::string(reply->element[i]->str); + batch_keys_.push_back(key); + } + if (0 == reply->elements) { + is_dynamic_imported_ = true; + LOG_I << "keys from list DONE"; + } + } + } else { + is_dynamic_imported_ = true; + } + } + + // mode 2 (step 1): load keys from pattern, load all keys at once + if (!is_pattern_imported_) { + import_all_pattern_keys(); + } + // mode 2 (step 2): load keys from pattern, generate a batch + int num_remained = num_keys_amount_ - num_keys_batched_; + if (!is_pattern_batched_) { + if (num_keys_batched_ != num_keys_amount_) { + int load_size = num_remained < batch_size_ ? num_remained : batch_size_; + int num_loaded = 0; + int i = 0; + while (num_loaded < load_size) { + std::vector& split_all_keys = all_keys_[i++ % all_keys_.size()]; + if (!split_all_keys.empty()) { + batch_keys_.push_back(split_all_keys.back()); + split_all_keys.pop_back(); + num_loaded++; + } + } + num_keys_batched_ += load_size; + } else { + is_pattern_batched_ = true; + } + } + + // mode 3: load keys from file, a batch at a time + std::string raw_key; + if (!is_file_imported_) { + if (keys_file_.is_open()) { + LOG_I << "keys from file [" << keys_path_ << "] ..."; + keys_file_.seekg(cur_pos_); + // import keys for current batch + for (int num_cur_line = 0; num_cur_line < batch_size_; num_cur_line++) { + std::getline(keys_file_, raw_key); + if (is_file_imported_ = keys_file_.eof()) { + is_file_imported_ = true; + break; + } + // eliminate '\r' + std::string key = raw_key.erase(raw_key.find_last_not_of(" \r\n")+1); + batch_keys_.push_back(key); + } + if (!is_file_imported_) + cur_pos_ = keys_file_.tellg(); + LOG_I << "keys from file DONE"; + } else { + is_file_imported_ = true; + } + } + + if (reply) { + freeReplyObject(reply); + } +} + +uint64_t RedisSplitAssigner::reduce_max_workload(PROC_KEYS_POOLS& proc_new_keys_pools, std::vector& workers_load) { + // update process-level workload + for (int proc_id=0; proc_id < num_procs_; proc_id++) { + procs_load_[proc_id] = proc_keys_stat_[proc_id][0] * local_served_latency_ + proc_keys_stat_[proc_id][1] * non_local_served_latency_; + // generate worker-level workload + for (int worker_id : proc_worker_map_[proc_id]) { + workers_load[worker_id] = procs_load_[proc_id] / proc_worker_map_[proc_id].size(); + } + } + auto minmax_it = std::minmax_element(workers_load.begin(), workers_load.end()); + int max_worker_id = minmax_it.second - workers_load.begin(); + int min_worker_id = minmax_it.first - workers_load.begin(); + int max_proc_id = work_info_.get_process_id(max_worker_id); + int min_proc_id = work_info_.get_process_id(min_worker_id); + int num_workers_max = proc_worker_map_[max_proc_id].size(); + int num_workers_min = proc_worker_map_[min_proc_id].size(); + uint64_t max_load = workers_load[max_worker_id]; + bool if_equal = false; + // transfer non-local/local workload + for (int key_type=1; key_type >= 0; key_type--) { + for (auto& split_keys : proc_new_keys_pools[max_proc_id][key_type]) { + if (if_equal) break; + int num_split_keys = split_keys.second.size(); + for (int i=0; i < num_split_keys; i++) { + const RedisRangeKey& range_key = split_keys.second.back(); + procs_load_[max_proc_id] -= keys_latency_map_[range_key.str_][max_proc_id]; + procs_load_[min_proc_id] += keys_latency_map_[range_key.str_][min_proc_id]; + if (local_served_latency_ == keys_latency_map_[range_key.str_][min_proc_id]) { + proc_new_keys_pools[min_proc_id][0][split_keys.first].push_back(split_keys.second.back()); + proc_keys_stat_[min_proc_id][0]++; + } else { + proc_new_keys_pools[min_proc_id][1][split_keys.first].push_back(split_keys.second.back()); + proc_keys_stat_[min_proc_id][1]++; + } + split_keys.second.pop_back(); + proc_keys_stat_[max_proc_id][key_type]--; + if (procs_load_[max_proc_id]/num_workers_max-non_local_served_latency_ <= procs_load_[min_proc_id]/num_workers_min) { + if_equal = true; + break; + } + } + } + } + + // return estimated reduced time at worker-level + return max_load - procs_load_[max_proc_id]/num_workers_max; +} + +void RedisSplitAssigner::schedule_batch_keys(std::mutex * pools_lock) { + redisReply *reply = NULL; + + // step 1: initialize incoming keys pools + PROC_KEYS_POOLS proc_new_keys_pools; + for (int proc_id=0; proc_id < num_procs_; proc_id++) { + std::map > local_keys_pool; + std::map > non_local_keys_pool; + std::vector > > best_keys_pool{local_keys_pool, non_local_keys_pool}; + proc_new_keys_pools.push_back(best_keys_pool); + } + + // step 2: assign local-served keys to process-level, retain non-local-served keys + for (auto& key : batch_keys_) { + RedisRangeKey range_key; + range_key.str_ = key; + uint16_t slot = gen_slot_crc16(key.c_str(), key.length()); + + int split_group_id = slot / num_slots_per_group_; + split_group_id = split_group_id > sorted_split_group_name_.size()-1 ? --split_group_id : split_group_id; + if (slot < splits_[sorted_split_group_name_[split_group_id]].get_sstart()) { + split_group_id--; + } else if (slot > splits_[sorted_split_group_name_[split_group_id]].get_send()) { + split_group_id++; + } + RedisSplitGroup& split_group = split_groups_[sorted_split_group_name_[split_group_id]]; + split_group.sort_members(); + for (int proc_id = 0; proc_id < num_procs_; proc_id++) { + keys_latency_map_[key].push_back(non_local_served_latency_); + } + std::string selected_split_id = split_group.get_sorted_members()[0]; + std::map::iterator it; + if ((it = split_proc_map_.find(selected_split_id)) != split_proc_map_.end()) { + int proc_id = it->second; + proc_new_keys_pools[proc_id][0][selected_split_id].push_back(range_key); + proc_keys_stat_[proc_id][0]++; + keys_latency_map_[key][proc_id] = local_served_latency_; + } else { + // retain non-local-served keys (this kind of keys rarely exist) + if (key_split_size_) { + RedisSplit master = splits_[sorted_split_group_name_[split_group_id]]; + reply = redisCmd(cons_[master.get_id()], "TYPE %s", key.c_str()); + if (!strcmp(reply->str, "list")) { + reply = redisCmd(cons_[master.get_id()], "LLEN %s", key.c_str()); + int llen = reply->integer; + // heavy list + if (llen >= key_split_size_) { + int range_start = 0; + for (int range_start=0; range_start < llen; range_start+=key_split_size_) { + range_key.start_ = range_start; + range_key.end_ = range_start+key_split_size_-1; + non_local_served_keys_[selected_split_id].push_back(range_key); + } + } else { + non_local_served_keys_[selected_split_id].push_back(range_key); + } + } + } else { + non_local_served_keys_[selected_split_id].push_back(range_key); + } + } + split_group.update_priority(); + } + + // step 3: optimize process-level workload + int reduced_time = 0; + int consumed_time = 0; + int optimize_step = 1; + std::chrono::time_point start; + std::vector workers_load; + for (int worker_id=0; worker_id < num_workers_; worker_id++) { + workers_load.push_back(0); + } + while (true) { + start = std::chrono::system_clock::now(); + reduced_time = 0; + for (int i=0; i < optimize_step; i++) { + reduced_time += reduce_max_workload(proc_new_keys_pools, workers_load); + } + std::chrono::duration interval = std::chrono::system_clock::now() - start; + consumed_time = int(interval.count()); + if (reduced_time <= consumed_time) break; + } + + pools_lock->lock(); + // step 4: distribute local-served keys to workers + std::vector > worker_keys_stat; + for (int worker_id; worker_id < num_workers_; worker_id++) { + std::vector keys_stat{0, 0}; + worker_keys_stat.push_back(keys_stat); + worker_num_keys_assigned_[worker_id].push(0); + } + int local_worker_id = 0; + int worker_id = 0; + for (int proc_id=0; proc_id < num_procs_; proc_id++) { + int num_local_workers = proc_worker_map_[proc_id].size(); + for (int key_type=0; key_type < 2; key_type++) { + for (auto& split_keys : proc_new_keys_pools[proc_id][key_type]) { + int num_keys = split_keys.second.size(); + for (int i=0; i < num_keys; i++) { + local_worker_id = (i + proc_worker_offset_[proc_id]) % num_local_workers; + worker_id = proc_worker_map_[proc_id][local_worker_id]; + worker_keys_pools_[worker_id][splits_[split_keys.first].get_sn()].push_back(split_keys.second.back()); + worker_keys_stat[worker_id][key_type]++; + split_keys.second.pop_back(); + } + proc_worker_offset_[proc_id] = local_worker_id + 1; + } + } + } + + // step 5: distribute non-local-served keys + for (auto& split_keys : non_local_served_keys_) { + int num_keys = split_keys.second.size(); + for (int i=0; i < num_keys; i++) { + worker_id = (i + non_local_offset_) % num_workers_; + worker_keys_pools_[worker_id][splits_[split_keys.first].get_sn()].push_back(split_keys.second.back()); + worker_keys_stat[worker_id][1]++; + proc_keys_stat_[work_info_.get_process_id(worker_id)][1]++; + split_keys.second.pop_back(); + } + non_local_offset_ = worker_id + 1; + } + + // step 6: count worker-level assigned keys + for (int worker_id; worker_id < num_workers_; worker_id++) { + for (int key_type=0; key_type < 2; key_type++) { + worker_num_keys_assigned_[worker_id].back() += worker_keys_stat[worker_id][key_type]; + } + } + + /* visualize worker stat + for (int worker_id; worker_idunlock(); + batch_keys_.clear(); + + if (reply) { + freeReplyObject(reply); + } +} + +} // namespace husky + +#endif diff --git a/master/redis_assigner.hpp b/master/redis_assigner.hpp new file mode 100644 index 0000000..4a6c91f --- /dev/null +++ b/master/redis_assigner.hpp @@ -0,0 +1,167 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef WITH_REDIS + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "core/context.hpp" +#include "hiredis/hiredis.h" +#include "io/input/redis_split.hpp" + +#define redisCmd(context, ...) static_cast(redisCommand(context, __VA_ARGS__)) + +namespace husky { + +using io::RedisSplit; +using io::RedisSplitGroup; +using io::RedisRangeKey; + +class RedisSplitAssigner { +public: + RedisSplitAssigner(); + virtual ~RedisSplitAssigner(); + void set_auth(const std::string&); + void reset_auth(); + +private: + // [proc_id][local, non_local]{split_id : [keys]} + typedef std::vector > > > PROC_KEYS_POOLS; + // [worker_id]{split_id : [keys]} + typedef std::vector > > WORKER_KEYS_POOLS; + +private: + bool load_parameters(); + + void master_redis_req_handler(); + void master_redis_qry_req_handler(); + void master_redis_req_end_handler(); + void master_setup_handler(); + + bool create_redis_info(); + bool refresh_splits_info(); + void create_husky_info(); + void create_split_proc_map(); + void create_redis_con_pool(); + void create_schedule_thread(); + void reset_default_states(); + + void answer_tid_best_keys(int global_tid, std::vector >& ret); + void answer_masters_info(std::map& redis_masters_info); + void answer_splits_info(std::map& redis_splits_info); + void receive_end(int global_tid, int num_received_keys); + + void import_all_pattern_keys(); + void load_batch_keys(); + void schedule_batch_keys(std::mutex * pools_lock); + void load_schedule(); + + uint64_t reduce_max_workload(PROC_KEYS_POOLS& proc_new_keys_pools, std::vector& workers_load); + + std::string parse_host(const std::string& hostname); + uint16_t gen_slot_crc16(const char *buf, int len); + +private: + // batch / streaming + std::vector > all_keys_; + std::vector batch_keys_; + int num_keys_amount_ = 0; + int num_keys_batched_ = 0; + int batch_size_; + bool if_streaming_mode_ = false; + std::vector > worker_num_keys_assigned_; + + // stop condition + bool is_dynamic_imported_ = false; + bool is_pattern_batched_ = false; + bool is_file_imported_ = false; + bool if_all_keys_scheduled_ = false; + bool if_all_keys_fetched_ = false; + bool if_scheduler_stopped_ = false; + int num_keys_fetched_ = 0; + std::vector worker_num_fetched_keys_; + std::vector worker_task_status_; + + // husky cluster info + WorkerInfo work_info_; + int num_procs_; + int num_workers_; + std::vector > proc_worker_map_; + + // local keys assignment + int num_slots_per_group_; + std::map splits_; + std::map split_groups_; + std::vector sorted_split_group_name_; + + // non-local/heavy keys assignment + std::map > non_local_served_keys_; + int key_split_size_ = 0; + + // keys from file + std::string keys_path_; + std::ifstream keys_file_; + int cur_pos_ = 0; + + // keys from pattern + bool is_pattern_imported_ = false; + std::string keys_pattern_ = ""; + + // keys from Redis List + std::string keys_list_; + RedisSplit keys_list_master_; + bool if_found_keys_list_ = false; + int cur_start_ = 0; + + // workload balance optimization, in microseconds + int local_served_latency_ = 100; + int non_local_served_latency_ = 100; + std::map > keys_latency_map_; + std::vector procs_load_; + std::vector > proc_keys_stat_; + std::map split_proc_map_; + + // worker-level task assignment + int num_keys_scheduled_ = 0; + WORKER_KEYS_POOLS worker_keys_pools_; + std::mutex worker_pools_mutex_; + std::thread scheduler_; + std::vector proc_worker_offset_; + int non_local_offset_ = 0; + + // miscellaneous + std::string ip_; + int port_; + struct timeval timeout_ = {1, 500000}; + bool need_auth_ = false; + std::string password_; + std::map cons_; + unsigned seed_; +}; + +} // namespace husky + +#endif diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5798331..b2f53f1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,17 @@ target_link_libraries(TestMongoDBOutputFormat ${husky}) target_link_libraries(TestMongoDBOutputFormat ${HUSKY_EXTERNAL_LIB}) husky_default_properties(TestMongoDBOutputFormat) +# TestRedisOutputFormat +add_executable(TestRedisOutputFormat test-redis-outputformat.cpp) +target_link_libraries(TestRedisOutputFormat ${husky}) +target_link_libraries(TestRedisOutputFormat ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(TestRedisOutputFormat) + +# TestRedisInputOutputFormat +add_executable(TestRedisInputOutputFormat test-redis-input-outputformat.cpp) +target_link_libraries(TestRedisInputOutputFormat ${husky}) +target_link_libraries(TestRedisInputOutputFormat ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(TestRedisInputOutputFormat) # TestMongoToES add_executable(TestMongoToES test-mongo-to-es.cpp) diff --git a/test/test-redis-input-outputformat.cpp b/test/test-redis-input-outputformat.cpp new file mode 100644 index 0000000..cca447a --- /dev/null +++ b/test/test-redis-input-outputformat.cpp @@ -0,0 +1,100 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "core/engine.hpp" +#include "io/input/inputformat_store.hpp" +#include "io/output/redis_outputformat.hpp" + +#include "hiredis/hiredis.h" +#include "boost/property_tree/ptree.hpp" +#include "boost/property_tree/json_parser.hpp" + +namespace pt = boost::property_tree; +namespace hi = husky::io; + +void test() { + auto& inputformat = hi::InputFormatStore::create_redis_inputformat(); + inputformat.set_server(); + // inputformat.set_auth(pwd); + + hi::RedisOutputFormat outputformat; + outputformat.set_server(); + // outputformat.set_auth(pwd); + + std::string sfx("_x"); + + auto read_and_write = [&](hi::RedisInputFormat::RecordT& record_pair) { + auto datatype = record_pair.first; + if (hi::RedisInputFormat::RedisDataType::Null == datatype) { + husky::LOG_I << "waiting for keys"; + } else { + pt::ptree reader; + std::stringstream jsonstream; + jsonstream << record_pair.second; + try { + pt::read_json(jsonstream, reader); + } + catch (pt::json_parser::json_parser_error) { + husky::LOG_E << "json_parser_error:"; + husky::LOG_E << record_pair.second; + return; + } + const auto& key = reader.begin()->first; + switch (datatype) { + case hi::RedisInputFormat::RedisDataType::String: + { + outputformat.commit(key + sfx, reader.begin()->second.get_value()); + } break; + case hi::RedisInputFormat::RedisDataType::Hash: + { + std::map map_data; + for (auto& kv : reader.begin()->second) { + map_data[kv.first] = kv.second.get_value(); + } + outputformat.commit(key + sfx, map_data); + } break; + case hi::RedisInputFormat::RedisDataType::List: + { + // for Redis List, commit() performs as creating and/or appending list elements + std::vector vec_data; + for (auto& kv : reader.begin()->second) { + vec_data.push_back(kv.second.get_value()); + } + outputformat.commit(key + sfx, vec_data); + } break; + default: + husky::LOG_E << "undefined data structure"; + break; + } + } + }; + + husky::load(inputformat, read_and_write); + outputformat.flush_all(); + + husky::LOG_I << "Done"; +} + +int main(int argc, char** argv) { + if (!husky::init_with_args(argc, argv, {"redis_hostname", "redis_port", "redis_keys_pattern"})) + return 1; + husky::run_job(test); + return 0; +} diff --git a/test/test-redis-outputformat.cpp b/test/test-redis-outputformat.cpp new file mode 100644 index 0000000..88db946 --- /dev/null +++ b/test/test-redis-outputformat.cpp @@ -0,0 +1,110 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "boost/tokenizer.hpp" +#include "mongo/bson/bson.h" +#include "mongo/client/dbclient.h" + +#include "core/engine.hpp" +#include "io/input/inputformat_store.hpp" +#include "io/output/redis_outputformat.hpp" + +#include "boost/property_tree/ptree.hpp" +#include "boost/property_tree/json_parser.hpp" + +namespace pt = boost::property_tree; + +void test() { + std::string server = husky::Context::get_param("mongo_server"); + std::string db = husky::Context::get_param("mongo_db"); + std::string collection = husky::Context::get_param("mongo_collection"); + // std::string user = husky::Context::get_param("mongo_user"); + // std::string pwd = husky::Context::get_param("mongo_pwd"); + + auto& inputformat = husky::io::InputFormatStore::create_mongodb_inputformat(); + inputformat.set_server(server); + inputformat.set_ns(db, collection); + // inputformat.set_auth(user, pwd); + + + husky::io::RedisOutputFormat outputformat; + outputformat.set_server(); + // outputformat.set_auth(pwd); + + const char * field_names[] = {"md5", "title", "url", "id", "content"}; + int length_field_names = sizeof(field_names) / sizeof(field_names[0]); + mongo::BSONElement fields[length_field_names]; + auto read_and_write = [&](std::string& chunk) { + mongo::BSONObj o = mongo::fromjson(chunk); + o.getFields(length_field_names, field_names, fields); + + /* commit string + */ + auto key = fields[0].toString(false, true); + key = key.substr(1, key.size()-2); + pt::ptree json_value; + for (int i=1; i map_data; + std::string value; + for (int i=1; i vec_data; + std::string value; + for (int i=1; i