Skip to content

Commit

Permalink
add file_copy example
Browse files Browse the repository at this point in the history
  • Loading branch information
kirkshoop committed Sep 16, 2021
1 parent b7d5b21 commit 2f16450
Showing 1 changed file with 338 additions and 0 deletions.
338 changes: 338 additions & 0 deletions examples/file_copy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <unifex/config.hpp>

#if !UNIFEX_NO_LIBURING

#include <unifex/scheduler_concepts.hpp>
#include <unifex/scope_guard.hpp>
#include <unifex/inplace_stop_token.hpp>
#include <unifex/async_manual_reset_event.hpp>
#include <unifex/static_thread_pool.hpp>
#include <unifex/linux/io_uring_context.hpp>
#include <unifex/async_scope.hpp>
#include <unifex/defer.hpp>
#include <unifex/let_value_with.hpp>
#include <unifex/let_error.hpp>
#include <unifex/let_done.hpp>
#include <unifex/sequence.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/finally.hpp>
#include <unifex/stop_when.hpp>
#include <unifex/then.hpp>
#include <unifex/just.hpp>
#include <unifex/just_from.hpp>
#include <unifex/just_done.hpp>
#include <unifex/just_error.hpp>
#include <unifex/repeat_effect_until.hpp>

#include <cstdio>

#include <chrono>
#include <charconv>
#include <string>
#include <string_view>
#include <vector>
#include <thread>
#include <filesystem>

using namespace unifex;
using namespace unifex::linuxos;
using namespace std::chrono;
using namespace std::chrono_literals;
using namespace std::string_view_literals;
namespace fs = std::filesystem;

inline constexpr size_t bufferSize = 64000;
inline constexpr int highInFlightMarker = 500;
inline constexpr int lowInFlightMarker = 400;

// This could be made generic across any scheduler that supports the
// async_write_only_file() CPO.

auto copy_file(io_uring_context::scheduler s, const fs::path& from, const fs::path& to) {
return let_value_with(
[s, from, to]() {
return std::make_tuple(
open_file_read_only(s, from),
open_file_write_only(s, to));
},
[
index = 0,
buffer = std::vector<char>{},
pending = span{(char*)nullptr, 0}
](auto& files) mutable {
auto& [from, to] = files;
buffer.resize(bufferSize);
buffer.resize(buffer.capacity());
return defer([&](){
return sequence(
async_read_some_at(
from,
index,
as_writable_bytes(span{buffer.data(), buffer.size()}))
| then([&](ssize_t bytesRead) {
pending = span{buffer.data(), size_t(bytesRead)};
if(bytesRead == 0) {
// signal complete
pending = span{(char*)nullptr, 0};
}
}),
defer([&](){
return async_write_some_at(
to,
index,
as_bytes(pending))
| then([&](ssize_t bytesWritten) {
index += bytesWritten;
pending = span{pending.begin() + bytesWritten, pending.size() - bytesWritten};
});
})
| repeat_effect_until([&](){return pending.size() == 0;}));
})
| repeat_effect_until([&]{return !pending.begin();});
});
}

