Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making sure changed number of cores is propagated to executor #6309

Merged
merged 2 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ namespace hpx { namespace ranges {

namespace hpx::ranges {

inline constexpr struct is_sorted_t final
inline constexpr struct is_sorted_t
: hpx::detail::tag_parallel_algorithm<is_sorted_t>
{
private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/async_base/scheduling_properties.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/execution/algorithms/detail/is_negative.hpp>
#include <hpx/execution/algorithms/detail/predicates.hpp>
Expand All @@ -16,6 +17,7 @@
#include <hpx/futures/future.hpp>
#include <hpx/iterator_support/iterator_range.hpp>
#include <hpx/parallel/util/detail/chunk_size_iterator.hpp>
#include <hpx/properties/property.hpp>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -132,7 +134,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -166,6 +168,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -175,7 +181,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename Future, typename F1, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, std::vector<Future>& workitems,
get_bulk_iteration_shape(ExPolicy& policy, std::vector<Future>& workitems,
F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -241,6 +247,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -250,7 +260,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
std::vector<hpx::tuple<IterOrR, std::size_t>>
get_bulk_iteration_shape_variable(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<IterOrR, std::size_t>;
Expand Down Expand Up @@ -308,27 +318,31 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape_variable(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape_variable(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -360,7 +374,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy, FwdIter begin,
get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
using iterator =
Expand Down Expand Up @@ -397,6 +411,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, 0);
iterator shape_end(last, chunk_size, count, count, 0);

Expand All @@ -407,7 +428,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy,
get_bulk_iteration_shape_idx(ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
Expand Down Expand Up @@ -475,6 +496,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, base_idx);
iterator shape_end(last, chunk_size, count, count, base_idx);

Expand All @@ -484,7 +512,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename FwdIter,
typename Stride = std::size_t>
std::vector<hpx::tuple<FwdIter, std::size_t, std::size_t>>
get_bulk_iteration_shape_idx_variable(ExPolicy&& policy, FwdIter first,
get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first,
std::size_t count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<FwdIter, std::size_t, std::size_t>;
Expand Down Expand Up @@ -543,6 +571,10 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}
} // namespace hpx::parallel::util::detail
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter, typename F>
auto foreach_partition(
ExPolicy&& policy, FwdIter first, std::size_t count, F&& f)
ExPolicy policy, FwdIter first, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -53,16 +53,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count);
policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count);
auto&& shape =
detail::get_bulk_iteration_shape_idx(policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -72,7 +72,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count);
policy, inititems, f, first, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
21 changes: 10 additions & 11 deletions libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename IterOrR, typename F>
auto partition(ExPolicy&& policy, IterOrR it_or_r, std::size_t count, F&& f)
auto partition(ExPolicy policy, IterOrR it_or_r, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -57,16 +57,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_variable(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
auto&& shape =
detail::get_bulk_iteration_shape(policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -76,7 +76,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), inititems, f, it_or_r, count);
policy, inititems, f, it_or_r, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -88,8 +88,8 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter,
typename Stride, typename F>
auto partition_with_index(ExPolicy&& policy, FwdIter first,
std::size_t count, Stride stride, F&& f)
auto partition_with_index(
ExPolicy policy, FwdIter first, std::size_t count, Stride stride, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -106,7 +106,7 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -115,7 +115,7 @@ namespace hpx::parallel::util::detail {
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -125,8 +125,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count,
stride);
policy, inititems, f, first, count, stride);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(tests
for_loop_5735
for_loop_with_auto_chunk_size
minimal_findend
num_cores
reduce_3641
scan_different_inits
scan_non_commutative
Expand Down
41 changes: 41 additions & 0 deletions libs/core/algorithms/tests/regressions/num_cores.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2023 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/chrono.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <cstddef>

int hpx_main()
{
hpx::execution::experimental::num_cores nc(2);
auto policy = hpx::execution::par.with(nc);

HPX_TEST_EQ(
hpx::parallel::execution::processing_units_count(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, 0),
static_cast<std::size_t>(2));

auto policy2 =
hpx::parallel::execution::with_processing_units_count(policy, 2);
HPX_TEST_EQ(hpx::parallel::execution::processing_units_count(
hpx::execution::par.parameters(), policy2.executor(),
hpx::chrono::null_duration, 0),
static_cast<std::size_t>(2));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void test_rotate_direct(Policy l, ExPolicy&& policy, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104
base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);

Expand Down Expand Up @@ -88,7 +88,7 @@ void test_rotate(Policy l, ExPolicy&& policy, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104
base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);

Expand Down Expand Up @@ -127,7 +127,7 @@ void test_rotate_async_direct(Policy l, ExPolicy&& p, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104

base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);
Expand Down Expand Up @@ -191,7 +191,7 @@ void rotate_test()

int hpx_main(hpx::program_options::variables_map& vm)
{
unsigned int seed = (unsigned int) std::time(nullptr);
unsigned int seed = static_cast<unsigned int>(std::time(nullptr));
if (vm.count("seed"))
seed = vm["seed"].as<unsigned int>();

Expand Down
Loading