Skip to content

Commit

Permalink
Implement file manager metrics
Browse files Browse the repository at this point in the history
Implement basic file management metrics [0].
README.md file updated with new metrics (OAM section).

Benchmark script was updated with new transformation
items to dump log information (current microseconds).
For that, microseconds for timestamp source is required [1].
UT modified to accept new transform source.

Also,
- General prefix is removed from source types and GeneralUnique
  is renamed to Recvseq, like the source name in the schema.

[0] #55
[1] #56
  • Loading branch information
testillano committed Jul 19, 2022
1 parent 8dbe2bd commit e784cd0
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 49 deletions.
29 changes: 19 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ Defines the response behavior for an incoming request matching some basic condit
"properties": {
"source": {
"type": "string",
"pattern": "^request\\.(uri(\\.(path$|param\\..+))?|body(\\..+)?|header\\..+)$|^response\\.body(\\..+)?$|^eraser$|^math\\..*|^random\\.[-+]{0,1}[0-9]+\\.[-+]{0,1}[0-9]+$|^randomset\\..+|^timestamp\\.[m|n]{0,1}s$|^strftime\\..+|^recvseq$|^(var|globalVar|event)\\..+|^(value)\\..*|^inState$"
"pattern": "^request\\.(uri(\\.(path$|param\\..+))?|body(\\..+)?|header\\..+)$|^response\\.body(\\..+)?$|^eraser$|^math\\..*|^random\\.[-+]{0,1}[0-9]+\\.[-+]{0,1}[0-9]+$|^randomset\\..+|^timestamp\\.[m|u|n]{0,1}s$|^strftime\\..+|^recvseq$|^(var|globalVar|event)\\..+|^(value)\\..*|^inState$"
},
"target": {
"type": "string",
Expand Down Expand Up @@ -1606,7 +1606,7 @@ The **source** of information is classified after parsing the following possible
- randomset.`<value1>|..|<valueN>`: random string value between pipe-separated labels provided. This source specification **admits variables substitution**.
- timestamp.`<unit>`: UNIX epoch time in `s` (seconds), `ms` (milliseconds) or `ns` (nanoseconds).
- timestamp.`<unit>`: UNIX epoch time in `s` (seconds), `ms` (milliseconds), `us` (microseconds) or `ns` (nanoseconds).
- strftime.`<format>`: current date/time formatted by [strftime](https://www.cplusplus.com/reference/ctime/strftime/). This source format **admits variables substitution**.
Expand Down Expand Up @@ -2473,14 +2473,23 @@ MockHttp2Server_observed_requests_total{method="DELETE"} 0
MockHttp2Server_observed_requests_total{method="PUT"} 0
MockHttp2Server_observed_requests_total{method="GET"} 0
MockHttp2Server_observed_requests_total{method="POST"} 100000
# HELP h2agent_observed_requests_total Http2 total requests observed in h2agent
# TYPE h2agent_observed_requests_total counter
h2agent_observed_requests_total{result="unprovisioned"} 0
h2agent_observed_requests_total{result="processed"} 100000
# HELP h2agent_purged_contexts_total Total contexts purged in h2agent
# TYPE h2agent_purged_contexts_total counter
h2agent_purged_contexts_total{result="failed"} 0
h2agent_purged_contexts_total{result="successful"} 0
# HELP ServerData_observed_requests_total Http2 total requests observed in h2agent server
# TYPE ServerData_observed_requests_total counter
ServerData_observed_requests_total{result="unprovisioned"} 0
ServerData_observed_requests_total{result="processed"} 100000
# HELP ServerData_purged_contexts_total Total contexts purged in h2agent server
# TYPE ServerData_purged_contexts_total counter
ServerData_purged_contexts_total{result="failed"} 0
ServerData_purged_contexts_total{result="successful"} 0
# HELP FileSystem_observed_operations_total H2agent file system operations
# TYPE FileSystem_observed_operations_total counter
FileSystem_observed_operations_total{operation="open",success="false"} 0
FileSystem_observed_operations_total{operation="instantClose"} 0
FileSystem_observed_operations_total{operation="delayedClose"} 100000
FileSystem_observed_operations_total{operation="write"} 100000
FileSystem_observed_operations_total{operation="close"} 1
FileSystem_observed_operations_total{operation="empty"} 0
FileSystem_observed_operations_total{operation="open"} 1
# HELP AdminHttp2Server_responses_delay_seconds_gauge Http2 message responses delay gauge (seconds) in AdminHttp2Server
# TYPE AdminHttp2Server_responses_delay_seconds_gauge gauge
AdminHttp2Server_responses_delay_seconds_gauge 7.2e-05
Expand Down
4 changes: 2 additions & 2 deletions ct/src/transform/no_filter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def test_031_replaceVariablesAtValueAndTransferToResponseBodyStringPath(admin_se


@pytest.mark.transform
def test_032_replaceVariablesAtGeneralStrftimeAndTransferToResponseBodyStringPath(admin_server_provision, h2ac_traffic):
def test_032_replaceVariablesAtStrftimeAndTransferToResponseBodyStringPath(admin_server_provision, h2ac_traffic):

# Provision
admin_server_provision(string2dict(TRANSFORM_FOO_BAR_PROVISION_TEMPLATE, id=1, queryp='', source="strftime.Now it's %I:%M%p and var1 is @{var1}.", target="response.body.string./result"))
Expand All @@ -428,7 +428,7 @@ def test_032_replaceVariablesAtGeneralStrftimeAndTransferToResponseBodyStringPat


@pytest.mark.transform
def test_033_replaceVariablesAtGeneralRandomsetAndTransferToResponseBodyStringPath(admin_server_provision, h2ac_traffic):
def test_033_replaceVariablesAtRandomsetAndTransferToResponseBodyStringPath(admin_server_provision, h2ac_traffic):

# Provision
admin_server_provision(string2dict(TRANSFORM_FOO_BAR_PROVISION_TEMPLATE, id=1, queryp='', source="randomset.@{var1}|@{var2}", target="response.body.string./result"))
Expand Down
4 changes: 2 additions & 2 deletions src/http2/MyTrafficHttp2Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ void MyTrafficHttp2Server::enableMyMetrics(ert::metrics::Metrics *metrics) {
metrics_ = metrics;

if (metrics_) {
ert::metrics::counter_family_ref_t cf = metrics->addCounterFamily(std::string("h2agent_observed_requests_total"), "Http2 total requests observed in h2agent");
ert::metrics::counter_family_ref_t cf = metrics->addCounterFamily(std::string("ServerData_observed_requests_total"), "Http2 total requests observed in h2agent server");

observed_requests_processed_counter_ = &(cf.Add({{"result", "processed"}}));
observed_requests_unprovisioned_counter_ = &(cf.Add({{"result", "unprovisioned"}}));

ert::metrics::counter_family_ref_t cf2 = metrics->addCounterFamily(std::string("h2agent_purged_contexts_total"), "Total contexts purged in h2agent");
ert::metrics::counter_family_ref_t cf2 = metrics->addCounterFamily(std::string("ServerData_purged_contexts_total"), "Total contexts purged in h2agent server");

purged_contexts_successful_counter_ = &(cf2.Add({{"result", "successful"}}));
purged_contexts_failed_counter_ = &(cf2.Add({{"result", "failed"}}));
Expand Down
2 changes: 1 addition & 1 deletion src/jsonSchema/AdminSchemas.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ const nlohmann::json server_provision = R"(
"properties": {
"source": {
"type": "string",
"pattern": "^request\\.(uri(\\.(path$|param\\..+))?|body(\\..+)?|header\\..+)$|^response\\.body(\\..+)?$|^eraser$|^math\\..*|^random\\.[-+]{0,1}[0-9]+\\.[-+]{0,1}[0-9]+$|^randomset\\..+|^timestamp\\.[m|n]{0,1}s$|^strftime\\..+|^recvseq$|^(var|globalVar|event)\\..+|^(value)\\..*|^inState$"
"pattern": "^request\\.(uri(\\.(path$|param\\..+))?|body(\\..+)?|header\\..+)$|^response\\.body(\\..+)?$|^eraser$|^math\\..*|^random\\.[-+]{0,1}[0-9]+\\.[-+]{0,1}[0-9]+$|^randomset\\..+|^timestamp\\.[m|u|n]{0,1}s$|^strftime\\..+|^recvseq$|^(var|globalVar|event)\\..+|^(value)\\..*|^inState$"
},
"target": {
"type": "string",
Expand Down
5 changes: 4 additions & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ int main(int argc, char* argv[])
// Traces
ert::tracing::Logger::initialize(progname); // initialize logger (before possible myExit() execution):

// General resources: timer IO service, configuration, global variables and file manager:
// General resources: timer IO service, configuration and global variables and file manager:
myTimersIoService = new boost::asio::io_service();
myConfiguration = new h2agent::model::Configuration();
myGlobalVariable = new h2agent::model::GlobalVariable();
Expand Down Expand Up @@ -722,6 +722,9 @@ int main(int argc, char* argv[])
}
}

// FileManager/SafeFile metrics
myFileManager->enableMetrics(p_metrics);

// Admin server
myAdminHttp2Server = new h2agent::http2::MyAdminHttp2Server(2); // 2 nghttp2 server thread
myAdminHttp2Server->enableMetrics(p_metrics);
Expand Down
15 changes: 9 additions & 6 deletions src/model/AdminServerProvision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,28 @@ bool AdminServerProvision::processSources(std::shared_ptr<Transformation> transf
parser.compile(expressionString, expression);
sourceVault.setFloat(expression.value());
}
else if (transformation->getSourceType() == Transformation::SourceType::GeneralRandom) {
else if (transformation->getSourceType() == Transformation::SourceType::Random) {
int range = transformation->getSourceI2() - transformation->getSourceI1() + 1;
sourceVault.setInteger(transformation->getSourceI1() + (rand() % range));
}
else if (transformation->getSourceType() == Transformation::SourceType::GeneralRandomSet) {
else if (transformation->getSourceType() == Transformation::SourceType::RandomSet) {
sourceVault.setStringReplacingVariables(transformation->getSourceTokenized()[rand () % transformation->getSourceTokenized().size()], variables); // replace variables if they exist
}
else if (transformation->getSourceType() == Transformation::SourceType::GeneralTimestamp) {
else if (transformation->getSourceType() == Transformation::SourceType::Timestamp) {
if (transformation->getSource() == "s") {
sourceVault.setInteger(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count());
}
else if (transformation->getSource() == "ms") {
sourceVault.setInteger(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
}
else if (transformation->getSource() == "us") {
sourceVault.setInteger(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
}
else if (transformation->getSource() == "ns") {
sourceVault.setInteger(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
}
}
else if (transformation->getSourceType() == Transformation::SourceType::GeneralStrftime) {
else if (transformation->getSourceType() == Transformation::SourceType::Strftime) {
std::time_t unixTime = 0;
std::time (&unixTime);
char buffer[100] = {0};
Expand All @@ -214,7 +217,7 @@ bool AdminServerProvision::processSources(std::shared_ptr<Transformation> transf

sourceVault.setStringReplacingVariables(std::string(buffer), variables); // replace variables if they exist
}
else if (transformation->getSourceType() == Transformation::SourceType::GeneralUnique) {
else if (transformation->getSourceType() == Transformation::SourceType::Recvseq) {
sourceVault.setUnsigned(generalUniqueServerSequence);
}
else if (transformation->getSourceType() == Transformation::SourceType::SVar) {
Expand Down Expand Up @@ -829,7 +832,7 @@ void AdminServerProvision::transform( const std::string &requestUri,

LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("Processing transformation item: %s", transformation->asString().c_str()), ERT_FILE_LOCATION));

// SOURCES: RequestUri, RequestUriPath, RequestUriParam, RequestBody, ResponseBody, RequestHeader, Eraser, Math, GeneralRandom, GeneralTimestamp, GeneralStrftime, GeneralUnique, SVar, SGvar, Value, Event, InState
// SOURCES: RequestUri, RequestUriPath, RequestUriParam, RequestBody, ResponseBody, RequestHeader, Eraser, Math, Random, Timestamp, Strftime, Recvseq, SVar, SGvar, Value, Event, InState
if (!processSources(transformation, sourceVault, variables, requestUri, requestUriPath, requestQueryParametersMap, requestBodyJsonOrString, requestBodyJson, requestBody, requestHeaders, eraser, generalUniqueServerSequence))
continue;

Expand Down
6 changes: 3 additions & 3 deletions src/model/FileManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void FileManager::write(const std::string &path, const std::string &data, bool t
std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app; // for text files
if (!textOrBinary) mode |= std::ios::binary;

safeFile = std::make_shared<SafeFile>(path, io_service_, closeDelayUs, mode);
safeFile = std::make_shared<SafeFile>(path, io_service_, metrics_, closeDelayUs, mode);
add(path, safeFile);
}

Expand All @@ -75,7 +75,7 @@ std::string FileManager::read(bool &success, const std::string &path, bool textO
else {
if (!textOrBinary) mode |= std::ios::binary;

safeFile = std::make_shared<SafeFile>(path, io_service_, 0, mode);
safeFile = std::make_shared<SafeFile>(path, io_service_, metrics_, 0, mode);
add(path, safeFile);
}

Expand All @@ -91,7 +91,7 @@ void FileManager::empty(const std::string &path) {
safeFile = it->second;
}
else {
safeFile = std::make_shared<SafeFile>(path, io_service_);
safeFile = std::make_shared<SafeFile>(path, io_service_, metrics_);
add(path, safeFile);
}

Expand Down
28 changes: 25 additions & 3 deletions src/model/FileManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ SOFTWARE.
#include <SafeFile.hpp>


namespace ert
{
namespace metrics
{
class Metrics;
}
}

namespace h2agent
{
namespace model
Expand All @@ -54,18 +62,32 @@ class FileManager : public Map<std::string, std::shared_ptr<SafeFile>>
mutable mutex_t rw_mutex_{};
boost::asio::io_service *io_service_{};

// metrics (will be passed to SafeFile):
ert::metrics::Metrics *metrics_{};

public:
/**
* File manager class
*
* A timers IO service is needed to schedule delayed close operations.
* You could provide 'nullptr' if you never schedule close operations (@see write).
* @param timersIoService timers IO service needed to schedule delayed close operations.
* If you never schedule close operations (@see write) it may be 'nullptr'.
* @param metrics underlaying reference for SafeFile in order to compute prometheus metrics
* about I/O operations. It may be 'nullptr' if no metrics are enabled.
*
* @see SafeFile
*/
FileManager(boost::asio::io_service *timersIoService) : io_service_(timersIoService) {;}
FileManager(boost::asio::io_service *timersIoService = nullptr, ert::metrics::Metrics *metrics = nullptr) : io_service_(timersIoService), metrics_(metrics) {;}
~FileManager() = default;

/**
* Set metrics reference
*
* @param metrics Optional metrics object to compute counters
*/
void enableMetrics(ert::metrics::Metrics *metrics) {
metrics_ = metrics;
}

/**
* Write file
*
Expand Down
35 changes: 34 additions & 1 deletion src/model/SafeFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,27 @@ std::atomic<int> SafeFile::CurrentOpenedFiles(0);
std::mutex SafeFile::MutexOpenedFiles;
std::condition_variable SafeFile::OpenedFilesCV;

SafeFile::SafeFile (const std::string& path, boost::asio::io_service *timersIoService, unsigned int closeDelayUs, std::ios_base::openmode mode):
SafeFile::SafeFile (const std::string& path, boost::asio::io_service *timersIoService, ert::metrics::Metrics *metrics, unsigned int closeDelayUs, std::ios_base::openmode mode):
path_(path),
io_service_(timersIoService),
metrics_(metrics),
close_delay_us_(closeDelayUs),
opened_(false),
timer_(nullptr)
{
max_open_files_ = sysconf(_SC_OPEN_MAX /* 1024 probably */) - 10 /* margin just in case the process open other files */;

if (metrics_) {
ert::metrics::counter_family_ref_t cf = metrics->addCounterFamily("FileSystem_observed_operations_total", "H2agent file system operations");
observed_open_operation_counter_ = &(cf.Add({{"operation", "open"}}));
observed_close_operation_counter_ = &(cf.Add({{"operation", "close"}}));
observed_write_operation_counter_ = &(cf.Add({{"operation", "write"}}));
observed_empty_operation_counter_ = &(cf.Add({{"operation", "empty"}}));
observed_delayed_close_operation_counter_ = &(cf.Add({{"operation", "delayedClose"}}));
observed_instant_close_operation_counter_ = &(cf.Add({{"operation", "instantClose"}}));
observed_error_open_operation_counter_ = &(cf.Add({{"success", "false"}, {"operation", "open"}}));
}

open(mode);
}

Expand All @@ -67,6 +80,9 @@ SafeFile::~SafeFile() {
void SafeFile::delayedClose() {
LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("Close delay is: %lu", close_delay_us_), ERT_FILE_LOCATION));

// metrics
if (metrics_) observed_delayed_close_operation_counter_->Increment();

//if (!io_service_) return; // protection
if (!timer_) timer_ = new boost::asio::deadline_timer(*io_service_, boost::posix_time::microseconds(close_delay_us_));
timer_->cancel();
Expand All @@ -93,9 +109,13 @@ bool SafeFile::open(std::ios_base::openmode mode) {
opened_ = true;
CurrentOpenedFiles++;
LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("'%s' opened for writting (currently opened: %d)", path_.c_str(), CurrentOpenedFiles.load()), ERT_FILE_LOCATION));
// metrics
if (metrics_) observed_open_operation_counter_->Increment();
}
else {
LOGWARNING(ert::tracing::Logger::warning(ert::tracing::Logger::asString("Failed open to write operation for '%s'", path_.c_str()), ERT_FILE_LOCATION));
// metrics
if (metrics_) observed_error_open_operation_counter_->Increment();
return false;
}
//lock.unlock();
Expand All @@ -111,6 +131,8 @@ void SafeFile::close() {
opened_ = false;
CurrentOpenedFiles--;
LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("'%s' closed (currently opened: %d)", path_.c_str(), CurrentOpenedFiles.load()), ERT_FILE_LOCATION));
// metrics
if (metrics_) observed_close_operation_counter_->Increment();

lock.unlock();
OpenedFilesCV.notify_one();
Expand All @@ -120,6 +142,8 @@ void SafeFile::empty() {
close();
open(std::ofstream::out | std::ofstream::trunc);
close();
// metrics
if (metrics_) observed_empty_operation_counter_->Increment();
}

std::string SafeFile::read(bool &success, std::ios_base::openmode mode) {
Expand All @@ -138,6 +162,9 @@ std::string SafeFile::read(bool &success, std::ios_base::openmode mode) {
// close the file after reading it:
close();

// metrics (not needed by h2agent)
//if (metrics_) ?->Increment();

return result;
}

Expand Down Expand Up @@ -175,12 +202,18 @@ void SafeFile::write (const std::string& data) {
file_.write(data.c_str(), data.size());
LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("Data written into '%s'", path_.c_str()), ERT_FILE_LOCATION));

// metrics
if (metrics_) observed_write_operation_counter_->Increment();

// Close file:
if (io_service_ && close_delay_us_ != 0) {
delayedClose();
}
else {
close();

// metrics
if (metrics_) observed_instant_close_operation_counter_->Increment();
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/model/SafeFile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ SOFTWARE.
#include <boost/date_time/posix_time/posix_time.hpp>
#include <nlohmann/json.hpp>

#include <ert/metrics/Metrics.hpp>


namespace h2agent
{
Expand All @@ -70,6 +72,16 @@ class SafeFile {
unsigned int close_delay_us_;
boost::asio::io_service *io_service_{};

ert::metrics::Metrics *metrics_{};

ert::metrics::counter_t *observed_open_operation_counter_{};
ert::metrics::counter_t *observed_close_operation_counter_{};
ert::metrics::counter_t *observed_write_operation_counter_{};
ert::metrics::counter_t *observed_empty_operation_counter_{};
ert::metrics::counter_t *observed_delayed_close_operation_counter_{};
ert::metrics::counter_t *observed_instant_close_operation_counter_{};
ert::metrics::counter_t *observed_error_open_operation_counter_{};

void delayedClose();

public:
Expand All @@ -82,6 +94,8 @@ class SafeFile {
* operations with the intention to reduce overhead in some scenarios. By default
* it is not used (if not provided in constructor), so delay is not performed
* regardless the close delay configured.
* @param metrics underlaying reference for SafeFile in order to compute prometheus metrics
* about I/O operations. It may be 'nullptr' if no metrics are enabled.
* @param closeDelayUs delay after last write operation, to close the file. By default
* it is configured to 1 second, something appropiate to log over long term files.
* Zero value means that no planned close is scheduled, so the file is opened,
Expand All @@ -96,6 +110,7 @@ class SafeFile {
*/
SafeFile (const std::string& path,
boost::asio::io_service *timersIoService = nullptr,
ert::metrics::Metrics *metrics = nullptr,
unsigned int closeDelayUs = 1000000 /* 1 second */,
std::ios_base::openmode mode = std::ofstream::out | std::ios_base::app);

Expand Down
Loading

0 comments on commit e784cd0

Please sign in to comment.