auto copy_files(
io_uring_context::scheduler s,
static_thread_pool::scheduler pool,
const fs::path& from,
const fs::path& to) noexcept
{
using state_t = std::tuple<
fs::recursive_directory_iterator,
unifex::async_scope,
std::atomic<int>,
unifex::async_manual_reset_event>;
return let_value_with(
[from]() -> state_t {
return {};
},
[s, pool, from, to](state_t& state) mutable {
auto& [entry, scope, pending, drain] = state;
entry = fs::recursive_directory_iterator(from);
drain.set();
return sequence(
sequence(
just_from([&]{
if(++pending >= highInFlightMarker && drain.ready()) {
// wait for some files to complete
drain.reset();
}
}),
drain.async_wait(),
just_from([&]{
if (entry != end(entry)) {
if (entry->is_directory()) {
// skip this item
--pending;
++entry;
return;
}

const auto& p = (*entry).path();
// Create path in target, if not existing.
const auto relativeSrc = fs::relative(p, from);
const auto targetParentPath = to / relativeSrc.parent_path();
const auto targetParentFile = targetParentPath / p.filename();

// Copy to the targetParentPath which we just created.
scope.spawn_on(pool, sequence(
just_from([&, targetParentPath]{
fs::create_directories(targetParentPath);
}),
copy_file(s, p, targetParentFile)
| then([&, p, targetParentFile] () noexcept {
printf("%d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str());
if (--pending <= lowInFlightMarker && !drain.ready()) {
// resume file iteration
drain.set();
}
fflush(stdout);
})
| let_error([&, p, targetParentFile](auto e) noexcept {
if constexpr (convertible_to<std::exception_ptr, decltype(e)>) {
try {std::rethrow_exception(e);} catch(const std::exception& ex) {
printf("EXCEPTION: '%s' %d: %s -> %s\n", ex.what(), pending.load(), p.c_str(), targetParentFile.c_str());
} catch(...) {
printf("UNKNOWN EXCEPTION: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str());
}
} else if constexpr (convertible_to<std::error_code, decltype(e)>) {
printf("ERRORCODE: '%s' %d: %s -> %s\n", e.message().c_str(), pending.load(), p.c_str(), targetParentFile.c_str());
} else {
printf("UNKNOWN ERROR: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str());
}
fflush(stdout);

// keep going
if (--pending <= lowInFlightMarker && !drain.ready()) {
// resume file iteration
drain.set();
}
return just();
})
| let_done([&, p, targetParentFile] () noexcept {
printf("CANCELLED: %d: %s -> %s\n", pending.load(), p.c_str(), targetParentFile.c_str());
if (--pending <= lowInFlightMarker && !drain.ready()) {
// resume file iteration
drain.set();
}
return just_done();
})
));

// advance to next directory entry
++entry;
}
})
)
| repeat_effect_until([&] () noexcept {
return entry == end(entry);
}),
scope.complete())
| let_error([&] (auto&&) noexcept {
return scope.cleanup();
})
| let_done([&] () noexcept {
return scope.cleanup();
});
});
}

auto copy_files(const fs::path& from, const fs::path& to) noexcept
{
for (const auto& dirEntry : fs::recursive_directory_iterator(from))
{
if (dirEntry.is_directory()) { continue; }

const auto& p = dirEntry.path();
// Create path in target, if not existing.
const auto relativeSrc = fs::relative(p, from);
const auto targetParentPath = to / relativeSrc.parent_path();
const auto targetParentFile = targetParentPath / p.filename();

fs::create_directories(targetParentPath);

// Copy to the targetParentPath which we just created.
try {
copy_file(p, targetParentFile, fs::copy_options::overwrite_existing);
printf("%s -> %s\n", p.c_str(), targetParentFile.c_str());
} catch(const std::exception& ex) {
printf("EXCEPTION: '%s' %s -> %s\n", ex.what(), p.c_str(), targetParentFile.c_str());
}
}
}

int main(int argc, char* argv[]) {
fs::path from;
fs::path to;
auto threadCount = std::thread::hardware_concurrency();
bool use_std_copy = false;

int position = 0;
std::vector<std::string_view> args(argv+1, argv+argc);
for (auto arg : args) {
if (arg.find("usestd"sv) == 0) {
use_std_copy = true;
} else if (arg.find("n-of-threads="sv) == 0) {
arg.remove_prefix(13);

auto [ptr, ec] { std::from_chars(arg.data(), arg.data() + arg.size(), threadCount) };

if (ec == std::errc::invalid_argument) {
printf("That isn't a number.\n");
return -1;
} else if (ec == std::errc::result_out_of_range) {
printf("This number is larger than an int.\n");
return -1;
}
} else {
if (position == 0) {
printf("from: -> %s\n", arg.data());
from = arg;
} else if (position == 1){
printf("to: -> %s\n", arg.data());
to = arg;
} else {
printf("error: too many positional arguments!");
return -1;
}
++position;
}
}

io_uring_context ctx;
static_thread_pool poolContext(threadCount);
auto pool = poolContext.get_scheduler();

inplace_stop_source stopSource;
std::thread t{[&] { ctx.run(stopSource.get_token()); }};
scope_guard stopOnExit = [&]() noexcept {
stopSource.request_stop();
t.join();
};

auto scheduler = ctx.get_scheduler();

try {
using double_sec = duration<double>;
auto start = steady_clock::now();
auto finish = steady_clock::now();
if (use_std_copy) {
start = steady_clock::now();
copy_files(from, to);
finish = steady_clock::now();
printf("std filesystem: Copied all the files in %6.6f seconds\n",
duration_cast<double_sec>(finish-start).count());
fflush(stdout);
} else {
sync_wait(sequence(
just_from([&] {
std::printf("copy file\n");
fflush(stdout);
start = steady_clock::now();
}),
copy_files(scheduler, pool, from, to)
| stop_when(scheduler.schedule_at(scheduler.now() + 30s))
| let_done([]{return just_error(std::make_exception_ptr(std::logic_error{"deadlock on copy_files"}));}),
just_from([&] {
finish = steady_clock::now();
std::printf("copy completed\n");
fflush(stdout);
})));
printf("uring: Copied all the files in %6.6f seconds\n",
duration_cast<double_sec>(finish-start).count());
fflush(stdout);
}
} catch (const std::exception& ex) {
std::printf("error: %s\n", ex.what());
fflush(stdout);
}

return 0;
}

#else // UNIFEX_NO_LIBURING

#include <cstdio>
int main() {
printf("liburing support not found\n");
return 0;
}

#endif // UNIFEX_NO_LIBURING

0 comments on commit 2f16450

Please sign in to comment.