diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 4e32b22351..c792b2b448 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -19,6 +19,7 @@ namespace turi { // private namespace namespace { const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB +const char* const WRITE_STREAM_SUFFIX = ".write"; } // namespace /* @@ -208,7 +209,16 @@ class read_caching_device { m_filename_to_stop_watch[filename]->start(); } } else { - m_contents = std::make_shared(filename, write); + // for write stream, should be treated different with read stream + m_contents = std::make_shared(m_filename, write); + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; + if (m_filename_to_stop_watch.find(watch_key) == + m_filename_to_stop_watch.end()) { + m_filename_to_stop_watch[watch_key] = + std::unique_ptr(new StopWatchSec_t(30)); + } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[watch_key]->start(); } m_writing = write; @@ -232,13 +242,24 @@ class read_caching_device { // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); - auto iter = m_filename_to_stop_watch.find(m_filename); + auto iter = + m_filename_to_stop_watch.find(m_filename + WRITE_STREAM_SUFFIX); if (iter != m_filename_to_stop_watch.end()) { // use no throw since this nanny thread may not start clock if (iter->second->stop(/* no throw */ true) == 0) { + /* + * the last part upload mostly happens at `close`, which is by + * design of the async S3 multipart uploading; Unlike read stream, + * each read a sync operation. + * */ + logprogress_stream + << "Finished uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << iter->second->template duration() + .count() + << " seconds" << std::endl; // nobody is holding m_filename_to_stop_watch.erase(iter); - m_filename_to_filesize_map.erase(m_filename); } } } @@ -299,22 +320,32 @@ class read_caching_device { } std::streamsize write(const char* strm_ptr, std::streamsize n) { + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; + { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_stop_watch.count(m_filename) == 0) + if (m_filename_to_stop_watch.count(watch_key) == 0) log_and_throw("write through closed files handle"); - m_filename_to_stop_watch[m_filename]->start(); + m_filename_to_stop_watch[watch_key]->start(); } auto bytes_written = get_contents()->write(strm_ptr, n); { + /* + * usually `n` is 4096; 4KB. + * it relies on the underlying buffer of m_contents implementation. + * For s3 impl, once the buffer is full, the upload may happen through + * network. Again, really depend on how AWS S3 multipart upload works. + * + * besides that, this first writes stream to local disk, that is to say, + * elapsed time measured here can be very small. + * */ std::lock_guard file_size_guard(m_filesize_cache_mutex); - auto& stop_watch = m_filename_to_stop_watch[m_filename]; + auto& stop_watch = m_filename_to_stop_watch[watch_key]; if (stop_watch->is_time_to_record()) { logprogress_stream - << "Finish uploading " << sanitize_url(m_filename) - << ". Elapsed time " + << "Uploading " << sanitize_url(m_filename) << ". Elapsed time " << stop_watch->template duration().count() << " seconds" << std::endl; }