From 3dafe5e6a57ff57ae5a85f4dd4b3ada0ea3b383a Mon Sep 17 00:00:00 2001 From: John Patton Date: Tue, 9 Apr 2019 09:25:42 -0600 Subject: [PATCH] fixed broker heartbeat bug --- .travis.yml | 2 +- .vscode/c_cpp_properties.json | 27 ++++++ cmake/version.cmake | 2 +- .../brokerOutput/brokerOutput.cpp | 82 ++++++++++-------- glass-broker-app/brokerOutput/brokerOutput.h | 9 +- glass-broker-app/brokerOutput/outputTopic.cpp | 5 +- neic-glass3.code-workspace | 83 +++++++++++++++++++ 7 files changed, 168 insertions(+), 42 deletions(-) create mode 100644 .vscode/c_cpp_properties.json create mode 100644 neic-glass3.code-workspace diff --git a/.travis.yml b/.travis.yml index 1e626dea..694a0624 100644 --- a/.travis.yml +++ b/.travis.yml @@ -88,7 +88,7 @@ deploy: github_token: $GITHUB_TOKEN # Set in travis-ci.org dashboard target_branch: gh-pages keep-history: true - local_dir: /dist/doc + local_dir: ${TRAVIS_BUILD_DIR}/dist/doc on: branch: master diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 00000000..ba852983 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,27 @@ +{ + "configurations": [ + { + "name": "Mac", + "includePath": [ + "${workspaceFolder}/**", + "${workspaceFolder}/lib/SuperEasyJSON/*", + "${workspaceFolder}/lib/spdlog/*", + "${workspaceFolder}/util/include/*", + "${workspaceFolder}/input/include/*", + "${workspaceFolder}/output/include/*", + "${workspaceFolder}/process/include/*", + "${workspaceFolder}/parse/include/*", + "${workspaceFolder}/glasscore/glasslib/include/*", + "${workspaceFolder}/glasscore/traveltime/include/*" + ], + "defines": [], + "macFrameworkPath": [ + "/System/Library/Frameworks", + "/Library/Frameworks" + ], + "compilerPath": "/usr/bin/clang", + "intelliSenseMode": "clang-x64" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/cmake/version.cmake b/cmake/version.cmake index 4d6753e0..64122422 100644 --- a/cmake/version.cmake +++ b/cmake/version.cmake @@ -1,4 +1,4 @@ # version.cmake - a CMake script that defines the overall project version set (PROJECT_VERSION_MAJOR 1) set (PROJECT_VERSION_MINOR 0) -set (PROJECT_VERSION_PATCH 3) +set (PROJECT_VERSION_PATCH 4) diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index 09e90277..ef192074 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -26,7 +26,7 @@ brokerOutput::brokerOutput() glass3::util::Logger::log("debug", "brokerOutput::brokerOutput(): Construction."); - m_OutputProducer = NULL; + m_StationRequestProducer = NULL; m_StationRequestTopic = NULL; // init config to defaults and allocate @@ -39,7 +39,7 @@ brokerOutput::brokerOutput(const std::shared_ptr &config) glass3::util::Logger::log( "debug", "brokerOutput::brokerOutput(): Advanced Construction."); - m_OutputProducer = NULL; + m_StationRequestProducer = NULL; m_StationRequestTopic = NULL; // init config to defaults and allocate @@ -62,8 +62,8 @@ brokerOutput::~brokerOutput() { } m_vOutputTopics.clear(); - if(m_OutputProducer != NULL) { - delete(m_OutputProducer); + if(m_StationRequestProducer != NULL) { + delete(m_StationRequestProducer); } if(m_StationRequestTopic != NULL) { @@ -137,6 +137,18 @@ bool brokerOutput::setup(std::shared_ptr config) { + producerConfig + "."); } + // producer heartbeat interval + int brokerHeartbeatInterval = -1; + if (config->HasKey("BrokerHeartbeatInterval")) { + brokerHeartbeatInterval = + (*config)["BrokerHeartbeatInterval"].ToInt(); + + glass3::util::Logger::log( + "info", + "brokerOutput::setup(): Using BrokerHeartbeatInterval: " + + std::to_string(brokerHeartbeatInterval) + "."); + } + // topic config std::string topicConfig = ""; if (!(config->HasKey("HazdevBrokerTopicConfig"))) { @@ -154,32 +166,6 @@ bool brokerOutput::setup(std::shared_ptr config) { + topicConfig + "."); } - // clear out any old producer - if (m_OutputProducer != NULL) { - delete (m_OutputProducer); - } - - // create new producer - m_OutputProducer = new hazdevbroker::Producer(); - - // heartbeat interval - if (config->HasKey("BrokerHeartbeatInterval")) { - int brokerHeartbeatInterval = - (*config)["BrokerHeartbeatInterval"].ToInt(); - m_OutputProducer->setHeartbeatInterval(brokerHeartbeatInterval); - glass3::util::Logger::log( - "info", - "brokerOutput::setup(): Using BrokerHeartbeatInterval: " - + std::to_string(brokerHeartbeatInterval) + "."); - } - - // set up logging - m_OutputProducer->setLogCallback( - std::bind(&brokerOutput::logProducer, this, std::placeholders::_1)); - - // set up producer - m_OutputProducer->setup(producerConfig, topicConfig); - // clear out any old topics for (auto aTopic : m_vOutputTopics) { if (aTopic != NULL) { @@ -203,14 +189,25 @@ bool brokerOutput::setup(std::shared_ptr config) { // parse topics for (auto aTopicConfig : topics) { - // create output topic - outputTopic* newTopic = new outputTopic(m_OutputProducer); + // create producer for the topic + hazdevbroker::Producer * topicProducer = new hazdevbroker::Producer(); + topicProducer->setHeartbeatInterval(brokerHeartbeatInterval); + topicProducer->setLogCallback( + std::bind(&brokerOutput::logProducer, this, std::placeholders::_1)); + topicProducer->setup(producerConfig, topicConfig); + + // create output topic using the producer + outputTopic* newTopic = new outputTopic(topicProducer); // setup output topic if (newTopic->setup(aTopicConfig) == true) { - // add output tpic to list + // add output topic to list m_vOutputTopics.push_back(newTopic); } else { + // failure, cleanup + newTopic->clear(); + delete (newTopic); + glass3::util::Logger::log( "error", "brokerOutput::setup(): Failed set up output topic."); @@ -248,8 +245,23 @@ bool brokerOutput::setup(std::shared_ptr config) { delete (m_StationRequestTopic); } if (stationRequestTopic != "") { + // clear out any old request producer + if (m_StationRequestProducer != NULL) { + delete (m_StationRequestProducer); + } + + // create new request producer + m_StationRequestProducer = new hazdevbroker::Producer(); + + // set up logging + m_StationRequestProducer->setLogCallback( + std::bind(&brokerOutput::logProducer, this, std::placeholders::_1)); + + // set up request producer + m_StationRequestProducer->setup(producerConfig, topicConfig); + // create topic - m_StationRequestTopic = m_OutputProducer->createTopic( + m_StationRequestTopic = m_StationRequestProducer->createTopic( stationRequestTopic); } else { m_StationRequestTopic = NULL; @@ -304,7 +316,7 @@ void brokerOutput::sendOutput(const std::string &type, const std::string &id, if (type == "StationInfoRequest") { // station info requests get their own special topic if (m_StationRequestTopic != NULL) { - m_OutputProducer->sendString(m_StationRequestTopic, message); + m_StationRequestProducer->sendString(m_StationRequestTopic, message); } } else if (type == "StationList") { // station lists are written to disk diff --git a/glass-broker-app/brokerOutput/brokerOutput.h b/glass-broker-app/brokerOutput/brokerOutput.h index feb98ea5..b0125962 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.h +++ b/glass-broker-app/brokerOutput/brokerOutput.h @@ -128,14 +128,15 @@ class brokerOutput : public glass3::output::output { private: /** - * \brief The hazdevbroker producer used to send messages to kafka + * \brief A vector containing the output topics */ - hazdevbroker::Producer * m_OutputProducer; + std::vector m_vOutputTopics; /** - * \brief A vector containing the output topics + * \brief The optional hazdevbroker producer used to send station requests + * to kafka */ - std::vector m_vOutputTopics; + hazdevbroker::Producer * m_StationRequestProducer; /** * \brief The optional station request topic diff --git a/glass-broker-app/brokerOutput/outputTopic.cpp b/glass-broker-app/brokerOutput/outputTopic.cpp index 5eadc330..e4c9bf4d 100644 --- a/glass-broker-app/brokerOutput/outputTopic.cpp +++ b/glass-broker-app/brokerOutput/outputTopic.cpp @@ -158,7 +158,10 @@ void outputTopic::clear() { m_OutputTopic = NULL; } - m_OutputProducer = NULL; + if (m_OutputProducer != NULL) { + delete (m_OutputProducer); + m_OutputProducer = NULL; + } } // ---------------------------------------------------------isInBounds diff --git a/neic-glass3.code-workspace b/neic-glass3.code-workspace new file mode 100644 index 00000000..b3374cc1 --- /dev/null +++ b/neic-glass3.code-workspace @@ -0,0 +1,83 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "files.associations": { + "__config": "cpp", + "__nullptr": "cpp", + "cstddef": "cpp", + "exception": "cpp", + "initializer_list": "cpp", + "new": "cpp", + "stdexcept": "cpp", + "type_traits": "cpp", + "typeinfo": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "tuple": "cpp", + "utility": "cpp", + "mutex": "cpp", + "thread": "cpp", + "system_error": "cpp", + "__mutex_base": "cpp", + "fstream": "cpp", + "__bit_reference": "cpp", + "__debug": "cpp", + "__functional_base": "cpp", + "__hash_table": "cpp", + "__locale": "cpp", + "__split_buffer": "cpp", + "__string": "cpp", + "__threading_support": "cpp", + "__tree": "cpp", + "__tuple": "cpp", + "array": "cpp", + "atomic": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "future": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "iterator": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "memory": "cpp", + "numeric": "cpp", + "ostream": "cpp", + "queue": "cpp", + "random": "cpp", + "ratio": "cpp", + "set": "cpp", + "sstream": "cpp", + "stack": "cpp", + "streambuf": "cpp", + "string": "cpp", + "string_view": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "regex": "cpp" + } + } +} \ No newline at end of file