Skip to content

Commit

Permalink
draft-713: CsvDumper refinements
Browse files Browse the repository at this point in the history
- remove is_valid(), inherit from Thread privately
- add open() and close(), return status code
- fix (de)initialization of dumper in pipeline
- fix dumper config in pipeline
  • Loading branch information
gavv committed Jul 28, 2024
1 parent 09b744e commit c3f3f9a
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 72 deletions.
68 changes: 42 additions & 26 deletions src/internal_modules/roc_core/csv_dumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,56 @@ namespace core {

CsvDumper::CsvDumper(const CsvConfig& config, IArena& arena)
: config_(config)
, ringbuf_(arena, config.max_queued)
, valid_(false) {
if (!config.dump_file || !open_(config.dump_file)) {
return;
, open_flag_(false)
, stop_flag_(false)
, file_(NULL)
, ringbuf_(arena, config.max_queued) {
if (!config.dump_file) {
roc_panic("csv dumper: dump file is null");
}
valid_ = true;
}

CsvDumper::~CsvDumper() {
if (is_joinable()) {
roc_panic("csv dumper: attempt to call destructor"
" before calling stop() and join()");
if (open_flag_ && !stop_flag_) {
roc_panic("csv dumper: close() not called before destructor");
}
}

status::StatusCode CsvDumper::open() {
Mutex::Lock lock(open_mutex_);

if (open_flag_) {
roc_panic("csv dumper: open() already called");
}

open_flag_ = true;

if (!open_(config_.dump_file)) {
return status::StatusErrFile;
}

if (!Thread::start()) {
return status::StatusErrThread;
}

return status::StatusOK;
}

void CsvDumper::close() {
Mutex::Lock lock(open_mutex_);

stop_flag_ = true;
write_sem_.post();

Thread::join();

close_();
}

bool CsvDumper::would_write(char type) {
roc_panic_if(!valid_);
roc_panic_if(!open_flag_);

if (stop_) {
if (stop_flag_) {
return false;
}

Expand All @@ -52,9 +81,9 @@ bool CsvDumper::would_write(char type) {
}

void CsvDumper::write(const CsvEntry& entry) {
roc_panic_if(!valid_);
roc_panic_if(!open_flag_);

if (stop_) {
if (stop_flag_) {
return;
}

Expand All @@ -73,17 +102,10 @@ void CsvDumper::write(const CsvEntry& entry) {
write_sem_.post();
}

void CsvDumper::stop() {
stop_ = true;
write_sem_.post();
}

void CsvDumper::run() {
roc_panic_if(!valid_);

roc_log(LogDebug, "csv dumper: running background thread");

while (!stop_ || !ringbuf_.is_empty()) {
while (!stop_flag_ || !ringbuf_.is_empty()) {
if (ringbuf_.is_empty()) {
write_sem_.wait();
}
Expand All @@ -97,8 +119,6 @@ void CsvDumper::run() {
}

roc_log(LogDebug, "csv dumper: exiting background thread");

close_();
}

RateLimiter& CsvDumper::limiter_(char type) {
Expand Down Expand Up @@ -167,9 +187,5 @@ bool CsvDumper::dump_(const CsvEntry& entry) {
return true;
}

bool CsvDumper::is_valid() const {
return valid_;
}

} // namespace core
} // namespace roc
25 changes: 11 additions & 14 deletions src/internal_modules/roc_core/csv_dumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "roc_core/stddefs.h"
#include "roc_core/thread.h"
#include "roc_core/time.h"
#include "roc_status/status_code.h"

namespace roc {
namespace core {
Expand Down Expand Up @@ -66,18 +67,17 @@ struct CsvConfig {
//! Asynchronous CSV dumper.
//! Writes entries to CSV file from background thread.
//! Recommended to be used from a single thread.
class CsvDumper : public Thread {
class CsvDumper : private Thread {
public:
//! Open file.
//! @p path - output file.
//! @p max_interval - maximum number of writes per second for each entry type.
//! Initialize.
CsvDumper(const CsvConfig& config, IArena& arena);

//! Close file.
~CsvDumper();

//! Check if opened without errors.
bool is_valid() const;
//! Open file and start background thread.
ROC_ATTR_NODISCARD status::StatusCode open();

//! Stop background thread and close file.
void close();

//! Check whether write() would enqueue or drop entry.
//! Lock-free operation.
Expand All @@ -89,9 +89,6 @@ class CsvDumper : public Thread {
//! Lock-free operation.
void write(const CsvEntry& entry);

//! Stop background thread.
void stop();

private:
virtual void run();

Expand All @@ -103,16 +100,16 @@ class CsvDumper : public Thread {

const CsvConfig config_;

Mutex open_mutex_;
Atomic<int> open_flag_;
Atomic<int> stop_flag_;
FILE* file_;

Mutex write_mutex_;
Semaphore write_sem_;
SpscRingBuffer<CsvEntry> ringbuf_;

Optional<RateLimiter> rate_lims_[128];

Atomic<int> stop_;
bool valid_;
};

} // namespace core
Expand Down
3 changes: 1 addition & 2 deletions src/internal_modules/roc_pipeline/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ SenderSinkConfig::SenderSinkConfig()
, packet_length(DefaultPacketLength)
, enable_cpu_clock(false)
, enable_auto_cts(false)
, enable_profiling(false)
, enable_interleaving(false)
, dump_file(NULL) {
, enable_profiling(false) {
}

void SenderSinkConfig::deduce_defaults(audio::ProcessorMap& processor_map) {
Expand Down
12 changes: 6 additions & 6 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ struct SenderSinkConfig {
//! Automatically fill capture timestamps of input frames with invocation time.
bool enable_auto_cts;

//! Profile moving average of frames being written.
bool enable_profiling;

//! Interleave packets.
bool enable_interleaving;

//! File to a dump file in csv format with some run-time metrics.
const char* dump_file;
//! Profile moving average of frames being written.
bool enable_profiling;

// Initialize config.
//! Parameters for a logger in csv format with some run-time metrics.
core::CsvConfig dumper;

//! Initialize config.
SenderSinkConfig();

//! Fill unset values with defaults.
Expand Down
8 changes: 3 additions & 5 deletions src/internal_modules/roc_pipeline/receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ ReceiverSource::ReceiverSource(const ReceiverSourceConfig& source_config,

if (source_config.common.dumper.dump_file) {
dumper_.reset(new (dumper_) core::CsvDumper(source_config.common.dumper, arena));
if (!dumper_->start()) {
init_status_ = status::StatusErrFile;
if ((init_status_ = dumper_->open()) != status::StatusOK) {
return;
}
}
Expand Down Expand Up @@ -85,9 +84,8 @@ ReceiverSource::ReceiverSource(const ReceiverSourceConfig& source_config,
}

ReceiverSource::~ReceiverSource() {
if (dumper_ && dumper_->is_valid() && dumper_->is_joinable()) {
dumper_->stop();
dumper_->join();
if (dumper_) {
dumper_->close();
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/internal_modules/roc_pipeline/sender_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config,
, frame_factory_(frame_pool, frame_buffer_pool)
, arena_(arena)
, frame_writer_(NULL)
, dumper_config_()
, init_status_(status::NoStatus) {
sink_config_.deduce_defaults(processor_map);

if (sink_config_.dumper.dump_file) {
dumper_.reset(new (dumper_) core::CsvDumper(sink_config_.dumper, arena));
if ((init_status_ = dumper_->open()) != status::StatusOK) {
return;
}
}

audio::IFrameWriter* frm_writer = NULL;

{
Expand Down Expand Up @@ -69,17 +75,16 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config,
frm_writer = profiler_.get();
}

if (sink_config_.dump_file) {
dumper_.reset(new (dumper_) core::CsvDumper(dumper_config_, arena));
if (!dumper_->start()) {
return;
}
}

frame_writer_ = frm_writer;
init_status_ = status::StatusOK;
}

SenderSink::~SenderSink() {
if (dumper_) {
dumper_->close();
}
}

status::StatusCode SenderSink::init_status() const {
return init_status_;
}
Expand Down
7 changes: 4 additions & 3 deletions src/internal_modules/roc_pipeline/sender_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> {
core::IPool& frame_buffer_pool,
core::IArena& arena);

~SenderSink();

//! Check if the pipeline was successfully constructed.
status::StatusCode init_status() const;

Expand Down Expand Up @@ -121,6 +123,8 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> {

StateTracker state_tracker_;

core::Optional<core::CsvDumper> dumper_;

core::Optional<audio::Fanout> fanout_;
core::Optional<audio::ProfilingWriter> profiler_;
core::Optional<audio::PcmMapperWriter> pcm_mapper_;
Expand All @@ -129,9 +133,6 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> {

audio::IFrameWriter* frame_writer_;

const core::CsvConfig dumper_config_;
core::Optional<core::CsvDumper> dumper_;

status::StatusCode init_status_;
};

Expand Down
8 changes: 4 additions & 4 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ int main(int argc, char** argv) {

receiver_config.common.enable_profiling = args.profile_flag;

if (args.dump_given) {
receiver_config.common.dumper.dump_file = args.dump_arg;
}

node::ContextConfig context_config;

if (args.max_packet_size_given) {
Expand Down Expand Up @@ -464,10 +468,6 @@ int main(int argc, char** argv) {
}
}

if (args.dump_given) {
receiver_config.common.dumper.dump_file = args.dump_arg;
}

node::Receiver receiver(context, receiver_config);
if (receiver.init_status() != status::StatusOK) {
roc_log(LogError, "can't create receiver node: status=%s",
Expand Down
8 changes: 4 additions & 4 deletions src/tools/roc_send/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ int main(int argc, char** argv) {

sender_config.enable_profiling = args.profile_flag;

if (args.dump_given) {
sender_config.dumper.dump_file = args.dump_arg;
}

node::ContextConfig context_config;

if (args.max_packet_size_given) {
Expand Down Expand Up @@ -442,10 +446,6 @@ int main(int argc, char** argv) {
return 1;
}

if (args.dump_given) {
sender_config.dump_file = args.dump_arg;
}

sndio::Config pump_config;
pump_config.sample_spec = input_source->sample_spec();
pump_config.frame_length = io_config.frame_length;
Expand Down

0 comments on commit c3f3f9a

Please sign in to comment.