diff --git a/README_8md.html b/README_8md.html index 07c746bd..f7f64a63 100644 --- a/README_8md.html +++ b/README_8md.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: README.md File Reference @@ -19,8 +19,8 @@
- - + @@ -29,21 +29,22 @@
+
CPPuddle
- + +/* @license-end */ +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+
-
-
README.md File Reference
+
README.md File Reference
diff --git a/aggregation__executor__pools_8hpp.html b/aggregation__executor__pools_8hpp.html index d80c4f3c..d4d0dcfd 100644 --- a/aggregation__executor__pools_8hpp.html +++ b/aggregation__executor__pools_8hpp.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: include/cppuddle/kernel_aggregation/detail/aggregation_executor_pools.hpp File Reference @@ -19,8 +19,8 @@
- - + @@ -29,21 +29,22 @@
+
CPPuddle
- + +/* @license-end */ +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+
-
-
aggregation_executor_pools.hpp File Reference
+
aggregation_executor_pools.hpp File Reference
#include "cppuddle/kernel_aggregation/detail/aggregation_executors_and_allocators.hpp"
Include dependency graph for aggregation_executor_pools.hpp:
-
-
+
This graph shows which files directly or indirectly include this file:
-
-
+

Go to the source code of this file.

-

+

Classes

class  cppuddle::kernel_aggregation::detail::aggregation_pool< kernelname, Interface, Pool >
 
- - + - + - +

+

Namespaces

 cppuddle
namespace  cppuddle
 
 cppuddle::kernel_aggregation
namespace  cppuddle::kernel_aggregation
 CPPuddle namespace containing the kernel aggregation functionality.
 
 cppuddle::kernel_aggregation::detail
namespace  cppuddle::kernel_aggregation::detail
 
- - +

+

Functions

template<typename aggregation_region_t >
template<typename aggregation_region_t >
void cppuddle::kernel_aggregation::detail::init_area_aggregation_pool (const size_t max_slices)
 
diff --git a/aggregation__executor__pools_8hpp__dep__incl.map b/aggregation__executor__pools_8hpp__dep__incl.map index fcd05bf0..e74836a8 100644 --- a/aggregation__executor__pools_8hpp__dep__incl.map +++ b/aggregation__executor__pools_8hpp__dep__incl.map @@ -1,9 +1,15 @@ - - - - - - - + + + + + + + + + + + + + diff --git a/aggregation__executor__pools_8hpp__dep__incl.md5 b/aggregation__executor__pools_8hpp__dep__incl.md5 index b6f932a1..b718acc9 100644 --- a/aggregation__executor__pools_8hpp__dep__incl.md5 +++ b/aggregation__executor__pools_8hpp__dep__incl.md5 @@ -1 +1 @@ -1fa96408c8bcfc68504851fb10cd7164 \ No newline at end of file +ab1e4aa438f9f7a41f5250ba5164ba86 \ No newline at end of file diff --git a/aggregation__executor__pools_8hpp__dep__incl.svg b/aggregation__executor__pools_8hpp__dep__incl.svg index b65f4338..348500d4 100644 --- a/aggregation__executor__pools_8hpp__dep__incl.svg +++ b/aggregation__executor__pools_8hpp__dep__incl.svg @@ -4,207 +4,218 @@ - + + - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/aggregation__executor__pools_8hpp__dep__incl_org.svg b/aggregation__executor__pools_8hpp__dep__incl_org.svg index 86eaae82..4f700fa4 100644 --- a/aggregation__executor__pools_8hpp__dep__incl_org.svg +++ b/aggregation__executor__pools_8hpp__dep__incl_org.svg @@ -4,119 +4,136 @@ - - + + include/cppuddle/kernel_aggregation/detail/aggregation_executor_pools.hpp - - + Node1 - - -include/cppuddle/kernel -_aggregation/detail/aggregation -_executor_pools.hpp + + +include/cppuddle/kernel +_aggregation/detail/aggregation +_executor_pools.hpp - + Node2 - - -include/cppuddle/kernel -_aggregation/kernel_aggregation -_interface.hpp + + +include/cppuddle/kernel +_aggregation/kernel_aggregation +_interface.hpp - + Node1->Node2 - - + + + + + - + Node3 - - -include/aggregation -_manager.hpp + + +include/aggregation +_manager.hpp - + Node2->Node3 - - + + + + + - + Node5 - - -tests/work_aggregation -_cpu_triad.cpp + + +tests/work_aggregation +_cpu_triad.cpp - + Node2->Node5 - - + + + + + - + Node6 - - -tests/work_aggregation -_cuda_triad.cpp + + +tests/work_aggregation +_cuda_triad.cpp - + Node2->Node6 - - + + + + + - + Node7 - - -tests/work_aggregation -_test.cpp + + +tests/work_aggregation +_test.cpp - + Node2->Node7 - - + + + + + - + Node4 - - -include/cppuddle/kernel -_aggregation/util/kokkos -_aggregation_util.hpp + + +include/cppuddle/kernel +_aggregation/util/kokkos +_aggregation_util.hpp - + Node3->Node4 - - + + + + + diff --git a/aggregation__executor__pools_8hpp__incl.map b/aggregation__executor__pools_8hpp__incl.map index 6a7620fe..e3d5c06b 100644 --- a/aggregation__executor__pools_8hpp__incl.map +++ b/aggregation__executor__pools_8hpp__incl.map @@ -1,41 +1,96 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/aggregation__executor__pools_8hpp__incl.md5 b/aggregation__executor__pools_8hpp__incl.md5 index e2f24f79..7a27b343 100644 --- a/aggregation__executor__pools_8hpp__incl.md5 +++ b/aggregation__executor__pools_8hpp__incl.md5 @@ -1 +1 @@ -10cc96ec0b18a01154ebdf1e6baaf8b4 \ No newline at end of file +8826e8104e00cea836d986bf3d2dc8dd \ No newline at end of file diff --git a/aggregation__executor__pools_8hpp__incl.svg b/aggregation__executor__pools_8hpp__incl.svg index 7dc0ab9f..d794a474 100644 --- a/aggregation__executor__pools_8hpp__incl.svg +++ b/aggregation__executor__pools_8hpp__incl.svg @@ -4,786 +4,944 @@ - + + - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/aggregation__executor__pools_8hpp__incl_org.svg b/aggregation__executor__pools_8hpp__incl_org.svg index 1120b590..33e10a00 100644 --- a/aggregation__executor__pools_8hpp__incl_org.svg +++ b/aggregation__executor__pools_8hpp__incl_org.svg @@ -4,698 +4,862 @@ - - + + include/cppuddle/kernel_aggregation/detail/aggregation_executor_pools.hpp - - + Node1 - - -include/cppuddle/kernel -_aggregation/detail/aggregation -_executor_pools.hpp + + +include/cppuddle/kernel +_aggregation/detail/aggregation +_executor_pools.hpp - + Node2 - - -cppuddle/kernel_aggregation -/detail/aggregation_executors -_and_allocators.hpp + + +cppuddle/kernel_aggregation +/detail/aggregation_executors +_and_allocators.hpp - + Node1->Node2 - - + + + + + - + Node3 - - -stdexcept + + +stdexcept - + Node2->Node3 - - + + + + + - + Node4 - - -stdio.h + + +stdio.h - + Node2->Node4 - - + + + + + - + Node5 - - -any + + +any - + Node2->Node5 - - + + + + + - + Node6 - - -atomic + + +atomic - + Node2->Node6 - - + + + + + - + Node7 - - -chrono + + +chrono - + Node2->Node7 - - + + + + + - + Node8 - - -cstdio + + +cstdio - + Node2->Node8 - - + + + + + - + Node9 - - -iostream + + +iostream - + Node2->Node9 - - + + + + + - + Node10 - - -memory + + +memory - + Node2->Node10 - - + + + + + - + Node11 - - -mutex + + +mutex - + Node2->Node11 - - + + + + + - + Node12 - - -optional + + +optional - + Node2->Node12 - - + + + + + - + Node13 - - -ostream + + +ostream - + Node2->Node13 - - + + + + + - + Node14 - - -string + + +string - + Node2->Node14 - - + + + + + - + Node15 - - -tuple + + +tuple - + Node2->Node15 - - + + + + + - + Node16 - - -type_traits + + +type_traits - + Node2->Node16 - - + + + + + - + Node17 - - -typeinfo + + +typeinfo - + Node2->Node17 - - + + + + + - + Node18 - - -utility + + +utility - + Node2->Node18 - - + + + + + - + Node19 - - -unordered_map + + +unordered_map - + Node2->Node19 - - + + + + + - + Node20 - - -hpx/futures/future.hpp + + +hpx/futures/future.hpp - + Node2->Node20 - - + + + + + - + Node21 - - -hpx/hpx_init.hpp + + +hpx/hpx_init.hpp - + Node2->Node21 - - + + + + + - + Node22 - - -hpx/include/async.hpp + + +hpx/include/async.hpp - + Node2->Node22 - - + + + + + - + Node23 - - -hpx/include/iostreams.hpp + + +hpx/include/iostreams.hpp - + Node2->Node23 - - + + + + + - + Node24 - - -hpx/include/lcos.hpp + + +hpx/include/lcos.hpp - + Node2->Node24 - - + + + + + - + Node25 - - -hpx/lcos/promise.hpp + + +hpx/lcos/promise.hpp - + Node2->Node25 - - + + + + + - + Node26 - - -hpx/mutex.hpp + + +hpx/mutex.hpp - + Node2->Node26 - - + + + + + - + Node27 - - -boost/core/demangle.hpp + + +boost/core/demangle.hpp - + Node2->Node27 - - + + + + + - + Node28 - - -boost/format.hpp + + +boost/format.hpp - + Node2->Node28 - - + + + + + - + Node29 - - -cppuddle/common/config.hpp + + +cppuddle/common/config.hpp - + Node2->Node29 - - + + + + + - + Node30 - - -cppuddle/memory_recycling -/detail/buffer_management.hpp + + +cppuddle/memory_recycling +/detail/buffer_management.hpp - + Node2->Node30 - - + + + + + - + Node34 - - -cppuddle/executor_recycling -/executor_pools_interface.hpp + + +cppuddle/executor_recycling +/executor_pools_interface.hpp - + Node2->Node34 - - + + + + + - + Node29->Node11 - - + + + + + - + Node30->Node3 - - + + + + + - + Node30->Node6 - - + + + + + - + Node30->Node9 - - + + + + + - + Node30->Node10 - - + + + + + - + Node30->Node11 - - + + + + + - + Node30->Node12 - - + + + + + - + Node30->Node16 - - + + + + + - + Node30->Node19 - - + + + + + - + Node30->Node29 - - + + + + + - + Node31 - - -cassert + + +cassert - + Node30->Node31 - - + + + + + - + Node32 - - -functional + + +functional - + Node30->Node32 - - + + + + + - + Node33 - - -list + + +list - + Node30->Node33 - - + + + + + - + Node35 - - -cppuddle/executor_recycling -/detail/executor_pools_management.hpp + + +cppuddle/executor_recycling +/detail/executor_pools_management.hpp - + Node34->Node35 - - + + + + + - + Node35->Node9 - - + + + + + - + Node35->Node10 - - + + + + + - + Node35->Node11 - - + + + + + - + Node35->Node15 - - + + + + + - + Node35->Node16 - - + + + + + - + Node35->Node29 - - + + + + + - + Node35->Node31 - - + + + + + - + Node36 - - -algorithm + + +algorithm - + Node35->Node36 - - + + + + + - + Node37 - - -array + + +array - + Node35->Node37 - - + + + + + - + Node38 - - -deque + + +deque - + Node35->Node38 - - + + + + + - + Node39 - - -queue + + +queue - + Node35->Node39 - - + + + + + diff --git a/aggregation__executor__pools_8hpp_source.html b/aggregation__executor__pools_8hpp_source.html index b3d1611e..5e8310cd 100644 --- a/aggregation__executor__pools_8hpp_source.html +++ b/aggregation__executor__pools_8hpp_source.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: include/cppuddle/kernel_aggregation/detail/aggregation_executor_pools.hpp Source File @@ -19,8 +19,8 @@
- - + @@ -29,22 +29,28 @@
+
CPPuddle
- + +/* @license-end */ + +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+
-
-
aggregation_executor_pools.hpp
+
aggregation_executor_pools.hpp
-Go to the documentation of this file.
1 // Copyright (c) 2022-2024 Gregor Daiß
-
2 //
-
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
-
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-
5 
- -
7 
-
8 #ifndef AGGREGATION_EXECUTOR_POOL_HPP
-
9 #define AGGREGATION_EXECUTOR_POOL_HPP
-
10 
-
11 namespace cppuddle {
-
12 namespace kernel_aggregation {
-
13 namespace detail {
-
14 
-
15 template <const char *kernelname, class Interface, class Pool>
- -
17 public:
-
19  template <typename... Ts>
-
20  static void init(size_t number_of_executors, size_t slices_per_executor,
-
21  aggregated_executor_modes mode, size_t num_devices = 1) {
-
22  if (is_initialized) {
-
23  throw std::runtime_error(
-
24  std::string("Trying to initialize cppuddle aggregation pool twice") +
-
25  " Agg pool name: " + std::string(kernelname));
-
26  }
-
27  if (num_devices > cppuddle::max_number_gpus) {
-
28  throw std::runtime_error(
-
29  std::string(
-
30  "Trying to initialize aggregation with more devices than the "
-
31  "maximum number of GPUs given at compiletime") +
-
32  " Agg pool name: " + std::string(kernelname));
-
33  }
-
34  number_devices = num_devices;
-
35  for (size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) {
-
36 
-
37  std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
-
38  assert(instance()[gpu_id].aggregation_executor_pool.empty());
-
39  for (int i = 0; i < number_of_executors; i++) {
-
40  instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor,
-
41  mode, gpu_id);
-
42  }
-
43  instance()[gpu_id].slices_per_executor = slices_per_executor;
-
44  instance()[gpu_id].mode = mode;
-
45  }
-
46  is_initialized = true;
-
47  }
-
48 
-
50  static decltype(auto) request_executor_slice(void) {
-
51  if (!is_initialized) {
-
52  throw std::runtime_error(
-
53  std::string("ERROR: Trying to use cppuddle aggregation pool without first calling init!\n") +
-
54  " Agg poolname: " + std::string(kernelname));
-
55  }
-
56  const size_t gpu_id = cppuddle::get_device_id(number_devices);
-
57  /* const size_t gpu_id = 1; */
-
58  std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
-
59  assert(!instance()[gpu_id].aggregation_executor_pool.empty());
-
60  std::optional<hpx::lcos::future<
- -
62  ret;
-
63  size_t local_id = (instance()[gpu_id].current_interface) %
-
64  instance()[gpu_id].aggregation_executor_pool.size();
-
65  ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
-
66  // Expected case: current aggregation executor is free
-
67  if (ret.has_value()) {
-
68  return ret;
-
69  }
-
70  // current interface is bad -> find free one
-
71  size_t abort_counter = 0;
-
72  const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1;
-
73  do {
-
74  local_id = (++(instance()[gpu_id].current_interface)) % // increment interface
-
75  instance()[gpu_id].aggregation_executor_pool.size();
-
76  ret =
-
77  instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
-
78  if (ret.has_value()) {
-
79  return ret;
-
80  }
-
81  abort_counter++;
-
82  } while (abort_counter <= abort_number);
-
83  // Everything's busy -> create new aggregation executor (growing pool) OR
-
84  // return empty optional
-
85  if (instance()[gpu_id].growing_pool) {
-
86  instance()[gpu_id].aggregation_executor_pool.emplace_back(
-
87  instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id);
-
88  instance()[gpu_id].current_interface =
-
89  instance()[gpu_id].aggregation_executor_pool.size() - 1;
-
90  assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480);
-
91  ret = instance()[gpu_id]
-
92  .aggregation_executor_pool[instance()[gpu_id].current_interface]
-
93  .request_executor_slice();
-
94  assert(ret.has_value()); // fresh executor -- should always have slices
-
95  // available
-
96  }
-
97  return ret;
-
98  }
-
99 
-
100 private:
-
101  std::deque<aggregated_executor<Interface>> aggregation_executor_pool;
-
102  std::atomic<size_t> current_interface{0};
-
103  size_t slices_per_executor;
- -
105  bool growing_pool{true};
-
106 
-
107 private:
-
110  aggregation_mutex_t pool_mutex;
-
112  static std::unique_ptr<aggregation_pool[]>& instance(void) {
-
113  static std::unique_ptr<aggregation_pool[]> pool_instances{
- -
115  return pool_instances;
-
116  }
-
117  static inline size_t number_devices = 1;
-
118  static inline bool is_initialized = false;
-
119  aggregation_pool() = default;
-
120 
-
121 public:
-
122  ~aggregation_pool() = default;
-
123  // Bunch of constructors we don't need
-
124  aggregation_pool(aggregation_pool const &other) = delete;
-
125  aggregation_pool &operator=(aggregation_pool const &other) = delete;
- - -
128 };
-
129 
-
130 template <typename aggregation_region_t>
- -
132  const size_t max_slices) {
-
133  constexpr size_t number_aggregation_executors = 128;
-
134  constexpr size_t number_gpus = cppuddle::max_number_gpus;
- -
136  if (max_slices == 1) {
-
137  executor_mode = aggregated_executor_modes::STRICT;
-
138  }
-
139  aggregation_region_t::init(
-
140  number_aggregation_executors, max_slices, executor_mode, number_gpus);
-
141 }
-
142 
-
143 
-
144 } // namespace detail
-
145 } // namespace kernel_aggregation
-
146 } // namespace cppuddle
-
147 
-
148 #endif
+Go to the documentation of this file.
1// Copyright (c) 2022-2024 Gregor Daiß
+
2//
+
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
+
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
5
+ +
7
+
8#ifndef AGGREGATION_EXECUTOR_POOL_HPP
+
9#define AGGREGATION_EXECUTOR_POOL_HPP
+
10
+
11namespace cppuddle {
+
+
12namespace kernel_aggregation {
+
+
13namespace detail {
+
14
+
15template <const char *kernelname, class Interface, class Pool>
+
+ +
17public:
+
19 template <typename... Ts>
+
+
20 static void init(size_t number_of_executors, size_t slices_per_executor,
+
21 aggregated_executor_modes mode, size_t num_devices = 1) {
+
22 if (is_initialized) {
+
23 throw std::runtime_error(
+
24 std::string("Trying to initialize cppuddle aggregation pool twice") +
+
25 " Agg pool name: " + std::string(kernelname));
+
26 }
+
27 if (num_devices > cppuddle::max_number_gpus) {
+
28 throw std::runtime_error(
+
29 std::string(
+
30 "Trying to initialize aggregation with more devices than the "
+
31 "maximum number of GPUs given at compiletime") +
+
32 " Agg pool name: " + std::string(kernelname));
+
33 }
+
34 number_devices = num_devices;
+
35 for (size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) {
+
36
+
37 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
+
38 assert(instance()[gpu_id].aggregation_executor_pool.empty());
+
39 for (int i = 0; i < number_of_executors; i++) {
+
40 instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor,
+
41 mode, gpu_id);
+
42 }
+
43 instance()[gpu_id].slices_per_executor = slices_per_executor;
+
44 instance()[gpu_id].mode = mode;
+
45 }
+
46 is_initialized = true;
+
47 }
+
+
48
+
+
50 static decltype(auto) request_executor_slice(void) {
+
51 if (!is_initialized) {
+
52 throw std::runtime_error(
+
53 std::string("ERROR: Trying to use cppuddle aggregation pool without first calling init!\n") +
+
54 " Agg poolname: " + std::string(kernelname));
+
55 }
+
56 const size_t gpu_id = cppuddle::get_device_id(number_devices);
+
57 /* const size_t gpu_id = 1; */
+
58 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
+
59 assert(!instance()[gpu_id].aggregation_executor_pool.empty());
+
60 std::optional<hpx::lcos::future<
+ +
62 ret;
+
63 size_t local_id = (instance()[gpu_id].current_interface) %
+
64 instance()[gpu_id].aggregation_executor_pool.size();
+
65 ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
+
66 // Expected case: current aggregation executor is free
+
67 if (ret.has_value()) {
+
68 return ret;
+
69 }
+
70 // current interface is bad -> find free one
+
71 size_t abort_counter = 0;
+
72 const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1;
+
73 do {
+
74 local_id = (++(instance()[gpu_id].current_interface)) % // increment interface
+
75 instance()[gpu_id].aggregation_executor_pool.size();
+
76 ret =
+
77 instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
+
78 if (ret.has_value()) {
+
79 return ret;
+
80 }
+ +
82 } while (abort_counter <= abort_number);
+
83 // Everything's busy -> create new aggregation executor (growing pool) OR
+
84 // return empty optional
+
85 if (instance()[gpu_id].growing_pool) {
+
86 instance()[gpu_id].aggregation_executor_pool.emplace_back(
+
87 instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id);
+
88 instance()[gpu_id].current_interface =
+
89 instance()[gpu_id].aggregation_executor_pool.size() - 1;
+
90 assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480);
+
91 ret = instance()[gpu_id]
+
92 .aggregation_executor_pool[instance()[gpu_id].current_interface]
+
93 .request_executor_slice();
+
94 assert(ret.has_value()); // fresh executor -- should always have slices
+
95 // available
+
96 }
+
97 return ret;
+
98 }
+
+
99
+
100private:
+
101 std::deque<aggregated_executor<Interface>> aggregation_executor_pool;
+
102 std::atomic<size_t> current_interface{0};
+
103 size_t slices_per_executor;
+ +
105 bool growing_pool{true};
+
106
+
107private:
+
110 aggregation_mutex_t pool_mutex;
+
112 static std::unique_ptr<aggregation_pool[]>& instance(void) {
+
113 static std::unique_ptr<aggregation_pool[]> pool_instances{
+ +
115 return pool_instances;
+
116 }
+
117 static inline size_t number_devices = 1;
+
118 static inline bool is_initialized = false;
+
119 aggregation_pool() = default;
+
120
+
121public:
+
122 ~aggregation_pool() = default;
+
123 // Bunch of constructors we don't need
+ + + + +
128};
+
+
129
+
130template <typename aggregation_region_t>
+
+ +
132 const size_t max_slices) {
+
133 constexpr size_t number_aggregation_executors = 128;
+
134 constexpr size_t number_gpus = cppuddle::max_number_gpus;
+
135 aggregated_executor_modes executor_mode = aggregated_executor_modes::EAGER;
+
136 if (max_slices == 1) {
+
137 executor_mode = aggregated_executor_modes::STRICT;
+
138 }
+
139 aggregation_region_t::init(
+ +
141}
+
+
142
+
143
+
144} // namespace detail
+
+
145} // namespace kernel_aggregation
+
+
146} // namespace cppuddle
+
147
+
148#endif
-
Slice class - meant as a scope interface to the aggregated executor.
Definition: aggregation_executors_and_allocators.hpp:420
-
Definition: aggregation_executor_pools.hpp:16
-
aggregation_pool & operator=(aggregation_pool const &other)=delete
+
Slice class - meant as a scope interface to the aggregated executor.
Definition aggregation_executors_and_allocators.hpp:420
+
Definition aggregation_executor_pools.hpp:16
aggregation_pool(aggregation_pool &&other)=delete
-
aggregation_pool & operator=(aggregation_pool &&other)=delete
+
aggregation_pool & operator=(aggregation_pool const &other)=delete
-
static decltype(auto) request_executor_slice(void)
Will always return a valid executor slice.
Definition: aggregation_executor_pools.hpp:50
+
aggregation_pool & operator=(aggregation_pool &&other)=delete
+
static decltype(auto) request_executor_slice(void)
Will always return a valid executor slice.
Definition aggregation_executor_pools.hpp:50
aggregation_pool(aggregation_pool const &other)=delete
-
static void init(size_t number_of_executors, size_t slices_per_executor, aggregated_executor_modes mode, size_t num_devices=1)
interface
Definition: aggregation_executor_pools.hpp:20
-
void init_area_aggregation_pool(const size_t max_slices)
Definition: aggregation_executor_pools.hpp:131
-
hpx::mutex aggregation_mutex_t
Definition: aggregation_executors_and_allocators.hpp:70
-
aggregated_executor_modes
Definition: aggregation_executors_and_allocators.hpp:383
- - -
Definition: config.hpp:31
-
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition: config.hpp:59
-
constexpr size_t max_number_gpus
Definition: config.hpp:52
+
static void init(size_t number_of_executors, size_t slices_per_executor, aggregated_executor_modes mode, size_t num_devices=1)
interface
Definition aggregation_executor_pools.hpp:20
+
void init_area_aggregation_pool(const size_t max_slices)
Definition aggregation_executor_pools.hpp:131
+
hpx::mutex aggregation_mutex_t
Definition aggregation_executors_and_allocators.hpp:70
+
aggregated_executor_modes
Definition aggregation_executors_and_allocators.hpp:383
+
Definition config.hpp:31
+
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition config.hpp:59
+
constexpr size_t max_number_gpus
Definition config.hpp:52
diff --git a/aggregation__executors__and__allocators_8hpp.html b/aggregation__executors__and__allocators_8hpp.html index 6de605f3..83ff1c87 100644 --- a/aggregation__executors__and__allocators_8hpp.html +++ b/aggregation__executors__and__allocators_8hpp.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: include/cppuddle/kernel_aggregation/detail/aggregation_executors_and_allocators.hpp File Reference @@ -19,8 +19,8 @@
- - + @@ -29,21 +29,22 @@
+
CPPuddle
- + +/* @license-end */ +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+
-
-
aggregation_executors_and_allocators.hpp File Reference
+
aggregation_executors_and_allocators.hpp File Reference
#include <stdexcept>
@@ -107,19 +114,17 @@
Include dependency graph for aggregation_executors_and_allocators.hpp:
-
-
+
This graph shows which files directly or indirectly include this file:
-
-
+

