Skip to content
This repository has been archived by the owner on Sep 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #54 from jpatton-USGS/heartbeat-bug
Browse files Browse the repository at this point in the history
fixed broker heartbeat bug
  • Loading branch information
wyeck-usgs authored Apr 11, 2019
2 parents e626742 + 3dafe5e commit 45391c0
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ deploy:
github_token: $GITHUB_TOKEN # Set in travis-ci.org dashboard
target_branch: gh-pages
keep-history: true
local_dir: /dist/doc
local_dir: ${TRAVIS_BUILD_DIR}/dist/doc
on:
branch: master

Expand Down
27 changes: 27 additions & 0 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"configurations": [
{
"name": "Mac",
"includePath": [
"${workspaceFolder}/**",
"${workspaceFolder}/lib/SuperEasyJSON/*",
"${workspaceFolder}/lib/spdlog/*",
"${workspaceFolder}/util/include/*",
"${workspaceFolder}/input/include/*",
"${workspaceFolder}/output/include/*",
"${workspaceFolder}/process/include/*",
"${workspaceFolder}/parse/include/*",
"${workspaceFolder}/glasscore/glasslib/include/*",
"${workspaceFolder}/glasscore/traveltime/include/*"
],
"defines": [],
"macFrameworkPath": [
"/System/Library/Frameworks",
"/Library/Frameworks"
],
"compilerPath": "/usr/bin/clang",
"intelliSenseMode": "clang-x64"
}
],
"version": 4
}
2 changes: 1 addition & 1 deletion cmake/version.cmake
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# version.cmake - a CMake script that defines the overall project version
set (PROJECT_VERSION_MAJOR 1)
set (PROJECT_VERSION_MINOR 0)
set (PROJECT_VERSION_PATCH 3)
set (PROJECT_VERSION_PATCH 4)
82 changes: 47 additions & 35 deletions glass-broker-app/brokerOutput/brokerOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ brokerOutput::brokerOutput()
glass3::util::Logger::log("debug",
"brokerOutput::brokerOutput(): Construction.");

m_OutputProducer = NULL;
m_StationRequestProducer = NULL;
m_StationRequestTopic = NULL;

// init config to defaults and allocate
Expand All @@ -39,7 +39,7 @@ brokerOutput::brokerOutput(const std::shared_ptr<json::Object> &config)
glass3::util::Logger::log(
"debug", "brokerOutput::brokerOutput(): Advanced Construction.");

m_OutputProducer = NULL;
m_StationRequestProducer = NULL;
m_StationRequestTopic = NULL;

// init config to defaults and allocate
Expand All @@ -62,8 +62,8 @@ brokerOutput::~brokerOutput() {
}
m_vOutputTopics.clear();

