Skip to content

Commit

Permalink
Merge pull request #42 from cyberway/develop
Browse files Browse the repository at this point in the history
Merge develop to master
  • Loading branch information
afalaleev authored Aug 13, 2019
2 parents f7d9b59 + 2b2df4c commit af76410
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 97 deletions.
23 changes: 12 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
cmake_minimum_required(VERSION 2.8)

project(cyber-notifier)

SET(BOOST_ROOT "~/opt/boost")
FIND_PACKAGE(Boost 1.67 REQUIRED COMPONENTS
system
iostreams)
if(APPLE)
include_directories(/usr/local/include)
link_directories(/usr/local/lib)
Expand All @@ -12,24 +15,20 @@ endif(APPLE)
# Uncomment to have the executable moved to 'build' instead of their respective 'build/xxx' directories
#set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR})

add_definitions(-std=c++14)
add_definitions(-std=c++17)
add_definitions(-DNATS_HAS_STREAMING)
add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY)
#add_definitions(-DNATS_CONN_STATUS_NO_PREFIX)

# Platform specific settings
if(UNIX)
# include(GNUInstallDirs)
if(NOT APPLE)
set(NATS_EXTRA_LIB "rt")
endif()
endif(UNIX)

set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads)

set(outname "cyber-notifier")
if(APPLE)
set(PROTOBUF_LIB "/usr/local/nats.c/pbuf/lib/darwin/libprotobuf-c.a")
else(APPLE)
set(PROTOBUF_LIB "/opt/nats.c-1.8.0/pbuf/lib/linux/libprotobuf-c.so")
set(PROTOBUF_LIB "/opt/nats.c/pbuf/lib/linux/libprotobuf-c.so")
endif(APPLE)

# Build the executable
Expand All @@ -42,3 +41,5 @@ else(APPLE)
target_link_libraries(${outname} nats ${NATS_EXTRA_LIB} ${PROTOBUF_LIB})
endif(APPLE)

target_link_libraries(${outname} Threads::Threads)

5 changes: 3 additions & 2 deletions Docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ ARG branch=master
ARG compiletype=RelWithDebInfo

RUN cd /opt && wget https://github.com/nats-io/cnats/archive/v1.8.0.tar.gz && tar -xzf v1.8.0.tar.gz \
&& ln -s /opt/nats.c-1.8.0 /opt/nats.c \
&& cd nats.c-1.8.0 && mkdir build && cd ./build && cmake -DCMAKE_BUILD_TYPE=$compiletype .. && make && make install

RUN echo "/opt/cnats-1.8.0/pbuf/lib/linux/" > /etc/ld.so.conf.d/protobuf-c.conf && ldconfig

ADD https://api.github.com/repos/GolosChain/cyberway.notifier/git/refs/heads/$branch /etc/version.json
ADD https://api.github.com/repos/cyberway/cyberway.notifier/git/refs/heads/$branch /etc/version.json

RUN cd /opt && git clone -b $branch https://github.com/GolosChain/cyberway.notifier.git --recursive
RUN cd /opt && git clone -b $branch https://github.com/cyberway/cyberway.notifier.git --recursive

RUN cd /opt/cyberway.notifier && mkdir build && cd build \
&& echo "$branch:$(git rev-parse HEAD)" > /etc/cyberway-notifier-version \
Expand Down
257 changes: 173 additions & 84 deletions notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,36 @@
#include <iostream>
#include <memory>
#include <signal.h>
#include <fstream>

static const char* usage =
"-txt text to send (default is 'hello')\n";
#include <map>
#include <tuple>
#include <vector>
#include <mutex>

struct myPubMsgInfo {
const char* payload;
int size;
char ID[30];
};
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>

static volatile bool done = false;
// natsOptions* opts = NULL;
// const char* cluster = "cyberway";
// const char* clientID = "notifier";
// const char* subj = "foo";
// const char* txt = "hello";
const std::string backup_file = "/queue/backup.txt";
const std::string DEFAULT_SOCKET_NAME = "/queue/msg.sock";

boost::asio::io_service io_service;
boost::asio::local::stream_protocol::endpoint ep(DEFAULT_SOCKET_NAME);
boost::asio::local::stream_protocol::socket socket_stream(io_service);

struct message final {
uint64_t index;
std::string subject;
std::string data;
}; // struct message

using message_map = std::map<uint64_t, message>;

message_map msgs_queue;
std::mutex msgs_mutex; // Protect queue from different threads

