Skip to content

Commit

Permalink
draft-713: Move CsvDumper and TempFile to new module roc_dbgio
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Jul 28, 2024
1 parent c3f3f9a commit 68403ac
Show file tree
Hide file tree
Showing 33 changed files with 114 additions and 106 deletions.
1 change: 1 addition & 0 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ env['ROC_SOVER'] = '.'.join(env['ROC_VERSION'].split('.')[:2])
env['ROC_MODULES'] = [
'roc_core',
'roc_status',
'roc_dbgio',
'roc_address',
'roc_packet',
'roc_audio',
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/feedback_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ FeedbackMonitor::FeedbackMonitor(IFrameWriter& writer,
const FeedbackConfig& feedback_config,
const LatencyConfig& latency_config,
const SampleSpec& sample_spec,
core::CsvDumper* dumper)
dbgio::CsvDumper* dumper)
: tuner_(latency_config, sample_spec, dumper)
, use_packetizer_(false)
, has_feedback_(false)
Expand Down
3 changes: 2 additions & 1 deletion src/internal_modules/roc_audio/feedback_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "roc_core/noncopyable.h"
#include "roc_core/rate_limiter.h"
#include "roc_core/time.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/ilink_meter.h"

namespace roc {
Expand Down Expand Up @@ -69,7 +70,7 @@ class FeedbackMonitor : public IFrameWriter, public core::NonCopyable<> {
const FeedbackConfig& feedback_config,
const LatencyConfig& latency_config,
const SampleSpec& sample_spec,
core::CsvDumper* dumper);
dbgio::CsvDumper* dumper);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;
Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_audio/freq_estimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ double dot_prod(const double* coeff,

FreqEstimator::FreqEstimator(FreqEstimatorProfile profile,
packet::stream_timestamp_t target_latency,
core::CsvDumper* dumper)
dbgio::CsvDumper* dumper)
: config_(make_config(profile))
, target_(target_latency)
, dec1_ind_(0)
Expand Down Expand Up @@ -202,7 +202,7 @@ double FreqEstimator::run_controller_(double current) {
}

void FreqEstimator::dump_(double filtered) {
core::CsvEntry e;
dbgio::CsvEntry e;
e.type = 'f';
e.n_fields = 5;
e.fields[0] = core::timestamp(core::ClockUnix);
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_audio/freq_estimator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "roc_audio/freq_estimator_decim.h"
#include "roc_audio/sample.h"
#include "roc_core/csv_dumper.h"
#include "roc_core/noncopyable.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/units.h"

namespace roc {
Expand Down Expand Up @@ -84,7 +84,7 @@ class FreqEstimator : public core::NonCopyable<> {
//! - @p target_latency defines latency we want to archive.
FreqEstimator(FreqEstimatorProfile profile,
packet::stream_timestamp_t target_latency,
roc::core::CsvDumper* dumper);
dbgio::CsvDumper* dumper);

//! Get current frequecy coefficient.
float freq_coeff() const;
Expand Down Expand Up @@ -125,7 +125,7 @@ class FreqEstimator : public core::NonCopyable<> {
// Last time when FreqEstimator was out of range.
core::nanoseconds_t last_unstable_time_;

core::CsvDumper* dumper_;
dbgio::CsvDumper* dumper_;
};

} // namespace audio
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader,
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
const SampleSpec& frame_sample_spec,
core::CsvDumper* dumper)
dbgio::CsvDumper* dumper)
: tuner_(config, frame_sample_spec, dumper)
, frame_reader_(frame_reader)
, incoming_queue_(incoming_queue)
Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include "roc_audio/resampler_reader.h"
#include "roc_audio/sample_spec.h"
#include "roc_core/attributes.h"
#include "roc_core/csv_dumper.h"
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_fec/block_reader.h"
#include "roc_packet/sorted_queue.h"
#include "roc_packet/units.h"
Expand Down Expand Up @@ -70,7 +70,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
const LatencyConfig& config,
const SampleSpec& packet_sample_spec,
const SampleSpec& frame_sample_spec,
core::CsvDumper* dumper);
dbgio::CsvDumper* dumper);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;
Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_audio/latency_tuner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ LatencyConfig::calc_latency_tolerance(const core::nanoseconds_t latency,

LatencyTuner::LatencyTuner(const LatencyConfig& config,
const SampleSpec& sample_spec,
core::CsvDumper* dumper)
dbgio::CsvDumper* dumper)
: stream_pos_(0)
, scale_interval_(0)
, scale_pos_(0)
Expand Down Expand Up @@ -439,7 +439,7 @@ void LatencyTuner::write_metrics(const LatencyMetrics& latency_metrics,
}

