From f18d213ac92603c91a8435820dda9c4c9d8e7fc9 Mon Sep 17 00:00:00 2001 From: Semen Medvedev Date: Wed, 31 Jul 2019 12:54:02 +0700 Subject: [PATCH 1/5] Rename GolosChain to cyberway in links --- Docker/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Docker/Dockerfile b/Docker/Dockerfile index 172cc72..2645695 100644 --- a/Docker/Dockerfile +++ b/Docker/Dockerfile @@ -9,9 +9,9 @@ RUN cd /opt && wget https://github.com/nats-io/cnats/archive/v1.8.0.tar.gz && ta RUN echo "/opt/cnats-1.8.0/pbuf/lib/linux/" > /etc/ld.so.conf.d/protobuf-c.conf && ldconfig -ADD https://api.github.com/repos/GolosChain/cyberway.notifier/git/refs/heads/$branch /etc/version.json +ADD https://api.github.com/repos/cyberway/cyberway.notifier/git/refs/heads/$branch /etc/version.json -RUN cd /opt && git clone -b $branch https://github.com/GolosChain/cyberway.notifier.git --recursive +RUN cd /opt && git clone -b $branch https://github.com/cyberway/cyberway.notifier.git --recursive RUN cd /opt/cyberway.notifier && mkdir build && cd build \ && echo "$branch:$(git rev-parse HEAD)" > /etc/cyberway-notifier-version \ From c15acd4495a1e087213555d38aeb654b826623ad Mon Sep 17 00:00:00 2001 From: Pasha Date: Mon, 5 Aug 2019 10:54:34 +0300 Subject: [PATCH 2/5] Save unsent data #28 (#35) Save unsent data #28 --- notifier.cpp | 168 +++++++++++++++++++++++++++++++++++---------------- options.h | 1 + 2 files changed, 118 insertions(+), 51 deletions(-) diff --git a/notifier.cpp b/notifier.cpp index 734923f..a8cb56c 100644 --- a/notifier.cpp +++ b/notifier.cpp @@ -4,6 +4,10 @@ #include #include #include +#include +#include +#include +#include static const char* usage = "-txt text to send (default is 'hello')\n"; @@ -15,6 +19,7 @@ struct myPubMsgInfo { }; static volatile bool done = false; +const std::string backup_file = "/queue/backup.txt"; // natsOptions* opts = NULL; // const char* cluster = "cyberway"; // const char* clientID = "notifier"; @@ -27,6 +32,12 @@ struct message final { std::string data; }; // struct message + +stanConnOptions* connOpts = nullptr; + +std::vector bad_msgs_queue; +std::map msgs_queue; + std::string get_subject(const std::string& data) { std::string subject; static const auto start = "{\"msg_channel\":\""; // ok, it's ugly. TODO: ?parse json @@ -44,15 +55,38 @@ std::string get_subject(const std::string& data) { return subject; } +bool is_file_empty(std::ifstream& file) { + return file.tellg() == std::ifstream::traits_type::eof(); +} + +void fill_backup_msgs() { + std::ifstream backup(backup_file); + if (backup.is_open()) { + if (is_file_empty(backup)) + return; + + for (auto [i, line_data, line_subject] = std::tuple{0, "", ""}; + std::getline(backup, line_subject), std::getline(backup, line_data); i++) { + msgs_queue.insert({ i, {line_subject, line_data} }); + } + backup.close(); + std::remove(backup_file.c_str()); + } +} + static void _publish_ack_cb(const char* guid, const char* error, void* closure) { - std::unique_ptr msg(static_cast(closure)); // TODO: delete object from waiting list, so we can check if some object didn't published for a long time //std::cout << "#Ack#, " << guid << std::endl; // myPubMsgInfo* pubMsg = (myPubMsgInfo*)closure; // printf("Ack handler for message ID=%s Data=%.*s GUID=%s - ", pubMsg->ID, pubMsg->size, pubMsg->payload, guid); + if (error != NULL) { std::cout << "Error: " << error << std::endl; + bad_msgs_queue.push_back((uint64_t)closure); done = true; // TODO: locking + } else { + auto index = (uint64_t)closure; + msgs_queue.erase(index); } // free(pubMsg); // This is a good place to free the pubMsg info since we no longer need it // Notify the main thread that we are done. This is not the proper way and you should use some locking. @@ -63,6 +97,17 @@ static void sigusr1_handler(int signum) { print = !print; } +void create_backup_file() { + std::fstream backup(backup_file, std::ios::out | std::ios::app); + if (backup.is_open()) { + for (auto& obj : msgs_queue) { + backup << obj.second.subject << std::endl; + backup << obj.second.data << std::endl; + } + } + backup.close(); +} + static void sig_int_term_handler(int signum) { if (signum == SIGINT) std::cout << "Interrupt signal (" << signum << ") received." << std::endl; @@ -71,6 +116,10 @@ static void sig_int_term_handler(int signum) { done = true; } +static void connectionLostCB(stanConnection *sc, const char *errTxt, void *closure) { + std::cout << "Connection lost: " << errTxt << std::endl; +} + int main(int argc, char** argv) { opts = parseArgs(argc, argv, usage); std::cout << "Sending pipe messages" << std::endl; @@ -80,7 +129,7 @@ int main(int argc, char** argv) { signal(SIGTERM, sig_int_term_handler); // Now create STAN Connection Options and set the NATS Options. - stanConnOptions* connOpts; + natsStatus s = stanConnOptions_Create(&connOpts); if (s == NATS_OK) { s = stanConnOptions_SetNATSOptions(connOpts, opts); @@ -91,78 +140,95 @@ int main(int argc, char** argv) { if (s == NATS_OK) { s = stanConnOptions_SetPubAckWait(connOpts, 120 * 1000 /* ms */); } - // Create the Connection using the STAN Connection Options - stanConnection* sc; if (s == NATS_OK) { - s = stanConnection_Connect(&sc, cluster, clientID, connOpts); + s = stanConnOptions_SetConnectionLostHandler(connOpts, connectionLostCB, nullptr); + } + stanConnection* sc = nullptr; + if (s == NATS_OK) { + s = stanConnection_Connect(&sc, cluster, clientID, connOpts); } - // Once the connection is created, we can destroy the options + natsOptions_Destroy(opts); - stanConnOptions_Destroy(connOpts); - bool is_warn = false; - while (s == NATS_OK) { - auto msg = std::make_unique(); - std::getline(std::cin, msg->data); - if (done) { - if (msg->data.size()) { - if (!is_warn) { - std::cerr << "WARNING! Pipe hasn't empty." << std::endl; - is_warn = true; - } - } else - break; - } - if (std::cin.eof()) { - nats_Sleep(50); - continue; - } else { - if (print) { - std::cout << msg->data << std::endl; - } - } - msg->subject = get_subject(msg->data); - - // TODO: create object to check in ack - // TODO: cpp - // myPubMsgInfo* pubMsg = (myPubMsgInfo*)calloc(1, sizeof(myPubMsgInfo)); - // if (pubMsg == NULL) - // s = NATS_NO_MEMORY; - // if (s == NATS_OK) { - // pubMsg->payload = txt; - // pubMsg->size = (int)strlen(txt); - // snprintf(pubMsg->ID, sizeof(pubMsg->ID), "%s:%d", "xyz", 234); - // } - // if (s == NATS_OK) { - // s = stanConnection_PublishAsync(sc, subj, pubMsg->payload, pubMsg->size, _pubAckHandler, (void*)pubMsg); + auto lambda_send_message = [&](void* index, const message& msg) { for (int i = 0; i < 24 * 1000; ++i) { if (async) { - s = stanConnection_PublishAsync(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size(), _publish_ack_cb, msg.get()); + s = stanConnection_PublishAsync(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size(), _publish_ack_cb, index); } else { - s = stanConnection_Publish(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size()); + s = stanConnection_Publish(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size()); } + + if (s == NATS_CONNECTION_CLOSED) + s = stanConnection_Connect(&sc, cluster, clientID, connOpts); + if (s == NATS_TIMEOUT) { nats_Sleep(50); continue; } break; } + }; - // Note that if this call fails, then we need to free the pubMsg object here since it won't be passed to the ack handler. - if (s == NATS_OK && async) { - msg.release(); + fill_backup_msgs(); + for (const auto& item : msgs_queue) { + if (s != NATS_OK) + break; + lambda_send_message((void*)item.first, item.second); + } + + bool warn = false; + while (s == NATS_OK) { + while (bad_msgs_queue.size()) { + if (s != NATS_OK) + break; + + const auto& it = msgs_queue.find(bad_msgs_queue.back()); + lambda_send_message((void*)it->first, it->second); + bad_msgs_queue.pop_back(); + } + + std::string data; + std::getline(std::cin, data); + + if (done) { + if (data.size()) { + if (!warn) { + std::cerr << "WARNING! Pipe hasn't empty." << std::endl; + warn = true; + } + } else + break; } + if (std::cin.eof() && !msgs_queue.size()) { + nats_Sleep(50); + continue; + } else if (print) + std::cout << data << std::endl; + + uint64_t index = msgs_queue.size(); + auto [it, status] = msgs_queue.insert({ index, {get_subject(data), data} }); + const auto& msg = it->second; + + lambda_send_message((void*)index, msg); + + // Note that if this call fails, then we need to free the pubMsg object here since it won't be passed to the ack handler. + if (s == NATS_OK && !async) { + msgs_queue.erase(index); + } } + stanConnOptions_Destroy(connOpts); + stanConnection_Close(sc); + stanConnection_Destroy(sc); + nats_Sleep(50); // To silence reports of memory still in-use with valgrind. + nats_Close(); + if (s != NATS_OK) { std::cout << "Error: " << s << " - " << natsStatus_GetText(s) << std::endl; nats_PrintLastErrorStack(stderr); + create_backup_file(); } - stanConnection_Destroy(sc); - // nats_Sleep(50); // To silence reports of memory still in-use with valgrind. - nats_Close(); - return 0; } diff --git a/options.h b/options.h index 4d601ee..a5407f0 100644 --- a/options.h +++ b/options.h @@ -23,6 +23,7 @@ const char* subj = "foo"; const char* txt = "hello"; const char* name = "worker"; int64_t total = 1000000; +const uint32_t limit_check_conn = 10; volatile int64_t count = 0; volatile int64_t dropped = 0; From 5af0bcd59d0c3577923e9c5ee1f4d5f0ef3120c0 Mon Sep 17 00:00:00 2001 From: Pasha Date: Mon, 12 Aug 2019 07:37:35 +0300 Subject: [PATCH 3/5] Implement unix socket #36 (#37) Implement unix socket #36 --- CMakeLists.txt | 23 +++-- Docker/Dockerfile | 1 + notifier.cpp | 252 +++++++++++++++++++++++++--------------------- options.h | 3 +- 4 files changed, 149 insertions(+), 130 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2c18fd8..eca818f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,10 @@ cmake_minimum_required(VERSION 2.8) project(cyber-notifier) - +SET(BOOST_ROOT "~/opt/boost") +FIND_PACKAGE(Boost 1.67 REQUIRED COMPONENTS + system + iostreams) if(APPLE) include_directories(/usr/local/include) link_directories(/usr/local/lib) @@ -12,24 +15,20 @@ endif(APPLE) # Uncomment to have the executable moved to 'build' instead of their respective 'build/xxx' directories #set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}) -add_definitions(-std=c++14) +add_definitions(-std=c++17) add_definitions(-DNATS_HAS_STREAMING) +add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY) #add_definitions(-DNATS_CONN_STATUS_NO_PREFIX) -# Platform specific settings -if(UNIX) -# include(GNUInstallDirs) - if(NOT APPLE) - set(NATS_EXTRA_LIB "rt") - endif() -endif(UNIX) - +set(CMAKE_THREAD_PREFER_PTHREAD TRUE) +set(THREADS_PREFER_PTHREAD_FLAG TRUE) +find_package(Threads) set(outname "cyber-notifier") if(APPLE) set(PROTOBUF_LIB "/usr/local/nats.c/pbuf/lib/darwin/libprotobuf-c.a") else(APPLE) - set(PROTOBUF_LIB "/opt/nats.c-1.8.0/pbuf/lib/linux/libprotobuf-c.so") + set(PROTOBUF_LIB "/opt/nats.c/pbuf/lib/linux/libprotobuf-c.so") endif(APPLE) # Build the executable @@ -42,3 +41,5 @@ else(APPLE) target_link_libraries(${outname} nats ${NATS_EXTRA_LIB} ${PROTOBUF_LIB}) endif(APPLE) +target_link_libraries(${outname} Threads::Threads) + diff --git a/Docker/Dockerfile b/Docker/Dockerfile index 2645695..36cc46b 100644 --- a/Docker/Dockerfile +++ b/Docker/Dockerfile @@ -5,6 +5,7 @@ ARG branch=master ARG compiletype=RelWithDebInfo RUN cd /opt && wget https://github.com/nats-io/cnats/archive/v1.8.0.tar.gz && tar -xzf v1.8.0.tar.gz \ + && ln -s /opt/nats.c-1.8.0 /opt/nats.c \ && cd nats.c-1.8.0 && mkdir build && cd ./build && cmake -DCMAKE_BUILD_TYPE=$compiletype .. && make && make install RUN echo "/opt/cnats-1.8.0/pbuf/lib/linux/" > /etc/ld.so.conf.d/protobuf-c.conf && ldconfig diff --git a/notifier.cpp b/notifier.cpp index a8cb56c..8e53f83 100644 --- a/notifier.cpp +++ b/notifier.cpp @@ -5,38 +5,34 @@ #include #include #include + #include #include #include +#include -static const char* usage = - "-txt text to send (default is 'hello')\n"; - -struct myPubMsgInfo { - const char* payload; - int size; - char ID[30]; -}; +#include +#include +#include static volatile bool done = false; const std::string backup_file = "/queue/backup.txt"; -// natsOptions* opts = NULL; -// const char* cluster = "cyberway"; -// const char* clientID = "notifier"; -// const char* subj = "foo"; -// const char* txt = "hello"; +const std::string DEFAULT_SOCKET_NAME = "/queue/msg.sock"; +boost::asio::io_service io_service; +boost::asio::local::stream_protocol::endpoint ep(DEFAULT_SOCKET_NAME); +boost::asio::local::stream_protocol::socket socket_stream(io_service); struct message final { + uint64_t index; std::string subject; std::string data; }; // struct message +using message_map = std::map; -stanConnOptions* connOpts = nullptr; - -std::vector bad_msgs_queue; -std::map msgs_queue; +message_map msgs_queue; +std::mutex msgs_mutex; // Protect queue from different threads std::string get_subject(const std::string& data) { std::string subject; @@ -55,42 +51,28 @@ std::string get_subject(const std::string& data) { return subject; } -bool is_file_empty(std::ifstream& file) { - return file.tellg() == std::ifstream::traits_type::eof(); -} +message_map fill_backup_msgs() { + message_map queue; -void fill_backup_msgs() { std::ifstream backup(backup_file); - if (backup.is_open()) { - if (is_file_empty(backup)) - return; - - for (auto [i, line_data, line_subject] = std::tuple{0, "", ""}; - std::getline(backup, line_subject), std::getline(backup, line_data); i++) { - msgs_queue.insert({ i, {line_subject, line_data} }); - } - backup.close(); - std::remove(backup_file.c_str()); + if (!backup.is_open()) { + return queue; } -} -static void _publish_ack_cb(const char* guid, const char* error, void* closure) { - // TODO: delete object from waiting list, so we can check if some object didn't published for a long time - //std::cout << "#Ack#, " << guid << std::endl; - // myPubMsgInfo* pubMsg = (myPubMsgInfo*)closure; - // printf("Ack handler for message ID=%s Data=%.*s GUID=%s - ", pubMsg->ID, pubMsg->size, pubMsg->payload, guid); + if (backup.tellg() == std::ifstream::traits_type::eof()) { + return queue; + } - if (error != NULL) { - std::cout << "Error: " << error << std::endl; - bad_msgs_queue.push_back((uint64_t)closure); - done = true; // TODO: locking - } else { - auto index = (uint64_t)closure; - msgs_queue.erase(index); + for (uint64_t i = 0; ; ++i) { + message msg; + if (!std::getline(backup, msg.subject) || !std::getline(backup, msg.data)) { + break; + } + queue.emplace(i, std::move(msg)); } - // free(pubMsg); // This is a good place to free the pubMsg info since we no longer need it - // Notify the main thread that we are done. This is not the proper way and you should use some locking. - // done = true; + + backup.close(); + return queue; } static void sigusr1_handler(int signum) { @@ -109,27 +91,64 @@ void create_backup_file() { } static void sig_int_term_handler(int signum) { - if (signum == SIGINT) - std::cout << "Interrupt signal (" << signum << ") received." << std::endl; - else if (signum == SIGTERM) - std::cout << "Termination signal (" << signum << ") received." << std::endl; + if (signum == SIGINT) { + std::cerr << "Interrupt signal (" << signum << ") received." << std::endl; + } else if (signum == SIGTERM) { + std::cerr << "Termination signal (" << signum << ") received." << std::endl; + } done = true; } -static void connectionLostCB(stanConnection *sc, const char *errTxt, void *closure) { - std::cout << "Connection lost: " << errTxt << std::endl; +static void _nats_publish_ack_cb(const char*, const char* error, void* closure) { + std::lock_guard guard(msgs_mutex); + + auto index = (uint64_t)closure; + if (error != NULL) { + std::cerr << "Nats send error: " << error << std::endl; + done = true; + } else { + msgs_queue.erase(index); + } +} + +static void _nats_connection_lost_cb(stanConnection*, const char* errTxt, void*) { + std::cerr << "Connection lost: " << errTxt << std::endl; +} + +static natsStatus send_nats_message(stanConnection* sc, stanConnOptions* connOpts, const message& msg) { + natsStatus s = NATS_OK; + + for (int i = 0; i < 24 * 1000; ++i) { + if (async) { + s = stanConnection_PublishAsync(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size(), _nats_publish_ack_cb, (void*)msg.index); + } else { + s = stanConnection_Publish(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size()); + } + + if (s == NATS_CONNECTION_CLOSED) { + s = stanConnection_Connect(&sc, cluster, clientID, connOpts); + } + + if (s == NATS_TIMEOUT) { + nats_Sleep(50); + continue; + } + break; + } + + return s; } int main(int argc, char** argv) { - opts = parseArgs(argc, argv, usage); - std::cout << "Sending pipe messages" << std::endl; + opts = parseArgs(argc, argv, ""); + std::cerr << "Sending socket messages" << std::endl; signal(SIGUSR1, sigusr1_handler); - signal(SIGINT, sig_int_term_handler); + signal(SIGINT, sig_int_term_handler); signal(SIGTERM, sig_int_term_handler); // Now create STAN Connection Options and set the NATS Options. - + stanConnOptions* connOpts = nullptr; natsStatus s = stanConnOptions_Create(&connOpts); if (s == NATS_OK) { s = stanConnOptions_SetNATSOptions(connOpts, opts); @@ -141,83 +160,84 @@ int main(int argc, char** argv) { s = stanConnOptions_SetPubAckWait(connOpts, 120 * 1000 /* ms */); } if (s == NATS_OK) { - s = stanConnOptions_SetConnectionLostHandler(connOpts, connectionLostCB, nullptr); + s = stanConnOptions_SetConnectionLostHandler(connOpts, _nats_connection_lost_cb, nullptr); } stanConnection* sc = nullptr; if (s == NATS_OK) { - s = stanConnection_Connect(&sc, cluster, clientID, connOpts); + s = stanConnection_Connect(&sc, cluster, clientID, connOpts); } - natsOptions_Destroy(opts); - auto lambda_send_message = [&](void* index, const message& msg) { - for (int i = 0; i < 24 * 1000; ++i) { - if (async) { - s = stanConnection_PublishAsync(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size(), _publish_ack_cb, index); - } else { - s = stanConnection_Publish(sc, msg.subject.c_str(), msg.data.c_str(), msg.data.size()); - } - - if (s == NATS_CONNECTION_CLOSED) - s = stanConnection_Connect(&sc, cluster, clientID, connOpts); - - if (s == NATS_TIMEOUT) { - nats_Sleep(50); - continue; - } - break; + auto backup_queue = fill_backup_msgs(); + for (const auto& msg: backup_queue) { + s = send_nats_message(sc, connOpts, msg.second); + if (s != NATS_OK) { + return 2; } - }; + } + std::remove(backup_file.c_str()); - fill_backup_msgs(); - for (const auto& item : msgs_queue) { - if (s != NATS_OK) - break; - lambda_send_message((void*)item.first, item.second); + try { + socket_stream.connect(ep); + if (socket_stream.native_non_blocking()) { + socket_stream.native_non_blocking(false); + } + } catch (const boost::system::system_error &err) { + std::cout << DEFAULT_SOCKET_NAME << std::endl; + std::cerr << "Failed to connect to notifier socket: " << err.what() << std::endl; + return 1; } - bool warn = false; - while (s == NATS_OK) { - while (bad_msgs_queue.size()) { - if (s != NATS_OK) - break; + uint64_t msg_index = backup_queue.size(); + boost::asio::streambuf socket_buf; + boost::system::error_code error; - const auto& it = msgs_queue.find(bad_msgs_queue.back()); - lambda_send_message((void*)it->first, it->second); - bad_msgs_queue.pop_back(); + for (;;) { + boost::asio::read_until(socket_stream, socket_buf, "\n", error); + if (error) { + // std::cerr << "Receive failed: " << error.message() << std::endl; + // nodeos shutdowns + break; } - std::string data; - std::getline(std::cin, data); - - if (done) { - if (data.size()) { - if (!warn) { - std::cerr << "WARNING! Pipe hasn't empty." << std::endl; - warn = true; - } - } else - break; + message msg; + std::istream data_stream(&socket_buf); + std::getline(data_stream, msg.data); + + try { + // json validating + std::stringstream local_stream; + local_stream << msg.data; + boost::property_tree::ptree pt; + boost::property_tree::read_json(local_stream, pt); + } catch (...) { + std::cerr << "Data error: " << msg.data << std::endl; + throw; } - if (std::cin.eof() && !msgs_queue.size()) { - nats_Sleep(50); - continue; - } else if (print) - std::cout << data << std::endl; - - uint64_t index = msgs_queue.size(); - auto [it, status] = msgs_queue.insert({ index, {get_subject(data), data} }); - const auto& msg = it->second; - - lambda_send_message((void*)index, msg); + if (print) { + std::cout << msg.data << std::endl; + } - // Note that if this call fails, then we need to free the pubMsg object here since it won't be passed to the ack handler. - if (s == NATS_OK && !async) { - msgs_queue.erase(index); + msg.index = msg_index++; + msg.subject = get_subject(msg.data); + auto result = msgs_queue.emplace(msg.index, std::move(msg)); + if (s == NATS_OK) { + s = send_nats_message(sc, connOpts, result.first->second); + if (s != NATS_OK || done) { + std::cerr << "Shutdown" << std::endl; + socket_stream.shutdown(socket_stream.shutdown_both, error); + } else if (!async) { + msgs_queue.erase(result.first->first); + } } } + if (s != NATS_OK) { + std::cerr << "Nats error: " << s << " - " << natsStatus_GetText(s) << std::endl; + nats_PrintLastErrorStack(stderr); + } + stanConnOptions_Destroy(connOpts); stanConnection_Close(sc); stanConnection_Destroy(sc); @@ -225,8 +245,6 @@ int main(int argc, char** argv) { nats_Close(); if (s != NATS_OK) { - std::cout << "Error: " << s << " - " << natsStatus_GetText(s) << std::endl; - nats_PrintLastErrorStack(stderr); create_backup_file(); } diff --git a/options.h b/options.h index a5407f0..0c0af1e 100644 --- a/options.h +++ b/options.h @@ -23,13 +23,12 @@ const char* subj = "foo"; const char* txt = "hello"; const char* name = "worker"; int64_t total = 1000000; -const uint32_t limit_check_conn = 10; volatile int64_t count = 0; volatile int64_t dropped = 0; int64_t start = 0; volatile int64_t elapsed = 0; -bool print = false; +bool print = true; int64_t timeout = 10000; // 10 seconds. natsOptions* opts = NULL; From 8b9d372f3ff122fd04a8604bca85bb50173b7cff Mon Sep 17 00:00:00 2001 From: Andrew Falaleev Date: Mon, 12 Aug 2019 14:17:22 +0700 Subject: [PATCH 4/5] Add locking of queue. #36 --- notifier.cpp | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/notifier.cpp b/notifier.cpp index 8e53f83..862d2e3 100644 --- a/notifier.cpp +++ b/notifier.cpp @@ -204,16 +204,16 @@ int main(int argc, char** argv) { std::istream data_stream(&socket_buf); std::getline(data_stream, msg.data); - try { - // json validating - std::stringstream local_stream; - local_stream << msg.data; - boost::property_tree::ptree pt; - boost::property_tree::read_json(local_stream, pt); - } catch (...) { - std::cerr << "Data error: " << msg.data << std::endl; - throw; - } +// try { +// // json validating +// std::stringstream local_stream; +// local_stream << msg.data; +// boost::property_tree::ptree pt; +// boost::property_tree::read_json(local_stream, pt); +// } catch (...) { +// std::cerr << "Data error: " << msg.data << std::endl; +// throw; +// } if (print) { std::cout << msg.data << std::endl; @@ -221,7 +221,12 @@ int main(int argc, char** argv) { msg.index = msg_index++; msg.subject = get_subject(msg.data); - auto result = msgs_queue.emplace(msg.index, std::move(msg)); + std::pair result; + { + std::lock_guard guard(msgs_mutex); + result = msgs_queue.emplace(msg.index, std::move(msg)); + } + if (s == NATS_OK) { s = send_nats_message(sc, connOpts, result.first->second); if (s != NATS_OK || done) { From b565cacc8f05a5b510b05059b78ae8ffd6c5f1ac Mon Sep 17 00:00:00 2001 From: Andrew Falaleev Date: Tue, 13 Aug 2019 18:14:18 +0700 Subject: [PATCH 5/5] Disable default printing of messages. --- options.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/options.h b/options.h index 0c0af1e..4d601ee 100644 --- a/options.h +++ b/options.h @@ -28,7 +28,7 @@ volatile int64_t count = 0; volatile int64_t dropped = 0; int64_t start = 0; volatile int64_t elapsed = 0; -bool print = true; +bool print = false; int64_t timeout = 10000; // 10 seconds. natsOptions* opts = NULL;