std::string get_subject(const std::string& data) {
std::string subject;
static const auto start = "{\"msg_channel\":\""; // ok, it's ugly. TODO: ?parse json
Expand All @@ -44,43 +51,104 @@ std::string get_subject(const std::string& data) {
return subject;
}

static void _publish_ack_cb(const char* guid, const char* error, void* closure) {
std::unique_ptr<message> msg(static_cast<message*>(closure));
// TODO: delete object from waiting list, so we can check if some object didn't published for a long time
//std::cout << "#Ack#, " << guid << std::endl;
// myPubMsgInfo* pubMsg = (myPubMsgInfo*)closure;
// printf("Ack handler for message ID=%s Data=%.*s GUID=%s - ", pubMsg->ID, pubMsg->size, pubMsg->payload, guid);
if (error != NULL) {
std::cout << "Error: " << error << std::endl;
done = true; // TODO: locking
message_map fill_backup_msgs() {
message_map queue;

std::ifstream backup(backup_file);
if (!backup.is_open()) {
return queue;
}

if (backup.tellg() == std::ifstream::traits_type::eof()) {
return queue;
}

for (uint64_t i = 0; ; ++i) {
message msg;
if (!std::getline(backup, msg.subject) || !std::getline(backup, msg.data)) {
break;
}
queue.emplace(i, std::move(msg));
}
// free(pubMsg); // This is a good place to free the pubMsg info since we no longer need it
// Notify the main thread that we are done. This is not the proper way and you should use some locking.
// done = true;

backup.close();
return queue;
}

static void sigusr1_handler(int signum) {
print = !print;
}

void create_backup_file() {
std::fstream backup(backup_file, std::ios::out | std::ios::app);
if (backup.is_open()) {
for (auto& obj : msgs_queue) {
backup << obj.second.subject << std::endl;
backup << obj.second.data << std::endl;
}
}
backup.close();
}

static void sig_int_term_handler(int signum) {
if (signum == SIGINT)
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
else if (signum == SIGTERM)
std::cout << "Termination signal (" << signum << ") received." << std::endl;
if (signum == SIGINT) {
std::cerr << "Interrupt signal (" << signum << ") received." << std::endl;
} else if (signum == SIGTERM) {
std::cerr << "Termination signal (" << signum << ") received." << std::endl;
}
done = true;
}

static void _nats_publish_ack_cb(const char*, const char* error, void* closure) {
std::lock_guard<std::mutex> guard(msgs_mutex);

auto index = (uint64_t)closure;
if (error != NULL) {
std::cerr << "Nats send error: " << error << std::endl;
done = true;
} else {
msgs_queue.erase(index);
}
}

static void _nats_connection_lost_cb(stanConnection*, const char* errTxt, void*) {
std::cerr << "Connection lost: " << errTxt << std::endl;
}

static natsStatus send_nats_message(stanConnection* sc, stanConnOptions* connOpts, const message& msg) {
natsStatus s = NATS_OK;

for (int i = 0; i < 24 * 1000; ++i) {
if (async) {
s = stanConnection_PublishAsync(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size(), _nats_publish_ack_cb, (void*)msg.index);
} else {
s = stanConnection_Publish(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size());
}

if (s == NATS_CONNECTION_CLOSED) {
s = stanConnection_Connect(&sc, cluster, clientID, connOpts);
}

if (s == NATS_TIMEOUT) {
nats_Sleep(50);
continue;
}
break;
}

return s;
}

int main(int argc, char** argv) {
opts = parseArgs(argc, argv, usage);
std::cout << "Sending pipe messages" << std::endl;
opts = parseArgs(argc, argv, "");
std::cerr << "Sending socket messages" << std::endl;

signal(SIGUSR1, sigusr1_handler);
signal(SIGINT, sig_int_term_handler);
signal(SIGINT, sig_int_term_handler);
signal(SIGTERM, sig_int_term_handler);

// Now create STAN Connection Options and set the NATS Options.
stanConnOptions* connOpts;
stanConnOptions* connOpts = nullptr;
natsStatus s = stanConnOptions_Create(&connOpts);
if (s == NATS_OK) {
s = stanConnOptions_SetNATSOptions(connOpts, opts);
Expand All @@ -91,78 +159,99 @@ int main(int argc, char** argv) {
if (s == NATS_OK) {
s = stanConnOptions_SetPubAckWait(connOpts, 120 * 1000 /* ms */);
}
// Create the Connection using the STAN Connection Options
stanConnection* sc;
if (s == NATS_OK) {
s = stanConnOptions_SetConnectionLostHandler(connOpts, _nats_connection_lost_cb, nullptr);
}
stanConnection* sc = nullptr;
if (s == NATS_OK) {
s = stanConnection_Connect(&sc, cluster, clientID, connOpts);
}
// Once the connection is created, we can destroy the options
natsOptions_Destroy(opts);
stanConnOptions_Destroy(connOpts);

bool is_warn = false;
while (s == NATS_OK) {
auto msg = std::make_unique<message>();
std::getline(std::cin, msg->data);
if (done) {
if (msg->data.size()) {
if (!is_warn) {
std::cerr << "WARNING! Pipe hasn't empty." << std::endl;
is_warn = true;
}
} else
break;
auto backup_queue = fill_backup_msgs();
for (const auto& msg: backup_queue) {
s = send_nats_message(sc, connOpts, msg.second);
if (s != NATS_OK) {
return 2;
}
if (std::cin.eof()) {
nats_Sleep(50);
continue;
} else {
if (print) {
std::cout << msg->data << std::endl;
}
}
std::remove(backup_file.c_str());

try {
socket_stream.connect(ep);
if (socket_stream.native_non_blocking()) {
socket_stream.native_non_blocking(false);
}
msg->subject = get_subject(msg->data);

// TODO: create object to check in ack
// TODO: cpp
// myPubMsgInfo* pubMsg = (myPubMsgInfo*)calloc(1, sizeof(myPubMsgInfo));
// if (pubMsg == NULL)
// s = NATS_NO_MEMORY;
// if (s == NATS_OK) {
// pubMsg->payload = txt;
// pubMsg->size = (int)strlen(txt);
// snprintf(pubMsg->ID, sizeof(pubMsg->ID), "%s:%d", "xyz", 234);
// }
// if (s == NATS_OK) {
// s = stanConnection_PublishAsync(sc, subj, pubMsg->payload, pubMsg->size, _pubAckHandler, (void*)pubMsg);
for (int i = 0; i < 24 * 1000; ++i) {
if (async) {
s = stanConnection_PublishAsync(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size(), _publish_ack_cb, msg.get());
} else {
s = stanConnection_Publish(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size());
}
if (s == NATS_TIMEOUT) {
nats_Sleep(50);
continue;
}
} catch (const boost::system::system_error &err) {
std::cout << DEFAULT_SOCKET_NAME << std::endl;
std::cerr << "Failed to connect to notifier socket: " << err.what() << std::endl;
return 1;
}

uint64_t msg_index = backup_queue.size();
boost::asio::streambuf socket_buf;
boost::system::error_code error;

for (;;) {
boost::asio::read_until(socket_stream, socket_buf, "\n", error);
if (error) {
// std::cerr << "Receive failed: " << error.message() << std::endl;
// nodeos shutdowns
break;
}

// Note that if this call fails, then we need to free the pubMsg object here since it won't be passed to the ack handler.
if (s == NATS_OK && async) {
msg.release();
message msg;
std::istream data_stream(&socket_buf);
std::getline(data_stream, msg.data);

// try {
// // json validating
// std::stringstream local_stream;
// local_stream << msg.data;
// boost::property_tree::ptree pt;
// boost::property_tree::read_json(local_stream, pt);
// } catch (...) {
// std::cerr << "Data error: " << msg.data << std::endl;
// throw;
// }

if (print) {
std::cout << msg.data << std::endl;
}

msg.index = msg_index++;
msg.subject = get_subject(msg.data);
std::pair<message_map::iterator, bool> result;
{
std::lock_guard<std::mutex> guard(msgs_mutex);
result = msgs_queue.emplace(msg.index, std::move(msg));
}

if (s == NATS_OK) {
s = send_nats_message(sc, connOpts, result.first->second);
if (s != NATS_OK || done) {
std::cerr << "Shutdown" << std::endl;
socket_stream.shutdown(socket_stream.shutdown_both, error);
} else if (!async) {
msgs_queue.erase(result.first->first);
}
}
}

if (s != NATS_OK) {
std::cout << "Error: " << s << " - " << natsStatus_GetText(s) << std::endl;
std::cerr << "Nats error: " << s << " - " << natsStatus_GetText(s) << std::endl;
nats_PrintLastErrorStack(stderr);
}

stanConnOptions_Destroy(connOpts);
stanConnection_Close(sc);
stanConnection_Destroy(sc);
// nats_Sleep(50); // To silence reports of memory still in-use with valgrind.
nats_Sleep(50); // To silence reports of memory still in-use with valgrind.
nats_Close();

if (s != NATS_OK) {
create_backup_file();
}

return 0;
}

0 comments on commit af76410

Please sign in to comment.