if (dumper_) {
core::CsvEntry e;
dbgio::CsvEntry e;
e.type = 't';
e.n_fields = 3;
e.fields[0] = core::timestamp(core::ClockUnix);
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_audio/latency_tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

#include "roc_audio/freq_estimator.h"
#include "roc_audio/sample_spec.h"
#include "roc_core/csv_dumper.h"
#include "roc_core/noncopyable.h"
#include "roc_core/optional.h"
#include "roc_core/time.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/ilink_meter.h"
#include "roc_packet/units.h"
#include "roc_status/status_code.h"
Expand Down Expand Up @@ -251,7 +251,7 @@ class LatencyTuner : public core::NonCopyable<> {
//! Initialize.
LatencyTuner(const LatencyConfig& config,
const SampleSpec& sample_spec,
core::CsvDumper* dumper);
dbgio::CsvDumper* dumper);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;
Expand Down Expand Up @@ -350,7 +350,7 @@ class LatencyTuner : public core::NonCopyable<> {

core::RateLimiter last_lat_limiter_;

core::CsvDumper* dumper_;
dbgio::CsvDumper* dumper_;

status::StatusCode init_status_;
void try_decrease_latency_(const core::nanoseconds_t estimate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "roc_core/csv_dumper.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_core/errno_to_str.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"

namespace roc {
namespace core {
namespace dbgio {

CsvDumper::CsvDumper(const CsvConfig& config, IArena& arena)
CsvDumper::CsvDumper(const CsvConfig& config, core::IArena& arena)
: config_(config)
, open_flag_(false)
, stop_flag_(false)
Expand All @@ -32,7 +32,7 @@ CsvDumper::~CsvDumper() {
}

status::StatusCode CsvDumper::open() {
Mutex::Lock lock(open_mutex_);
core::Mutex::Lock lock(open_mutex_);

if (open_flag_) {
roc_panic("csv dumper: open() already called");
Expand All @@ -52,7 +52,7 @@ status::StatusCode CsvDumper::open() {
}

void CsvDumper::close() {
Mutex::Lock lock(open_mutex_);
core::Mutex::Lock lock(open_mutex_);

stop_flag_ = true;
write_sem_.post();
Expand Down Expand Up @@ -121,13 +121,14 @@ void CsvDumper::run() {
roc_log(LogDebug, "csv dumper: exiting background thread");
}

RateLimiter& CsvDumper::limiter_(char type) {
core::RateLimiter& CsvDumper::limiter_(char type) {
roc_panic_if(!isalnum(type));

const size_t idx = (size_t)type;

if (!rate_lims_[idx]) {
rate_lims_[idx].reset(new (rate_lims_[idx]) RateLimiter(config_.max_interval));
rate_lims_[idx].reset(new (rate_lims_[idx])
core::RateLimiter(config_.max_interval));
}

return *rate_lims_[idx];
Expand All @@ -139,7 +140,7 @@ bool CsvDumper::open_(const char* path) {
file_ = fopen(path, "w");
if (!file_) {
roc_log(LogError, "csv dumper: failed to open output file \"%s\": %s", path,
errno_to_str().c_str());
core::errno_to_str().c_str());
return false;
}

Expand All @@ -150,7 +151,7 @@ void CsvDumper::close_() {
if (file_) {
if (fclose(file_) != 0) {
roc_log(LogError, "csv dumper: failed to close output file: %s",
errno_to_str().c_str());
core::errno_to_str().c_str());
}
file_ = NULL;
}
Expand Down Expand Up @@ -180,12 +181,12 @@ bool CsvDumper::dump_(const CsvEntry& entry) {

if (fprintf(file_, "%s\n", line) < 0) {
roc_log(LogError, "csv dumper: failed to write output file: %s",
errno_to_str().c_str());
core::errno_to_str().c_str());
return false;
}

return true;
}

} // namespace core
} // namespace dbgio
} // namespace roc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/csv_dumper.h
//! @file roc_dbgio/csv_dumper.h
//! @brief Asynchronous CSV dumper.

#ifndef ROC_CORE_CSV_DUMPER_H_
#define ROC_CORE_CSV_DUMPER_H_
#ifndef ROC_DBGIO_CSV_DUMPER_H_
#define ROC_DBGIO_CSV_DUMPER_H_

#include "roc_core/atomic.h"
#include "roc_core/mutex.h"
Expand All @@ -24,7 +24,7 @@
#include "roc_status/status_code.h"

namespace roc {
namespace core {
namespace dbgio {

//! Maximum number of fields in CSV entry.
static const size_t Csv_MaxFields = 10;
Expand Down Expand Up @@ -55,22 +55,22 @@ struct CsvConfig {
//! Maximum allowed interval between subsequent entries of same type.
//! If zero, there is no limit.
//! If non-zero, each entry type is rate-limited according to this.
nanoseconds_t max_interval;
core::nanoseconds_t max_interval;

CsvConfig()
: dump_file(NULL)
, max_queued(1000)
, max_interval(Millisecond) {
, max_interval(core::Millisecond) {
}
};

//! Asynchronous CSV dumper.
//! Writes entries to CSV file from background thread.
//! Recommended to be used from a single thread.
class CsvDumper : private Thread {
class CsvDumper : private core::Thread {
public:
//! Initialize.
CsvDumper(const CsvConfig& config, IArena& arena);
CsvDumper(const CsvConfig& config, core::IArena& arena);
~CsvDumper();

//! Open file and start background thread.
Expand All @@ -92,27 +92,27 @@ class CsvDumper : private Thread {
private:
virtual void run();

RateLimiter& limiter_(char type);
core::RateLimiter& limiter_(char type);

bool open_(const char* path);
void close_();
bool dump_(const CsvEntry& entry);

const CsvConfig config_;

Mutex open_mutex_;
Atomic<int> open_flag_;
Atomic<int> stop_flag_;
core::Mutex open_mutex_;
core::Atomic<int> open_flag_;
core::Atomic<int> stop_flag_;
FILE* file_;

Mutex write_mutex_;
Semaphore write_sem_;
SpscRingBuffer<CsvEntry> ringbuf_;
core::Mutex write_mutex_;
core::Semaphore write_sem_;
core::SpscRingBuffer<CsvEntry> ringbuf_;

Optional<RateLimiter> rate_lims_[128];
core::Optional<core::RateLimiter> rate_lims_[128];
};

} // namespace core
} // namespace dbgio
} // namespace roc

#endif // ROC_CORE_CSV_DUMPER_H_
#endif // ROC_DBGIO_CSV_DUMPER_H_
Loading

0 comments on commit 68403ac

Please sign in to comment.