Go to the source code of this file.

- @@ -134,70 +139,70 @@

+

Classes

class  cppuddle::kernel_aggregation::detail::aggregated_function_call< Executor >
 Manages the launch conditions for aggregated function calls type/value-errors. More...
 Declaration since the actual allocator is only defined after the Executors. More...
 
- - + - + - + - + - + - +

+

Namespaces

 cppuddle
namespace  cppuddle
 
 cppuddle::kernel_aggregation
namespace  cppuddle::kernel_aggregation
 CPPuddle namespace containing the kernel aggregation functionality.
 
 cppuddle::kernel_aggregation::detail
namespace  cppuddle::kernel_aggregation::detail
 
 hpx
namespace  hpx
 
 hpx::parallel
namespace  hpx::parallel
 
 hpx::parallel::execution
namespace  hpx::parallel::execution
 
- - +

+

Typedefs

using cppuddle::kernel_aggregation::detail::aggregation_mutex_t = hpx::mutex
using cppuddle::kernel_aggregation::detail::aggregation_mutex_t = hpx::mutex
 
- -

+

Enumerations

enum class  cppuddle::kernel_aggregation::detail::aggregated_executor_modes { cppuddle::kernel_aggregation::detail::EAGER = 1 +
enum class  cppuddle::kernel_aggregation::detail::aggregated_executor_modes { cppuddle::kernel_aggregation::detail::EAGER = 1 , cppuddle::kernel_aggregation::detail::STRICT , cppuddle::kernel_aggregation::detail::ENDLESS }
 
- - - - - - - - + + + + + + + - - - + + + - - - + + + - - + + - - - - - + + + + + - - + +

+

Functions

template<typename... Ts>
std::tuple< Ts... > cppuddle::kernel_aggregation::detail::make_tuple_supporting_references (Ts &&...ts)
 Constructs a tuple with copies (to store temporaries in aggregated function calls) yet also supporting references (on the users own risk...) More...
 
template<typename T >
std::string cppuddle::kernel_aggregation::detail::print_if_possible (T val)
 Print some specific values that we can, but don't bother for most types (such as vector) More...
template<typename... Ts>
std::tuple< Ts... > cppuddle::kernel_aggregation::detail::make_tuple_supporting_references (Ts &&...ts)
 Constructs a tuple with copies (to store temporaries in aggregated function calls) yet also supporting references (on the users own risk...)
 
template<typename T >
std::string cppuddle::kernel_aggregation::detail::print_if_possible (T val)
 Print some specific values that we can, but don't bother for most types (such as vector)
 
template<class TupType , size_t... I>
void cppuddle::kernel_aggregation::detail::print_tuple (const TupType &_tup, std::index_sequence< I... >)
 Helper class for the helper class that prints tuples – do not use this directly. More...
template<class TupType , size_t... I>
void cppuddle::kernel_aggregation::detail::print_tuple (const TupType &_tup, std::index_sequence< I... >)
 Helper class for the helper class that prints tuples – do not use this directly.
 
template<class... T>
void cppuddle::kernel_aggregation::detail::print_tuple (const std::tuple< T... > &_tup)
 Helper class for printing tuples (first component should be a function pointer, remaining components the function arguments) More...
template<class... T>
void cppuddle::kernel_aggregation::detail::print_tuple (const std::tuple< T... > &_tup)
 Helper class for printing tuples (first component should be a function pointer, remaining components the function arguments)
 
template<typename Executor , typename F , typename... Ts>
void cppuddle::kernel_aggregation::detail::exec_post_wrapper (Executor &exec, F &&f, Ts &&...ts)
template<typename Executor , typename F , typename... Ts>
void cppuddle::kernel_aggregation::detail::exec_post_wrapper (Executor &exec, F &&f, Ts &&...ts)
 
template<typename Executor , typename F , typename... Ts>
hpx::lcos::future< void > cppuddle::kernel_aggregation::detail::exec_async_wrapper (Executor &exec, F &&f, Ts &&...ts)
 
template<typename T , typename U , typename Host_Allocator , typename Executor >
constexpr bool cppuddle::kernel_aggregation::detail::operator== (allocator_slice< T, Host_Allocator, Executor > const &, allocator_slice< U, Host_Allocator, Executor > const &) noexcept
template<typename Executor , typename F , typename... Ts>
hpx::lcos::future< voidcppuddle::kernel_aggregation::detail::exec_async_wrapper (Executor &exec, F &&f, Ts &&...ts)
 
template<typename T , typename U , typename Host_Allocator , typename Executor >
constexpr bool cppuddle::kernel_aggregation::detail::operator== (allocator_slice< T, Host_Allocator, Executor > const &, allocator_slice< U, Host_Allocator, Executor > const &) noexcept
 
template<typename T , typename U , typename Host_Allocator , typename Executor >
constexpr bool cppuddle::kernel_aggregation::detail::operator!= (allocator_slice< T, Host_Allocator, Executor > const &, allocator_slice< U, Host_Allocator, Executor > const &) noexcept
template<typename T , typename U , typename Host_Allocator , typename Executor >
constexpr bool cppuddle::kernel_aggregation::detail::operator!= (allocator_slice< T, Host_Allocator, Executor > const &, allocator_slice< U, Host_Allocator, Executor > const &) noexcept
 
diff --git a/aggregation__executors__and__allocators_8hpp__dep__incl.map b/aggregation__executors__and__allocators_8hpp__dep__incl.map index ace866a2..59a2a8ef 100644 --- a/aggregation__executors__and__allocators_8hpp__dep__incl.map +++ b/aggregation__executors__and__allocators_8hpp__dep__incl.map @@ -1,10 +1,18 @@ - - - - - - - - + + + + + + + + + + + + + + + + diff --git a/aggregation__executors__and__allocators_8hpp__dep__incl.md5 b/aggregation__executors__and__allocators_8hpp__dep__incl.md5 index 85501305..1eecf208 100644 --- a/aggregation__executors__and__allocators_8hpp__dep__incl.md5 +++ b/aggregation__executors__and__allocators_8hpp__dep__incl.md5 @@ -1 +1 @@ -56f8625eba371d1ca7e9fe592bf63a82 \ No newline at end of file +d0d01bac1a22da7ab1771e68ee34b8ca \ No newline at end of file diff --git a/aggregation__executors__and__allocators_8hpp__dep__incl.svg b/aggregation__executors__and__allocators_8hpp__dep__incl.svg index c65aa485..06fade25 100644 --- a/aggregation__executors__and__allocators_8hpp__dep__incl.svg +++ b/aggregation__executors__and__allocators_8hpp__dep__incl.svg @@ -4,230 +4,247 @@ - + + - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/aggregation__executors__and__allocators_8hpp__dep__incl_org.svg b/aggregation__executors__and__allocators_8hpp__dep__incl_org.svg index f655886f..8d7da3a5 100644 --- a/aggregation__executors__and__allocators_8hpp__dep__incl_org.svg +++ b/aggregation__executors__and__allocators_8hpp__dep__incl_org.svg @@ -4,142 +4,165 @@ - - + + include/cppuddle/kernel_aggregation/detail/aggregation_executors_and_allocators.hpp - - + Node1 - - -include/cppuddle/kernel -_aggregation/detail/aggregation -_executors_and_allocators.hpp + + +include/cppuddle/kernel +_aggregation/detail/aggregation +_executors_and_allocators.hpp - + Node2 - - -include/cppuddle/kernel -_aggregation/detail/aggregation -_executor_pools.hpp + + +include/cppuddle/kernel +_aggregation/detail/aggregation +_executor_pools.hpp - + Node1->Node2 - - + + + + + - + Node3 - - -include/cppuddle/kernel -_aggregation/kernel_aggregation -_interface.hpp + + +include/cppuddle/kernel +_aggregation/kernel_aggregation +_interface.hpp - + Node1->Node3 - - + + + + + - + Node2->Node3 - - + + + + + - + Node4 - - -include/aggregation -_manager.hpp + + +include/aggregation +_manager.hpp - + Node3->Node4 - - + + + + + - + Node6 - - -tests/work_aggregation -_cpu_triad.cpp + + +tests/work_aggregation +_cpu_triad.cpp - + Node3->Node6 - - + + + + + - + Node7 - - -tests/work_aggregation -_cuda_triad.cpp + + +tests/work_aggregation +_cuda_triad.cpp - + Node3->Node7 - - + + + + + - + Node8 - - -tests/work_aggregation -_test.cpp + + +tests/work_aggregation +_test.cpp - + Node3->Node8 - - + + + + + - + Node5 - - -include/cppuddle/kernel -_aggregation/util/kokkos -_aggregation_util.hpp + + +include/cppuddle/kernel +_aggregation/util/kokkos +_aggregation_util.hpp - + Node4->Node5 - - + + + + + diff --git a/aggregation__executors__and__allocators_8hpp__incl.map b/aggregation__executors__and__allocators_8hpp__incl.map index fa20732d..d870151e 100644 --- a/aggregation__executors__and__allocators_8hpp__incl.map +++ b/aggregation__executors__and__allocators_8hpp__incl.map @@ -1,40 +1,94 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/aggregation__executors__and__allocators_8hpp__incl.md5 b/aggregation__executors__and__allocators_8hpp__incl.md5 index 2949cab8..fda5449c 100644 --- a/aggregation__executors__and__allocators_8hpp__incl.md5 +++ b/aggregation__executors__and__allocators_8hpp__incl.md5 @@ -1 +1 @@ -f068d0a1f2ba58b52d5237d3db43ba90 \ No newline at end of file +94599bd525cafc0b0d7436e98f47bb6e \ No newline at end of file diff --git a/aggregation__executors__and__allocators_8hpp__incl.svg b/aggregation__executors__and__allocators_8hpp__incl.svg index c392964a..02ced47d 100644 --- a/aggregation__executors__and__allocators_8hpp__incl.svg +++ b/aggregation__executors__and__allocators_8hpp__incl.svg @@ -4,769 +4,924 @@ - + + - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/aggregation__executors__and__allocators_8hpp__incl_org.svg b/aggregation__executors__and__allocators_8hpp__incl_org.svg index 018b487b..cd6f9e72 100644 --- a/aggregation__executors__and__allocators_8hpp__incl_org.svg +++ b/aggregation__executors__and__allocators_8hpp__incl_org.svg @@ -4,681 +4,842 @@ - - + + include/cppuddle/kernel_aggregation/detail/aggregation_executors_and_allocators.hpp - - + Node1 - - -include/cppuddle/kernel -_aggregation/detail/aggregation -_executors_and_allocators.hpp + + +include/cppuddle/kernel +_aggregation/detail/aggregation +_executors_and_allocators.hpp - + Node2 - - -stdexcept + + +stdexcept - + Node1->Node2 - - + + + + + - + Node3 - - -stdio.h + + +stdio.h - + Node1->Node3 - - + + + + + - + Node4 - - -any + + +any - + Node1->Node4 - - + + + + + - + Node5 - - -atomic + + +atomic - + Node1->Node5 - - + + + + + - + Node6 - - -chrono + + +chrono - + Node1->Node6 - - + + + + + - + Node7 - - -cstdio + + +cstdio - + Node1->Node7 - - + + + + + - + Node8 - - -iostream + + +iostream - + Node1->Node8 - - + + + + + - + Node9 - - -memory + + +memory - + Node1->Node9 - - + + + + + - + Node10 - - -mutex + + +mutex - + Node1->Node10 - - + + + + + - + Node11 - - -optional + + +optional - + Node1->Node11 - - + + + + + - + Node12 - - -ostream + + +ostream - + Node1->Node12 - - + + + + + - + Node13 - - -string + + +string - + Node1->Node13 - - + + + + + - + Node14 - - -tuple + + +tuple - + Node1->Node14 - - + + + + + - + Node15 - - -type_traits + + +type_traits - + Node1->Node15 - - + + + + + - + Node16 - - -typeinfo + + +typeinfo - + Node1->Node16 - - + + + + + - + Node17 - - -utility + + +utility - + Node1->Node17 - - + + + + + - + Node18 - - -unordered_map + + +unordered_map - + Node1->Node18 - - + + + + + - + Node19 - - -hpx/futures/future.hpp + + +hpx/futures/future.hpp - + Node1->Node19 - - + + + + + - + Node20 - - -hpx/hpx_init.hpp + + +hpx/hpx_init.hpp - + Node1->Node20 - - + + + + + - + Node21 - - -hpx/include/async.hpp + + +hpx/include/async.hpp - + Node1->Node21 - - + + + + + - + Node22 - - -hpx/include/iostreams.hpp + + +hpx/include/iostreams.hpp - + Node1->Node22 - - + + + + + - + Node23 - - -hpx/include/lcos.hpp + + +hpx/include/lcos.hpp - + Node1->Node23 - - + + + + + - + Node24 - - -hpx/lcos/promise.hpp + + +hpx/lcos/promise.hpp - + Node1->Node24 - - + + + + + - + Node25 - - -hpx/mutex.hpp + + +hpx/mutex.hpp - + Node1->Node25 - - + + + + + - + Node26 - - -boost/core/demangle.hpp + + +boost/core/demangle.hpp - + Node1->Node26 - - + + + + + - + Node27 - - -boost/format.hpp + + +boost/format.hpp - + Node1->Node27 - - + + + + + - + Node28 - - -cppuddle/common/config.hpp + + +cppuddle/common/config.hpp - + Node1->Node28 - - + + + + + - + Node29 - - -cppuddle/memory_recycling -/detail/buffer_management.hpp + + +cppuddle/memory_recycling +/detail/buffer_management.hpp - + Node1->Node29 - - + + + + + - + Node33 - - -cppuddle/executor_recycling -/executor_pools_interface.hpp + + +cppuddle/executor_recycling +/executor_pools_interface.hpp - + Node1->Node33 - - + + + + + - + Node28->Node10 - - + + + + + - + Node29->Node2 - - + + + + + - + Node29->Node5 - - + + + + + - + Node29->Node8 - - + + + + + - + Node29->Node9 - - + + + + + - + Node29->Node10 - - + + + + + - + Node29->Node11 - - + + + + + - + Node29->Node15 - - + + + + + - + Node29->Node18 - - + + + + + - + Node29->Node28 - - + + + + + - + Node30 - - -cassert + + +cassert - + Node29->Node30 - - + + + + + - + Node31 - - -functional + + +functional - + Node29->Node31 - - + + + + + - + Node32 - - -list + + +list - + Node29->Node32 - - + + + + + - + Node34 - - -cppuddle/executor_recycling -/detail/executor_pools_management.hpp + + +cppuddle/executor_recycling +/detail/executor_pools_management.hpp - + Node33->Node34 - - + + + + + - + Node34->Node8 - - + + + + + - + Node34->Node9 - - + + + + + - + Node34->Node10 - - + + + + + - + Node34->Node14 - - + + + + + - + Node34->Node15 - - + + + + + - + Node34->Node28 - - + + + + + - + Node34->Node30 - - + + + + + - + Node35 - - -algorithm + + +algorithm - + Node34->Node35 - - + + + + + - + Node36 - - -array + + +array - + Node34->Node36 - - + + + + + - + Node37 - - -deque + + +deque - + Node34->Node37 - - + + + + + - + Node38 - - -queue + + +queue - + Node34->Node38 - - + + + + + diff --git a/aggregation__executors__and__allocators_8hpp_source.html b/aggregation__executors__and__allocators_8hpp_source.html index fe8beb54..5562585b 100644 --- a/aggregation__executors__and__allocators_8hpp_source.html +++ b/aggregation__executors__and__allocators_8hpp_source.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: include/cppuddle/kernel_aggregation/detail/aggregation_executors_and_allocators.hpp Source File @@ -19,8 +19,8 @@
- - + @@ -29,22 +29,28 @@
+
CPPuddle
- + +/* @license-end */ + +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+
-
-
aggregation_executors_and_allocators.hpp
+
aggregation_executors_and_allocators.hpp
-Go to the documentation of this file.
1 // Copyright (c) 2022-2024 Gregor Daiß
-
2 //
-
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
-
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-
5 
-
6 #ifndef AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
-
7 #define AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
-
8 
-
9 #ifndef CPPUDDLE_HAVE_HPX
-
10 #error "Work aggregation allocators/executors require CPPUDDLE_WITH_HPX=ON"
-
11 #endif
-
12 
-
13 #include <stdexcept>
-
14 // When defined, CPPuddle will run more checks
-
15 // about the order of aggregated method calls.
-
16 // Best defined before including this header when needed
-
17 // (hence commented out here)
-
18 //#define DEBUG_AGGREGATION_CALLS 1
-
19 
-
20 #include <stdio.h>
-
21 
-
22 #include <any>
-
23 #include <atomic>
-
24 #include <chrono>
-
25 #include <cstdio>
-
26 #include <iostream>
-
27 #include <memory>
-
28 #include <mutex>
-
29 #include <optional>
-
30 #include <ostream>
-
31 #include <string>
-
32 #include <tuple>
-
33 #include <type_traits>
-
34 #include <typeinfo>
-
35 #include <utility>
-
36 #include <unordered_map>
-
37 
-
38 #include <hpx/futures/future.hpp>
-
39 #include <hpx/hpx_init.hpp>
-
40 #include <hpx/include/async.hpp>
-
41 #include <hpx/include/iostreams.hpp>
-
42 #include <hpx/include/lcos.hpp>
-
43 #include <hpx/lcos/promise.hpp>
-
44 #include <hpx/mutex.hpp>
-
45 
-
46 #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
-
47 // required for defining type traits using cuda executor as underlying
-
48 // aggregation executors
-
49 #include <hpx/async_cuda/cuda_executor.hpp>
-
50 #endif
-
51 
-
52 #include <boost/core/demangle.hpp>
-
53 #include <boost/format.hpp>
-
54 
- -
56 // get direct access to the buffer manangment
- -
58 // get normal access to the executor pools
- -
60 
-
61 #ifndef CPPUDDLE_HAVE_HPX_MUTEX
-
62 #pragma message \
-
63  "Work aggregation will use hpx::mutex internally, despite CPPUDDLE_WITH_HPX_MUTEX=OFF"
-
64 #pragma message \
-
65  "Consider using CPPUDDLE_WITH_HPX_MUTEX=ON, to make the rest of CPPuddle also use hpx::mutex"
-
66 #endif
-
67 namespace cppuddle {
-
68 namespace kernel_aggregation {
-
69 namespace detail {
-
70  using aggregation_mutex_t = hpx::mutex;
-
71 
-
72 //===============================================================================
-
73 //===============================================================================
-
74 // Helper functions/classes
-
75 
-
78 template <typename... Ts>
-
79 std::tuple<Ts...> make_tuple_supporting_references(Ts &&...ts) {
-
80  return std::tuple<Ts...>{std::forward<Ts>(ts)...};
-
81 }
-
82 
-
85 template <typename T> std::string print_if_possible(T val) {
-
86  if constexpr (std::is_convertible_v<T, std::string>) {
-
87  return val;
-
88  } else if constexpr (std::is_integral_v<T> || std::is_floating_point_v<T>) {
-
89  return std::to_string(val);
-
90  } else if constexpr (std::is_pointer_v<T>) {
-
91  // Pretty printing pointer sort of only works well with %p
-
92  // TODO Try using std::format as soon as we can move to C++20
-
93  std::unique_ptr<char[]> debug_string(new char[128]());
-
94  snprintf(debug_string.get(), 128, "%p", val);
-
95  return std::string(debug_string.get());
-
96  } else {
-
97  return std::string("cannot print value");
-
98  }
-
99 }
-
100 
-
103 template <class TupType, size_t... I>
-
104 void print_tuple(const TupType &_tup, std::index_sequence<I...>) {
-
105  (..., (hpx::cout << (I == 0 ? "" : ", ")
-
106  << print_if_possible(std::get<I + 1>(_tup))));
-
107 }
-
108 
-
111 template <class... T> void print_tuple(const std::tuple<T...> &_tup) {
-
112  // Use pointer and sprintf as boost::format refused to NOT cast the pointer
-
113  // address to 1...
-
114  // TODO Try using std::format as soon as we can move to C++20
-
115  std::unique_ptr<char[]> debug_string(new char[128]());
-
116  snprintf(debug_string.get(), 128, "Function address: %p -- Arguments: (",
-
117  std::get<0>(_tup));
-
118  hpx::cout << debug_string.get();
-
119  print_tuple(_tup, std::make_index_sequence<sizeof...(T) - 1>());
-
120  hpx::cout << ")";
-
121 }
-
122 
-
123 //===============================================================================
-
124 //===============================================================================
-
125 template <typename Executor, typename F, typename... Ts>
-
126 void exec_post_wrapper(Executor & exec, F &&f, Ts &&...ts) {
-
127  hpx::apply(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
-
128 }
-
129 
-
130 template <typename Executor, typename F, typename... Ts>
-
131 hpx::lcos::future<void> exec_async_wrapper(Executor & exec, F &&f, Ts &&...ts) {
-
132  return hpx::async(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
-
133 }
-
134 
-
137 /** Launch conditions: All slice executors must have called the same function
-
138  * (tracked by future all_slices_ready)
-
139  * AND
-
140  * Previous aggregated_function_call on the same Executor must have been
-
141  * launched (tracked by future stream_future)
-
142  * All function calls received from the slice executors are checked if they
-
143  * match the first one in both types and values (throws exception otherwise)
-
144  */
-
145 
-
146 template <typename Executor> class aggregated_function_call {
-
147 private:
-
148  std::atomic<size_t> slice_counter = 0;
-
149 
-
151  /* hpx::lcos::local::promise<void> slices_ready_promise; */
-
153  /* hpx::lcos::future<void> all_slices_ready = slices_ready_promise.get_future(); */
-
155  const size_t number_slices;
-
156  const bool async_mode;
-
157 
-
158  Executor &underlying_executor;
-
159 
-
160 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
161 #pragma message \
-
162  "Building slow work aggegator build with additional runtime checks! Build with NDEBUG defined for fast build..."
-
165  std::any function_tuple;
-
167  std::string debug_type_information;
-
168  aggregation_mutex_t debug_mut;
-
169 #endif
-
170 
-
171  std::vector<hpx::lcos::local::promise<void>> potential_async_promises{};
-
172 
-
173 public:
-
174  aggregated_function_call(const size_t number_slices, bool async_mode, Executor &exec)
-
175  : number_slices(number_slices), async_mode(async_mode), underlying_executor(exec) {
-
176  if (async_mode)
-
177  potential_async_promises.resize(number_slices);
-
178  }
-
179  ~aggregated_function_call(void) {
-
180  // All slices should have done this call
-
181  assert(slice_counter == number_slices);
-
182  // assert(!all_slices_ready.valid());
-
183  }
-
185  bool sync_aggregation_slices(hpx::lcos::future<void> &stream_future) {
-
186  assert(!async_mode);
-
187  assert(potential_async_promises.empty());
-
188  const size_t local_counter = slice_counter++;
-
189  if (local_counter == number_slices - 1) {
-
190  return true;
-
191  }
-
192  else return false;
-
193  }
-
194  template <typename F, typename... Ts>
-
195  void post_when(hpx::lcos::future<void> &stream_future, F &&f, Ts &&...ts) {
-
196 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
197  // needed for concurrent access to function_tuple and debug_type_information
-
198  // Not required for normal use
-
199  std::lock_guard<aggregation_mutex_t> guard(debug_mut);
-
200 #endif
-
201  assert(!async_mode);
-
202  assert(potential_async_promises.empty());
-
203  const size_t local_counter = slice_counter++;
-
204 
-
205  if (local_counter == 0) {
-
206 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
207  auto tmp_tuple =
-
208  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
-
209  function_tuple = tmp_tuple;
-
210  debug_type_information = typeid(decltype(tmp_tuple)).name();
-
211 #endif
-
212 
-
213  } else {
-
214  //
-
215  // This scope checks if both the type and the values of the current call
-
216  // match the original call To be used in debug build...
-
217  //
-
218 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
219  auto comparison_tuple =
-
220  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
-
221  try {
-
222  auto orig_call_tuple =
-
223  std::any_cast<decltype(comparison_tuple)>(function_tuple);
-
224  if (comparison_tuple != orig_call_tuple) {
-
225  throw std::runtime_error(
-
226  "Values of post function arguments (or function "
-
227  "itself) do not match ");
-
228  }
-
229  } catch (const std::bad_any_cast &e) {
-
230  hpx::cout
-
231  << "\nMismatched types error in aggregated post call of executor "
-
232  << ": " << e.what() << "\n";
-
233  hpx::cout << "Expected types:\t\t "
-
234  << boost::core::demangle(debug_type_information.c_str());
-
235  hpx::cout << "\nGot types:\t\t "
-
236  << boost::core::demangle(
-
237  typeid(decltype(comparison_tuple)).name())
-
238  << "\n"
-
239  << std::endl;
-
240  // throw;
-
241  } catch (const std::runtime_error &e) {
-
242  hpx::cout
-
243  << "\nMismatched values error in aggregated post call of executor "
-
244  << ": " << e.what() << std::endl;
-
245  hpx::cout << "Types (matched):\t "
-
246  << boost::core::demangle(debug_type_information.c_str());
-
247  auto orig_call_tuple =
-
248  std::any_cast<decltype(comparison_tuple)>(function_tuple);
-
249  hpx::cout << "\nExpected values:\t ";
-
250  print_tuple(orig_call_tuple);
-
251  hpx::cout << "\nGot values:\t\t ";
-
252  print_tuple(comparison_tuple);
-
253  hpx::cout << std::endl << std::endl;
-
254  // throw;
-
255  }
-
256 #endif
-
257  }
-
258  assert(local_counter < number_slices);
-
259  assert(slice_counter < number_slices + 1);
-
260  // Check exit criteria: Launch function call continuation by setting the
-
261  // slices promise
-
262  if (local_counter == number_slices - 1) {
-
263  exec_post_wrapper<Executor, F, Ts...>(underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
-
264  //slices_ready_promise.set_value();
-
265  }
-
266  }
-
267  template <typename F, typename... Ts>
-
268  hpx::lcos::future<void> async_when(hpx::lcos::future<void> &stream_future,
-
269  F &&f, Ts &&...ts) {
-
270 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
271  // needed for concurrent access to function_tuple and debug_type_information
-
272  // Not required for normal use
-
273  std::lock_guard<aggregation_mutex_t> guard(debug_mut);
-
274 #endif
-
275  assert(async_mode);
-
276  assert(!potential_async_promises.empty());
-
277  const size_t local_counter = slice_counter++;
-
278  if (local_counter == 0) {
-
279 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
280  auto tmp_tuple =
-
281  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
-
282  function_tuple = tmp_tuple;
-
283  debug_type_information = typeid(decltype(tmp_tuple)).name();
-
284 #endif
-
285  } else {
-
286  //
-
287  // This scope checks if both the type and the values of the current call
-
288  // match the original call To be used in debug build...
-
289  //
-
290 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
-
291  auto comparison_tuple =
-
292  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
-
293  try {
-
294  auto orig_call_tuple =
-
295  std::any_cast<decltype(comparison_tuple)>(function_tuple);
-
296  if (comparison_tuple != orig_call_tuple) {
-
297  throw std::runtime_error(
-
298  "Values of async function arguments (or function "
-
299  "itself) do not match ");
-
300  }
-
301  } catch (const std::bad_any_cast &e) {
-
302  hpx::cout
-
303  << "\nMismatched types error in aggregated async call of executor "
-
304  << ": " << e.what() << "\n";
-
305  hpx::cout << "Expected types:\t\t "
-
306  << boost::core::demangle(debug_type_information.c_str());
-
307  hpx::cout << "\nGot types:\t\t "
-
308  << boost::core::demangle(
-
309  typeid(decltype(comparison_tuple)).name())
-
310  << "\n"
-
311  << std::endl;
-
312  // throw;
-
313  } catch (const std::runtime_error &e) {
-
314  hpx::cout
-
315  << "\nMismatched values error in aggregated async call of executor "
-
316  << ": " << e.what() << std::endl;
-
317  hpx::cout << "Types (matched):\t "
-
318  << boost::core::demangle(debug_type_information.c_str());
-
319  auto orig_call_tuple =
-
320  std::any_cast<decltype(comparison_tuple)>(function_tuple);
-
321  hpx::cout << "\nExpected values:\t ";
-
322  print_tuple(orig_call_tuple);
-
323  hpx::cout << "\nGot values:\t\t ";
-
324  print_tuple(comparison_tuple);
-
325  hpx::cout << std::endl << std::endl;
-
326  // throw;
-
327  }
-
328 #endif
-
329  }
-
330  assert(local_counter < number_slices);
-
331  assert(slice_counter < number_slices + 1);
-
332  assert(potential_async_promises.size() == number_slices);
-
333  hpx::lcos::future<void> ret_fut =
-
334  potential_async_promises[local_counter].get_future();
-
335  if (local_counter == number_slices - 1) {
-
336  /* slices_ready_promise.set_value(); */
-
337  auto fut = exec_async_wrapper<Executor, F, Ts...>(
-
338  underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
-
339  fut.then([this](auto &&fut) {
-
340  for (auto &promise : potential_async_promises) {
-
341  promise.set_value();
-
342  }
-
343  });
-
344  }
-
345  // Check exit criteria: Launch function call continuation by setting the
-
346  // slices promise
-
347  return ret_fut;
-
348  }
-
349  template <typename F, typename... Ts>
-
350  hpx::lcos::shared_future<void> wrap_async(hpx::lcos::future<void> &stream_future,
-
351  F &&f, Ts &&...ts) {
-
352  assert(async_mode);
-
353  assert(!potential_async_promises.empty());
-
354  const size_t local_counter = slice_counter++;
-
355  assert(local_counter < number_slices);
-
356  assert(slice_counter < number_slices + 1);
-
357  assert(potential_async_promises.size() == number_slices);
-
358  hpx::lcos::shared_future<void> ret_fut =
-
359  potential_async_promises[local_counter].get_shared_future();
-
360  if (local_counter == number_slices - 1) {
-
361  auto fut = f(std::forward<Ts>(ts)...);
-
362  fut.then([this](auto &&fut) {
-
363  // TODO just use one promise
-
364  for (auto &promise : potential_async_promises) {
-
365  promise.set_value();
-
366  }
-
367  });
-
368  }
-
369  return ret_fut;
-
370  }
-
371  // We need to be able to copy or no-except move for std::vector..
-
372  aggregated_function_call(const aggregated_function_call &other) = default;
-
373  aggregated_function_call &
-
374  operator=(const aggregated_function_call &other) = default;
-
375  aggregated_function_call(aggregated_function_call &&other) = default;
-
376  aggregated_function_call &
-
377  operator=(aggregated_function_call &&other) = default;
-
378 };
-
379 
-
380 //===============================================================================
-
381 //===============================================================================
-
382 
-
383 enum class aggregated_executor_modes { EAGER = 1, STRICT, ENDLESS };
-
385 template <typename T, typename Host_Allocator, typename Executor>
-
386 class allocator_slice;
-
387 
-
389 /** Executor is not meant to be used directly. Instead it yields multiple
-
390  * executor_slice objects. These serve as interfaces. Slices from the same
-
391  * aggregated_executor are meant to execute the same function calls but on
-
392  * different data (i.e. different tasks)
-
393  */
-
394 template <typename Executor> class aggregated_executor {
-
395 private:
-
396  //===============================================================================
-
397  // Misc private variables:
-
398  //
-
399  std::atomic<bool> slices_exhausted;
-
400 
-
401  std::atomic<bool> executor_slices_alive;
-
402  std::atomic<bool> buffers_in_use;
-
403  std::atomic<size_t> dealloc_counter;
-
404 
-
405  const aggregated_executor_modes mode;
-
406  const size_t max_slices;
-
407  std::atomic<size_t> current_slices;
-
411  std::unique_ptr<cppuddle::executor_recycling::executor_interface<
-
412  Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>>
-
413  executor_wrapper;
-
414 
-
415 public:
-
416  size_t gpu_id;
-
417  // Subclasses
-
418 
-
420  class executor_slice {
-
421  public:
-
422  aggregated_executor<Executor> &parent;
-
423  private:
-
427  size_t launch_counter{0};
-
428  size_t buffer_counter{0};
-
429  bool notify_parent_about_destruction{true};
-
430 
-
431  public:
-
434  const size_t number_slices;
-
435  size_t max_slices;
-
436  size_t id;
-
437  using executor_t = Executor;
-
438  executor_slice(aggregated_executor &parent, const size_t slice_id,
-
439  const size_t number_slices, const size_t max_number_slices)
-
440  : parent(parent), notify_parent_about_destruction(true),
-
441  number_slices(number_slices), id(slice_id), max_slices(max_number_slices) {
-
442  assert(parent.max_slices == max_slices);
-
443  assert(number_slices >= 1);
-
444  assert(number_slices <= max_slices);
-
445  }
-
446  ~executor_slice(void) {
-
447  // Don't notify parent if we moved away from this executor_slice
-
448  if (notify_parent_about_destruction) {
-
449  // Executor should be done by the time of destruction
-
450  // -> check here before notifying parent
-
451 
-
452  assert(parent.max_slices == max_slices);
-
453  assert(number_slices >= 1);
-
454  assert(number_slices <= max_slices);
-
455  // parent still in execution mode?
-
456  assert(parent.slices_exhausted == true);
-
457  // all kernel launches done?
-
458  assert(launch_counter == parent.function_calls.size());
-
459  // Notifiy parent that this aggregation slice is one
-
460  parent.reduce_usage_counter();
-
461  }
-
462  }
-
463  executor_slice(const executor_slice &other) = delete;
-
464  executor_slice &operator=(const executor_slice &other) = delete;
-
465  executor_slice(executor_slice &&other)
-
466  : parent(other.parent), launch_counter(std::move(other.launch_counter)),
-
467  buffer_counter(std::move(other.buffer_counter)),
-
468  number_slices(std::move(other.number_slices)),
-
469  id(std::move(other.id)), max_slices(std::move(other.max_slices)) {
-
470  other.notify_parent_about_destruction = false;
-
471  }
-
472  executor_slice &operator=(executor_slice &&other) {
-
473  parent = other.parent;
-
474  launch_counter = std::move(other.launch_counter);
-
475  buffer_counter = std::move(other.buffer_counter);
-
476  number_slices = std::move(other.number_slices);
-
477  id = std::move(other.id);
-
478  max_slices = std::move(other.max_slices);
-
479  other.notify_parent_about_destruction = false;
-
480  }
-
481  template <typename T, typename Host_Allocator>
-
482  allocator_slice<T, Host_Allocator, Executor> make_allocator() {
-
483  return allocator_slice<T, Host_Allocator, Executor>(*this);
-
484  }
-
485  bool sync_aggregation_slices() {
-
486  assert(parent.slices_exhausted == true);
-
487  auto ret = parent.sync_aggregation_slices(launch_counter);
-
488  launch_counter++;
-
489  return ret;
-
490  }
-
491  template <typename F, typename... Ts> void post(F &&f, Ts &&...ts) {
-
492  // we should only execute function calls once all slices
-
493  // have been given away (-> Executor Slices start)
-
494  assert(parent.slices_exhausted == true);
-
495  parent.post(launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
-
496  launch_counter++;
-
497  }
-
498  template <typename F, typename... Ts>
-
499  hpx::lcos::future<void> async(F &&f, Ts &&...ts) {
-
500  // we should only execute function calls once all slices
-
501  // have been given away (-> Executor Slices start)
-
502  assert(parent.slices_exhausted == true);
-
503  hpx::lcos::future<void> ret_fut = parent.async(
-
504  launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
-
505  launch_counter++;
-
506  return ret_fut;
-
507  }
-
508 
-
509  // OneWay Execution
-
510  template <typename F, typename... Ts>
-
511  friend decltype(auto) tag_invoke(hpx::parallel::execution::post_t,
-
512  executor_slice& exec, F&& f, Ts&&... ts)
-
513  {
-
514  return exec.post(std::forward<F>(f), std::forward<Ts>(ts)...);
-
515  }
-
516 
-
517  // TwoWay Execution
-
518  template <typename F, typename... Ts>
-
519  friend decltype(auto) tag_invoke(
-
520  hpx::parallel::execution::async_execute_t, executor_slice& exec,
-
521  F&& f, Ts&&... ts)
-
522  {
-
523  return exec.async(
-
524  std::forward<F>(f), std::forward<Ts>(ts)...);
-
525  }
-
526 
-
527  template <typename F, typename... Ts>
-
528  hpx::lcos::shared_future<void> wrap_async(F &&f, Ts &&...ts) {
-
529  // we should only execute function calls once all slices
-
530  // have been given away (-> Executor Slices start)
-
531  assert(parent.slices_exhausted == true);
-
532  hpx::lcos::shared_future<void> ret_fut = parent.wrap_async(
-
533  launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
-
534  launch_counter++;
-
535  return ret_fut;
-
536  }
-
537 
-
540  template <typename T, typename Host_Allocator> T *get(const size_t size) {
-
541  assert(parent.slices_exhausted == true);
-
542  T *aggregated_buffer =
-
543  parent.get<T, Host_Allocator>(size, buffer_counter);
-
544  buffer_counter++;
-
545  assert(buffer_counter > 0);
-
546  return aggregated_buffer;
-
547  }
-
548 
-
549  Executor& get_underlying_executor(void) {
-
550  assert(parent.executor_wrapper);
-
551  return *(parent.executor_wrapper);
-
552  }
-
553  };
-
554 
-
555  // deprecated name...
-
556  using Executor_Slice [[deprectated("Renamed: Use executor_slice instead")]] = executor_slice;
-
557 
-
558  //===============================================================================
-
559 
-
560  hpx::lcos::local::promise<void> slices_full_promise;
-
563  std::vector<hpx::lcos::local::promise<executor_slice>> executor_slices;
-
566  std::deque<aggregated_function_call<Executor>> function_calls;
-
568  aggregation_mutex_t mut;
-
569 
-
572  using buffer_entry_t =
-
573  std::tuple<void*, const size_t, std::atomic<size_t>, bool, const size_t, size_t>;
-
575  std::deque<buffer_entry_t> buffer_allocations;
-
577  std::unordered_map<void*,size_t> buffer_allocations_map;
-
579  aggregation_mutex_t buffer_mut;
-
580  std::atomic<size_t> buffer_counter = 0;
-
581 
-
583  template <typename T, typename Host_Allocator>
-
584  T *get(const size_t size, const size_t slice_alloc_counter) {
-
585  assert(slices_exhausted == true);
-
586  assert(executor_wrapper);
-
587  assert(executor_slices_alive == true);
-
588  // Add aggreated buffer entry in case it hasn't happened yet for this call
-
589  // First: Check if it already has happened
-
590  if (buffer_counter <= slice_alloc_counter) {
-
591  // we might be the first! Lock...
-
592  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
-
593  // ... and recheck
-
594  if (buffer_counter <= slice_alloc_counter) {
-
595  constexpr bool manage_content_lifetime = false;
-
596  buffers_in_use = true;
-
597 
-
598  // Default location -- useful for GPU builds as we otherwise create way too
-
599  // many different buffers for different aggregation sizes on different GPUs
-
600  /* size_t location_id = gpu_id * instances_per_gpu; */
-
601  // Use integer conversion to only use 0 16 32 ... as buckets
-
602  size_t location_id = ((hpx::get_worker_thread_num() % cppuddle::number_instances) / 16) * 16;
-
603 #ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
-
604  if (max_slices == 1) {
-
605  // get prefered location: aka the current hpx threads location
-
606  // Usually handy for CPU builds where we want to use the buffers
-
607  // close to the current CPU core
-
608  /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */
-
609  /* location_id = (gpu_id) * instances_per_gpu; */
-
610  // division makes sure that we always use the same instance to store our gpu buffers.
-
611  }
-
612 #endif
-
613  // Get shiny and new buffer that will be shared between all slices
-
614  // Buffer might be recycled from previous allocations by the
-
615  // buffer_interface...
-
616  T *aggregated_buffer =
-
617  cppuddle::memory_recycling::detail::buffer_interface::get<
-
618  T, Host_Allocator>(size, manage_content_lifetime, location_id,
-
619  gpu_id);
-
620  // Create buffer entry for this buffer
-
621  buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
-
622  size, 1, true, location_id, gpu_id);
-
623 
-
624 #ifndef NDEBUG
-
625  // if previousely used the buffer should not be in usage anymore
-
626  const auto exists = buffer_allocations_map.count(
-
627  static_cast<void *>(aggregated_buffer));
-
628  if (exists > 0) {
-
629  const auto previous_usage_id =
-
630  buffer_allocations_map[static_cast<void *>(aggregated_buffer)];
-
631  const auto &valid =
-
632  std::get<3>(buffer_allocations[previous_usage_id]);
-
633  assert(!valid);
-
634  }
-
635 #endif
-
636  buffer_allocations_map.insert_or_assign(static_cast<void *>(aggregated_buffer),
-
637  buffer_counter);
-
638 
-
639  assert (buffer_counter == slice_alloc_counter);
-
640  buffer_counter = buffer_allocations.size();
-
641 
-
642  // Return buffer
-
643  return aggregated_buffer;
-
644  }
-
645  }
-
646  assert(buffers_in_use == true);
-
647  assert(std::get<3>(buffer_allocations[slice_alloc_counter])); // valid
-
648  assert(std::get<2>(buffer_allocations[slice_alloc_counter]) >= 1);
-
649 
-
650  // Buffer entry should already exist:
-
651  T *aggregated_buffer = static_cast<T *>(
-
652  std::get<0>(buffer_allocations[slice_alloc_counter]));
-
653  // Error handling: Size is wrong?
-
654  assert(size == std::get<1>(buffer_allocations[slice_alloc_counter]));
-
655  // Notify that one more slice has visited this buffer allocation
-
656  std::get<2>(buffer_allocations[slice_alloc_counter])++;
-
657  return aggregated_buffer;
-
658  }
-
659 
-
661  template <typename T, typename Host_Allocator>
-
662  void mark_unused(T *p, const size_t size) {
-
663  assert(slices_exhausted == true);
-
664  assert(executor_wrapper);
-
665 
-
666  void *ptr_key = static_cast<void*>(p);
-
667  size_t slice_alloc_counter = buffer_allocations_map[p];
-
668 
-
669  assert(slice_alloc_counter < buffer_allocations.size());
-
670  /*auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, valid] =
-
671  buffer_allocations[slice_alloc_counter];*/
-
672  auto buffer_pointer_void = std::get<0>(buffer_allocations[slice_alloc_counter]);
-
673  const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
-
674  auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
-
675  auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
-
676  const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
-
677  const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]);
-
678  assert(valid);
-
679  T *buffer_pointer = static_cast<T *>(buffer_pointer_void);
-
680 
-
681  assert(buffer_size == size);
-
682  assert(p == buffer_pointer);
-
683  // assert(buffer_pointer == p || buffer_pointer == nullptr);
-
684  // Slice is done with this buffer
-
685  buffer_allocation_counter--;
-
686  // Check if all slices are done with this buffer?
-
687  if (buffer_allocation_counter == 0) {
-
688  // Yes! "Deallocate" by telling the recylcer the buffer is fit for reusage
-
689  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
-
690  // Only mark unused if another buffer has not done so already (and marked
-
691  // it as invalid)
-
692  if (valid) {
-
693  assert(buffers_in_use == true);
-
694  cppuddle::memory_recycling::detail::buffer_interface::mark_unused<
-
695  T, Host_Allocator>(buffer_pointer, buffer_size, location_id,
-
696  gpu_id);
-
697  // mark buffer as invalid to prevent any other slice from marking the
-
698  // buffer as unused
-
699  valid = false;
-
700 
-
701  const size_t current_deallocs = ++dealloc_counter;
-
702  if (current_deallocs == buffer_counter) {
-
703  std::lock_guard<aggregation_mutex_t> guard(mut);
-
704  buffers_in_use = false;
-
705  if (!executor_slices_alive && !buffers_in_use) {
-
706  slices_exhausted = false;
-
707  // Release executor
-
708  executor_wrapper.reset(nullptr);
-
709  }
-
710  }
-
711  }
-
712  }
-
713  }
-
714 
-
715  //===============================================================================
-
716  // Public Interface
-
717 public:
-
718  hpx::lcos::future<void> current_continuation;
-
719  hpx::lcos::future<void> last_stream_launch_done;
-
720  std::atomic<size_t> overall_launch_counter = 0;
-
721 
-
723  bool sync_aggregation_slices(const size_t slice_launch_counter) {
-
724  std::lock_guard<aggregation_mutex_t> guard(mut);
-
725  assert(slices_exhausted == true);
-
726  assert(executor_wrapper);
-
727  // Add function call object in case it hasn't happened for this launch yet
-
728  if (overall_launch_counter <= slice_launch_counter) {
-
729  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
-
730  if (overall_launch_counter <= slice_launch_counter) {
-
731  function_calls.emplace_back(current_slices, false, *executor_wrapper);
-
732  overall_launch_counter = function_calls.size();
-
733  return function_calls[slice_launch_counter].sync_aggregation_slices(
-
734  last_stream_launch_done);
-
735  }
-
736  }
-
737 
-
738  return function_calls[slice_launch_counter].sync_aggregation_slices(
-
739  last_stream_launch_done);
-
740  }
-
741 
-
743  template <typename F, typename... Ts>
-
744  void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
-
745  std::lock_guard<aggregation_mutex_t> guard(mut);
-
746  assert(slices_exhausted == true);
-
747  assert(executor_wrapper);
-
748  // Add function call object in case it hasn't happened for this launch yet
-
749  if (overall_launch_counter <= slice_launch_counter) {
-
750  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
-
751  if (overall_launch_counter <= slice_launch_counter) {
-
752  function_calls.emplace_back(current_slices, false, *executor_wrapper);
-
753  overall_launch_counter = function_calls.size();
-
754  function_calls[slice_launch_counter].post_when(
-
755  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
756  return;
-
757  }
-
758  }
-
759 
-
760  function_calls[slice_launch_counter].post_when(
-
761  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
762  return;
-
763  }
-
764 
-
766  template <typename F, typename... Ts>
-
767  hpx::lcos::future<void> async(const size_t slice_launch_counter, F &&f,
-
768  Ts &&...ts) {
-
769  std::lock_guard<aggregation_mutex_t> guard(mut);
-
770  assert(slices_exhausted == true);
-
771  assert(executor_wrapper);
-
772  // Add function call object in case it hasn't happened for this launch yet
-
773  if (overall_launch_counter <= slice_launch_counter) {
-
774  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
-
775  if (overall_launch_counter <= slice_launch_counter) {
-
776  function_calls.emplace_back(current_slices, true, *executor_wrapper);
-
777  overall_launch_counter = function_calls.size();
-
778  return function_calls[slice_launch_counter].async_when(
-
779  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
780  }
-
781  }
-
782 
-
783  return function_calls[slice_launch_counter].async_when(
-
784  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
785  }
-
787  template <typename F, typename... Ts>
-
788  hpx::lcos::shared_future<void> wrap_async(const size_t slice_launch_counter, F &&f,
-
789  Ts &&...ts) {
-
790  std::lock_guard<aggregation_mutex_t> guard(mut);
-
791  assert(slices_exhausted == true);
-
792  assert(executor_wrapper);
-
793  // Add function call object in case it hasn't happened for this launch yet
-
794  if (overall_launch_counter <= slice_launch_counter) {
-
795  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
-
796  if (overall_launch_counter <= slice_launch_counter) {
-
797  function_calls.emplace_back(current_slices, true, *executor_wrapper);
-
798  overall_launch_counter = function_calls.size();
-
799  return function_calls[slice_launch_counter].wrap_async(
-
800  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
801  }
-
802  }
-
803 
-
804  return function_calls[slice_launch_counter].wrap_async(
-
805  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
-
806  }
-
807 
-
808  bool slice_available(void) {
-
809  std::lock_guard<aggregation_mutex_t> guard(mut);
-
810  return !slices_exhausted;
-
811  }
-
812 
-
813  std::optional<hpx::lcos::future<executor_slice>> request_executor_slice() {
-
814  std::lock_guard<aggregation_mutex_t> guard(mut);
-
815  if (!slices_exhausted) {
-
816  const size_t local_slice_id = ++current_slices;
-
817  if (local_slice_id == 1) {
-
818  // Cleanup leftovers from last run if any
-
819  // TODO still required? Should be clean here already
-
820  function_calls.clear();
-
821  overall_launch_counter = 0;
-
822  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
-
823 #ifndef NDEBUG
-
824  for (const auto &buffer_entry : buffer_allocations) {
-
825  const auto &[buffer_pointer_any, buffer_size,
-
826  buffer_allocation_counter, valid, location_id, device_id] =
-
827  buffer_entry;
-
828  assert(!valid);
-
829  }
-
830 #endif
-
831  buffer_allocations.clear();
-
832  buffer_allocations_map.clear();
-
833  buffer_counter = 0;
-
834 
-
835  assert(executor_slices_alive == false);
-
836  assert(buffers_in_use == false);
-
837  executor_slices_alive = true;
-
838  buffers_in_use = false;
-
839  dealloc_counter = 0;
-
840 
-
841  if (mode == aggregated_executor_modes::STRICT ) {
-
842  slices_full_promise = hpx::lcos::local::promise<void>{};
-
843  }
-
844  }
-
845 
-
846  // Create Executor Slice future -- that will be returned later
-
847  hpx::lcos::future<executor_slice> ret_fut;
-
848  if (local_slice_id < max_slices) {
-
849  executor_slices.emplace_back(hpx::lcos::local::promise<executor_slice>{});
-
850  ret_fut =
-
851  executor_slices[local_slice_id - 1].get_future();
-
852  } else {
-
853  launched_slices = current_slices;
-
854  ret_fut = hpx::make_ready_future(executor_slice{*this,
-
855  executor_slices.size(), launched_slices, max_slices});
-
856  }
-
857 
-
858  // Are we the first slice? If yes, add continuation set the
-
859  // executor_slice
-
860  // futures to ready if the launch conditions are met
-
861  if (local_slice_id == 1) {
-
862  // Redraw executor
-
863  assert(!executor_wrapper);
-
864  cppuddle::executor_recycling::executor_pool::select_device<
-
865  Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
-
866  gpu_id);
-
867  executor_wrapper.reset(
-
868  new cppuddle::executor_recycling::executor_interface<
-
869  Executor,
-
870  cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
-
871  gpu_id));
-
872  // Renew promise that all slices will be ready as the primary launch
-
873  // criteria...
-
874  hpx::lcos::shared_future<void> fut;
-
875  if (mode == aggregated_executor_modes::EAGER ||
-
876  mode == aggregated_executor_modes::ENDLESS) {
-
877  // Fallback launch condidtion: Launch as soon as the underlying stream
-
878  // is ready
-
879  /* auto slices_full_fut = slices_full_promise.get_future(); */
-
880  cppuddle::executor_recycling::executor_pool::select_device<
-
881  Executor,
-
882  cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(gpu_id);
-
883  auto exec_fut = (*executor_wrapper).get_future();
-
884  /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
-
885  fut = std::move(exec_fut);
-
886  } else {
-
887  auto slices_full_fut = slices_full_promise.get_shared_future();
-
888  // Just use the slices launch condition
-
889  fut = std::move(slices_full_fut);
-
890  }
-
891  // Launch all executor slices within this continuation
-
892  current_continuation = fut.then([this](auto &&fut) {
-
893  std::lock_guard<aggregation_mutex_t> guard(mut);
-
894  slices_exhausted = true;
-
895  launched_slices = current_slices;
-
896  size_t id = 0;
-
897  for (auto &slice_promise : executor_slices) {
-
898  slice_promise.set_value(
-
899  executor_slice{*this, id, launched_slices, max_slices});
-
900  id++;
-
901  }
-
902  executor_slices.clear();
-
903  });
-
904  }
-
905  if (local_slice_id >= max_slices &&
-
906  mode != aggregated_executor_modes::ENDLESS) {
-
907  slices_exhausted = true; // prevents any more threads from entering
-
908  // before the continuation is launched
-
909  /* launched_slices = current_slices; */
-
910  /* size_t id = 0; */
-
911  /* for (auto &slice_promise : executor_slices) { */
-
912  /* slice_promise.set_value( */
-
913  /* executor_slice{*this, id, launched_slices}); */
-
914  /* id++; */
-
915  /* } */
-
916  /* executor_slices.clear(); */
-
917  if (mode == aggregated_executor_modes::STRICT ) {
-
918  slices_full_promise.set_value(); // Trigger slices launch condition continuation
-
919  }
-
920  // that continuation will set all executor slices so far handed out to ready
-
921  }
-
922  return ret_fut;
-
923  } else {
-
924  // Return empty optional as failure
-
925  return std::optional<hpx::lcos::future<executor_slice>>{};
-
926  }
-
927  }
-
928  size_t launched_slices;
-
929  void reduce_usage_counter(void) {
-
930  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
-
931  assert(slices_exhausted == true);
-
932  assert(executor_wrapper);
-
933  assert(executor_slices_alive == true);
-
934  assert(launched_slices >= 1);
-
935  assert(current_slices >= 0 && current_slices <= launched_slices);
-
936  const size_t local_slice_id = --current_slices;
-
937  // Last slice goes out scope?
-
938  if (local_slice_id == 0) {
-
939  // Mark executor fit for reusage
-
940  std::lock_guard<aggregation_mutex_t> guard(mut);
-
941  executor_slices_alive = false;
-
942  if (!executor_slices_alive && !buffers_in_use) {
-
943  // Release executor
-
944  slices_exhausted = false;
-
945  executor_wrapper.reset(nullptr);
-
946  }
-
947  }
-
948  }
-
949  ~aggregated_executor(void) {
-
950 
-
951  assert(current_slices == 0);
-
952  assert(executor_slices_alive == false);
-
953  assert(buffers_in_use == false);
-
954 
-
955  if (mode != aggregated_executor_modes::STRICT ) {
-
956  slices_full_promise.set_value(); // Trigger slices launch condition continuation
-
957  }
-
958 
-
959  // Cleanup leftovers from last run if any
-
960  function_calls.clear();
-
961  overall_launch_counter = 0;
-
962 #ifndef NDEBUG
-
963  for (const auto &buffer_entry : buffer_allocations) {
-
964  const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
-
965  valid, location_id, device_id] = buffer_entry;
-
966  assert(!valid);
-
967  }
-
968 #endif
-
969  buffer_allocations.clear();
-
970  buffer_allocations_map.clear();
-
971  buffer_counter = 0;
-
972 
-
973  assert(buffer_allocations.empty());
-
974  assert(buffer_allocations_map.empty());
-
975  }
-
976 
-
977  aggregated_executor(const size_t number_slices,
-
978  aggregated_executor_modes mode, const size_t gpu_id = 0)
-
979  : max_slices(number_slices), current_slices(0), slices_exhausted(false),
-
980  dealloc_counter(0), mode(mode), executor_slices_alive(false),
-
981  buffers_in_use(false), gpu_id(gpu_id),
-
982  executor_wrapper(nullptr),
-
983  current_continuation(hpx::make_ready_future()),
-
984  last_stream_launch_done(hpx::make_ready_future()) {}
-
985  // Not meant to be copied or moved
-
986  aggregated_executor(const aggregated_executor &other) = delete;
-
987  aggregated_executor &operator=(const aggregated_executor &other) = delete;
-
988  aggregated_executor(aggregated_executor &&other) = delete;
-
989  aggregated_executor &operator=(aggregated_executor &&other) = delete;
-
990 };
-
991 
-
992 template <typename T, typename Host_Allocator, typename Executor>
-
993 class allocator_slice {
-
994 private:
-
995  typename aggregated_executor<Executor>::executor_slice &executor_reference;
-
996  aggregated_executor<Executor> &executor_parent;
-
997 
-
998 public:
-
999  using value_type = T;
-
1000  allocator_slice(
-
1001  typename aggregated_executor<Executor>::executor_slice &executor)
-
1002  : executor_reference(executor), executor_parent(executor.parent) {}
-
1003  template <typename U>
-
1004  explicit allocator_slice(
-
1005  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {}
-
1006  T *allocate(std::size_t n) {
-
1007  T *data = executor_reference.template get<T, Host_Allocator>(n);
-
1008  return data;
-
1009  }
-
1010  void deallocate(T *p, std::size_t n) {
-
1011  /* executor_reference.template mark_unused<T, Host_Allocator>(p, n); */
-
1012  executor_parent.template mark_unused<T, Host_Allocator>(p, n);
-
1013  }
-
1014  template <typename... Args>
-
1015  inline void construct(T *p, Args... args) noexcept {
-
1016  // Do nothing here - we reuse the content of the last owner
-
1017  }
-
1018  void destroy(T *p) {
-
1019  // Do nothing here - Contents will be destroyed when the buffer manager is
-
1020  // destroyed, not before
-
1021  }
-
1022 };
-
1023 template <typename T, typename U, typename Host_Allocator, typename Executor>
-
1024 constexpr bool
-
1025 operator==(allocator_slice<T, Host_Allocator, Executor> const &,
-
1026  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
-
1027  return false;
-
1028 }
-
1029 template <typename T, typename U, typename Host_Allocator, typename Executor>
-
1030 constexpr bool
-
1031 operator!=(allocator_slice<T, Host_Allocator, Executor> const &,
-
1032  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
-
1033  return true;
-
1034 }
-
1035 
-
1036 } // namespace detail
-
1037 } // namespace kernel_aggregation
-
1038 } // namespace cppuddle
-
1039 
-
1040 
-
1041 
-
1042 namespace hpx { namespace parallel { namespace execution {
-
1043  // TODO Unfortunately does not work that way! Create trait that works for Executor Slices with
-
1044  // compatible unlying executor types
-
1045  /* template<typename E> */
-
1046  /* struct is_one_way_executor<typename aggregated_executor<E>::executor_slice> */
-
1047  /* : std::true_type */
-
1048  /* {}; */
-
1049  /* template<typename E> */
-
1050  /* struct is_two_way_executor<typename aggregated_executor<E>::executor_slice> */
-
1051  /* : std::true_type */
-
1052  /* {}; */
-
1053 
-
1054 #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
-
1055  // Workaround for the meantime: Manually create traits for compatible types:
-
1056 template <>
-
1057 struct is_one_way_executor<
-
1058  typename cppuddle::kernel_aggregation::detail::aggregated_executor<
-
1059  hpx::cuda::experimental::cuda_executor>::executor_slice>
-
1060  : std::true_type {};
-
1061 template <>
-
1062 struct is_two_way_executor<
-
1063  typename cppuddle::kernel_aggregation::detail::aggregated_executor<
-
1064  hpx::cuda::experimental::cuda_executor>::executor_slice>
-
1065  : std::true_type {};
-
1066 #endif
-
1067 }}}
-
1068 
-
1069 #endif
+Go to the documentation of this file.
1// Copyright (c) 2022-2024 Gregor Daiß
+
2//
+
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
+
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
5
+
6#ifndef AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
+
7#define AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
+
8
+
9#ifndef CPPUDDLE_HAVE_HPX
+
10#error "Work aggregation allocators/executors require CPPUDDLE_WITH_HPX=ON"
+
11#endif
+
12
+
13#include <stdexcept>
+
14// When defined, CPPuddle will run more checks
+
15// about the order of aggregated method calls.
+
16// Best defined before including this header when needed
+
17// (hence commented out here)
+
18//#define DEBUG_AGGREGATION_CALLS 1
+
19
+
20#include <stdio.h>
+
21
+
22#include <any>
+
23#include <atomic>
+
24#include <chrono>
+
25#include <cstdio>
+
26#include <iostream>
+
27#include <memory>
+
28#include <mutex>
+
29#include <optional>
+
30#include <ostream>
+
31#include <string>
+
32#include <tuple>
+
33#include <type_traits>
+
34#include <typeinfo>
+
35#include <utility>
+
36#include <unordered_map>
+
37
+
38#include <hpx/futures/future.hpp>
+
39#include <hpx/hpx_init.hpp>
+
40#include <hpx/include/async.hpp>
+
41#include <hpx/include/iostreams.hpp>
+
42#include <hpx/include/lcos.hpp>
+
43#include <hpx/lcos/promise.hpp>
+
44#include <hpx/mutex.hpp>
+
45
+
46#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
+
47// required for defining type traits using cuda executor as underlying
+
48// aggregation executors
+
49#include <hpx/async_cuda/cuda_executor.hpp>
+
50#endif
+
51
+
52#include <boost/core/demangle.hpp>
+
53#include <boost/format.hpp>
+
54
+ +
56// get direct access to the buffer manangment
+ +
58// get normal access to the executor pools
+ +
60
+
61#ifndef CPPUDDLE_HAVE_HPX_MUTEX
+
62#pragma message \
+
63 "Work aggregation will use hpx::mutex internally, despite CPPUDDLE_WITH_HPX_MUTEX=OFF"
+
64#pragma message \
+
65 "Consider using CPPUDDLE_WITH_HPX_MUTEX=ON, to make the rest of CPPuddle also use hpx::mutex"
+
66#endif
+
67namespace cppuddle {
+
68namespace kernel_aggregation {
+
69namespace detail {
+
70 using aggregation_mutex_t = hpx::mutex;
+
71
+
72//===============================================================================
+
73//===============================================================================
+
74// Helper functions/classes
+
75
+
78template <typename... Ts>
+
+
79std::tuple<Ts...> make_tuple_supporting_references(Ts &&...ts) {
+
80 return std::tuple<Ts...>{std::forward<Ts>(ts)...};
+
81}
+
+
82
+
+
85template <typename T> std::string print_if_possible(T val) {
+
86 if constexpr (std::is_convertible_v<T, std::string>) {
+
87 return val;
+
88 } else if constexpr (std::is_integral_v<T> || std::is_floating_point_v<T>) {
+
89 return std::to_string(val);
+
90 } else if constexpr (std::is_pointer_v<T>) {
+
91 // Pretty printing pointer sort of only works well with %p
+
92 // TODO Try using std::format as soon as we can move to C++20
+
93 std::unique_ptr<char[]> debug_string(new char[128]());
+
94 snprintf(debug_string.get(), 128, "%p", val);
+
95 return std::string(debug_string.get());
+
96 } else {
+
97 return std::string("cannot print value");
+
98 }
+
99}
+
+
100
+
103template <class TupType, size_t... I>
+
+
104void print_tuple(const TupType &_tup, std::index_sequence<I...>) {
+
105 (..., (hpx::cout << (I == 0 ? "" : ", ")
+
106 << print_if_possible(std::get<I + 1>(_tup))));
+
107}
+
+
108
+
+
111template <class... T> void print_tuple(const std::tuple<T...> &_tup) {
+
112 // Use pointer and sprintf as boost::format refused to NOT cast the pointer
+
113 // address to 1...
+
114 // TODO Try using std::format as soon as we can move to C++20
+
115 std::unique_ptr<char[]> debug_string(new char[128]());
+
116 snprintf(debug_string.get(), 128, "Function address: %p -- Arguments: (",
+
117 std::get<0>(_tup));
+
118 hpx::cout << debug_string.get();
+
119 print_tuple(_tup, std::make_index_sequence<sizeof...(T) - 1>());
+
120 hpx::cout << ")";
+
121}
+
+
122
+
123//===============================================================================
+
124//===============================================================================
+
125template <typename Executor, typename F, typename... Ts>
+
+
126void exec_post_wrapper(Executor & exec, F &&f, Ts &&...ts) {
+
127 hpx::apply(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
+
128}
+
+
129
+
130template <typename Executor, typename F, typename... Ts>
+
+
131hpx::lcos::future<void> exec_async_wrapper(Executor & exec, F &&f, Ts &&...ts) {
+
132 return hpx::async(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
+
133}
+
+
134
+
137/** Launch conditions: All slice executors must have called the same function
+
138 * (tracked by future all_slices_ready)
+
139 * AND
+
140 * Previous aggregated_function_call on the same Executor must have been
+
141 * launched (tracked by future stream_future)
+
142 * All function calls received from the slice executors are checked if they
+
143 * match the first one in both types and values (throws exception otherwise)
+
144 */
+
145
+
+
146template <typename Executor> class aggregated_function_call {
+
147private:
+
148 std::atomic<size_t> slice_counter = 0;
+
149
+
151 /* hpx::lcos::local::promise<void> slices_ready_promise; */
+
153 /* hpx::lcos::future<void> all_slices_ready = slices_ready_promise.get_future(); */
+
155 const size_t number_slices;
+
156 const bool async_mode;
+
157
+
158 Executor &underlying_executor;
+
159
+
160#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
161#pragma message \
+
162 "Building slow work aggegator build with additional runtime checks! Build with NDEBUG defined for fast build..."
+
165 std::any function_tuple;
+
167 std::string debug_type_information;
+
168 aggregation_mutex_t debug_mut;
+
169#endif
+
170
+
171 std::vector<hpx::lcos::local::promise<void>> potential_async_promises{};
+
172
+
173public:
+
+
174 aggregated_function_call(const size_t number_slices, bool async_mode, Executor &exec)
+
175 : number_slices(number_slices), async_mode(async_mode), underlying_executor(exec) {
+
176 if (async_mode)
+
177 potential_async_promises.resize(number_slices);
+
178 }
+
+
+
179 ~aggregated_function_call(void) {
+
180 // All slices should have done this call
+
181 assert(slice_counter == number_slices);
+
182 // assert(!all_slices_ready.valid());
+
183 }
+
+
+
185 bool sync_aggregation_slices(hpx::lcos::future<void> &stream_future) {
+
186 assert(!async_mode);
+
187 assert(potential_async_promises.empty());
+
188 const size_t local_counter = slice_counter++;
+
189 if (local_counter == number_slices - 1) {
+
190 return true;
+
191 }
+
192 else return false;
+
193 }
+
+
194 template <typename F, typename... Ts>
+
+
195 void post_when(hpx::lcos::future<void> &stream_future, F &&f, Ts &&...ts) {
+
196#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
197 // needed for concurrent access to function_tuple and debug_type_information
+
198 // Not required for normal use
+
199 std::lock_guard<aggregation_mutex_t> guard(debug_mut);
+
200#endif
+
201 assert(!async_mode);
+
202 assert(potential_async_promises.empty());
+
203 const size_t local_counter = slice_counter++;
+
204
+
205 if (local_counter == 0) {
+
206#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
207 auto tmp_tuple =
+
208 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
+
209 function_tuple = tmp_tuple;
+
210 debug_type_information = typeid(decltype(tmp_tuple)).name();
+
211#endif
+
212
+
213 } else {
+
214 //
+
215 // This scope checks if both the type and the values of the current call
+
216 // match the original call To be used in debug build...
+
217 //
+
218#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
219 auto comparison_tuple =
+
220 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
+
221 try {
+
222 auto orig_call_tuple =
+
223 std::any_cast<decltype(comparison_tuple)>(function_tuple);
+
224 if (comparison_tuple != orig_call_tuple) {
+
225 throw std::runtime_error(
+
226 "Values of post function arguments (or function "
+
227 "itself) do not match ");
+
228 }
+
229 } catch (const std::bad_any_cast &e) {
+
230 hpx::cout
+
231 << "\nMismatched types error in aggregated post call of executor "
+
232 << ": " << e.what() << "\n";
+
233 hpx::cout << "Expected types:\t\t "
+
234 << boost::core::demangle(debug_type_information.c_str());
+
235 hpx::cout << "\nGot types:\t\t "
+
236 << boost::core::demangle(
+
237 typeid(decltype(comparison_tuple)).name())
+
238 << "\n"
+
239 << std::endl;
+
240 // throw;
+
241 } catch (const std::runtime_error &e) {
+
242 hpx::cout
+
243 << "\nMismatched values error in aggregated post call of executor "
+
244 << ": " << e.what() << std::endl;
+
245 hpx::cout << "Types (matched):\t "
+
246 << boost::core::demangle(debug_type_information.c_str());
+
247 auto orig_call_tuple =
+
248 std::any_cast<decltype(comparison_tuple)>(function_tuple);
+
249 hpx::cout << "\nExpected values:\t ";
+
250 print_tuple(orig_call_tuple);
+
251 hpx::cout << "\nGot values:\t\t ";
+
252 print_tuple(comparison_tuple);
+
253 hpx::cout << std::endl << std::endl;
+
254 // throw;
+
255 }
+
256#endif
+
257 }
+
258 assert(local_counter < number_slices);
+
259 assert(slice_counter < number_slices + 1);
+
260 // Check exit criteria: Launch function call continuation by setting the
+
261 // slices promise
+
262 if (local_counter == number_slices - 1) {
+
263 exec_post_wrapper<Executor, F, Ts...>(underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
+
264 //slices_ready_promise.set_value();
+
265 }
+
266 }
+
+
267 template <typename F, typename... Ts>
+
+
268 hpx::lcos::future<void> async_when(hpx::lcos::future<void> &stream_future,
+
269 F &&f, Ts &&...ts) {
+
270#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
271 // needed for concurrent access to function_tuple and debug_type_information
+
272 // Not required for normal use
+
273 std::lock_guard<aggregation_mutex_t> guard(debug_mut);
+
274#endif
+
275 assert(async_mode);
+
276 assert(!potential_async_promises.empty());
+
277 const size_t local_counter = slice_counter++;
+
278 if (local_counter == 0) {
+
279#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
280 auto tmp_tuple =
+
281 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
+
282 function_tuple = tmp_tuple;
+
283 debug_type_information = typeid(decltype(tmp_tuple)).name();
+
284#endif
+
285 } else {
+
286 //
+
287 // This scope checks if both the type and the values of the current call
+
288 // match the original call To be used in debug build...
+
289 //
+
290#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
+
291 auto comparison_tuple =
+
292 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
+
293 try {
+
294 auto orig_call_tuple =
+
295 std::any_cast<decltype(comparison_tuple)>(function_tuple);
+
296 if (comparison_tuple != orig_call_tuple) {
+
297 throw std::runtime_error(
+
298 "Values of async function arguments (or function "
+
299 "itself) do not match ");
+
300 }
+
301 } catch (const std::bad_any_cast &e) {
+
302 hpx::cout
+
303 << "\nMismatched types error in aggregated async call of executor "
+
304 << ": " << e.what() << "\n";
+
305 hpx::cout << "Expected types:\t\t "
+
306 << boost::core::demangle(debug_type_information.c_str());
+
307 hpx::cout << "\nGot types:\t\t "
+
308 << boost::core::demangle(
+
309 typeid(decltype(comparison_tuple)).name())
+
310 << "\n"
+
311 << std::endl;
+
312 // throw;
+
313 } catch (const std::runtime_error &e) {
+
314 hpx::cout
+
315 << "\nMismatched values error in aggregated async call of executor "
+
316 << ": " << e.what() << std::endl;
+
317 hpx::cout << "Types (matched):\t "
+
318 << boost::core::demangle(debug_type_information.c_str());
+
319 auto orig_call_tuple =
+
320 std::any_cast<decltype(comparison_tuple)>(function_tuple);
+
321 hpx::cout << "\nExpected values:\t ";
+
322 print_tuple(orig_call_tuple);
+
323 hpx::cout << "\nGot values:\t\t ";
+
324 print_tuple(comparison_tuple);
+
325 hpx::cout << std::endl << std::endl;
+
326 // throw;
+
327 }
+
328#endif
+
329 }
+
330 assert(local_counter < number_slices);
+
331 assert(slice_counter < number_slices + 1);
+
332 assert(potential_async_promises.size() == number_slices);
+
333 hpx::lcos::future<void> ret_fut =
+
334 potential_async_promises[local_counter].get_future();
+
335 if (local_counter == number_slices - 1) {
+
336 /* slices_ready_promise.set_value(); */
+
337 auto fut = exec_async_wrapper<Executor, F, Ts...>(
+
338 underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
+
339 fut.then([this](auto &&fut) {
+
340 for (auto &promise : potential_async_promises) {
+
341 promise.set_value();
+
342 }
+
343 });
+
344 }
+
345 // Check exit criteria: Launch function call continuation by setting the
+
346 // slices promise
+
347 return ret_fut;
+
348 }
+
+
349 template <typename F, typename... Ts>
+
+
350 hpx::lcos::shared_future<void> wrap_async(hpx::lcos::future<void> &stream_future,
+
351 F &&f, Ts &&...ts) {
+
352 assert(async_mode);
+
353 assert(!potential_async_promises.empty());
+
354 const size_t local_counter = slice_counter++;
+
355 assert(local_counter < number_slices);
+
356 assert(slice_counter < number_slices + 1);
+
357 assert(potential_async_promises.size() == number_slices);
+
358 hpx::lcos::shared_future<void> ret_fut =
+
359 potential_async_promises[local_counter].get_shared_future();
+
360 if (local_counter == number_slices - 1) {
+
361 auto fut = f(std::forward<Ts>(ts)...);
+
362 fut.then([this](auto &&fut) {
+
363 // TODO just use one promise
+
364 for (auto &promise : potential_async_promises) {
+
365 promise.set_value();
+
366 }
+
367 });
+
368 }
+
369 return ret_fut;
+
370 }
+
+
371 // We need to be able to copy or no-except move for std::vector..
+
372 aggregated_function_call(const aggregated_function_call &other) = default;
+
373 aggregated_function_call &
+
374 operator=(const aggregated_function_call &other) = default;
+
375 aggregated_function_call(aggregated_function_call &&other) = default;
+
376 aggregated_function_call &
+
377 operator=(aggregated_function_call &&other) = default;
+
378};
+
+
379
+
380//===============================================================================
+
381//===============================================================================
+
382
+
383enum class aggregated_executor_modes { EAGER = 1, STRICT, ENDLESS };
+
385template <typename T, typename Host_Allocator, typename Executor>
+
386class allocator_slice;
+
387
+
389/** Executor is not meant to be used directly. Instead it yields multiple
+
390 * executor_slice objects. These serve as interfaces. Slices from the same
+
391 * aggregated_executor are meant to execute the same function calls but on
+
392 * different data (i.e. different tasks)
+
393 */
+
+
394template <typename Executor> class aggregated_executor {
+
395private:
+
396 //===============================================================================
+
397 // Misc private variables:
+
398 //
+
399 std::atomic<bool> slices_exhausted;
+
400
+
401 std::atomic<bool> executor_slices_alive;
+
402 std::atomic<bool> buffers_in_use;
+
403 std::atomic<size_t> dealloc_counter;
+
404
+
405 const aggregated_executor_modes mode;
+
406 const size_t max_slices;
+
407 std::atomic<size_t> current_slices;
+
411 std::unique_ptr<cppuddle::executor_recycling::executor_interface<
+
412 Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>>
+
413 executor_wrapper;
+
414
+
415public:
+
416 size_t gpu_id;
+
417 // Subclasses
+
418
+
+
420 class executor_slice {
+
421 public:
+
422 aggregated_executor<Executor> &parent;
+
423 private:
+
427 size_t launch_counter{0};
+
428 size_t buffer_counter{0};
+
429 bool notify_parent_about_destruction{true};
+
430
+
431 public:
+
434 const size_t number_slices;
+
435 size_t max_slices;
+
436 size_t id;
+
437 using executor_t = Executor;
+
+
438 executor_slice(aggregated_executor &parent, const size_t slice_id,
+
439 const size_t number_slices, const size_t max_number_slices)
+
440 : parent(parent), notify_parent_about_destruction(true),
+
441 number_slices(number_slices), id(slice_id), max_slices(max_number_slices) {
+
442 assert(parent.max_slices == max_slices);
+
443 assert(number_slices >= 1);
+
444 assert(number_slices <= max_slices);
+
445 }
+
+
+
446 ~executor_slice(void) {
+
447 // Don't notify parent if we moved away from this executor_slice
+
448 if (notify_parent_about_destruction) {
+
449 // Executor should be done by the time of destruction
+
450 // -> check here before notifying parent
+
451
+
452 assert(parent.max_slices == max_slices);
+
453 assert(number_slices >= 1);
+
454 assert(number_slices <= max_slices);
+
455 // parent still in execution mode?
+
456 assert(parent.slices_exhausted == true);
+
457 // all kernel launches done?
+
458 assert(launch_counter == parent.function_calls.size());
+
459 // Notifiy parent that this aggregation slice is one
+
460 parent.reduce_usage_counter();
+
461 }
+
462 }
+
+
463 executor_slice(const executor_slice &other) = delete;
+
464 executor_slice &operator=(const executor_slice &other) = delete;
+
+
465 executor_slice(executor_slice &&other)
+
466 : parent(other.parent), launch_counter(std::move(other.launch_counter)),
+
467 buffer_counter(std::move(other.buffer_counter)),
+
468 number_slices(std::move(other.number_slices)),
+
469 id(std::move(other.id)), max_slices(std::move(other.max_slices)) {
+
470 other.notify_parent_about_destruction = false;
+
471 }
+
+
+
472 executor_slice &operator=(executor_slice &&other) {
+
473 parent = other.parent;
+
474 launch_counter = std::move(other.launch_counter);
+
475 buffer_counter = std::move(other.buffer_counter);
+
476 number_slices = std::move(other.number_slices);
+
477 id = std::move(other.id);
+
478 max_slices = std::move(other.max_slices);
+
479 other.notify_parent_about_destruction = false;
+
480 }
+
+
481 template <typename T, typename Host_Allocator>
+
+
482 allocator_slice<T, Host_Allocator, Executor> make_allocator() {
+
483 return allocator_slice<T, Host_Allocator, Executor>(*this);
+
484 }
+
+
+
485 bool sync_aggregation_slices() {
+
486 assert(parent.slices_exhausted == true);
+
487 auto ret = parent.sync_aggregation_slices(launch_counter);
+
488 launch_counter++;
+
489 return ret;
+
490 }
+
+
+
491 template <typename F, typename... Ts> void post(F &&f, Ts &&...ts) {
+
492 // we should only execute function calls once all slices
+
493 // have been given away (-> Executor Slices start)
+
494 assert(parent.slices_exhausted == true);
+
495 parent.post(launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
+
496 launch_counter++;
+
497 }
+
+
498 template <typename F, typename... Ts>
+
+
499 hpx::lcos::future<void> async(F &&f, Ts &&...ts) {
+
500 // we should only execute function calls once all slices
+
501 // have been given away (-> Executor Slices start)
+
502 assert(parent.slices_exhausted == true);
+
503 hpx::lcos::future<void> ret_fut = parent.async(
+
504 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
+
505 launch_counter++;
+
506 return ret_fut;
+
507 }
+
+
508
+
509 // OneWay Execution
+
510 template <typename F, typename... Ts>
+
+
511 friend decltype(auto) tag_invoke(hpx::parallel::execution::post_t,
+
512 executor_slice& exec, F&& f, Ts&&... ts)
+
513 {
+
514 return exec.post(std::forward<F>(f), std::forward<Ts>(ts)...);
+
515 }
+
+
516
+
517 // TwoWay Execution
+
518 template <typename F, typename... Ts>
+
+
519 friend decltype(auto) tag_invoke(
+
520 hpx::parallel::execution::async_execute_t, executor_slice& exec,
+
521 F&& f, Ts&&... ts)
+
522 {
+
523 return exec.async(
+
524 std::forward<F>(f), std::forward<Ts>(ts)...);
+
525 }
+
+
526
+
527 template <typename F, typename... Ts>
+
+
528 hpx::lcos::shared_future<void> wrap_async(F &&f, Ts &&...ts) {
+
529 // we should only execute function calls once all slices
+
530 // have been given away (-> Executor Slices start)
+
531 assert(parent.slices_exhausted == true);
+
532 hpx::lcos::shared_future<void> ret_fut = parent.wrap_async(
+
533 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
+
534 launch_counter++;
+
535 return ret_fut;
+
536 }
+
+
537
+
+
540 template <typename T, typename Host_Allocator> T *get(const size_t size) {
+
541 assert(parent.slices_exhausted == true);
+
542 T *aggregated_buffer =
+
543 parent.get<T, Host_Allocator>(size, buffer_counter);
+
544 buffer_counter++;
+
545 assert(buffer_counter > 0);
+
546 return aggregated_buffer;
+
547 }
+
+
548
+
+
549 Executor& get_underlying_executor(void) {
+
550 assert(parent.executor_wrapper);
+
551 return *(parent.executor_wrapper);
+
552 }
+
+
553 };
+
+
554
+
555 // deprecated name...
+
556 using Executor_Slice [[deprectated("Renamed: Use executor_slice instead")]] = executor_slice;
+
557
+
558 //===============================================================================
+
559
+
560 hpx::lcos::local::promise<void> slices_full_promise;
+
563 std::vector<hpx::lcos::local::promise<executor_slice>> executor_slices;
+
566 std::deque<aggregated_function_call<Executor>> function_calls;
+
568 aggregation_mutex_t mut;
+
569
+
572 using buffer_entry_t =
+
573 std::tuple<void*, const size_t, std::atomic<size_t>, bool, const size_t, size_t>;
+
575 std::deque<buffer_entry_t> buffer_allocations;
+
577 std::unordered_map<void*,size_t> buffer_allocations_map;
+
579 aggregation_mutex_t buffer_mut;
+
580 std::atomic<size_t> buffer_counter = 0;
+
581
+
583 template <typename T, typename Host_Allocator>
+
+
584 T *get(const size_t size, const size_t slice_alloc_counter) {
+
585 assert(slices_exhausted == true);
+
586 assert(executor_wrapper);
+
587 assert(executor_slices_alive == true);
+
588 // Add aggreated buffer entry in case it hasn't happened yet for this call
+
589 // First: Check if it already has happened
+
590 if (buffer_counter <= slice_alloc_counter) {
+
591 // we might be the first! Lock...
+
592 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
+
593 // ... and recheck
+
594 if (buffer_counter <= slice_alloc_counter) {
+
595 constexpr bool manage_content_lifetime = false;
+
596 buffers_in_use = true;
+
597
+
598 // Default location -- useful for GPU builds as we otherwise create way too
+
599 // many different buffers for different aggregation sizes on different GPUs
+
600 /* size_t location_id = gpu_id * instances_per_gpu; */
+
601 // Use integer conversion to only use 0 16 32 ... as buckets
+
602 size_t location_id = ((hpx::get_worker_thread_num() % cppuddle::number_instances) / 16) * 16;
+
603#ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
+
604 if (max_slices == 1) {
+
605 // get prefered location: aka the current hpx threads location
+
606 // Usually handy for CPU builds where we want to use the buffers
+
607 // close to the current CPU core
+
608 /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */
+
609 /* location_id = (gpu_id) * instances_per_gpu; */
+
610 // division makes sure that we always use the same instance to store our gpu buffers.
+
611 }
+
612#endif
+
613 // Get shiny and new buffer that will be shared between all slices
+
614 // Buffer might be recycled from previous allocations by the
+
615 // buffer_interface...
+
616 T *aggregated_buffer =
+
617 cppuddle::memory_recycling::detail::buffer_interface::get<
+
618 T, Host_Allocator>(size, manage_content_lifetime, location_id,
+
619 gpu_id);
+
620 // Create buffer entry for this buffer
+
621 buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
+
622 size, 1, true, location_id, gpu_id);
+
623
+
624#ifndef NDEBUG
+
625 // if previousely used the buffer should not be in usage anymore
+
626 const auto exists = buffer_allocations_map.count(
+
627 static_cast<void *>(aggregated_buffer));
+
628 if (exists > 0) {
+
629 const auto previous_usage_id =
+
630 buffer_allocations_map[static_cast<void *>(aggregated_buffer)];
+
631 const auto &valid =
+
632 std::get<3>(buffer_allocations[previous_usage_id]);
+
633 assert(!valid);
+
634 }
+
635#endif
+
636 buffer_allocations_map.insert_or_assign(static_cast<void *>(aggregated_buffer),
+
637 buffer_counter);
+
638
+
639 assert (buffer_counter == slice_alloc_counter);
+
640 buffer_counter = buffer_allocations.size();
+
641
+
642 // Return buffer
+
643 return aggregated_buffer;
+
644 }
+
645 }
+
646 assert(buffers_in_use == true);
+
647 assert(std::get<3>(buffer_allocations[slice_alloc_counter])); // valid
+
648 assert(std::get<2>(buffer_allocations[slice_alloc_counter]) >= 1);
+
649
+
650 // Buffer entry should already exist:
+
651 T *aggregated_buffer = static_cast<T *>(
+
652 std::get<0>(buffer_allocations[slice_alloc_counter]));
+
653 // Error handling: Size is wrong?
+
654 assert(size == std::get<1>(buffer_allocations[slice_alloc_counter]));
+
655 // Notify that one more slice has visited this buffer allocation
+
656 std::get<2>(buffer_allocations[slice_alloc_counter])++;
+
657 return aggregated_buffer;
+
658 }
+
+
659
+
661 template <typename T, typename Host_Allocator>
+
+
662 void mark_unused(T *p, const size_t size) {
+
663 assert(slices_exhausted == true);
+
664 assert(executor_wrapper);
+
665
+
666 void *ptr_key = static_cast<void*>(p);
+
667 size_t slice_alloc_counter = buffer_allocations_map[p];
+
668
+
669 assert(slice_alloc_counter < buffer_allocations.size());
+
670 /*auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, valid] =
+
671 buffer_allocations[slice_alloc_counter];*/
+
672 auto buffer_pointer_void = std::get<0>(buffer_allocations[slice_alloc_counter]);
+
673 const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
+
674 auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
+
675 auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
+
676 const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
+
677 const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]);
+
678 assert(valid);
+
679 T *buffer_pointer = static_cast<T *>(buffer_pointer_void);
+
680
+
681 assert(buffer_size == size);
+
682 assert(p == buffer_pointer);
+
683 // assert(buffer_pointer == p || buffer_pointer == nullptr);
+
684 // Slice is done with this buffer
+
685 buffer_allocation_counter--;
+
686 // Check if all slices are done with this buffer?
+
687 if (buffer_allocation_counter == 0) {
+
688 // Yes! "Deallocate" by telling the recylcer the buffer is fit for reusage
+
689 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
+
690 // Only mark unused if another buffer has not done so already (and marked
+
691 // it as invalid)
+
692 if (valid) {
+
693 assert(buffers_in_use == true);
+
694 cppuddle::memory_recycling::detail::buffer_interface::mark_unused<
+
695 T, Host_Allocator>(buffer_pointer, buffer_size, location_id,
+
696 gpu_id);
+
697 // mark buffer as invalid to prevent any other slice from marking the
+
698 // buffer as unused
+
699 valid = false;
+
700
+
701 const size_t current_deallocs = ++dealloc_counter;
+
702 if (current_deallocs == buffer_counter) {
+
703 std::lock_guard<aggregation_mutex_t> guard(mut);
+
704 buffers_in_use = false;
+
705 if (!executor_slices_alive && !buffers_in_use) {
+
706 slices_exhausted = false;
+
707 // Release executor
+
708 executor_wrapper.reset(nullptr);
+
709 }
+
710 }
+
711 }
+
712 }
+
713 }
+
+
714
+
715 //===============================================================================
+
716 // Public Interface
+
717public:
+
718 hpx::lcos::future<void> current_continuation;
+
719 hpx::lcos::future<void> last_stream_launch_done;
+
720 std::atomic<size_t> overall_launch_counter = 0;
+
721
+
+
723 bool sync_aggregation_slices(const size_t slice_launch_counter) {
+
724 std::lock_guard<aggregation_mutex_t> guard(mut);
+
725 assert(slices_exhausted == true);
+
726 assert(executor_wrapper);
+
727 // Add function call object in case it hasn't happened for this launch yet
+
728 if (overall_launch_counter <= slice_launch_counter) {
+
729 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
+
730 if (overall_launch_counter <= slice_launch_counter) {
+
731 function_calls.emplace_back(current_slices, false, *executor_wrapper);
+
732 overall_launch_counter = function_calls.size();
+
733 return function_calls[slice_launch_counter].sync_aggregation_slices(
+
734 last_stream_launch_done);
+
735 }
+
736 }
+
737
+
738 return function_calls[slice_launch_counter].sync_aggregation_slices(
+
739 last_stream_launch_done);
+
740 }
+
+
741
+
743 template <typename F, typename... Ts>
+
+
744 void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
+
745 std::lock_guard<aggregation_mutex_t> guard(mut);
+
746 assert(slices_exhausted == true);
+
747 assert(executor_wrapper);
+
748 // Add function call object in case it hasn't happened for this launch yet
+
749 if (overall_launch_counter <= slice_launch_counter) {
+
750 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
+
751 if (overall_launch_counter <= slice_launch_counter) {
+
752 function_calls.emplace_back(current_slices, false, *executor_wrapper);
+
753 overall_launch_counter = function_calls.size();
+
754 function_calls[slice_launch_counter].post_when(
+
755 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
756 return;
+
757 }
+
758 }
+
759
+
760 function_calls[slice_launch_counter].post_when(
+
761 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
762 return;
+
763 }
+
+
764
+
766 template <typename F, typename... Ts>
+
+
767 hpx::lcos::future<void> async(const size_t slice_launch_counter, F &&f,
+
768 Ts &&...ts) {
+
769 std::lock_guard<aggregation_mutex_t> guard(mut);
+
770 assert(slices_exhausted == true);
+
771 assert(executor_wrapper);
+
772 // Add function call object in case it hasn't happened for this launch yet
+
773 if (overall_launch_counter <= slice_launch_counter) {
+
774 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
+
775 if (overall_launch_counter <= slice_launch_counter) {
+
776 function_calls.emplace_back(current_slices, true, *executor_wrapper);
+
777 overall_launch_counter = function_calls.size();
+
778 return function_calls[slice_launch_counter].async_when(
+
779 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
780 }
+
781 }
+
782
+
783 return function_calls[slice_launch_counter].async_when(
+
784 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
785 }
+
+
787 template <typename F, typename... Ts>
+
+
788 hpx::lcos::shared_future<void> wrap_async(const size_t slice_launch_counter, F &&f,
+
789 Ts &&...ts) {
+
790 std::lock_guard<aggregation_mutex_t> guard(mut);
+
791 assert(slices_exhausted == true);
+
792 assert(executor_wrapper);
+
793 // Add function call object in case it hasn't happened for this launch yet
+
794 if (overall_launch_counter <= slice_launch_counter) {
+
795 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
+
796 if (overall_launch_counter <= slice_launch_counter) {
+
797 function_calls.emplace_back(current_slices, true, *executor_wrapper);
+
798 overall_launch_counter = function_calls.size();
+
799 return function_calls[slice_launch_counter].wrap_async(
+
800 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
801 }
+
802 }
+
803
+
804 return function_calls[slice_launch_counter].wrap_async(
+
805 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
+
806 }
+
+
807
+
+
808 bool slice_available(void) {
+
809 std::lock_guard<aggregation_mutex_t> guard(mut);
+
810 return !slices_exhausted;
+
811 }
+
+
812
+
+
813 std::optional<hpx::lcos::future<executor_slice>> request_executor_slice() {
+
814 std::lock_guard<aggregation_mutex_t> guard(mut);
+
815 if (!slices_exhausted) {
+
816 const size_t local_slice_id = ++current_slices;
+
817 if (local_slice_id == 1) {
+
818 // Cleanup leftovers from last run if any
+
819 // TODO still required? Should be clean here already
+
820 function_calls.clear();
+
821 overall_launch_counter = 0;
+
822 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
+
823#ifndef NDEBUG
+
824 for (const auto &buffer_entry : buffer_allocations) {
+
825 const auto &[buffer_pointer_any, buffer_size,
+
826 buffer_allocation_counter, valid, location_id, device_id] =
+
827 buffer_entry;
+
828 assert(!valid);
+
829 }
+
830#endif
+
831 buffer_allocations.clear();
+
832 buffer_allocations_map.clear();
+
833 buffer_counter = 0;
+
834
+
835 assert(executor_slices_alive == false);
+
836 assert(buffers_in_use == false);
+
837 executor_slices_alive = true;
+
838 buffers_in_use = false;
+
839 dealloc_counter = 0;
+
840
+
841 if (mode == aggregated_executor_modes::STRICT ) {
+
842 slices_full_promise = hpx::lcos::local::promise<void>{};
+
843 }
+
844 }
+
845
+
846 // Create Executor Slice future -- that will be returned later
+
847 hpx::lcos::future<executor_slice> ret_fut;
+
848 if (local_slice_id < max_slices) {
+
849 executor_slices.emplace_back(hpx::lcos::local::promise<executor_slice>{});
+
850 ret_fut =
+
851 executor_slices[local_slice_id - 1].get_future();
+
852 } else {
+
853 launched_slices = current_slices;
+
854 ret_fut = hpx::make_ready_future(executor_slice{*this,
+
855 executor_slices.size(), launched_slices, max_slices});
+
856 }
+
857
+
858 // Are we the first slice? If yes, add continuation set the
+
859 // executor_slice
+
860 // futures to ready if the launch conditions are met
+
861 if (local_slice_id == 1) {
+
862 // Redraw executor
+
863 assert(!executor_wrapper);
+
864 cppuddle::executor_recycling::executor_pool::select_device<
+
865 Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
+
866 gpu_id);
+
867 executor_wrapper.reset(
+
868 new cppuddle::executor_recycling::executor_interface<
+
869 Executor,
+
870 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
+
871 gpu_id));
+
872 // Renew promise that all slices will be ready as the primary launch
+
873 // criteria...
+
874 hpx::lcos::shared_future<void> fut;
+
875 if (mode == aggregated_executor_modes::EAGER ||
+
876 mode == aggregated_executor_modes::ENDLESS) {
+
877 // Fallback launch condidtion: Launch as soon as the underlying stream
+
878 // is ready
+
879 /* auto slices_full_fut = slices_full_promise.get_future(); */
+
880 cppuddle::executor_recycling::executor_pool::select_device<
+
881 Executor,
+
882 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(gpu_id);
+
883 auto exec_fut = (*executor_wrapper).get_future();
+
884 /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
+
885 fut = std::move(exec_fut);
+
886 } else {
+
887 auto slices_full_fut = slices_full_promise.get_shared_future();
+
888 // Just use the slices launch condition
+
889 fut = std::move(slices_full_fut);
+
890 }
+
891 // Launch all executor slices within this continuation
+
892 current_continuation = fut.then([this](auto &&fut) {
+
893 std::lock_guard<aggregation_mutex_t> guard(mut);
+
894 slices_exhausted = true;
+
895 launched_slices = current_slices;
+
896 size_t id = 0;
+
897 for (auto &slice_promise : executor_slices) {
+
898 slice_promise.set_value(
+
899 executor_slice{*this, id, launched_slices, max_slices});
+
900 id++;
+
901 }
+
902 executor_slices.clear();
+
903 });
+
904 }
+
905 if (local_slice_id >= max_slices &&
+
906 mode != aggregated_executor_modes::ENDLESS) {
+
907 slices_exhausted = true; // prevents any more threads from entering
+
908 // before the continuation is launched
+
909 /* launched_slices = current_slices; */
+
910 /* size_t id = 0; */
+
911 /* for (auto &slice_promise : executor_slices) { */
+
912 /* slice_promise.set_value( */
+
913 /* executor_slice{*this, id, launched_slices}); */
+
914 /* id++; */
+
915 /* } */
+
916 /* executor_slices.clear(); */
+
917 if (mode == aggregated_executor_modes::STRICT ) {
+
918 slices_full_promise.set_value(); // Trigger slices launch condition continuation
+
919 }
+
920 // that continuation will set all executor slices so far handed out to ready
+
921 }
+
922 return ret_fut;
+
923 } else {
+
924 // Return empty optional as failure
+
925 return std::optional<hpx::lcos::future<executor_slice>>{};
+
926 }
+
927 }
+
+
928 size_t launched_slices;
+
+
929 void reduce_usage_counter(void) {
+
930 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
+
931 assert(slices_exhausted == true);
+
932 assert(executor_wrapper);
+
933 assert(executor_slices_alive == true);
+
934 assert(launched_slices >= 1);
+
935 assert(current_slices >= 0 && current_slices <= launched_slices);
+
936 const size_t local_slice_id = --current_slices;
+
937 // Last slice goes out scope?
+
938 if (local_slice_id == 0) {
+
939 // Mark executor fit for reusage
+
940 std::lock_guard<aggregation_mutex_t> guard(mut);
+
941 executor_slices_alive = false;
+
942 if (!executor_slices_alive && !buffers_in_use) {
+
943 // Release executor
+
944 slices_exhausted = false;
+
945 executor_wrapper.reset(nullptr);
+
946 }
+
947 }
+
948 }
+
+
+
949 ~aggregated_executor(void) {
+
950
+
951 assert(current_slices == 0);
+
952 assert(executor_slices_alive == false);
+
953 assert(buffers_in_use == false);
+
954
+
955 if (mode != aggregated_executor_modes::STRICT ) {
+
956 slices_full_promise.set_value(); // Trigger slices launch condition continuation
+
957 }
+
958
+
959 // Cleanup leftovers from last run if any
+
960 function_calls.clear();
+
961 overall_launch_counter = 0;
+
962#ifndef NDEBUG
+
963 for (const auto &buffer_entry : buffer_allocations) {
+
964 const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
+
965 valid, location_id, device_id] = buffer_entry;
+
966 assert(!valid);
+
967 }
+
968#endif
+
969 buffer_allocations.clear();
+
970 buffer_allocations_map.clear();
+
971 buffer_counter = 0;
+
972
+
973 assert(buffer_allocations.empty());
+
974 assert(buffer_allocations_map.empty());
+
975 }
+
+
976
+
+
977 aggregated_executor(const size_t number_slices,
+
978 aggregated_executor_modes mode, const size_t gpu_id = 0)
+
979 : max_slices(number_slices), current_slices(0), slices_exhausted(false),
+
980 dealloc_counter(0), mode(mode), executor_slices_alive(false),
+
981 buffers_in_use(false), gpu_id(gpu_id),
+
982 executor_wrapper(nullptr),
+
983 current_continuation(hpx::make_ready_future()),
+
984 last_stream_launch_done(hpx::make_ready_future()) {}
+
+
985 // Not meant to be copied or moved
+
986 aggregated_executor(const aggregated_executor &other) = delete;
+
987 aggregated_executor &operator=(const aggregated_executor &other) = delete;
+
988 aggregated_executor(aggregated_executor &&other) = delete;
+
989 aggregated_executor &operator=(aggregated_executor &&other) = delete;
+
990};
+
+
991
+
992template <typename T, typename Host_Allocator, typename Executor>
+
+
993class allocator_slice {
+
994private:
+
995 typename aggregated_executor<Executor>::executor_slice &executor_reference;
+
996 aggregated_executor<Executor> &executor_parent;
+
997
+
998public:
+
999 using value_type = T;
+
+
1000 allocator_slice(
+
1001 typename aggregated_executor<Executor>::executor_slice &executor)
+
1002 : executor_reference(executor), executor_parent(executor.parent) {}
+
+
1003 template <typename U>
+
+
1004 explicit allocator_slice(
+
1005 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {}
+
+
+
1006 T *allocate(std::size_t n) {
+
1007 T *data = executor_reference.template get<T, Host_Allocator>(n);
+
1008 return data;
+
1009 }
+
+
+
1010 void deallocate(T *p, std::size_t n) {
+
1011 /* executor_reference.template mark_unused<T, Host_Allocator>(p, n); */
+
1012 executor_parent.template mark_unused<T, Host_Allocator>(p, n);
+
1013 }
+
+
1014 template <typename... Args>
+
+
1015 inline void construct(T *p, Args... args) noexcept {
+
1016 // Do nothing here - we reuse the content of the last owner
+
1017 }
+
+
+
1018 void destroy(T *p) {
+
1019 // Do nothing here - Contents will be destroyed when the buffer manager is
+
1020 // destroyed, not before
+
1021 }
+
+
1022};
+
+
1023template <typename T, typename U, typename Host_Allocator, typename Executor>
+
1024constexpr bool
+
+
1025operator==(allocator_slice<T, Host_Allocator, Executor> const &,
+
1026 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
+
1027 return false;
+
1028}
+
+
1029template <typename T, typename U, typename Host_Allocator, typename Executor>
+
1030constexpr bool
+
+
1031operator!=(allocator_slice<T, Host_Allocator, Executor> const &,
+
1032 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
+
1033 return true;
+
1034}
+
+
1035
+
1036} // namespace detail
+
1037} // namespace kernel_aggregation
+
1038} // namespace cppuddle
+
1039
+
1040
+
1041
+
+
1042namespace hpx { namespace parallel { namespace execution {
+
1043 // TODO Unfortunately does not work that way! Create trait that works for Executor Slices with
+
1044 // compatible unlying executor types
+
1045 /* template<typename E> */
+
1046 /* struct is_one_way_executor<typename aggregated_executor<E>::executor_slice> */
+
1047 /* : std::true_type */
+
1048 /* {}; */
+
1049 /* template<typename E> */
+
1050 /* struct is_two_way_executor<typename aggregated_executor<E>::executor_slice> */
+
1051 /* : std::true_type */
+
1052 /* {}; */
+
1053
+
1054#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
+
1055 // Workaround for the meantime: Manually create traits for compatible types:
+
1056template <>
+
1057struct is_one_way_executor<
+
1058 typename cppuddle::kernel_aggregation::detail::aggregated_executor<
+
1059 hpx::cuda::experimental::cuda_executor>::executor_slice>
+
1060 : std::true_type {};
+
1061template <>
+
1062struct is_two_way_executor<
+
1063 typename cppuddle::kernel_aggregation::detail::aggregated_executor<
+
1064 hpx::cuda::experimental::cuda_executor>::executor_slice>
+
1065 : std::true_type {};
+
1066#endif
+
1067}}}
+
+
1068
+
1069#endif
-
Slice class - meant as a scope interface to the aggregated executor.
Definition: aggregation_executors_and_allocators.hpp:420
+
Slice class - meant as a scope interface to the aggregated executor.
Definition aggregation_executors_and_allocators.hpp:420
diff --git a/aggregation__manager_8hpp.html b/aggregation__manager_8hpp.html index a839018f..db9ce686 100644 --- a/aggregation__manager_8hpp.html +++ b/aggregation__manager_8hpp.html @@ -1,9 +1,9 @@ - + - - + + CPPuddle: include/aggregation_manager.hpp File Reference @@ -19,8 +19,8 @@
- - + @@ -29,21 +29,22 @@
+
CPPuddle
- + +/* @license-end */ +
- +
+
+
+
+
Loading...
+
Searching...
+
No Matches
+
+
+