Skip to content

Commit

Permalink
[IO] Add Redis Connector
Browse files Browse the repository at this point in the history
Ref #287.
  • Loading branch information
gypleon committed Jun 28, 2017
1 parent d0566fa commit de7e539
Show file tree
Hide file tree
Showing 21 changed files with 2,942 additions and 1 deletion.
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ if(MONGOCLIENT_FOUND)
list(APPEND HUSKY_EXTERNAL_DEFINITION ${MONGOCLIENT_DEFINITION})
endif(MONGOCLIENT_FOUND)

# Redis
if(REDISCLIENT_FOUND)
list(APPEND HUSKY_EXTERNAL_INCLUDE ${REDISCLIENT_INCLUDE_DIR})
list(APPEND HUSKY_EXTERNAL_LIB ${REDISCLIENT_LIBRARY})
list(APPEND HUSKY_EXTERNAL_DEFINITION ${REDISCLIENT_DEFINITION})
endif(REDISCLIENT_FOUND)

# Thrift
if(THRIFT_FOUND)
list(APPEND HUSKY_EXTERNAL_INCLUDE ${THRIFT_INCLUDE_DIR})
Expand Down
59 changes: 59 additions & 0 deletions cmake/dep.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,65 @@ if(WITHOUT_MONGODB)
unset(MONGOCLIENT_FOUND)
message(STATUS "Not using MongoClient due to WITHOUT_MONGODB option")
endif(WITHOUT_MONGODB)

### Redis ###

