Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
one last segfault for upload
Browse files Browse the repository at this point in the history
  • Loading branch information
guihao-liang committed May 18, 2020
1 parent 4c18f55 commit e163555
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/core/storage/fileio/read_caching_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace turi {
// private namespace
namespace {
const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB
const char* const WRITE_STREAM_SUFFIX = ".write";
} // namespace

/*
Expand Down Expand Up @@ -208,7 +209,16 @@ class read_caching_device {
m_filename_to_stop_watch[filename]->start();
}
} else {
m_contents = std::make_shared<T>(filename, write);
// for write stream, should be treated different with read stream
m_contents = std::make_shared<T>(m_filename, write);
auto watch_key = m_filename + WRITE_STREAM_SUFFIX;
if (m_filename_to_stop_watch.find(watch_key) ==
m_filename_to_stop_watch.end()) {
m_filename_to_stop_watch[watch_key] =
std::unique_ptr<StopWatchSec_t>(new StopWatchSec_t(30));
}
// start the clock immediately to register the thread id into it
m_filename_to_stop_watch[watch_key]->start();
}

m_writing = write;
Expand All @@ -232,13 +242,24 @@ class read_caching_device {
// evict the file size cache
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto iter = m_filename_to_stop_watch.find(m_filename);
auto iter =
m_filename_to_stop_watch.find(m_filename + WRITE_STREAM_SUFFIX);
if (iter != m_filename_to_stop_watch.end()) {
// use no throw since this nanny thread may not start clock
if (iter->second->stop(/* no throw */ true) == 0) {
/*
* the last part upload mostly happens at `close`, which is by
* design of the async S3 multipart uploading; Unlike read stream,
* each read a sync operation.
* */
logprogress_stream
<< "Finished uploading " << sanitize_url(m_filename)
<< ". Elapsed time "
<< iter->second->template duration<std::chrono::seconds>()
.count()
<< " seconds" << std::endl;
// nobody is holding
m_filename_to_stop_watch.erase(iter);
m_filename_to_filesize_map.erase(m_filename);
}
}
}
Expand Down Expand Up @@ -299,22 +320,32 @@ class read_caching_device {
}

std::streamsize write(const char* strm_ptr, std::streamsize n) {
auto watch_key = m_filename + WRITE_STREAM_SUFFIX;

{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
if (m_filename_to_stop_watch.count(m_filename) == 0)
if (m_filename_to_stop_watch.count(watch_key) == 0)
log_and_throw("write through closed files handle");
m_filename_to_stop_watch[m_filename]->start();
m_filename_to_stop_watch[watch_key]->start();
}

auto bytes_written = get_contents()->write(strm_ptr, n);

{
/*
* usually `n` is 4096; 4KB.
* it relies on the underlying buffer of m_contents implementation.
* For s3 impl, once the buffer is full, the upload may happen through
* network. Again, really depend on how AWS S3 multipart upload works.
*
* besides that, this first writes stream to local disk, that is to say,
* elapsed time measured here can be very small.
* */
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto& stop_watch = m_filename_to_stop_watch[m_filename];
auto& stop_watch = m_filename_to_stop_watch[watch_key];
if (stop_watch->is_time_to_record()) {
logprogress_stream
<< "Finish uploading " << sanitize_url(m_filename)
<< ". Elapsed time "
<< "Uploading " << sanitize_url(m_filename) << ". Elapsed time "
<< stop_watch->template duration<std::chrono::seconds>().count()
<< " seconds" << std::endl;
}
Expand Down

0 comments on commit e163555

Please sign in to comment.