-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
338 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,338 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License Version 2.0 with LLVM Exceptions | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* https://llvm.org/LICENSE.txt | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include <unifex/config.hpp> | ||
|
||
#if !UNIFEX_NO_LIBURING | ||
|
||
#include <unifex/scheduler_concepts.hpp> | ||
#include <unifex/scope_guard.hpp> | ||
#include <unifex/inplace_stop_token.hpp> | ||
#include <unifex/async_manual_reset_event.hpp> | ||
#include <unifex/static_thread_pool.hpp> | ||
#include <unifex/linux/io_uring_context.hpp> | ||
#include <unifex/async_scope.hpp> | ||
#include <unifex/defer.hpp> | ||
#include <unifex/let_value_with.hpp> | ||
#include <unifex/let_error.hpp> | ||
#include <unifex/let_done.hpp> | ||
#include <unifex/sequence.hpp> | ||
#include <unifex/sync_wait.hpp> | ||
#include <unifex/finally.hpp> | ||
#include <unifex/stop_when.hpp> | ||
#include <unifex/then.hpp> | ||
#include <unifex/just.hpp> | ||
#include <unifex/just_from.hpp> | ||
#include <unifex/just_done.hpp> | ||
#include <unifex/just_error.hpp> | ||
#include <unifex/repeat_effect_until.hpp> | ||
|
||
#include <cstdio> | ||
|
||
#include <chrono> | ||
#include <charconv> | ||
#include <string> | ||
#include <string_view> | ||
#include <vector> | ||
#include <thread> | ||
#include <filesystem> | ||
|
||
using namespace unifex; | ||
using namespace unifex::linuxos; | ||
using namespace std::chrono; | ||
using namespace std::chrono_literals; | ||
using namespace std::string_view_literals; | ||
namespace fs = std::filesystem; | ||
|
||
inline constexpr size_t bufferSize = 64000; | ||
inline constexpr int highInFlightMarker = 500; | ||
inline constexpr int lowInFlightMarker = 400; | ||
|
||
// This could be made generic across any scheduler that supports the | ||
// async_write_only_file() CPO. | ||
|
||
auto copy_file(io_uring_context::scheduler s, const fs::path& from, const fs::path& to) { | ||
return let_value_with( | ||
[s, from, to]() { | ||
return std::make_tuple( | ||
open_file_read_only(s, from), | ||
open_file_write_only(s, to)); | ||
}, | ||
[ | ||
index = 0, | ||
buffer = std::vector<char>{}, | ||
pending = span{(char*)nullptr, 0} | ||
](auto& files) mutable { | ||
auto& [from, to] = files; | ||
buffer.resize(bufferSize); | ||
buffer.resize(buffer.capacity()); | ||
return defer([&](){ | ||
return sequence( | ||
async_read_some_at( | ||
from, | ||
index, | ||
as_writable_bytes(span{buffer.data(), buffer.size()})) | ||
| then([&](ssize_t bytesRead) { | ||
pending = span{buffer.data(), size_t(bytesRead)}; | ||
if(bytesRead == 0) { | ||
// signal complete | ||
pending = span{(char*)nullptr, 0}; | ||
} | ||
}), | ||
defer([&](){ | ||
return async_write_some_at( | ||
to, | ||
index, | ||
as_bytes(pending)) | ||
| then([&](ssize_t bytesWritten) { | ||
index += bytesWritten; | ||
pending = span{pending.begin() + bytesWritten, pending.size() - bytesWritten}; | ||
}); | ||
}) | ||
| repeat_effect_until([&](){return pending.size() == 0;})); | ||
}) | ||
| repeat_effect_until([&]{return !pending.begin();}); | ||
}); | ||
} | ||
|
||
auto copy_files( | ||
io_uring_context::scheduler s, | ||
static_thread_pool::scheduler pool, | ||
const fs::path& from, | ||
const fs::path& to) noexcept | ||
{ | ||
using state_t = std::tuple< | ||
fs::recursive_directory_iterator, | ||
unifex::async_scope, | ||
std::atomic<int>, | ||
unifex::async_manual_reset_event>; | ||
return let_value_with( | ||
[from]() -> state_t { | ||
return {}; | ||
}, | ||
[s, pool, from, to](state_t& state) mutable { | ||
auto& [entry, scope, pending, drain] = state; | ||
entry = fs::recursive_directory_iterator(from); | ||
drain.set(); | ||
return sequence( | ||
sequence( | ||
just_from([&]{ | ||
if(++pending >= highInFlightMarker && drain.ready()) { | ||
// wait for some files to complete | ||
drain.reset(); | ||
} | ||
}), | ||
drain.async_wait(), | ||
just_from([&]{ | ||
if (entry != end(entry)) { | ||
if (entry->is_directory()) { | ||
// skip this item | ||
--pending; | ||
++entry; | ||
return; | ||
} | ||
|
||
const auto& p = (*entry).path(); | ||
// Create path in target, if not existing. | ||
const auto relativeSrc = fs::relative(p, from); | ||
const auto targetParentPath = to / relativeSrc.parent_path(); | ||
const auto targetParentFile = targetParentPath / p.filename(); | ||
|
||
// Copy to the targetParentPath which we just created. | ||
scope.spawn_on(pool, sequence( | ||
just_from([&, targetParentPath]{ | ||
fs::create_directories(targetParentPath); | ||
}), | ||
copy_file(s, p, targetParentFile) | ||
| then([&, p, targetParentFile] () noexcept { | ||
printf("%d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str()); | ||
if (--pending <= lowInFlightMarker && !drain.ready()) { | ||
// resume file iteration | ||
drain.set(); | ||
} | ||
fflush(stdout); | ||
}) | ||
| let_error([&, p, targetParentFile](auto e) noexcept { | ||
if constexpr (convertible_to<std::exception_ptr, decltype(e)>) { | ||
try {std::rethrow_exception(e);} catch(const std::exception& ex) { | ||
printf("EXCEPTION: '%s' %d: %s -> %s\n", ex.what(), pending.load(), p.c_str(), targetParentFile.c_str()); | ||
} catch(...) { | ||
printf("UNKNOWN EXCEPTION: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str()); | ||
} | ||
} else if constexpr (convertible_to<std::error_code, decltype(e)>) { | ||
printf("ERRORCODE: '%s' %d: %s -> %s\n", e.message().c_str(), pending.load(), p.c_str(), targetParentFile.c_str()); | ||
} else { | ||
printf("UNKNOWN ERROR: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str()); | ||
} | ||
fflush(stdout); | ||
|
||
// keep going | ||
if (--pending <= lowInFlightMarker && !drain.ready()) { | ||
// resume file iteration | ||
drain.set(); | ||
} | ||
return just(); | ||
}) | ||
| let_done([&, p, targetParentFile] () noexcept { | ||
printf("CANCELLED: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str()); | ||
if (--pending <= lowInFlightMarker && !drain.ready()) { | ||
// resume file iteration | ||
drain.set(); | ||
} | ||
return just_done(); | ||
}) | ||
)); | ||
|
||
// advance to next directory entry | ||
++entry; | ||
} | ||
}) | ||
) | ||
| repeat_effect_until([&] () noexcept { | ||
return entry == end(entry); | ||
}), | ||
scope.complete()) | ||
| let_error([&] (auto&&) noexcept { | ||
return scope.cleanup(); | ||
}) | ||
| let_done([&] () noexcept { | ||
return scope.cleanup(); | ||
}); | ||
}); | ||
} | ||
|
||
auto copy_files(const fs::path& from, const fs::path& to) noexcept | ||
{ | ||
for (const auto& dirEntry : fs::recursive_directory_iterator(from)) | ||
{ | ||
if (dirEntry.is_directory()) { continue; } | ||
|
||
const auto& p = dirEntry.path(); | ||
// Create path in target, if not existing. | ||
const auto relativeSrc = fs::relative(p, from); | ||
const auto targetParentPath = to / relativeSrc.parent_path(); | ||
const auto targetParentFile = targetParentPath / p.filename(); | ||
|
||
fs::create_directories(targetParentPath); | ||
|
||
// Copy to the targetParentPath which we just created. | ||
try { | ||
copy_file(p, targetParentFile, fs::copy_options::overwrite_existing); | ||
printf("%s -> %s\n", p.c_str(), targetParentFile.c_str()); | ||
} catch(const std::exception& ex) { | ||
printf("EXCEPTION: '%s' %s -> %s\n", ex.what(), p.c_str(), targetParentFile.c_str()); | ||
} | ||
} | ||
} | ||
|
||
int main(int argc, char* argv[]) { | ||
fs::path from; | ||
fs::path to; | ||
auto threadCount = std::thread::hardware_concurrency() - 1; | ||
bool use_std_copy = false; | ||
|
||
int position = 0; | ||
std::vector<std::string_view> args(argv+1, argv+argc); | ||
for (auto arg : args) { | ||
if (arg.find("usestd"sv) == 0) { | ||
use_std_copy = true; | ||
} else if (arg.find("n-of-threads="sv) == 0) { | ||
arg.remove_prefix(13); | ||
|
||
auto [ptr, ec] { std::from_chars(arg.data(), arg.data() + arg.size(), threadCount) }; | ||
|
||
if (ec == std::errc::invalid_argument) { | ||
printf("That isn't a number.\n"); | ||
return -1; | ||
} else if (ec == std::errc::result_out_of_range) { | ||
printf("This number is larger than an int.\n"); | ||
return -1; | ||
} | ||
} else { | ||
if (position == 0) { | ||
printf("from: -> %s\n", arg.data()); | ||
from = arg; | ||
} else if (position == 1){ | ||
printf("to: -> %s\n", arg.data()); | ||
to = arg; | ||
} else { | ||
printf("error: too many positional arguments!"); | ||
return -1; | ||
} | ||
++position; | ||
} | ||
} | ||
|
||
io_uring_context ctx; | ||
static_thread_pool poolContext(threadCount); | ||
auto pool = poolContext.get_scheduler(); | ||
|
||
inplace_stop_source stopSource; | ||
std::thread t{[&] { ctx.run(stopSource.get_token()); }}; | ||
scope_guard stopOnExit = [&]() noexcept { | ||
stopSource.request_stop(); | ||
t.join(); | ||
}; | ||
|
||
auto scheduler = ctx.get_scheduler(); | ||
|
||
try { | ||
using double_sec = duration<double>; | ||
auto start = steady_clock::now(); | ||
auto finish = steady_clock::now(); | ||
if (use_std_copy) { | ||
start = steady_clock::now(); | ||
copy_files(from, to); | ||
finish = steady_clock::now(); | ||
printf("std filesystem: Copied all the files in %6.6f seconds\n", | ||
duration_cast<double_sec>(finish-start).count()); | ||
fflush(stdout); | ||
} else { | ||
sync_wait(sequence( | ||
just_from([&] { | ||
std::printf("copy file\n"); | ||
fflush(stdout); | ||
start = steady_clock::now(); | ||
}), | ||
copy_files(scheduler, pool, from, to) | ||
| stop_when(scheduler.schedule_at(scheduler.now() + 30s)) | ||
| let_done([]{return just_error(std::make_exception_ptr(std::logic_error{"deadlock on copy_files"}));}), | ||
just_from([&] { | ||
finish = steady_clock::now(); | ||
std::printf("copy completed\n"); | ||
fflush(stdout); | ||
}))); | ||
printf("uring: Copied all the files in %6.6f seconds\n", | ||
duration_cast<double_sec>(finish-start).count()); | ||
fflush(stdout); | ||
} | ||
} catch (const std::exception& ex) { | ||
std::printf("error: %s\n", ex.what()); | ||
fflush(stdout); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
#else // UNIFEX_NO_LIBURING | ||
|
||
#include <cstdio> | ||
int main() { | ||
printf("liburing support not found\n"); | ||
return 0; | ||
} | ||
|
||
#endif // UNIFEX_NO_LIBURING |