if(REDISCLIENT_SEARCH_PATH)
find_path(REDISCLIENT_INCLUDE_DIR NAMES hiredis PATHS ${REDISCLIENT_SEARCH_PATH})
find_library(REDISCLIENT_LIBRARY NAMES hiredis PATHS ${REDISCLIENT_SEARCH_PATH})
else(REDISCLIENT_SEARCH_PATH)
find_path(REDISCLIENT_INCLUDE_DIR NAMES hiredis)
find_library(REDISCLIENT_LIBRARY NAMES hiredis)
endif(REDISCLIENT_SEARCH_PATH)
if(REDISCLIENT_INCLUDE_DIR AND REDISCLIENT_LIBRARY)
set(REDISCLIENT_FOUND true)
endif(REDISCLIENT_INCLUDE_DIR AND REDISCLIENT_LIBRARY)
if(REDISCLIENT_FOUND)
set(REDISCLIENT_DEFINITION "-DWITH_REDIS")
message (STATUS "Found Hiredis:")
message (STATUS " (Headers) ${REDISCLIENT_INCLUDE_DIR}")
message (STATUS " (Library) ${REDISCLIENT_LIBRARY}")
message (STATUS " (Definition) ${REDISCLIENT_DEFINITION}")
else(REDISCLIENT_FOUND)
if(WIN32)
message (STATUS "Redis and hiredis are currently not available on win32")
else(WIN32)
message (STATUS "hiredis will be included as a third party:")
include(ExternalProject)
set(THIRDPARTY_DIR ${PROJECT_SOURCE_DIR}/third_party)
if(NOT REDISCLIENT_INCLUDE_DIR OR NOT REDISCLIENT_LIBRARY)
set(REDIS_INSTALL "cp")
ExternalProject_Add(
hiredis
GIT_REPOSITORY "https://github.com/redis/hiredis"
GIT_TAG "v0.13.3"
PREFIX ${THIRDPARTY_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
# TODO: if remove "-pedantic" strict Warnings.
BUILD_COMMAND sed -i "s/ -pedantic//g" ${THIRDPARTY_DIR}/src/hiredis/Makefile COMMAND make dynamic COMMAND make static
BUILD_IN_SOURCE 1
INSTALL_COMMAND mkdir -p ${PROJECT_BINARY_DIR}/include/hiredis/adapters ${PROJECT_BINARY_DIR}/lib COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/hiredis.h ${PROJECT_BINARY_DIR}/include/hiredis/hiredis.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/read.h ${PROJECT_BINARY_DIR}/include/hiredis/read.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/sds.h ${PROJECT_BINARY_DIR}/include/hiredis/sds.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/async.h ${PROJECT_BINARY_DIR}/include/hiredis/async.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/ae.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/ae.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/libev.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/libev.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/adapters/libevent.h ${PROJECT_BINARY_DIR}/include/hiredis/adapters/libevent.h COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/libhiredis.so ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0.13 COMMAND ln -sf ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0.13 ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0 COMMAND ln -sf ${PROJECT_BINARY_DIR}/lib/libhiredis.so.0 ${PROJECT_BINARY_DIR}/lib/libhiredis.so COMMAND ${REDIS_INSTALL} ${THIRDPARTY_DIR}/src/hiredis/libhiredis.a ${PROJECT_BINARY_DIR}/lib/libhiredis.a
)
list(APPEND external_project_dependencies hiredis)
endif(NOT REDISCLIENT_INCLUDE_DIR OR NOT REDISCLIENT_LIBRARY)
set(REDISCLIENT_INCLUDE_DIR "${PROJECT_BINARY_DIR}/include/hiredis")
if(BUILD_SHARED_LIBRARY)
set(REDISCLIENT_LIBRARY "${PROJECT_BINARY_DIR}/lib/libhiredis.so")
else(BUILD_SHARED_LIBRARY)
set(REDISCLIENT_LIBRARY "${PROJECT_BINARY_DIR}/lib/libhiredis.a")
endif(BUILD_SHARED_LIBRARY)
message (STATUS " (Headers should be) ${REDISCLIENT_INCLUDE_DIR}")
message (STATUS " (Library should be) ${REDISCLIENT_LIBRARY}")
set(REDISCLIENT_FOUND true)
set(REDISCLIENT_DEFINITION "-DWITH_REDIS")
endif(WIN32)
endif(REDISCLIENT_FOUND)
if(WITHOUT_REDIS)
unset(REDISCLIENT_FOUND)
unset(REDISCLIENT_DEFINITION)
message(STATUS "Not using Hiredis due to WITHOUT_REDIS option")
endif(WITHOUT_REDIS)

### RT ###

Expand Down
3 changes: 3 additions & 0 deletions core/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const uint32_t TYPE_KAFKA_REQ = 0xfa091343;
const uint32_t TYPE_KAFKA_END_REQ = 0xfa091344;
const uint32_t TYPE_MONGODB_REQ = 0xfa091388;
const uint32_t TYPE_MONGODB_END_REQ = 0xfa091389;
const uint32_t TYPE_REDIS_REQ = 0xfa0913c8;
const uint32_t TYPE_REDIS_QRY_REQ = 0xfa0913c9;
const uint32_t TYPE_REDIS_END_REQ = 0xfa0913ca;
const uint32_t TYPE_LOCAL_BLK_REQ = 0xfa0e12a2;
const uint32_t TYPE_ORC_BLK_REQ = 0xfa2e32a1;
const uint32_t TYPE_STOP_ASYNC_REQ = 0xf89d74b4;
Expand Down
6 changes: 6 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ target_link_libraries(WordCountFlume ${husky})
target_link_libraries(WordCountFlume ${HUSKY_EXTERNAL_LIB})
husky_default_properties(WordCountFlume)

# WordCountRedis
add_executable(WordCountRedis wc_mr_redis.cpp)
target_link_libraries(WordCountRedis ${husky})
target_link_libraries(WordCountRedis ${HUSKY_EXTERNAL_LIB})
husky_default_properties(WordCountRedis)

# WordCountORC
add_executable(WordCountORC wc_mr_orc.cpp)
target_link_libraries(WordCountORC ${husky})
Expand Down
160 changes: 160 additions & 0 deletions examples/wc_mr_redis.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <set>
#include <string>
#include <utility>
#include <vector>


#include "boost/property_tree/json_parser.hpp"
#include "boost/property_tree/ptree.hpp"
#include "boost/tokenizer.hpp"
#include "hiredis/hiredis.h"

#include "base/serialization.hpp"
#include "core/engine.hpp"
#include "io/input/inputformat_store.hpp"
#include "io/output/redis_outputformat.hpp"
#include "lib/aggregator_factory.hpp"

namespace pt = boost::property_tree;

class Word {
public:
using KeyT = std::string;

Word() = default;
explicit Word(const KeyT& w) : word(w) {}
const KeyT& id() const { return word; }

KeyT word;
int count = 0;
};

bool operator<(const std::pair<int, std::string>& a, const std::pair<int, std::string>& b) {
return a.first == b.first ? a.second < b.second : a.first < b.first;
}

void wc() {
auto& inputformat = husky::io::InputFormatStore::create_redis_inputformat();
inputformat.set_server();

husky::io::RedisOutputFormat outputformat;
outputformat.set_server();

auto& word_list = husky::ObjListStore::create_objlist<Word>();
auto& ch = husky::ChannelStore::create_push_combined_channel<int, husky::SumCombiner<int>>(inputformat, word_list);

auto parse_wc = [&](husky::io::RedisInputFormat::RecordT& record_pair) {
auto datatype = record_pair.first;
switch (datatype) {
case husky::io::RedisInputFormat::RedisDataType::String:
{
pt::ptree reader, content_reader;
std::stringstream jsonstream, content_stream;
jsonstream << record_pair.second;
try {
pt::read_json(jsonstream, reader);
}
catch (pt::json_parser::json_parser_error) {
husky::LOG_E << "json_parser_error: " << record_pair.second;
return;
}

content_stream << reader.begin()->second.get_value<std::string>();
try {
pt::read_json(content_stream, content_reader);
std::string content = content_reader.get<std::string>("content");
boost::char_separator<char> sep(" \t");
boost::tokenizer<boost::char_separator<char>> tok(content, sep);
for (auto& w : tok) {
ch.push(1, w);
}
}
catch (pt::ptree_bad_path) {
husky::LOG_E << "invalid content field";
return;
}
catch (pt::json_parser::json_parser_error) {
husky::LOG_E << "json_parser_error:" << reader.begin()->second.get_value<std::string>();
return;
}
} break;
case husky::io::RedisInputFormat::RedisDataType::List:
case husky::io::RedisInputFormat::RedisDataType::Hash:
break;
case husky::io::RedisInputFormat::RedisDataType::Null:
// waiting for keys
// husky::LOG_I << "waiting more keys";
break;
default:
husky::LOG_E << "undefined data structure";
return;
}
};

husky::load(inputformat, parse_wc);

// Show topk words.
const int kMaxNum = 10;
typedef std::set<std::pair<int, std::string>> TopKPairs;
auto add_to_topk = [](TopKPairs& pairs, const std::pair<int, std::string>& p) {
if (pairs.size() == kMaxNum && *pairs.begin() < p)
pairs.erase(pairs.begin());
if (pairs.size() < kMaxNum)
pairs.insert(p);
};
husky::lib::Aggregator<TopKPairs> unique_topk(
TopKPairs(),
[add_to_topk](TopKPairs& a, const TopKPairs& b) {
for (auto& i : b)
add_to_topk(a, i);
},
[](TopKPairs& a) { a.clear(); },
[add_to_topk](husky::base::BinStream& in, TopKPairs& pairs) {
pairs.clear();
for (size_t n = husky::base::deser<size_t>(in); n--;)
add_to_topk(pairs, husky::base::deser<std::pair<int, std::string>>(in));
},
[](husky::base::BinStream& out, const TopKPairs& pairs) {
out << pairs.size();
for (auto& p : pairs)
out << p;
});

husky::list_execute(word_list, [&ch, &unique_topk, add_to_topk](Word& word) {
unique_topk.update(add_to_topk, std::make_pair(ch.get(word), word.id()));
});

husky::lib::AggregatorFactory::sync();

if (husky::Context::get_global_tid() == 0) {
for (auto& i : unique_topk.get_value())
husky::LOG_I << i.second << " " << i.first;
}

/* Output result to Redis as a Hash table
std::string result_key("WordCountResult");
std::map<std::string, int> result_map;
outputformat.commit(result_key, result_map);
*/
}

int main(int argc, char** argv) {
if (!husky::init_with_args(argc, argv, {"redis_hostname", "redis_port", "redis_keys_pattern"}))
return 1;
husky::run_job(wc);
return 0;
}
5 changes: 5 additions & 0 deletions io/input/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ if(MONGOCLIENT_FOUND)
list(APPEND io-input-src-files ${io-input-mongo-src-files})
endif(MONGOCLIENT_FOUND)

if(REDISCLIENT_FOUND)
file(GLOB io-input-redis-src-files redis_split.cpp redis_inputformat.cpp)
list(APPEND io-input-src-files ${io-input-redis-src-files})
endif(REDISCLIENT_FOUND)

if(THRIFT_FOUND)
file(GLOB io-input-flume-src-files
flume_connector/flume_types.cpp
Expand Down
11 changes: 11 additions & 0 deletions io/input/inputformat_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ MongoDBInputFormat& InputFormatStore::create_mongodb_inputformat() {
}
#endif

#ifdef WITH_REDIS
RedisInputFormat& InputFormatStore::create_redis_inputformat() {
InputFormatMap& inputformat_map = get_inputformat_map();
int id = g_gen_inputformat_id++;
ASSERT_MSG(inputformat_map.find(id) == inputformat_map.end(), "Should not be reached");
auto* redis_input_format = new RedisInputFormat();
inputformat_map.insert({id, redis_input_format});
return *redis_input_format;
}
#endif

ElasticsearchInputFormat& InputFormatStore::create_elasticsearch_inputformat() {
InputFormatMap& inputformat_map = get_inputformat_map();
int id = g_gen_inputformat_id++;
Expand Down
6 changes: 6 additions & 0 deletions io/input/inputformat_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#ifdef WITH_MONGODB
#include "io/input/mongodb_inputformat.hpp"
#endif
#ifdef WITH_REDIS
#include "io/input/redis_inputformat.hpp"
#endif
#include "io/input/elasticsearch_inputformat.hpp"
#include "io/input/separator_inputformat.hpp"
#include "io/input/xml_inputformat.hpp"
Expand Down Expand Up @@ -53,6 +56,9 @@ class InputFormatStore {
#endif
#ifdef WITH_MONGODB
static MongoDBInputFormat& create_mongodb_inputformat();
#endif
#ifdef WITH_REDIS
static RedisInputFormat& create_redis_inputformat();
#endif
static ElasticsearchInputFormat& create_elasticsearch_inputformat();

Expand Down
Loading

0 comments on commit de7e539

Please sign in to comment.