Skip to content

Commit

Permalink
chore(release): merge release 21.10.next into 21.10.x (#386)
Browse files Browse the repository at this point in the history
* enh(broker): cbd with multiargs and robot tests (#306)

-enh(broker): cbd with multiargs and robot tests
-Validate with tests/broker/command-line.robot
-Update README.md
-Common.py : fix find in log with timeout

REFS: MON-13901

* fix(broker/bam): downtimes on kpi can be more than one

* enh(tests/bam): new tests to reproduce a bug
* chore(doc): CHANGELOG updated and version set to 21.10.3
* chore(tests): README updated

REFS: MON-14091

* fix(broker/engine): grpc api can be changed through configuration. Otherwise it is fixed to localhost (#320)

REFS: MON-13904

* fix(broker/rrd): rebuild fixed in 21.10 (#340)

* Also an issue due to conan fixed, python 3.8 is needed.

REFS: MON-14092

* fix(ci): issues with conan fixed

* fix(ci/scripts): conan bad path

* fix(broker/bam): overlapping downtimes on kpi service are well handled now. (#341)

REFS: MON-14091

* fix(ci/tests): missing dependencies for python38

* fix(broker/lua): lua stream connector accepts empty parameters(21.10) (#359)

REFS: MON-13875

* fix(broker/muxer): poller waits at most 5s to send goodbye to broker before shutdown (#360)

REFS: MON-14511

Co-authored-by: denliA <[email protected]>
Co-authored-by: David Boucher <[email protected]>
Co-authored-by: jean-christophe81 <[email protected]>
  • Loading branch information
4 people authored Sep 27, 2022
1 parent 9764a06 commit 597c800
Show file tree
Hide file tree
Showing 58 changed files with 1,822 additions and 886 deletions.
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
# Changelog

## 21.10.3

### centreon-broker

#### Enhancements

*grpc*

The grpc api listens by default on localhost now. And it can be configured
with the Broker configuration file.

#### Fixes

*rrd*

Rebuild of graphs should work better.

*bam*

If a service with two overlapping downtimes is a BA kpi. When the first downtime
is cancelled from the service, it is as if all the downtimes are removed from
the kpi. This new version fixes this issue.

### centreon-engine

#### Enhancements

*grpc*

The grpc api listens by default on localhost now. And it can be configured
with the Engine configuration file.

## 21.10.2

### centreon-broker
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
##
## Copyright 2009-2021 Centreon
## Copyright 2009-2022 Centreon
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,7 @@ endif ()
# Version.
set(COLLECT_MAJOR 21)
set(COLLECT_MINOR 10)
set(COLLECT_PATCH 2)
set(COLLECT_PATCH 3)
set(COLLECT_VERSION "${COLLECT_MAJOR}.${COLLECT_MINOR}.${COLLECT_PATCH}")
add_definitions(-DCENTREON_CONNECTOR_VERSION=\"${COLLECT_VERSION}\")
# add_definitions(-DCENTREON_BROKER_VERSION=\"${COLLECT_VERSION}\")
Expand Down
12 changes: 7 additions & 5 deletions centreon-broker/bam/src/ba.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2014-2016, 2021 Centreon
** Copyright 2014-2016, 2021-2022 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,10 +72,12 @@ ba::ba(uint32_t id,
_host_id(host_id),
_service_id(service_id),
_generate_virtual_status(generate_virtual_status),
_computed_soft_state(source == configuration::ba::state_source_best ?
ba::state::state_critical : ba::state::state_ok),
_computed_hard_state(source == configuration::ba::state_source_best ?
ba::state::state_critical : ba::state::state_ok),
_computed_soft_state(source == configuration::ba::state_source_best
? ba::state::state_critical
: ba::state::state_ok),
_computed_hard_state(source == configuration::ba::state_source_best
? ba::state::state_critical
: ba::state::state_ok),
_num_soft_critical_childs{0.f},
_num_hard_critical_childs{0.f},
_acknowledgement_hard(0.0),
Expand Down
16 changes: 10 additions & 6 deletions centreon-broker/bam/src/kpi_service.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2014-2015, 2021 Centreon
** Copyright 2014-2015, 2021-2022 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -182,7 +182,7 @@ bool kpi_service::is_acknowledged() const {
* @param[out] visitor Object that will receive events.
*/
void kpi_service::service_update(
std::shared_ptr<neb::service_status> const& status,
const std::shared_ptr<neb::service_status>& status,
io::stream* visitor) {
if (status && status->host_id == _host_id &&
status->service_id == _service_id) {
Expand Down Expand Up @@ -252,25 +252,29 @@ void kpi_service::service_update(
* @param[in] dt
* @param[out] visitor Object that will receive events.
*/
void kpi_service::service_update(std::shared_ptr<neb::downtime> const& dt,
void kpi_service::service_update(const std::shared_ptr<neb::downtime>& dt,
io::stream* visitor) {
assert(dt && dt->host_id == _host_id && dt->service_id == _service_id);
// Update information.
_downtimed = dt->was_started && dt->actual_end_time.is_null();
if (_downtime_ids.contains(dt->internal_id) && !dt->was_cancelled) {
bool downtimed = dt->was_started && dt->actual_end_time.is_null();
if (!_downtimed && downtimed)
_downtimed = true;

if (_downtime_ids.contains(dt->internal_id) && dt->deletion_time.is_null()) {
log_v2::bam()->trace("Downtime {} already handled in this kpi service",
dt->internal_id);
return;
}

if (_downtimed) {
if (downtimed) {
log_v2::bam()->trace("adding in kpi service the impacting downtime {}",
dt->internal_id);
_downtime_ids.insert(dt->internal_id);
} else {
log_v2::bam()->trace("removing from kpi service the impacting downtime {}",
dt->internal_id);
_downtime_ids.erase(dt->internal_id);
_downtimed = !_downtime_ids.empty();
}

if (!_event || _event->in_downtime != _downtimed) {
Expand Down
1 change: 1 addition & 0 deletions centreon-broker/bam/test/ba/kpi_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ TEST_F(BamBA, KpiServiceDt) {
std::cout << "service_update 1" << std::endl;
kpis[0]->service_update(dt, _visitor.get());

dt->deletion_time = now + 2 + 10 * i + 5;
dt->actual_end_time = now + 2 + 10 * i + 5;
dt->was_cancelled = true;
std::cout << "service_update 2" << std::endl;
Expand Down
3 changes: 3 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/config/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace config {
class state {
int _broker_id;
uint16_t _rpc_port;
std::string _listen_address;
std::string _broker_name;
std::string _cache_directory;
std::string _command_file;
Expand Down Expand Up @@ -75,6 +76,8 @@ class state {
int broker_id() const noexcept;
void rpc_port(uint16_t port) noexcept;
uint16_t rpc_port(void) const noexcept;
void listen_address(const std::string& listen_address) noexcept;
const std::string& listen_address() const noexcept;
void broker_name(std::string const& name);
const std::string& broker_name() const noexcept;
void cache_directory(std::string const& dir);
Expand Down
2 changes: 2 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/io/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class stream {
bool validate(std::shared_ptr<io::data> const& d, std::string const& error);
virtual int write(std::shared_ptr<data> const& d) = 0;
const std::string& get_name() const { return _name; }

virtual bool wait_for_all_events_written(unsigned ms_timeout);
};
} // namespace io

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class acceptor : public endpoint {
void set_read_filters(std::unordered_set<uint32_t> const& filters);
void set_retry_interval(time_t retry_interval);
void set_write_filters(std::unordered_set<uint32_t> const& filters);
bool wait_for_all_events_written(unsigned ms_timeout) override;
};
} // namespace processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class endpoint : public stat_visitable {
virtual void update() {}
virtual void start() = 0;
virtual void exit() = 0;

virtual bool wait_for_all_events_written(unsigned) { return true; }
};
} // namespace processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class failover : public endpoint {
void set_failover(std::shared_ptr<processing::failover> fo);
void set_retry_interval(time_t retry_interval);
void update() override;
bool wait_for_all_events_written(unsigned ms_timeout) override;

protected:
// From stat_visitable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class feeder : public stat_visitable {
feeder& operator=(const feeder&) = delete;
bool is_finished() const noexcept;
const char* get_state() const;

bool wait_for_all_events_written(unsigned ms_timeout);
};
} // namespace processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class stat_visitable {
virtual void _forward_statistic(nlohmann::json& tree);

public:
static constexpr unsigned idle_microsec_wait_idle_thread_delay = 100000;

stat_visitable(std::string const& name = std::string());
virtual ~stat_visitable() noexcept = default;
stat_visitable(stat_visitable const& other) = delete;
Expand Down
5 changes: 5 additions & 0 deletions centreon-broker/core/src/config/applier/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ void endpoint::_discard() {
_discarding = true;
log_v2::config()->debug("endpoint applier: destruction");

// wait for failover and feeder to push endloop event
::usleep(processing::stat_visitable::idle_microsec_wait_idle_thread_delay +
100000);
// Exit threads.
{
log_v2::config()->debug("endpoint applier: requesting threads termination");
Expand All @@ -219,6 +222,7 @@ void endpoint::_discard() {
// We begin with feeders
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
if (it->second->is_feeder()) {
it->second->wait_for_all_events_written(5000);
log_v2::config()->trace(
"endpoint applier: send exit signal to endpoint '{}'",
it->second->get_name());
Expand All @@ -244,6 +248,7 @@ void endpoint::_discard() {

// We continue with failovers
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
it->second->wait_for_all_events_written(5000);
log_v2::config()->trace(
"endpoint applier: send exit signal on endpoint '{}'",
it->second->get_name());
Expand Down
25 changes: 21 additions & 4 deletions centreon-broker/core/src/config/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,27 @@ state parser::parse(std::string const& file) {
&json::is_number, &json::get<int>))
;
else if (it.key() == "grpc" && it.value().is_object()) {
if (json_document["centreonBroker"]["grpc"]["rpc_port"].is_number()) {
retval.rpc_port(static_cast<uint16_t>(
json_document["centreonBroker"]["grpc"]["rpc_port"]
.get<int>()));
if (json_document["centreonBroker"]["grpc"].contains("rpc_port")) {
if (json_document["centreonBroker"]["grpc"]["rpc_port"]
.is_number()) {
retval.rpc_port(static_cast<uint16_t>(
json_document["centreonBroker"]["grpc"]["rpc_port"]
.get<int>()));
} else
throw msg_fmt(
"The rpc_port value in the grpc object should be an integer");
}
if (json_document["centreonBroker"]["grpc"].contains(
"listen_address")) {
if (json_document["centreonBroker"]["grpc"]["listen_address"]
.is_string()) {
retval.listen_address(
json_document["centreonBroker"]["grpc"]["listen_address"]
.get<std::string>());
} else
throw msg_fmt(
"The listen_address value in the grpc object should be a "
"string");
}
} else if (get_conf<state>({it.key(), it.value()}, "broker_name",
retval, &state::broker_name,
Expand Down
25 changes: 24 additions & 1 deletion centreon-broker/core/src/config/state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ state::state()
state::state(const state& other)
: _broker_id(other._broker_id),
_rpc_port(other._rpc_port),
_listen_address{other._listen_address},
_broker_name(other._broker_name),
_cache_directory(other._cache_directory),
_command_file(other._command_file),
Expand Down Expand Up @@ -70,6 +71,7 @@ state& state::operator=(state const& other) {
if (this != &other) {
_broker_id = other._broker_id;
_rpc_port = other._rpc_port;
_listen_address = other._listen_address;
_broker_name = other._broker_name;
_cache_directory = other._cache_directory;
_command_file = other._command_file;
Expand All @@ -92,6 +94,7 @@ state& state::operator=(state const& other) {
void state::clear() {
_broker_id = 0;
_rpc_port = 0;
_listen_address.resize(0);
_broker_name.clear();
_cache_directory.clear();
_command_file.clear();
Expand Down Expand Up @@ -360,10 +363,30 @@ std::string const& state::poller_name() const noexcept {
void state::rpc_port(uint16_t port) noexcept {
_rpc_port = port;
}
uint16_t state::rpc_port(void) const noexcept {
uint16_t state::rpc_port() const noexcept {
return _rpc_port;
}

/**
* @brief Force the interface address to listen from for the gRPC API.
*
* @param listen_address An address or a hostname ("127.0.0.1", "localhost",
* ...)
*/
void state::listen_address(const std::string& listen_address) noexcept {
_listen_address = listen_address;
}

/**
* @brief Access to the configured listen address or an empty string if not
* defined. The behavior of broker in the latter is to listen from localhost.
*
* @return The listen address for the gRPC API.
*/
const std::string& state::listen_address() const noexcept {
return _listen_address;
}

state::log& state::log_conf() {
return _log_conf;
}
Expand Down
15 changes: 15 additions & 0 deletions centreon-broker/core/src/io/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,18 @@ bool stream::validate(std::shared_ptr<io::data> const& d,
}
return true;
}

/**
* @brief if it has a substream, it waits until the substream has sent all data
* on the wire
*
* @param ms_timeout
* @return true all data sent
* @return false timeout expires
*/
bool stream::wait_for_all_events_written(unsigned ms_timeout) {
if (_substream) {
return _substream->wait_for_all_events_written(ms_timeout);
}
return true;
}
Loading

0 comments on commit 597c800

Please sign in to comment.