diff --git a/src/internal_modules/roc_core/csv_dumper.cpp b/src/internal_modules/roc_core/csv_dumper.cpp index 45cda6e8e..ba33ec4ba 100644 --- a/src/internal_modules/roc_core/csv_dumper.cpp +++ b/src/internal_modules/roc_core/csv_dumper.cpp @@ -16,27 +16,56 @@ namespace core { CsvDumper::CsvDumper(const CsvConfig& config, IArena& arena) : config_(config) - , ringbuf_(arena, config.max_queued) - , valid_(false) { - if (!config.dump_file || !open_(config.dump_file)) { - return; + , open_flag_(false) + , stop_flag_(false) + , file_(NULL) + , ringbuf_(arena, config.max_queued) { + if (!config.dump_file) { + roc_panic("csv dumper: dump file is null"); } - valid_ = true; } CsvDumper::~CsvDumper() { - if (is_joinable()) { - roc_panic("csv dumper: attempt to call destructor" - " before calling stop() and join()"); + if (open_flag_ && !stop_flag_) { + roc_panic("csv dumper: close() not called before destructor"); + } +} + +status::StatusCode CsvDumper::open() { + Mutex::Lock lock(open_mutex_); + + if (open_flag_) { + roc_panic("csv dumper: open() already called"); + } + + open_flag_ = true; + + if (!open_(config_.dump_file)) { + return status::StatusErrFile; + } + + if (!Thread::start()) { + return status::StatusErrThread; } + return status::StatusOK; +} + +void CsvDumper::close() { + Mutex::Lock lock(open_mutex_); + + stop_flag_ = true; + write_sem_.post(); + + Thread::join(); + close_(); } bool CsvDumper::would_write(char type) { - roc_panic_if(!valid_); + roc_panic_if(!open_flag_); - if (stop_) { + if (stop_flag_) { return false; } @@ -52,9 +81,9 @@ bool CsvDumper::would_write(char type) { } void CsvDumper::write(const CsvEntry& entry) { - roc_panic_if(!valid_); + roc_panic_if(!open_flag_); - if (stop_) { + if (stop_flag_) { return; } @@ -73,17 +102,10 @@ void CsvDumper::write(const CsvEntry& entry) { write_sem_.post(); } -void CsvDumper::stop() { - stop_ = true; - write_sem_.post(); -} - void CsvDumper::run() { - roc_panic_if(!valid_); - roc_log(LogDebug, "csv dumper: running background thread"); - while (!stop_ || !ringbuf_.is_empty()) { + while (!stop_flag_ || !ringbuf_.is_empty()) { if (ringbuf_.is_empty()) { write_sem_.wait(); } @@ -97,8 +119,6 @@ void CsvDumper::run() { } roc_log(LogDebug, "csv dumper: exiting background thread"); - - close_(); } RateLimiter& CsvDumper::limiter_(char type) { @@ -167,9 +187,5 @@ bool CsvDumper::dump_(const CsvEntry& entry) { return true; } -bool CsvDumper::is_valid() const { - return valid_; -} - } // namespace core } // namespace roc diff --git a/src/internal_modules/roc_core/csv_dumper.h b/src/internal_modules/roc_core/csv_dumper.h index bdc794794..8b781a5ce 100644 --- a/src/internal_modules/roc_core/csv_dumper.h +++ b/src/internal_modules/roc_core/csv_dumper.h @@ -21,6 +21,7 @@ #include "roc_core/stddefs.h" #include "roc_core/thread.h" #include "roc_core/time.h" +#include "roc_status/status_code.h" namespace roc { namespace core { @@ -66,18 +67,17 @@ struct CsvConfig { //! Asynchronous CSV dumper. //! Writes entries to CSV file from background thread. //! Recommended to be used from a single thread. -class CsvDumper : public Thread { +class CsvDumper : private Thread { public: - //! Open file. - //! @p path - output file. - //! @p max_interval - maximum number of writes per second for each entry type. + //! Initialize. CsvDumper(const CsvConfig& config, IArena& arena); - - //! Close file. ~CsvDumper(); - //! Check if opened without errors. - bool is_valid() const; + //! Open file and start background thread. + ROC_ATTR_NODISCARD status::StatusCode open(); + + //! Stop background thread and close file. + void close(); //! Check whether write() would enqueue or drop entry. //! Lock-free operation. @@ -89,9 +89,6 @@ class CsvDumper : public Thread { //! Lock-free operation. void write(const CsvEntry& entry); - //! Stop background thread. - void stop(); - private: virtual void run(); @@ -103,6 +100,9 @@ class CsvDumper : public Thread { const CsvConfig config_; + Mutex open_mutex_; + Atomic open_flag_; + Atomic stop_flag_; FILE* file_; Mutex write_mutex_; @@ -110,9 +110,6 @@ class CsvDumper : public Thread { SpscRingBuffer ringbuf_; Optional rate_lims_[128]; - - Atomic stop_; - bool valid_; }; } // namespace core diff --git a/src/internal_modules/roc_pipeline/config.cpp b/src/internal_modules/roc_pipeline/config.cpp index 71de60b79..ca7ee50c3 100644 --- a/src/internal_modules/roc_pipeline/config.cpp +++ b/src/internal_modules/roc_pipeline/config.cpp @@ -18,9 +18,8 @@ SenderSinkConfig::SenderSinkConfig() , packet_length(DefaultPacketLength) , enable_cpu_clock(false) , enable_auto_cts(false) - , enable_profiling(false) , enable_interleaving(false) - , dump_file(NULL) { + , enable_profiling(false) { } void SenderSinkConfig::deduce_defaults(audio::ProcessorMap& processor_map) { diff --git a/src/internal_modules/roc_pipeline/config.h b/src/internal_modules/roc_pipeline/config.h index 4d58ee48b..b5051f7c6 100644 --- a/src/internal_modules/roc_pipeline/config.h +++ b/src/internal_modules/roc_pipeline/config.h @@ -96,16 +96,16 @@ struct SenderSinkConfig { //! Automatically fill capture timestamps of input frames with invocation time. bool enable_auto_cts; - //! Profile moving average of frames being written. - bool enable_profiling; - //! Interleave packets. bool enable_interleaving; - //! File to a dump file in csv format with some run-time metrics. - const char* dump_file; + //! Profile moving average of frames being written. + bool enable_profiling; - // Initialize config. + //! Parameters for a logger in csv format with some run-time metrics. + core::CsvConfig dumper; + + //! Initialize config. SenderSinkConfig(); //! Fill unset values with defaults. diff --git a/src/internal_modules/roc_pipeline/receiver_source.cpp b/src/internal_modules/roc_pipeline/receiver_source.cpp index 03597e4c0..145ac231c 100644 --- a/src/internal_modules/roc_pipeline/receiver_source.cpp +++ b/src/internal_modules/roc_pipeline/receiver_source.cpp @@ -34,8 +34,7 @@ ReceiverSource::ReceiverSource(const ReceiverSourceConfig& source_config, if (source_config.common.dumper.dump_file) { dumper_.reset(new (dumper_) core::CsvDumper(source_config.common.dumper, arena)); - if (!dumper_->start()) { - init_status_ = status::StatusErrFile; + if ((init_status_ = dumper_->open()) != status::StatusOK) { return; } } @@ -85,9 +84,8 @@ ReceiverSource::ReceiverSource(const ReceiverSourceConfig& source_config, } ReceiverSource::~ReceiverSource() { - if (dumper_ && dumper_->is_valid() && dumper_->is_joinable()) { - dumper_->stop(); - dumper_->join(); + if (dumper_) { + dumper_->close(); } } diff --git a/src/internal_modules/roc_pipeline/sender_sink.cpp b/src/internal_modules/roc_pipeline/sender_sink.cpp index d313ce163..ea80f6bf2 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.cpp +++ b/src/internal_modules/roc_pipeline/sender_sink.cpp @@ -29,10 +29,16 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config, , frame_factory_(frame_pool, frame_buffer_pool) , arena_(arena) , frame_writer_(NULL) - , dumper_config_() , init_status_(status::NoStatus) { sink_config_.deduce_defaults(processor_map); + if (sink_config_.dumper.dump_file) { + dumper_.reset(new (dumper_) core::CsvDumper(sink_config_.dumper, arena)); + if ((init_status_ = dumper_->open()) != status::StatusOK) { + return; + } + } + audio::IFrameWriter* frm_writer = NULL; { @@ -69,17 +75,16 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config, frm_writer = profiler_.get(); } - if (sink_config_.dump_file) { - dumper_.reset(new (dumper_) core::CsvDumper(dumper_config_, arena)); - if (!dumper_->start()) { - return; - } - } - frame_writer_ = frm_writer; init_status_ = status::StatusOK; } +SenderSink::~SenderSink() { + if (dumper_) { + dumper_->close(); + } +} + status::StatusCode SenderSink::init_status() const { return init_status_; } diff --git a/src/internal_modules/roc_pipeline/sender_sink.h b/src/internal_modules/roc_pipeline/sender_sink.h index 4ef055e37..3a0d93f48 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.h +++ b/src/internal_modules/roc_pipeline/sender_sink.h @@ -53,6 +53,8 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> { core::IPool& frame_buffer_pool, core::IArena& arena); + ~SenderSink(); + //! Check if the pipeline was successfully constructed. status::StatusCode init_status() const; @@ -121,6 +123,8 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> { StateTracker state_tracker_; + core::Optional dumper_; + core::Optional fanout_; core::Optional profiler_; core::Optional pcm_mapper_; @@ -129,9 +133,6 @@ class SenderSink : public sndio::ISink, public core::NonCopyable<> { audio::IFrameWriter* frame_writer_; - const core::CsvConfig dumper_config_; - core::Optional dumper_; - status::StatusCode init_status_; }; diff --git a/src/tools/roc_recv/main.cpp b/src/tools/roc_recv/main.cpp index d5637a445..339c0c72f 100644 --- a/src/tools/roc_recv/main.cpp +++ b/src/tools/roc_recv/main.cpp @@ -294,6 +294,10 @@ int main(int argc, char** argv) { receiver_config.common.enable_profiling = args.profile_flag; + if (args.dump_given) { + receiver_config.common.dumper.dump_file = args.dump_arg; + } + node::ContextConfig context_config; if (args.max_packet_size_given) { @@ -464,10 +468,6 @@ int main(int argc, char** argv) { } } - if (args.dump_given) { - receiver_config.common.dumper.dump_file = args.dump_arg; - } - node::Receiver receiver(context, receiver_config); if (receiver.init_status() != status::StatusOK) { roc_log(LogError, "can't create receiver node: status=%s", diff --git a/src/tools/roc_send/main.cpp b/src/tools/roc_send/main.cpp index c36b2ba1e..41cb6b918 100644 --- a/src/tools/roc_send/main.cpp +++ b/src/tools/roc_send/main.cpp @@ -232,6 +232,10 @@ int main(int argc, char** argv) { sender_config.enable_profiling = args.profile_flag; + if (args.dump_given) { + sender_config.dumper.dump_file = args.dump_arg; + } + node::ContextConfig context_config; if (args.max_packet_size_given) { @@ -442,10 +446,6 @@ int main(int argc, char** argv) { return 1; } - if (args.dump_given) { - sender_config.dump_file = args.dump_arg; - } - sndio::Config pump_config; pump_config.sample_spec = input_source->sample_spec(); pump_config.frame_length = io_config.frame_length;