From 60bd0a8e001e764a7000f5836b9fad6b6e3c5ea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Tue, 28 May 2024 00:10:05 +0200 Subject: [PATCH 01/12] Added initial version of cuda sample --- CMakeLists.txt | 13 +++ examples/cuda_vector_add.cu | 165 ++++++++++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+) create mode 100644 examples/cuda_vector_add.cu diff --git a/CMakeLists.txt b/CMakeLists.txt index d88a3d27..81b3f853 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -251,6 +251,19 @@ install( install(FILES cppuddle-config.cmake DESTINATION ${CMAKE_INSTALL_PREFIX}/lib/cmake/CPPuddle/) install(EXPORT CPPuddle NAMESPACE CPPuddle:: DESTINATION ${CMAKE_INSTALL_PREFIX}/lib/cmake/CPPuddle/) +#Boost::boost Boost::program_options HPX::hpx Kokkos::kokkos HPXKokkos::hpx_kokkos buffer_manager stream_manager +if (CPPUDDLE_WITH_TESTS) + if (CPPUDDLE_WITH_CUDA) + add_hpx_executable( + cuda_vector_add + DEPENDENCIES + Boost::boost Boost::program_options HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/cuda_vector_add.cu + ) + endif() +endif() #------------------------------------------------------------------------------------------------------------ # Define cmake targets for all tests/example executables diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu new file mode 100644 index 00000000..5c0a1c2b --- /dev/null +++ b/examples/cuda_vector_add.cu @@ -0,0 +1,165 @@ +#include +#include +#include +#include +#include +#include + + +#include +#include +#include +#include + +#include +#include +#include + +using float_t = float; +using device_executor_t = hpx::cuda::experimental::cuda_executor; + +constexpr size_t vector_size = 102400; +constexpr size_t entries_per_task = 1024; +constexpr size_t number_tasks = vector_size / entries_per_task; +constexpr size_t number_repetitions = 20; +constexpr size_t max_queue_length = 5; +constexpr size_t number_executors = 32; +constexpr size_t gpu_id = 0; + +static_assert(vector_size % entries_per_task == 0); + + +__global__ void kernel_add(const float_t *input_a, const float_t *input_b, float_t *output_c) { + const int index = blockIdx.x * blockDim.x + threadIdx.x; + output_c[index] = input_a[index] + input_b[index]; +} + +int hpx_main(int argc, char *argv[]) { + // HPX and CPPuddle Setup for executor (polling + pool init) + // =========================================== 0.a Init HPX CUDA polling + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + // 0.b Init CPPuddle executor pool + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_executors, gpu_id, true); + hpx::cout << "Init done!" << std::endl << std::endl; + + + // Launch tasks + // Note: Repetitions may be out of order since they do not depend on each other in this toy sample + hpx::cout << "Start launching tasks..." << std::endl; + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + futs[task_id] = hpx::async([task_id]() { + + // Inner Task Setup to launch the CUDA kernels: + // =========================================== + + // 1. Create required per task host-side buffers + std::vector< + float_t, + cppuddle::memory_recycling::recycle_allocator_cuda_host> + host_a(entries_per_task); + std::vector< + float_t, + cppuddle::memory_recycling::recycle_allocator_cuda_host> + host_b(entries_per_task); + std::vector< + float_t, + cppuddle::memory_recycling::recycle_allocator_cuda_host> + host_c(entries_per_task); + + // 3. Host-side preprocessing (usually: communication, here fill dummy + // input) + std::fill(host_a.begin(), host_a.end(), 1.0); + std::fill(host_b.begin(), host_b.end(), 2.0); + + // 3. Check GPU utiliation + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl< + device_executor_t>>(max_queue_length, gpu_id); + + //4. Run Kernel on either CPU or GPU + if (!device_executor_available) { + // 4a. Launch CPU Fallback Version + for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { + host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; + } + } else { + // 4b. Create per_task device-side buffers and draw executor + cppuddle::executor_recycling::executor_interface< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl> + executor(gpu_id); + cppuddle::memory_recycling::cuda_device_buffer + device_a(entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer + device_b(entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer + device_c(entries_per_task); + + // 4c. Launch data transfers and kernel + hpx::apply( + static_cast(executor), + cudaMemcpyAsync, device_a.device_side_buffer, host_a.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + hpx::apply( + static_cast(executor), + cudaMemcpyAsync, device_b.device_side_buffer, host_b.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + void *args[] = {&device_a.device_side_buffer, + &device_b.device_side_buffer, + &device_c.device_side_buffer}; + hpx::apply( + static_cast(executor), + cudaLaunchKernel, kernel_add, + entries_per_task / 128, 128, args, 0); + auto fut = hpx::async( + static_cast(executor), + cudaMemcpyAsync, host_c.data(), device_c.device_side_buffer, + entries_per_task * sizeof(float_t), cudaMemcpyDeviceToHost); + fut.get(); // Allow worker thread to jump away until the kernel and + // data-transfers are done + } + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + if (!std::all_of(host_c.begin(), host_c.end(), + [](float_t i) { return i == 1.0 + 2.0; })) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + } + + // Inner Task Done! + // =========================================== + }); + } + // Schedule output task to run once a repetition is done + auto repetition_finished = hpx::when_all(futs); + repetition_futs.emplace_back(repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + hpx::cout << "All tasks launched asynchronously!" << std::endl << std::endl; + // Schedule output task to run once all other tasks are done + auto all_done_fut = hpx::when_all(repetition_futs).then([](auto &&fut) { + hpx::cout << "All tasks are done!" << std::endl << std::endl; + }); + all_done_fut.get(); + + hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); + hpx::cout << "Finalizing..." << std::endl; + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} From b3d8d6774833602f78aa42b0f6e4a595d0dbca62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Tue, 28 May 2024 00:21:10 +0200 Subject: [PATCH 02/12] Adding counters to CUDA sample --- examples/cuda_vector_add.cu | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu index 5c0a1c2b..c0378d7a 100644 --- a/examples/cuda_vector_add.cu +++ b/examples/cuda_vector_add.cu @@ -23,7 +23,7 @@ constexpr size_t entries_per_task = 1024; constexpr size_t number_tasks = vector_size / entries_per_task; constexpr size_t number_repetitions = 20; constexpr size_t max_queue_length = 5; -constexpr size_t number_executors = 32; +constexpr size_t number_executors = 1; constexpr size_t gpu_id = 0; static_assert(vector_size % entries_per_task == 0); @@ -46,6 +46,8 @@ int hpx_main(int argc, char *argv[]) { gpu_id, number_executors, gpu_id, true); hpx::cout << "Init done!" << std::endl << std::endl; + std::atomic number_cpu_kernel_launches = 0; + std::atomic number_gpu_kernel_launches = 0; // Launch tasks // Note: Repetitions may be out of order since they do not depend on each other in this toy sample @@ -54,8 +56,8 @@ int hpx_main(int argc, char *argv[]) { for (size_t repetition = 0; repetition < number_repetitions; repetition++) { std::vector> futs(number_tasks); for (size_t task_id = 0; task_id < number_tasks; task_id++) { - futs[task_id] = hpx::async([task_id]() { - + futs[task_id] = hpx::async([task_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches]() { // Inner Task Setup to launch the CUDA kernels: // =========================================== @@ -73,7 +75,7 @@ int hpx_main(int argc, char *argv[]) { cppuddle::memory_recycling::recycle_allocator_cuda_host> host_c(entries_per_task); - // 3. Host-side preprocessing (usually: communication, here fill dummy + // 2. Host-side preprocessing (usually: communication, here fill dummy // input) std::fill(host_a.begin(), host_a.end(), 1.0); std::fill(host_b.begin(), host_b.end(), 2.0); @@ -88,11 +90,13 @@ int hpx_main(int argc, char *argv[]) { //4. Run Kernel on either CPU or GPU if (!device_executor_available) { // 4a. Launch CPU Fallback Version + number_cpu_kernel_launches++; for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; } } else { // 4b. Create per_task device-side buffers and draw executor + number_gpu_kernel_launches++; cppuddle::executor_recycling::executor_interface< device_executor_t, cppuddle::executor_recycling:: round_robin_pool_impl> @@ -148,9 +152,16 @@ int hpx_main(int argc, char *argv[]) { } hpx::cout << "All tasks launched asynchronously!" << std::endl << std::endl; // Schedule output task to run once all other tasks are done - auto all_done_fut = hpx::when_all(repetition_futs).then([](auto &&fut) { - hpx::cout << "All tasks are done!" << std::endl << std::endl; - }); + auto all_done_fut = + hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done!" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl << std::endl; + }); all_done_fut.get(); hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); From d4e026cbd08d913f87b85a2b3a8e22204c3653cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Tue, 28 May 2024 01:39:24 +0200 Subject: [PATCH 03/12] Add in-order flag --- examples/cuda_vector_add.cu | 112 +++++++++++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 15 deletions(-) diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu index c0378d7a..db39c67d 100644 --- a/examples/cuda_vector_add.cu +++ b/examples/cuda_vector_add.cu @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -25,6 +26,7 @@ constexpr size_t number_repetitions = 20; constexpr size_t max_queue_length = 5; constexpr size_t number_executors = 1; constexpr size_t gpu_id = 0; +constexpr bool in_order_repetitions = true; static_assert(vector_size % entries_per_task == 0); @@ -35,6 +37,47 @@ __global__ void kernel_add(const float_t *input_a, const float_t *input_b, float } int hpx_main(int argc, char *argv[]) { + + /* try { */ + /* boost::program_options::options_description desc{"Options"}; */ + /* desc.add_options()("help", "Help screen")( */ + /* "elements_per_task", */ + /* boost::program_options::value(&array_size) */ + /* ->default_value(5000000), */ + /* "Size of the buffers")( */ + /* "tasks_per_repetition", */ + /* boost::program_options::value(&number_futures) */ + /* ->default_value(64), */ + /* "Sets the number of futures to be (potentially) executed in parallel")( */ + /* "number_repetitions", */ + /* boost::program_options::value(&passes)->default_value(200), */ + /* "Sets the number of repetitions")( */ + /* "outputfile", */ + /* boost::program_options::value(&filename)->default_value( */ + /* ""), */ + /* "Redirect stdout/stderr to this file"); */ + + /* boost::program_options::variables_map vm; */ + /* boost::program_options::parsed_options options = */ + /* parse_command_line(argc, argv, desc); */ + /* boost::program_options::store(options, vm); */ + /* boost::program_options::notify(vm); */ + + /* if (vm.count("help") == 0u) { */ + /* std::cout << "Running with parameters:" << std::endl */ + /* << " --arraysize = " << array_size << std::endl */ + /* << " --futures = " << number_futures << std::endl */ + /* << " --passes = " << passes << std::endl */ + /* << " --hpx:threads = " << hpx::get_os_thread_count() */ + /* << std::endl; */ + /* } else { */ + /* std::cout << desc << std::endl; */ + /* return hpx::finalize(); */ + /* } */ + /* } catch (const boost::program_options::error &ex) { */ + /* std::cerr << "CLI argument problem found: " << ex.what() << '\n'; */ + /* } */ + // HPX and CPPuddle Setup for executor (polling + pool init) // =========================================== 0.a Init HPX CUDA polling hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; @@ -52,12 +95,15 @@ int hpx_main(int argc, char *argv[]) { // Launch tasks // Note: Repetitions may be out of order since they do not depend on each other in this toy sample hpx::cout << "Start launching tasks..." << std::endl; + + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); std::vector> repetition_futs(number_repetitions); for (size_t repetition = 0; repetition < number_repetitions; repetition++) { std::vector> futs(number_tasks); for (size_t task_id = 0; task_id < number_tasks; task_id++) { - futs[task_id] = hpx::async([task_id, &number_cpu_kernel_launches, - &number_gpu_kernel_launches]() { + auto gpu_task_lambda = [](const auto task_id, + auto &number_cpu_kernel_launches, + auto &number_gpu_kernel_launches) { // Inner Task Setup to launch the CUDA kernels: // =========================================== @@ -142,27 +188,63 @@ int hpx_main(int argc, char *argv[]) { // Inner Task Done! // =========================================== - }); + }; + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then([task_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches, gpu_task_lambda](auto && fut) { + gpu_task_lambda(task_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } else { + futs[task_id] = + hpx::async([task_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches, gpu_task_lambda]() { + gpu_task_lambda(task_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } } // Schedule output task to run once a repetition is done auto repetition_finished = hpx::when_all(futs); - repetition_futs.emplace_back(repetition_finished.then([repetition](auto &&fut) { + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { hpx::cout << "Repetition " << repetition << " done!" << std::endl; })); + } } hpx::cout << "All tasks launched asynchronously!" << std::endl << std::endl; // Schedule output task to run once all other tasks are done - auto all_done_fut = - hpx::when_all(repetition_futs) - .then([&number_cpu_kernel_launches, - &number_gpu_kernel_launches](auto &&fut) { - hpx::cout << "All tasks are done!" << std::endl; - hpx::cout << " => " << number_gpu_kernel_launches - << " kernels were run on the GPU" << std::endl; - hpx::cout << " => " << number_cpu_kernel_launches - << " kernels were using the CPU fallback" << std::endl << std::endl; - }); - all_done_fut.get(); + if (in_order_repetitions) { + previous_iteration_fut + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }) + .get(); + } else { + hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }) + .get(); + } hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); hpx::cout << "Finalizing..." << std::endl; From 1337e34b3e95f3dd23b961febdd8aed46ffe4922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Tue, 28 May 2024 15:02:49 +0200 Subject: [PATCH 04/12] Add cli args --- examples/cuda_vector_add.cu | 112 ++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 50 deletions(-) diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu index db39c67d..b0b10f1e 100644 --- a/examples/cuda_vector_add.cu +++ b/examples/cuda_vector_add.cu @@ -16,19 +16,17 @@ #include #include +// Compile-time options using float_t = float; using device_executor_t = hpx::cuda::experimental::cuda_executor; -constexpr size_t vector_size = 102400; -constexpr size_t entries_per_task = 1024; -constexpr size_t number_tasks = vector_size / entries_per_task; -constexpr size_t number_repetitions = 20; -constexpr size_t max_queue_length = 5; -constexpr size_t number_executors = 1; -constexpr size_t gpu_id = 0; -constexpr bool in_order_repetitions = true; - -static_assert(vector_size % entries_per_task == 0); +// Runtime options +size_t entries_per_task = 1024; +size_t number_tasks = 100; +size_t number_repetitions = 20; +size_t max_queue_length = 5; +size_t number_executors = 1; +size_t gpu_id = 0; __global__ void kernel_add(const float_t *input_a, const float_t *input_b, float_t *output_c) { @@ -38,48 +36,62 @@ __global__ void kernel_add(const float_t *input_a, const float_t *input_b, float int hpx_main(int argc, char *argv[]) { - /* try { */ - /* boost::program_options::options_description desc{"Options"}; */ - /* desc.add_options()("help", "Help screen")( */ - /* "elements_per_task", */ - /* boost::program_options::value(&array_size) */ - /* ->default_value(5000000), */ - /* "Size of the buffers")( */ - /* "tasks_per_repetition", */ - /* boost::program_options::value(&number_futures) */ - /* ->default_value(64), */ - /* "Sets the number of futures to be (potentially) executed in parallel")( */ - /* "number_repetitions", */ - /* boost::program_options::value(&passes)->default_value(200), */ - /* "Sets the number of repetitions")( */ - /* "outputfile", */ - /* boost::program_options::value(&filename)->default_value( */ - /* ""), */ - /* "Redirect stdout/stderr to this file"); */ + // Process example options + bool in_order_repetitions = false; + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions)->default_value(20), + "Sets the number of repetitions")( + "number_executors", + boost::program_options::value(&number_executors)->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length)->default_value(5), + "Maximum numbers of kernels queued per GPU executor"); - /* boost::program_options::variables_map vm; */ - /* boost::program_options::parsed_options options = */ - /* parse_command_line(argc, argv, desc); */ - /* boost::program_options::store(options, vm); */ - /* boost::program_options::notify(vm); */ + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); - /* if (vm.count("help") == 0u) { */ - /* std::cout << "Running with parameters:" << std::endl */ - /* << " --arraysize = " << array_size << std::endl */ - /* << " --futures = " << number_futures << std::endl */ - /* << " --passes = " << passes << std::endl */ - /* << " --hpx:threads = " << hpx::get_os_thread_count() */ - /* << std::endl; */ - /* } else { */ - /* std::cout << desc << std::endl; */ - /* return hpx::finalize(); */ - /* } */ - /* } catch (const boost::program_options::error &ex) { */ - /* std::cerr << "CLI argument problem found: " << ex.what() << '\n'; */ - /* } */ + std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_executors = " << number_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return hpx::finalize(); + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + } // HPX and CPPuddle Setup for executor (polling + pool init) - // =========================================== 0.a Init HPX CUDA polling + // =========================================== + // 0.a Init HPX CUDA polling hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); // 0.b Init CPPuddle executor pool @@ -93,9 +105,7 @@ int hpx_main(int argc, char *argv[]) { std::atomic number_gpu_kernel_launches = 0; // Launch tasks - // Note: Repetitions may be out of order since they do not depend on each other in this toy sample hpx::cout << "Start launching tasks..." << std::endl; - hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); std::vector> repetition_futs(number_repetitions); for (size_t repetition = 0; repetition < number_repetitions; repetition++) { @@ -189,6 +199,8 @@ int hpx_main(int argc, char *argv[]) { // Inner Task Done! // =========================================== }; + + // Schedule task either in order (one repetition after another) or out of order if (in_order_repetitions) { futs[task_id] = previous_iteration_fut.then([task_id, &number_cpu_kernel_launches, &number_gpu_kernel_launches, gpu_task_lambda](auto && fut) { From 33097e33004846224388b9b692e1f5ea1842c898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Tue, 28 May 2024 17:01:26 +0200 Subject: [PATCH 05/12] Separate into parts and add timer --- examples/cuda_vector_add.cu | 459 ++++++++++++++++++++++-------------- 1 file changed, 281 insertions(+), 178 deletions(-) diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu index b0b10f1e..81de15ad 100644 --- a/examples/cuda_vector_add.cu +++ b/examples/cuda_vector_add.cu @@ -1,4 +1,10 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + #include +#include #include #include #include @@ -20,31 +26,242 @@ using float_t = float; using device_executor_t = hpx::cuda::experimental::cuda_executor; -// Runtime options -size_t entries_per_task = 1024; -size_t number_tasks = 100; -size_t number_repetitions = 20; -size_t max_queue_length = 5; -size_t number_executors = 1; -size_t gpu_id = 0; +/** \file This example shows how to use HPX + CPPuddle with GPU-accelerated + * applications. Particulary we focus on how to use a) recycled pinned host + * memory, b) recycled device memory, c) the executor pool, d) the HPX-CUDA + * futures and the basic CPU/GPU load balancing based on executor usage in an + * HPX application. To demonstrate these features we just use the simplest of + * kernels: a vector add, that is repeated over a multitude of tasks (with + * varying, artifical dependencies inbetween). So while the compute kernel is + * basic, we still get to see how the CPPuddle/HPX features may be used. + * + * The example has three parts: First the GPU part, then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ +//================================================================================================= +// PART I: The GPU kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= +/** Just some example CUDA kernel. For simplicity it just adds two vectors. */ __global__ void kernel_add(const float_t *input_a, const float_t *input_b, float_t *output_c) { const int index = blockIdx.x * blockDim.x + threadIdx.x; output_c[index] = input_a[index] + input_b[index]; } -int hpx_main(int argc, char *argv[]) { +/** Method that demonstrates how one might launch a CUDA kernel with HPX and + * CPPuddle recycled memory/executor! By using CPPuddle allocators to avoid + * allocating GPU memory and HPX futures to track the status of the + * kernel/memory transfers, this method is expected to be non-blocking both on + * the launching CPU thread and on the GPU (non malloc barriers). Hence, this + * launch method is suitable to quickly launch a multitude of GPU kernels if + * required. + * + * This method uses the following features: + * - Recycled pinned host memory. + * - Recycled device memory. + * - Draws GPU executor from the CPPuddle executor pool. + * - CPU-GPU load balancing based on the number of GPU executors and their queue length. + * - Asynchronous data-transfers and lauching of the kernel. + * - HPX futures to suspend the HPX task until kernel and data-transfers are done. + * - Includes (sample) pre- and post-processing. */ +void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // 1. Create required per task host-side buffers using CPPuddle recycled + // pinned memory + std::vector> + host_a(entries_per_task); + std::vector> + host_b(entries_per_task); + std::vector> + host_c(entries_per_task); - // Process example options - bool in_order_repetitions = false; + // 2. Host-side preprocessing (usually: communication, here fill dummy input) + std::fill(host_a.begin(), host_a.end(), 1.0); + std::fill(host_b.begin(), host_b.end(), 2.0); + + // 3. Check GPU utilization - Method will return true if there is an executor + // in the pool that does currently not exceed its queue limit (tracked by + // RAII, no CUDA API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + // 4. Run Kernel on either CPU or GPU + if (!device_executor_available) { + // 4a. Launch CPU Fallback Version + number_cpu_kernel_launches++; + for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { + host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; + } + } else { + // 4b. Create per_task device-side buffers (using recylced device memory) + // and draw GPU executor from CPPuddle executor pool + number_gpu_kernel_launches++; + cppuddle::executor_recycling::executor_interface< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + cppuddle::memory_recycling::cuda_device_buffer device_a( + entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer device_b( + entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer device_c( + entries_per_task); + + // 4c. Launch data transfers and kernel + hpx::apply(static_cast(executor), cudaMemcpyAsync, + device_a.device_side_buffer, host_a.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + hpx::apply(static_cast(executor), cudaMemcpyAsync, + device_b.device_side_buffer, host_b.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + void *args[] = {&device_a.device_side_buffer, &device_b.device_side_buffer, + &device_c.device_side_buffer}; + hpx::apply(static_cast(executor), + cudaLaunchKernel, kernel_add, + entries_per_task / 128, 128, args, 0); + auto fut = + hpx::async(static_cast(executor), cudaMemcpyAsync, + host_c.data(), device_c.device_side_buffer, + entries_per_task * sizeof(float_t), cudaMemcpyDeviceToHost); + fut.get(); // Allow worker thread to jump away until the kernel and + // data-transfers are done + } + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + if (!std::all_of(host_c.begin(), host_c.end(), + [](float_t i) { return i == 1.0 + 2.0; })) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + } +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + &number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its CUDA futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the CUDA futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know having the + * number_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_executors, const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_executors, gpu_id, true); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_executors, + size_t &max_queue_length) { try { boost::program_options::options_description desc{"Options"}; desc.add_options()("help", "Help screen")( "elements_per_task", boost::program_options::value(&entries_per_task) ->default_value(1024), - "Number of elements added per task (corresponds to the number of CUDA workitems used per kernel)")( + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( "tasks_per_repetition", boost::program_options::value(&number_tasks) ->default_value(100), @@ -54,13 +271,16 @@ int hpx_main(int argc, char *argv[]) { ->default_value(false), "Execute repetitions in-order")( "number_repetitions", - boost::program_options::value(&number_repetitions)->default_value(20), + boost::program_options::value(&number_repetitions) + ->default_value(20), "Sets the number of repetitions")( "number_executors", - boost::program_options::value(&number_executors)->default_value(32), + boost::program_options::value(&number_executors) + ->default_value(32), "Number of GPU executors in the pool")( "max_queue_length_per_executor", - boost::program_options::value(&max_queue_length)->default_value(5), + boost::program_options::value(&max_queue_length) + ->default_value(5), "Maximum numbers of kernels queued per GPU executor"); boost::program_options::variables_map vm; @@ -69,6 +289,11 @@ int hpx_main(int argc, char *argv[]) { boost::program_options::store(options, vm); boost::program_options::notify(vm); + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; std::cout << "=====================================================" << std::endl; if (vm.count("help") == 0u) { @@ -83,183 +308,61 @@ int hpx_main(int argc, char *argv[]) { << std::endl << std::endl; } else { std::cout << desc << std::endl; - return hpx::finalize(); + return false; } } catch (const boost::program_options::error &ex) { std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; } + return true; +} - // HPX and CPPuddle Setup for executor (polling + pool init) - // =========================================== - // 0.a Init HPX CUDA polling - hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; - hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); - // 0.b Init CPPuddle executor pool - cppuddle::executor_recycling::executor_pool::init_executor_pool< - device_executor_t, - cppuddle::executor_recycling::round_robin_pool_impl>( - gpu_id, number_executors, gpu_id, true); - hpx::cout << "Init done!" << std::endl << std::endl; - +int hpx_main(int argc, char *argv[]) { + // Launch counters std::atomic number_cpu_kernel_launches = 0; std::atomic number_gpu_kernel_launches = 0; - // Launch tasks - hpx::cout << "Start launching tasks..." << std::endl; - hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); - std::vector> repetition_futs(number_repetitions); - for (size_t repetition = 0; repetition < number_repetitions; repetition++) { - std::vector> futs(number_tasks); - for (size_t task_id = 0; task_id < number_tasks; task_id++) { - auto gpu_task_lambda = [](const auto task_id, - auto &number_cpu_kernel_launches, - auto &number_gpu_kernel_launches) { - // Inner Task Setup to launch the CUDA kernels: - // =========================================== - - // 1. Create required per task host-side buffers - std::vector< - float_t, - cppuddle::memory_recycling::recycle_allocator_cuda_host> - host_a(entries_per_task); - std::vector< - float_t, - cppuddle::memory_recycling::recycle_allocator_cuda_host> - host_b(entries_per_task); - std::vector< - float_t, - cppuddle::memory_recycling::recycle_allocator_cuda_host> - host_c(entries_per_task); - - // 2. Host-side preprocessing (usually: communication, here fill dummy - // input) - std::fill(host_a.begin(), host_a.end(), 1.0); - std::fill(host_b.begin(), host_b.end(), 2.0); - - // 3. Check GPU utiliation - bool device_executor_available = - cppuddle::executor_recycling::executor_pool::interface_available< - device_executor_t, - cppuddle::executor_recycling::round_robin_pool_impl< - device_executor_t>>(max_queue_length, gpu_id); - - //4. Run Kernel on either CPU or GPU - if (!device_executor_available) { - // 4a. Launch CPU Fallback Version - number_cpu_kernel_launches++; - for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { - host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; - } - } else { - // 4b. Create per_task device-side buffers and draw executor - number_gpu_kernel_launches++; - cppuddle::executor_recycling::executor_interface< - device_executor_t, cppuddle::executor_recycling:: - round_robin_pool_impl> - executor(gpu_id); - cppuddle::memory_recycling::cuda_device_buffer - device_a(entries_per_task); - cppuddle::memory_recycling::cuda_device_buffer - device_b(entries_per_task); - cppuddle::memory_recycling::cuda_device_buffer - device_c(entries_per_task); - - // 4c. Launch data transfers and kernel - hpx::apply( - static_cast(executor), - cudaMemcpyAsync, device_a.device_side_buffer, host_a.data(), - entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); - hpx::apply( - static_cast(executor), - cudaMemcpyAsync, device_b.device_side_buffer, host_b.data(), - entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); - void *args[] = {&device_a.device_side_buffer, - &device_b.device_side_buffer, - &device_c.device_side_buffer}; - hpx::apply( - static_cast(executor), - cudaLaunchKernel, kernel_add, - entries_per_task / 128, 128, args, 0); - auto fut = hpx::async( - static_cast(executor), - cudaMemcpyAsync, host_c.data(), device_c.device_side_buffer, - entries_per_task * sizeof(float_t), cudaMemcpyDeviceToHost); - fut.get(); // Allow worker thread to jump away until the kernel and - // data-transfers are done - } + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_executors = 1; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_executors, max_queue_length)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } - // 5. Host-side postprocessing (usually: communication, here: check - // correctness) - if (!std::all_of(host_c.begin(), host_c.end(), - [](float_t i) { return i == 1.0 + 2.0; })) { - std::cerr << "Task " << task_id << " contained wrong results!!" - << std::endl; - } + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_executors, gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; - // Inner Task Done! - // =========================================== - }; - // Schedule task either in order (one repetition after another) or out of order - if (in_order_repetitions) { - futs[task_id] = previous_iteration_fut.then([task_id, &number_cpu_kernel_launches, - &number_gpu_kernel_launches, gpu_task_lambda](auto && fut) { - gpu_task_lambda(task_id, number_cpu_kernel_launches, - number_gpu_kernel_launches); - }); - } else { - futs[task_id] = - hpx::async([task_id, &number_cpu_kernel_launches, - &number_gpu_kernel_launches, gpu_task_lambda]() { - gpu_task_lambda(task_id, number_cpu_kernel_launches, - number_gpu_kernel_launches); - }); - } - } - // Schedule output task to run once a repetition is done - auto repetition_finished = hpx::when_all(futs); - if (in_order_repetitions) { - previous_iteration_fut = - repetition_finished.then([repetition](auto &&fut) { - hpx::cout << "Repetition " << repetition << " done!" << std::endl; - }); - } else { - repetition_futs.emplace_back( - repetition_finished.then([repetition](auto &&fut) { - hpx::cout << "Repetition " << repetition << " done!" << std::endl; - })); - } - } - hpx::cout << "All tasks launched asynchronously!" << std::endl << std::endl; - // Schedule output task to run once all other tasks are done - if (in_order_repetitions) { - previous_iteration_fut - .then([&number_cpu_kernel_launches, - &number_gpu_kernel_launches](auto &&fut) { - hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; - hpx::cout << " => " << number_gpu_kernel_launches - << " kernels were run on the GPU" << std::endl; - hpx::cout << " => " << number_cpu_kernel_launches - << " kernels were using the CPU fallback" << std::endl - << std::endl; - }) - .get(); - } else { - hpx::when_all(repetition_futs) - .then([&number_cpu_kernel_launches, - &number_gpu_kernel_launches](auto &&fut) { - hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; - hpx::cout << " => " << number_gpu_kernel_launches - << " kernels were run on the GPU" << std::endl; - hpx::cout << " => " << number_cpu_kernel_launches - << " kernels were using the CPU fallback" << std::endl - << std::endl; - }) - .get(); - } + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; - hpx::cuda::experimental::detail::unregister_polling(hpx::resource::get_thread_pool(0)); + // Finalize HPX (CPPuddle finalizes automatically) hpx::cout << "Finalizing..." << std::endl; + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); return hpx::finalize(); } From f09c6aaf14d201cdaca16a9cdf2835cde6d264f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Thu, 30 May 2024 00:19:47 +0200 Subject: [PATCH 06/12] Add kokkos recycling example --- CMakeLists.txt | 11 + examples/cuda_vector_add.cu | 50 +- examples/recycling-with-hpx-kokkos.cpp | 477 ++++++++++++++++++ .../util/recycling_kokkos_view.hpp | 4 + 4 files changed, 523 insertions(+), 19 deletions(-) create mode 100644 examples/recycling-with-hpx-kokkos.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 81b3f853..1a9a3dd1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -262,6 +262,17 @@ if (CPPUDDLE_WITH_TESTS) SOURCES examples/cuda_vector_add.cu ) + + if (CPPUDDLE_WITH_KOKKOS) + add_hpx_executable( + recycling-with-hpx-kokkos + DEPENDENCIES + Boost::boost Boost::program_options Kokkos::kokkos HPXKokkos::hpx_kokkos HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/recycling-with-hpx-kokkos.cpp + ) + endif() endif() endif() #------------------------------------------------------------------------------------------------------------ diff --git a/examples/cuda_vector_add.cu b/examples/cuda_vector_add.cu index 81de15ad..81670172 100644 --- a/examples/cuda_vector_add.cu +++ b/examples/cuda_vector_add.cu @@ -3,6 +3,10 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Simplify specifying an executor pool (at least when using the +// default round_robin_pool_impl). The current way seems awfully verbose + #include #include #include @@ -13,6 +17,7 @@ #include +#include #include #include #include @@ -22,28 +27,31 @@ #include #include -// Compile-time options -using float_t = float; -using device_executor_t = hpx::cuda::experimental::cuda_executor; /** \file This example shows how to use HPX + CPPuddle with GPU-accelerated * applications. Particulary we focus on how to use a) recycled pinned host * memory, b) recycled device memory, c) the executor pool, d) the HPX-CUDA * futures and the basic CPU/GPU load balancing based on executor usage in an * HPX application. To demonstrate these features we just use the simplest of - * kernels: a vector add, that is repeated over a multitude of tasks (with + * kernels: a vector addition that is repeated over a multitude of tasks (with * varying, artifical dependencies inbetween). So while the compute kernel is - * basic, we still get to see how the CPPuddle/HPX features may be used. + * basic, we still get to see how the CPPuddle/HPX features may be used with + * it. * * The example has three parts: First the GPU part, then the HPX task graph * management and lastly the remaining initialization/boilerplate code */ //================================================================================================= -// PART I: The GPU kernel and how to launch it with CPPuddle + HPX whilst avoid +// PART I: The (CUDA) GPU kernel and how to launch it with CPPuddle + HPX whilst avoid // any CPU/GPU barriers //================================================================================================= +// Compile-time options: float type... +using float_t = float; +// ... and we will use the HPX CUDA executor inside the executor pool later on +using device_executor_t = hpx::cuda::experimental::cuda_executor; + /** Just some example CUDA kernel. For simplicity it just adds two vectors. */ __global__ void kernel_add(const float_t *input_a, const float_t *input_b, float_t *output_c) { const int index = blockIdx.x * blockDim.x + threadIdx.x; @@ -97,15 +105,15 @@ void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, // 4. Run Kernel on either CPU or GPU if (!device_executor_available) { - // 4a. Launch CPU Fallback Version number_cpu_kernel_launches++; + // 4a. Launch CPU Fallback Version for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; } } else { + number_gpu_kernel_launches++; // 4b. Create per_task device-side buffers (using recylced device memory) // and draw GPU executor from CPPuddle executor pool - number_gpu_kernel_launches++; cppuddle::executor_recycling::executor_interface< device_executor_t, cppuddle::executor_recycling::round_robin_pool_impl> @@ -236,23 +244,23 @@ build_task_graph(const size_t number_repetitions, const size_t number_tasks, * Polling usually has the superior performance, however, it requires that the * polling is initialized at startup (or at least before the CUDA futures are * used). The CPPuddle executor pool also needs initialzing as we need to set it - * to a specified number of executors (which CPPuddle cannot know having the - * number_executors parameter). We will use the round_robin_pool_impl for + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for * simplicity. A priority_pool_impl is also available. */ -void init_executor_pool_and_polling(const size_t number_executors, const size_t gpu_id) { +void init_executor_pool_and_polling(const size_t number_gpu_executors, const size_t gpu_id) { assert(gpu_id == 0); // MultiGPU not used in this example hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); cppuddle::executor_recycling::executor_pool::init_executor_pool< device_executor_t, cppuddle::executor_recycling::round_robin_pool_impl>( - gpu_id, number_executors, gpu_id, true); + gpu_id, number_gpu_executors, gpu_id, true); } /// Processes the CLI options via boost program_options to configure the example bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, size_t &number_tasks, bool &in_order_repetitions, - size_t &number_repetitions, size_t &number_executors, + size_t &number_repetitions, size_t &number_gpu_executors, size_t &max_queue_length) { try { boost::program_options::options_description desc{"Options"}; @@ -274,8 +282,8 @@ bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, boost::program_options::value(&number_repetitions) ->default_value(20), "Sets the number of repetitions")( - "number_executors", - boost::program_options::value(&number_executors) + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) ->default_value(32), "Number of GPU executors in the pool")( "max_queue_length_per_executor", @@ -302,7 +310,7 @@ bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, << " --tasks_per_repetition = " << number_tasks << std::endl << " --number_repetitions = " << number_repetitions << std::endl << " --in_order_repetitions = " << in_order_repetitions << std::endl - << " --number_executors = " << number_executors << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl << " --max_queue_length_per_executor = " << max_queue_length << std::endl << " --hpx:threads = " << hpx::get_os_thread_count() << std::endl << std::endl; @@ -328,17 +336,17 @@ int hpx_main(int argc, char *argv[]) { size_t number_repetitions = 20; bool in_order_repetitions = false; size_t max_queue_length = 5; - size_t number_executors = 1; + size_t number_gpu_executors = 1; size_t gpu_id = 0; if(!process_cli_options(argc, argv, entries_per_task, number_tasks, in_order_repetitions, number_repetitions, - number_executors, max_queue_length)) { + number_gpu_executors, max_queue_length)) { return hpx::finalize(); // problem with CLI parameters detected -> exiting.. } // Init HPX CUDA polling + executor pool hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; - init_executor_pool_and_polling(number_executors, gpu_id); + init_executor_pool_and_polling(number_gpu_executors, gpu_id); hpx::cout << "Init done!" << std::endl << std::endl; @@ -361,6 +369,10 @@ int hpx_main(int argc, char *argv[]) { // Finalize HPX (CPPuddle finalizes automatically) hpx::cout << "Finalizing..." << std::endl; + // Deallocates all CPPuddle everything and prevent further usage. Technically + // not required as long as static variables with CPPuddle-managed memory are + // not used, however, it does not hurt either. + cppuddle::memory_recycling::finalize(); hpx::cuda::experimental::detail::unregister_polling( hpx::resource::get_thread_pool(0)); return hpx::finalize(); diff --git a/examples/recycling-with-hpx-kokkos.cpp b/examples/recycling-with-hpx-kokkos.cpp new file mode 100644 index 00000000..250faba8 --- /dev/null +++ b/examples/recycling-with-hpx-kokkos.cpp @@ -0,0 +1,477 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Improve type accessiblity (user should not worry about the +// activated Kokkos backend like belew to pick the correct view types +// TODO(daissgr) Add unified CPPuddle finalize that also cleans up all executor +// pool (and avoids having to use the cleanup methds of the individual pools + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + + +/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated + * applications. The example is extremly similary to its CUDA counterpart, however, uses + * Kokkos for implementation to showcase the required boilerplate and offered features. + * Particulary we focus on how to use a) recycled pinned host + * memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos + * futures and the basic CPU/GPU load balancing based on executor usage in an + * HPX application. To demonstrate these features we just use the simplest of + * kernels: a vector add, that is repeated over a multitude of tasks (with + * varying, artifical dependencies inbetween). So while the compute kernel is + * basic, we still get to see how the CPPuddle/HPX features may be used. + * + * The example has three parts: First the GPU part, then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ + +//================================================================================================= +// PART I: The Kokkos kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= + +// Define types: A lot of this can be done automatically, however, here we want to show the manual +// approach (as using different types/ifdefs can allow us to specialize kernels for specific hardware +// if required. +// +using float_t = float; +// Use correct device exeuction space and memory spaces depending on the activated device +// execution space +#ifdef KOKKOS_ENABLE_CUDA +// Pick executor type +using device_executor_t = hpx::kokkos::cuda_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_host; +#elif KOKKOS_ENABLE_HIP +// Pick executor type +using device_executor_t = hpx::kokkos::hip_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_host; +#elif KOKKOS_ENABLE_SYCL +// Pick executor type +using device_executor_t = hpx::kokkos::sycl_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_host; +#else +#error "Example assumes both a host and a device Kokkos execution space are available" +#endif +// Plug together the defined Kokkos views with the recycling CPPuddle allocators +// This yields a new type that can be used just like a normal Kokkos View but gets its memory from +// CPPuddle. +using recycling_device_view_t = + cppuddle::memory_recycling::recycling_view; +using recycling_host_view_t = + cppuddle::memory_recycling::recycling_view; + +// Run host kernels on HPX execution space: +using host_executor_t = hpx::kokkos::hpx_executor; +// Serial executor can actually work well, too when interleaving multiple Kokkos kernels to +// achieve multicore usage. However, be aware that this only works for Kokkos kernels that are not +// using team policies / scratch memory (those use a global barrier across all Serial execution +// spaces): +// using host_executor_t = hpx::kokkos::serial_executor; + +// The actual compute kernel: Simply defines a exeuction policy with the given executor and runs the +// kernel with a Kokkos::parallel_for +template +void kernel_add(executor_t &executor, const size_t entries_per_task, + const view_t &input_a, const view_t &input_b, view_t &output_c) +{ + // Define exeuction policy + auto execution_policy = Kokkos::Experimental::require( + Kokkos::RangePolicy( + executor.instance(), 0, entries_per_task), + Kokkos::Experimental::WorkItemProperty::HintLightWeight); + + // Run Kernel with execution policy (and give it some name ideally) + Kokkos::parallel_for( + "sample vector add kernel", execution_policy, + KOKKOS_LAMBDA(size_t index) { + output_c[index] = input_a[index] + input_b[index]; + }); +} + +/** Method that demonstrates how one might launch a Kokkos kernel with HPX and + * CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid + * allocating GPU memory and HPX futures to track the status of the + * kernel/memory transfers, this method is expected to be non-blocking both on + * the launching CPU thread and on the GPU (non malloc barriers). Hence, this + * launch method is suitable to quickly launch a multitude of GPU kernels if + * required. + * + * This method uses the following features: + * - Recycled pinned host memory. + * - Recycled device memory. + * - Draws GPU executor from the CPPuddle executor pool. + * - CPU-GPU load balancing based on the number of GPU executors and their queue length. + * - Asynchronous data-transfers and lauching of the kernel. + * - HPX futures to suspend the HPX task until kernel and data-transfers are done. + * - Includes (sample) pre- and post-processing. */ +void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // 1. Create recycled Kokkos host views + recycling_host_view_t host_a(entries_per_task); + recycling_host_view_t host_b(entries_per_task); + recycling_host_view_t host_c(entries_per_task); + + // 2. Host-side preprocessing (usually: communication, here fill dummy input) + for (size_t i = 0; i < entries_per_task; i++) { + host_a[i] = 1.0; + host_b[i] = 2.0; + } + + // 3. Check GPU utilization - Method will return true if there is an executor + // in the pool that does currently not exceed its queue limit (tracked by + // RAII, no CUDA/HIP/SYCL API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + // 4. Run Kernel on either CPU or GPU + if (!device_executor_available) { + // 4a. Launch CPU Fallback Version + number_cpu_kernel_launches++; + // Draw host executor + cppuddle::executor_recycling::executor_interface< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + + // Launch + kernel_add(static_cast(executor), entries_per_task, host_a, host_b, host_c); + + // Sync kernel + static_cast(executor).instance().fence(); + + } else { + number_gpu_kernel_launches++; + // 4b. Create per_task device-side views (using recylced device memory) + // and draw GPU executor from CPPuddle executor pool + // Draw host device + cppuddle::executor_recycling::executor_interface< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + + recycling_device_view_t device_a(entries_per_task); + recycling_device_view_t device_b(entries_per_task); + recycling_device_view_t device_c(entries_per_task); + + // 4c. Launch data transfers and kernel + Kokkos::deep_copy(executor.interface.instance(), device_a, host_a); + Kokkos::deep_copy(executor.interface.instance(), device_b, host_b); + + kernel_add(static_cast(executor), entries_per_task, + device_a, device_b, device_c); + + auto transfer_fut = hpx::kokkos::deep_copy_async( + executor.interface.instance(), host_c, device_c); + transfer_fut.get(); + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + for (size_t i = 0; i < entries_per_task; i++) { + if (host_c[i] != 1.0 + 2.0) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + break; + } + } + } + + +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + &number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its HPX-Kokkos futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the HPX-Kokkos futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_gpu_executors, + const size_t number_cpu_executors, + const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + // Init polling + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + // Init device executors + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_gpu_executors, hpx::kokkos::execution_space_mode::independent); + /* // Init host executors (fixed to 256) */ + cppuddle::executor_recycling::executor_pool::init_all_executor_pools< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + number_cpu_executors, hpx::kokkos::execution_space_mode::independent); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_gpu_executors, + size_t &max_queue_length) { + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions) + ->default_value(20), + "Sets the number of repetitions")( + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) + ->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length) + ->default_value(5), + "Maximum numbers of kernels queued per GPU executor"); + + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); + + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + + std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return false; + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; + } + return true; +} + +int hpx_main(int argc, char *argv[]) { + // Init Kokkos + Kokkos::initialize(); + // Init/Finalize Kokkos alternative using RAII: + /* hpx::kokkos::ScopeGuard g(argc, argv); */ + // Launch counters + std::atomic number_cpu_kernel_launches = 0; + std::atomic number_gpu_kernel_launches = 0; + + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_gpu_executors = 1; + size_t number_cpu_executors = 128; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_gpu_executors, max_queue_length)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } + + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_gpu_executors, number_cpu_executors, + gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; + + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; + + // Finalize HPX (CPPuddle finalizes automatically) + hpx::cout << "Finalizing..." << std::endl; + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); + + // Cleanup (executor_pool cleanup required to deallocate all Kokkos execution + // spaces before Kokkos finalize is called) + cppuddle::executor_recycling::executor_pool::cleanup< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::executor_recycling::executor_pool::cleanup< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::memory_recycling::finalize(); + Kokkos::finalize(); // only required if hpx-kokkos Scope Guard is not used + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} diff --git a/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp b/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp index 97626ebb..2696e705 100644 --- a/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp +++ b/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp @@ -35,6 +35,8 @@ struct view_deleter { /// Kokkos View that automatically uses a recycling allocator using /// alloc_type as an underlying allocator. Must be passed an existing allocator object /// (which should be an allocator_slice from the kernel aggregation functionality) +/** Requires the underlying Kokkos View (kokkos_type) to be a View using the + * MemoryUnmanaged trait! */ template class aggregated_recycling_view : public kokkos_type { private: @@ -95,6 +97,8 @@ class aggregated_recycling_view : public kokkos_type { /// Kokkos View that automatically uses a recycling allocator using /// alloc_type as an underlying allocator +/** Requires the underlying Kokkos View (kokkos_type) to be a View using the + * MemoryUnmanaged trait! */ template class recycling_view : public kokkos_type { private: From a12fb023424724577ac7a2b1ea8cc53cb3071600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Thu, 30 May 2024 22:48:14 +0200 Subject: [PATCH 07/12] Add inital aggregation example --- CMakeLists.txt | 8 + .../kernel-aggregation-with-hpx-kokkos.cpp | 510 ++++++++++++++++++ examples/recycling-with-hpx-kokkos.cpp | 23 +- 3 files changed, 529 insertions(+), 12 deletions(-) create mode 100644 examples/kernel-aggregation-with-hpx-kokkos.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a9a3dd1..fb305106 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -272,6 +272,14 @@ if (CPPUDDLE_WITH_TESTS) SOURCES examples/recycling-with-hpx-kokkos.cpp ) + add_hpx_executable( + kernel-aggregation-with-hpx-kokkos + DEPENDENCIES + Boost::boost Boost::program_options Kokkos::kokkos HPXKokkos::hpx_kokkos HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/kernel-aggregation-with-hpx-kokkos.cpp + ) endif() endif() endif() diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp new file mode 100644 index 00000000..d7b4ecbf --- /dev/null +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -0,0 +1,510 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Improve type accessiblity (user should not worry about the +// activated Kokkos backend like belew to pick the correct view types +// TODO(daissgr) Add unified CPPuddle finalize that also cleans up all executor +// pool (and avoids having to use the cleanup methds of the individual pools + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include + + +/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated + * applications. The example is extremly similary to its CUDA counterpart, however, uses + * Kokkos for implementation to showcase the required boilerplate and offered features. + * Particulary we focus on how to use a) recycled pinned host + * memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos + * futures and the basic CPU/GPU load balancing based on executor usage in an + * HPX application. To demonstrate these features we just use the simplest of + * kernels: a vector add, that is repeated over a multitude of tasks (with + * varying, artifical dependencies inbetween). So while the compute kernel is + * basic, we still get to see how the CPPuddle/HPX features may be used. + * + * The example has three parts: First the GPU part, then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ + +//================================================================================================= +// PART I: The Kokkos kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= + +// Define types: A lot of this can be done automatically, however, here we want to show the manual +// approach (as using different types/ifdefs can allow us to specialize kernels for specific hardware +// if required. +// +using float_t = float; +// Use correct device exeuction space and memory spaces depending on the activated device +// execution space +#ifdef KOKKOS_ENABLE_CUDA +#include +// Pick executor type +using device_executor_t = hpx::kokkos::cuda_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_host; +#elif KOKKOS_ENABLE_HIP +#include +// Pick executor type +using device_executor_t = hpx::kokkos::hip_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_host; +#elif KOKKOS_ENABLE_SYCL +#include +// Pick executor type +using device_executor_t = hpx::kokkos::sycl_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_host; +#else +#error "Example assumes both a host and a device Kokkos execution space are available" +#endif +// Plug together the defined Kokkos views with the recycling CPPuddle allocators +// This yields a new type that can be used just like a normal Kokkos View but gets its memory from +// CPPuddle. +using recycling_device_view_t = + cppuddle::memory_recycling::recycling_view; +using recycling_host_view_t = + cppuddle::memory_recycling::recycling_view; +using aggregated_device_view_t = + cppuddle::memory_recycling::aggregated_recycling_view< + kokkos_device_view_t, + cppuddle::kernel_aggregation::allocator_slice< + float_t, device_allocator_t, device_executor_t>, + float_t>; +using aggregated_host_view_t = + cppuddle::memory_recycling::aggregated_recycling_view< + kokkos_host_view_t, + cppuddle::kernel_aggregation::allocator_slice, + float_t>; + +// Run host kernels on HPX execution space: +using host_executor_t = hpx::kokkos::hpx_executor; +// Serial executor can actually work well, too when interleaving multiple Kokkos kernels to +// achieve multicore usage. However, be aware that this only works for Kokkos kernels that are not +// using team policies / scratch memory (those use a global barrier across all Serial execution +// spaces): +// using host_executor_t = hpx::kokkos::serial_executor; + +// The actual compute kernel: Simply defines a exeuction policy with the given executor and runs the +// kernel with a Kokkos::parallel_for +template +void kernel_add(executor_t &executor, const size_t entries_per_task, + const view_t &input_a, const view_t &input_b, view_t &output_c) +{ + // Define exeuction policy + auto execution_policy = Kokkos::Experimental::require( + Kokkos::RangePolicy( + executor.instance(), 0, entries_per_task), + Kokkos::Experimental::WorkItemProperty::HintLightWeight); + + // Run Kernel with execution policy (and give it some name ideally) + Kokkos::parallel_for( + "sample vector add kernel", execution_policy, + KOKKOS_LAMBDA(size_t index) { + output_c[index] = input_a[index] + input_b[index]; + }); +} + +/** Method that demonstrates how one might launch a Kokkos kernel with HPX and + * CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid + * allocating GPU memory and HPX futures to track the status of the + * kernel/memory transfers, this method is expected to be non-blocking both on + * the launching CPU thread and on the GPU (non malloc barriers). Hence, this + * launch method is suitable to quickly launch a multitude of GPU kernels if + * required. + * + * This method uses the following features: + * - Recycled pinned host memory. + * - Recycled device memory. + * - Draws GPU executor from the CPPuddle executor pool. + * - CPU-GPU load balancing based on the number of GPU executors and their queue length. + * - Asynchronous data-transfers and lauching of the kernel. + * - HPX futures to suspend the HPX task until kernel and data-transfers are done. + * - Includes (sample) pre- and post-processing. */ +void launch_gpu_kernel_task( + const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + const size_t max_kernels_fused, + std::atomic &number_aggregated_kernel_launches, + std::atomic &number_kernel_launches) { + // 3. Check GPU utilization - Method will return true if there is an + // executor in the pool that does currently not exceed its queue limit + // (tracked by RAII, no CUDA/HIP/SYCL API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + static const char aggregation_region_name[] = "vector_add_aggregation"; + auto kernel_done_future = cppuddle::kernel_aggregation::aggregation_region< + aggregation_region_name, device_executor_t, + void>(max_kernels_fused, [&](auto slice_id, auto number_slices, + auto &aggregation_executor) { + cppuddle::kernel_aggregation::allocator_slice + alloc_host = aggregation_executor + .template make_allocator(); + assert(number_slices >= 1); + assert(number_slices < max_kernels_fused); + + if (aggregation_executor.sync_aggregation_slices()) { + // Only executed once per team + number_aggregated_kernel_launches++; + } + // Executed by each team member + number_kernel_launches++; + // 1. Create recycled Kokkos host views + aggregated_host_view_t host_a(alloc_host, entries_per_task * max_kernels_fused); + aggregated_host_view_t host_b(alloc_host, entries_per_task * max_kernels_fused); + aggregated_host_view_t host_c(alloc_host, entries_per_task * max_kernels_fused); + + auto [host_a_slice, host_b_slice, host_c_slice] = + cppuddle::kernel_aggregation::map_views_to_slice( + aggregation_executor, host_a, host_b, host_c); + + // 2. Host-side preprocessing (usually: communication, here fill dummy + // input) + for (size_t i = 0; i < entries_per_task; i++) { + host_a_slice[i] = 1.0; + host_b_slice[i] = 2.0; + } + + cppuddle::kernel_aggregation::allocator_slice + alloc_device = aggregation_executor + .template make_allocator(); + aggregated_device_view_t device_a(alloc_device, entries_per_task * max_kernels_fused); + aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused); + aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused); + + // 4c. Launch data transfers and kernel + cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, + device_a, host_a); + cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, + device_b, host_b); + + if (aggregation_executor.sync_aggregation_slices()) { + kernel_add(aggregation_executor.get_underlying_executor(), + entries_per_task, device_a, device_b, device_c); + } + + auto transfer_fut = + cppuddle::kernel_aggregation::aggregrated_deep_copy_async( + aggregation_executor, host_c, device_c); + transfer_fut.get(); + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + for (size_t i = 0; i < entries_per_task; i++) { + if (host_c[i] != 1.0 + 2.0) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + break; + } + } + }); + kernel_done_future.get(); +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + const size_t max_kernels_fused, + std::atomic &number_aggregated_kernel_launches, + std::atomic &number_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + max_kernels_fused, &number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + max_kernels_fused, number_aggregated_kernel_launches, + number_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, max_kernels_fused, + &number_aggregated_kernel_launches, + &number_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, max_kernels_fused, + number_aggregated_kernel_launches, + number_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_kernel_launches + << " were scheduled (before kernel fusion)" << std::endl; + hpx::cout << " => " << number_aggregated_kernel_launches + << " fused kernels were launched" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_kernel_launches + << " were scheduled (before kernel fusion)" << std::endl; + hpx::cout << " => " << number_aggregated_kernel_launches + << " fused kernels were launched" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its HPX-Kokkos futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the HPX-Kokkos futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_gpu_executors, + const size_t number_cpu_executors, + const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + // Init polling + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + // Init device executors + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_gpu_executors, hpx::kokkos::execution_space_mode::independent); + /* // Init host executors (fixed to 256) */ + cppuddle::executor_recycling::executor_pool::init_all_executor_pools< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + number_cpu_executors, hpx::kokkos::execution_space_mode::independent); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_gpu_executors, + size_t &max_queue_length, size_t &max_kernels_fused) { + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions) + ->default_value(20), + "Sets the number of repetitions")( + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) + ->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length) + ->default_value(5), + "Maximum numbers of kernels queued per GPU executor")( + "max_kernels_fused", + boost::program_options::value(&max_kernels_fused) + ->default_value(4), + "The maximum amount of kernels being fused together (keep below 128 ideally)"); + + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); + + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + + std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --max_kernels_fused = " << max_kernels_fused << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return false; + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; + } + return true; +} + +int hpx_main(int argc, char *argv[]) { + // Init Kokkos + Kokkos::initialize(); + // Init/Finalize Kokkos alternative using RAII: + /* hpx::kokkos::ScopeGuard g(argc, argv); */ + // Launch counters + std::atomic number_aggregated_kernel_launches = 0; + std::atomic number_kernel_launches = 0; + + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_gpu_executors = 1; + size_t number_cpu_executors = 128; + size_t max_kernels_fused = 4; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_gpu_executors, max_queue_length, max_kernels_fused)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } + + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_gpu_executors, number_cpu_executors, + gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; + + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, max_kernels_fused, + number_aggregated_kernel_launches, number_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; + + // Finalize HPX (CPPuddle finalizes automatically) + hpx::cout << "Finalizing..." << std::endl; + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); + + // Cleanup (executor_pool cleanup required to deallocate all Kokkos execution + // spaces before Kokkos finalize is called) + cppuddle::executor_recycling::executor_pool::cleanup< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::executor_recycling::executor_pool::cleanup< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::memory_recycling::finalize(); + Kokkos::finalize(); // only required if hpx-kokkos Scope Guard is not used + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} diff --git a/examples/recycling-with-hpx-kokkos.cpp b/examples/recycling-with-hpx-kokkos.cpp index 250faba8..fde4fa64 100644 --- a/examples/recycling-with-hpx-kokkos.cpp +++ b/examples/recycling-with-hpx-kokkos.cpp @@ -25,8 +25,6 @@ #include #include -#include -#include #include #include @@ -63,6 +61,7 @@ using float_t = float; // Use correct device exeuction space and memory spaces depending on the activated device // execution space #ifdef KOKKOS_ENABLE_CUDA +#include // Pick executor type using device_executor_t = hpx::kokkos::cuda_executor; // Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling @@ -72,6 +71,7 @@ using kokkos_host_view_t = Kokkos::View; using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_host; #elif KOKKOS_ENABLE_HIP +#include // Pick executor type using device_executor_t = hpx::kokkos::hip_executor; // Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling @@ -81,6 +81,7 @@ using kokkos_host_view_t = Kokkos::View; using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_host; #elif KOKKOS_ENABLE_SYCL +#include // Pick executor type using device_executor_t = hpx::kokkos::sycl_executor; // Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling @@ -210,19 +211,17 @@ void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, auto transfer_fut = hpx::kokkos::deep_copy_async( executor.interface.instance(), host_c, device_c); transfer_fut.get(); + } - // 5. Host-side postprocessing (usually: communication, here: check - // correctness) - for (size_t i = 0; i < entries_per_task; i++) { - if (host_c[i] != 1.0 + 2.0) { - std::cerr << "Task " << task_id << " contained wrong results!!" - << std::endl; - break; - } + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + for (size_t i = 0; i < entries_per_task; i++) { + if (host_c[i] != 1.0 + 2.0) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + break; } } - - } //================================================================================================= From 751adac43f450b6f35687844ce011df023ca1932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Fri, 31 May 2024 02:06:38 +0200 Subject: [PATCH 08/12] Fix and simplify aggregation example --- .../kernel-aggregation-with-hpx-kokkos.cpp | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp index d7b4ecbf..51ffef92 100644 --- a/examples/kernel-aggregation-with-hpx-kokkos.cpp +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -130,19 +130,26 @@ using host_executor_t = hpx::kokkos::hpx_executor; // kernel with a Kokkos::parallel_for template void kernel_add(executor_t &executor, const size_t entries_per_task, - const view_t &input_a, const view_t &input_b, view_t &output_c) -{ + const size_t number_slices, const size_t max_kernels_fused, + const view_t &input_a, const view_t &input_b, + view_t &output_c) { // Define exeuction policy auto execution_policy = Kokkos::Experimental::require( Kokkos::RangePolicy( - executor.instance(), 0, entries_per_task), + executor.instance(), 0, entries_per_task * number_slices), Kokkos::Experimental::WorkItemProperty::HintLightWeight); // Run Kernel with execution policy (and give it some name ideally) Kokkos::parallel_for( "sample vector add kernel", execution_policy, KOKKOS_LAMBDA(size_t index) { - output_c[index] = input_a[index] + input_b[index]; + const size_t slice_id = index / entries_per_task; + const size_t entry_index = index % entries_per_task; + auto [a_slice, b_slice, c_slice] = + cppuddle::kernel_aggregation::map_views_to_slice(slice_id, max_kernels_fused, input_a, + input_b, output_c); + + c_slice[entry_index] = a_slice[entry_index] + b_slice[entry_index]; }); } @@ -182,10 +189,7 @@ void launch_gpu_kernel_task( aggregation_region_name, device_executor_t, void>(max_kernels_fused, [&](auto slice_id, auto number_slices, auto &aggregation_executor) { - cppuddle::kernel_aggregation::allocator_slice - alloc_host = aggregation_executor - .template make_allocator(); + auto alloc_host = aggregation_executor. template make_allocator(); assert(number_slices >= 1); assert(number_slices < max_kernels_fused); @@ -211,11 +215,9 @@ void launch_gpu_kernel_task( host_b_slice[i] = 2.0; } - cppuddle::kernel_aggregation::allocator_slice - alloc_device = aggregation_executor - .template make_allocator(); - aggregated_device_view_t device_a(alloc_device, entries_per_task * max_kernels_fused); + auto alloc_device = aggregation_executor. template make_allocator(); + aggregated_device_view_t device_a(alloc_device, + entries_per_task * max_kernels_fused); aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused); aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused); @@ -225,20 +227,20 @@ void launch_gpu_kernel_task( cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, device_b, host_b); - if (aggregation_executor.sync_aggregation_slices()) { + if (aggregation_executor.sync_aggregation_slices()) { // Only launch one per team kernel_add(aggregation_executor.get_underlying_executor(), - entries_per_task, device_a, device_b, device_c); + entries_per_task, number_slices, max_kernels_fused, device_a, device_b, device_c); } auto transfer_fut = - cppuddle::kernel_aggregation::aggregrated_deep_copy_async( - aggregation_executor, host_c, device_c); + cppuddle::kernel_aggregation::aggregrated_deep_copy_async< + device_executor_t>(aggregation_executor, host_c, device_c); transfer_fut.get(); // 5. Host-side postprocessing (usually: communication, here: check // correctness) for (size_t i = 0; i < entries_per_task; i++) { - if (host_c[i] != 1.0 + 2.0) { + if (host_c_slice[i] != 1.0 + 2.0) { std::cerr << "Task " << task_id << " contained wrong results!!" << std::endl; break; From 703dc23564a787858e7023a44efe44f405a7118d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Sat, 1 Jun 2024 01:57:49 +0200 Subject: [PATCH 09/12] Add more descriptions --- .../kernel-aggregation-with-hpx-kokkos.cpp | 127 ++++++++++++------ 1 file changed, 83 insertions(+), 44 deletions(-) diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp index 51ffef92..d734f8fb 100644 --- a/examples/kernel-aggregation-with-hpx-kokkos.cpp +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -35,19 +35,14 @@ #include #include - -/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated - * applications. The example is extremly similary to its CUDA counterpart, however, uses - * Kokkos for implementation to showcase the required boilerplate and offered features. - * Particulary we focus on how to use a) recycled pinned host - * memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos - * futures and the basic CPU/GPU load balancing based on executor usage in an - * HPX application. To demonstrate these features we just use the simplest of - * kernels: a vector add, that is repeated over a multitude of tasks (with - * varying, artifical dependencies inbetween). So while the compute kernel is - * basic, we still get to see how the CPPuddle/HPX features may be used. +/** \file Work aggregation example using Kokkos with HPX and CPPuddle. Like the + * other examples, we are still using a mere vector-add kernel for simplicity, + * allowing us to focus on how the aggregation actually works. Notably, it works + * similarly for much more complicated application (see Octo-Tiger for an + * example of this) * - * The example has three parts: First the GPU part, then the HPX task graph + * The example has three parts: First the GPU part (using the kernel + * aggregation feature with both host and device code), then the HPX task graph * management and lastly the remaining initialization/boilerplate code */ @@ -61,6 +56,8 @@ // if required. // using float_t = float; +// TODO(daissgr) No reason not to have this in a cppuddle header defining a generic type +// // Use correct device exeuction space and memory spaces depending on the activated device // execution space #ifdef KOKKOS_ENABLE_CUDA @@ -140,35 +137,33 @@ void kernel_add(executor_t &executor, const size_t entries_per_task, Kokkos::Experimental::WorkItemProperty::HintLightWeight); // Run Kernel with execution policy (and give it some name ideally) + // NOTE: Since this kernel may be launched in a way where we combine multiple + // kernel launches into one kernel, it contains another index: the slice ID + // (telling us which kernel a workitem belongs to in case multiple kernels + // were fused. For this simpple kernel there is no difference compute-wise for + // each workitem, however this is not always the case (for instance when + // fusing together multiple reduce kernels), hence we show how to simply + // obtain the slice ID if required here: Kokkos::parallel_for( "sample vector add kernel", execution_policy, KOKKOS_LAMBDA(size_t index) { + // Get slice id (kernel ID in case multiple kernels were fused) const size_t slice_id = index / entries_per_task; const size_t entry_index = index % entries_per_task; + // Obtain correct subviews for current team member (slice) auto [a_slice, b_slice, c_slice] = cppuddle::kernel_aggregation::map_views_to_slice(slice_id, max_kernels_fused, input_a, input_b, output_c); + // Run the actual compute kernel on the mapped subviews c_slice[entry_index] = a_slice[entry_index] + b_slice[entry_index]; }); } -/** Method that demonstrates how one might launch a Kokkos kernel with HPX and - * CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid - * allocating GPU memory and HPX futures to track the status of the - * kernel/memory transfers, this method is expected to be non-blocking both on - * the launching CPU thread and on the GPU (non malloc barriers). Hence, this - * launch method is suitable to quickly launch a multitude of GPU kernels if - * required. - * - * This method uses the following features: - * - Recycled pinned host memory. - * - Recycled device memory. - * - Draws GPU executor from the CPPuddle executor pool. - * - CPU-GPU load balancing based on the number of GPU executors and their queue length. - * - Asynchronous data-transfers and lauching of the kernel. - * - HPX futures to suspend the HPX task until kernel and data-transfers are done. - * - Includes (sample) pre- and post-processing. */ +/** Method that shows how the CPPuddle kernel aggregation feature may be used + * to + * define aggregation regions where teams of tasks can form on-the-fly to + * facilitate a single larger GPU kernel launch than they would individually. */ void launch_gpu_kernel_task( const size_t task_id, const size_t entries_per_task, const size_t max_queue_length, const size_t gpu_id, @@ -184,61 +179,105 @@ void launch_gpu_kernel_task( round_robin_pool_impl>( max_queue_length, gpu_id); + // Defines an aggregation region + // ----------------------------- + // The lambda within will be executed a team of threads (between 1 and + // max_kernels_fused). The "threads" correspond to the HPX tasks that + // originally hit this aggregation region. How many of the tasks are fused + // together to this team depends on the utilization of the underlying + // ressource (GPU stream in this case). If it is not busy, a task will + // immediately get to work on lauching its own kernel (teamsize 1). If the + // ressource is busy however, the current task will be combined with other + // tasks arriving and launch as an aggregated/fused task once either + // max_kernels_fused tasks have arrived or the ressource becomes available. + // + // Note that the worker threads are never blocked during this. If a task is + // waiting to be combined with other tasks it will simply be suspended by HPX static const char aggregation_region_name[] = "vector_add_aggregation"; auto kernel_done_future = cppuddle::kernel_aggregation::aggregation_region< aggregation_region_name, device_executor_t, void>(max_kernels_fused, [&](auto slice_id, auto number_slices, - auto &aggregation_executor) { - auto alloc_host = aggregation_executor. template make_allocator(); - assert(number_slices >= 1); - assert(number_slices < max_kernels_fused); - + auto &aggregation_executor) { + // Within the aggregation region we have multiple extra parameters + // available: + // --> slice_id: The team ID of the current thread + // --> number_slices: Thn number of participating threads (1 <= number_slices <= max_kernels_fused) + // --> aggregation executor: This special-purpose executor + // enables the communication between the team members. It does this three + // ways: 1) function calls wrapped by it are only called once (by the last + // team member encountering the call, the previous members visiting just + // signals their readiness), 2) by providing an allocator that creates data + // structures that are shared between the threads (they are only created + // once by the first thread visiting the allocation, subsequent team members + // use the same allocation) and 3) by providing some primitives that allow + // us to conditionally execute some commands only by the final team members + // (just as if we would have wrapped them). + + // Demonstrate how to execute something once per team (by the last team thread visiting)... if (aggregation_executor.sync_aggregation_slices()) { - // Only executed once per team - number_aggregated_kernel_launches++; + // Only executed once per team + number_aggregated_kernel_launches++; } - // Executed by each team member + // ..and how to execute something for each team members number_kernel_launches++; - // 1. Create recycled Kokkos host views + + + // 1. Create recycled Kokkos host views + // 1a) obtain aggregated host allocator from aggregation executor + auto alloc_host = aggregation_executor. template make_allocator(); + // 1b) create aggregated views (shared by all team members, hence larger than entries_per_task) aggregated_host_view_t host_a(alloc_host, entries_per_task * max_kernels_fused); aggregated_host_view_t host_b(alloc_host, entries_per_task * max_kernels_fused); aggregated_host_view_t host_c(alloc_host, entries_per_task * max_kernels_fused); - + // 1c) use aggregation_executor with to obtain subviews that just map to the current team member auto [host_a_slice, host_b_slice, host_c_slice] = cppuddle::kernel_aggregation::map_views_to_slice( aggregation_executor, host_a, host_b, host_c); - // 2. Host-side preprocessing (usually: communication, here fill dummy - // input) + // 2. Use per-teammember subview for host-side preprocessing (usually: + // communication, here fill dummy input) for (size_t i = 0; i < entries_per_task; i++) { host_a_slice[i] = 1.0; host_b_slice[i] = 2.0; } + // 3 Create subviews + // 3a) obtain aggregated device allocator from aggregation executor auto alloc_device = aggregation_executor. template make_allocator(); + // 3b) create aggregated views (shared by all team members, hence larger than entries_per_task) aggregated_device_view_t device_a(alloc_device, entries_per_task * max_kernels_fused); aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused); aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused); - // 4c. Launch data transfers and kernel + // 4. Launch data transfers and kernel. Only executed once per team (by + // using the aggregated views and the aggregated_deep_copy wrapper that is + // only executed by the last thread. Alternatively one could have used the + // sync_aggregation_slices for an if branch again. cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, device_a, host_a); cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, device_b, host_b); - if (aggregation_executor.sync_aggregation_slices()) { // Only launch one per team + // 5. Launch the kernel. We could have wrapepd this with the aggregation + // executor but this time we use the sync_aggregation if branch again to + // only launch it once + if (aggregation_executor + .sync_aggregation_slices()) { // Only launch one per team kernel_add(aggregation_executor.get_underlying_executor(), entries_per_task, number_slices, max_kernels_fused, device_a, device_b, device_c); } + // 6. Future of the last data copy will be ready once the data transfers for the entire team will be done auto transfer_fut = cppuddle::kernel_aggregation::aggregrated_deep_copy_async< device_executor_t>(aggregation_executor, host_c, device_c); transfer_fut.get(); - // 5. Host-side postprocessing (usually: communication, here: check - // correctness) + // 7. Now each team member has gotten its results. Hence we can proceed + // with + // some example post processing (usually: communication, here checking + // correctness)) for (size_t i = 0; i < entries_per_task; i++) { if (host_c_slice[i] != 1.0 + 2.0) { std::cerr << "Task " << task_id << " contained wrong results!!" From d21465221dc464e4afc5693f5ccdbd553fc4f85a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Sat, 1 Jun 2024 17:16:35 +0200 Subject: [PATCH 10/12] Fix naming --- CMakeLists.txt | 4 ++-- examples/kernel-aggregation-with-hpx-kokkos.cpp | 2 +- examples/{cuda_vector_add.cu => recycling-with-hpx-cuda.cu} | 0 examples/recycling-with-hpx-kokkos.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename examples/{cuda_vector_add.cu => recycling-with-hpx-cuda.cu} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index fb305106..24be243c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,12 +255,12 @@ install(EXPORT CPPuddle NAMESPACE CPPuddle:: DESTINATION ${CMAKE_INSTALL_PREFIX} if (CPPUDDLE_WITH_TESTS) if (CPPUDDLE_WITH_CUDA) add_hpx_executable( - cuda_vector_add + recycling-with-hpx-cuda DEPENDENCIES Boost::boost Boost::program_options HPX::hpx buffer_manager stream_manager COMPONENT_DEPENDENCIES iostreams SOURCES - examples/cuda_vector_add.cu + examples/recycling-with-hpx-cuda.cu ) if (CPPUDDLE_WITH_KOKKOS) diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp index d734f8fb..a0695b7b 100644 --- a/examples/kernel-aggregation-with-hpx-kokkos.cpp +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -454,7 +454,7 @@ bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, return false; } - std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "CPPuddle Aggregation Sample (Vector-Add / Kokkos edition)" << std::endl; std::cout << "=====================================================" << std::endl; if (vm.count("help") == 0u) { hpx::cout << "Running with parameters:" << std::endl diff --git a/examples/cuda_vector_add.cu b/examples/recycling-with-hpx-cuda.cu similarity index 100% rename from examples/cuda_vector_add.cu rename to examples/recycling-with-hpx-cuda.cu diff --git a/examples/recycling-with-hpx-kokkos.cpp b/examples/recycling-with-hpx-kokkos.cpp index fde4fa64..1a3db086 100644 --- a/examples/recycling-with-hpx-kokkos.cpp +++ b/examples/recycling-with-hpx-kokkos.cpp @@ -381,7 +381,7 @@ bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, return false; } - std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "CPPuddle Recycling Sample (Vector-Add / Kokkos edition)" << std::endl; std::cout << "=====================================================" << std::endl; if (vm.count("help") == 0u) { hpx::cout << "Running with parameters:" << std::endl From 924e1440298d0be13f43486e1389b5362bc53926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Sat, 1 Jun 2024 20:34:18 +0200 Subject: [PATCH 11/12] Add more triggers for github actions tests --- .github/workflows/cmake.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index a6fd9f4f..d328b002 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -1,6 +1,11 @@ name: ctest -on: [push] +on: + push: + pull request: # build on PRs -- asks maintainers for approval before running on external PRs + branches: + - master + workflow_dispatch: # add manual trigger for the workflow #env: # BUILD_TYPE: Release From 01da04232325357d072335255a890bd51027279b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Dai=C3=9F?= Date: Sat, 1 Jun 2024 20:37:17 +0200 Subject: [PATCH 12/12] Fix typo.. --- .github/workflows/cmake.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index d328b002..20c15d89 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -2,7 +2,7 @@ name: ctest on: push: - pull request: # build on PRs -- asks maintainers for approval before running on external PRs + pull_request: # build on PRs -- asks maintainers for approval before running on external PRs branches: - master workflow_dispatch: # add manual trigger for the workflow