if(m_OutputProducer != NULL) {
delete(m_OutputProducer);
if(m_StationRequestProducer != NULL) {
delete(m_StationRequestProducer);
}

if(m_StationRequestTopic != NULL) {
Expand Down Expand Up @@ -137,6 +137,18 @@ bool brokerOutput::setup(std::shared_ptr<const json::Object> config) {
+ producerConfig + ".");
}

// producer heartbeat interval
int brokerHeartbeatInterval = -1;
if (config->HasKey("BrokerHeartbeatInterval")) {
brokerHeartbeatInterval =
(*config)["BrokerHeartbeatInterval"].ToInt();

glass3::util::Logger::log(
"info",
"brokerOutput::setup(): Using BrokerHeartbeatInterval: "
+ std::to_string(brokerHeartbeatInterval) + ".");
}

// topic config
std::string topicConfig = "";
if (!(config->HasKey("HazdevBrokerTopicConfig"))) {
Expand All @@ -154,32 +166,6 @@ bool brokerOutput::setup(std::shared_ptr<const json::Object> config) {
+ topicConfig + ".");
}

// clear out any old producer
if (m_OutputProducer != NULL) {
delete (m_OutputProducer);
}

// create new producer
m_OutputProducer = new hazdevbroker::Producer();

// heartbeat interval
if (config->HasKey("BrokerHeartbeatInterval")) {
int brokerHeartbeatInterval =
(*config)["BrokerHeartbeatInterval"].ToInt();
m_OutputProducer->setHeartbeatInterval(brokerHeartbeatInterval);
glass3::util::Logger::log(
"info",
"brokerOutput::setup(): Using BrokerHeartbeatInterval: "
+ std::to_string(brokerHeartbeatInterval) + ".");
}

// set up logging
m_OutputProducer->setLogCallback(
std::bind(&brokerOutput::logProducer, this, std::placeholders::_1));

// set up producer
m_OutputProducer->setup(producerConfig, topicConfig);

// clear out any old topics
for (auto aTopic : m_vOutputTopics) {
if (aTopic != NULL) {
Expand All @@ -203,14 +189,25 @@ bool brokerOutput::setup(std::shared_ptr<const json::Object> config) {

// parse topics
for (auto aTopicConfig : topics) {
// create output topic
outputTopic* newTopic = new outputTopic(m_OutputProducer);
// create producer for the topic
hazdevbroker::Producer * topicProducer = new hazdevbroker::Producer();
topicProducer->setHeartbeatInterval(brokerHeartbeatInterval);
topicProducer->setLogCallback(
std::bind(&brokerOutput::logProducer, this, std::placeholders::_1));
topicProducer->setup(producerConfig, topicConfig);

// create output topic using the producer
outputTopic* newTopic = new outputTopic(topicProducer);

// setup output topic
if (newTopic->setup(aTopicConfig) == true) {
// add output tpic to list
// add output topic to list
m_vOutputTopics.push_back(newTopic);
} else {
// failure, cleanup
newTopic->clear();
delete (newTopic);

glass3::util::Logger::log(
"error",
"brokerOutput::setup(): Failed set up output topic.");
Expand Down Expand Up @@ -248,8 +245,23 @@ bool brokerOutput::setup(std::shared_ptr<const json::Object> config) {
delete (m_StationRequestTopic);
}
if (stationRequestTopic != "") {
// clear out any old request producer
if (m_StationRequestProducer != NULL) {
delete (m_StationRequestProducer);
}

// create new request producer
m_StationRequestProducer = new hazdevbroker::Producer();

// set up logging
m_StationRequestProducer->setLogCallback(
std::bind(&brokerOutput::logProducer, this, std::placeholders::_1));

// set up request producer
m_StationRequestProducer->setup(producerConfig, topicConfig);

// create topic
m_StationRequestTopic = m_OutputProducer->createTopic(
m_StationRequestTopic = m_StationRequestProducer->createTopic(
stationRequestTopic);
} else {
m_StationRequestTopic = NULL;
Expand Down Expand Up @@ -304,7 +316,7 @@ void brokerOutput::sendOutput(const std::string &type, const std::string &id,
if (type == "StationInfoRequest") {
// station info requests get their own special topic
if (m_StationRequestTopic != NULL) {
m_OutputProducer->sendString(m_StationRequestTopic, message);
m_StationRequestProducer->sendString(m_StationRequestTopic, message);
}
} else if (type == "StationList") {
// station lists are written to disk
Expand Down
9 changes: 5 additions & 4 deletions glass-broker-app/brokerOutput/brokerOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ class brokerOutput : public glass3::output::output {

private:
/**
* \brief The hazdevbroker producer used to send messages to kafka
* \brief A vector containing the output topics
*/
hazdevbroker::Producer * m_OutputProducer;
std::vector<glass3::outputTopic*> m_vOutputTopics;

/**
* \brief A vector containing the output topics
* \brief The optional hazdevbroker producer used to send station requests
* to kafka
*/
std::vector<glass3::outputTopic*> m_vOutputTopics;
hazdevbroker::Producer * m_StationRequestProducer;

/**
* \brief The optional station request topic
Expand Down
5 changes: 4 additions & 1 deletion glass-broker-app/brokerOutput/outputTopic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ void outputTopic::clear() {
m_OutputTopic = NULL;
}

m_OutputProducer = NULL;
if (m_OutputProducer != NULL) {
delete (m_OutputProducer);
m_OutputProducer = NULL;
}
}

// ---------------------------------------------------------isInBounds
Expand Down
83 changes: 83 additions & 0 deletions neic-glass3.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"folders": [
{
"path": "."
}
],
"settings": {
"files.associations": {
"__config": "cpp",
"__nullptr": "cpp",
"cstddef": "cpp",
"exception": "cpp",
"initializer_list": "cpp",
"new": "cpp",
"stdexcept": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"tuple": "cpp",
"utility": "cpp",
"mutex": "cpp",
"thread": "cpp",
"system_error": "cpp",
"__mutex_base": "cpp",
"fstream": "cpp",
"__bit_reference": "cpp",
"__debug": "cpp",
"__functional_base": "cpp",
"__hash_table": "cpp",
"__locale": "cpp",
"__split_buffer": "cpp",
"__string": "cpp",
"__threading_support": "cpp",
"__tree": "cpp",
"__tuple": "cpp",
"array": "cpp",
"atomic": "cpp",
"bitset": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"codecvt": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"future": "cpp",
"iomanip": "cpp",
"ios": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"iterator": "cpp",
"limits": "cpp",
"list": "cpp",
"locale": "cpp",
"map": "cpp",
"memory": "cpp",
"numeric": "cpp",
"ostream": "cpp",
"queue": "cpp",
"random": "cpp",
"ratio": "cpp",
"set": "cpp",
"sstream": "cpp",
"stack": "cpp",
"streambuf": "cpp",
"string": "cpp",
"string_view": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"regex": "cpp"
}
}
}

0 comments on commit 45391c0

Please sign in to comment.