Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross GPU Reduce Operator Initial Development #1795

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
11 changes: 11 additions & 0 deletions example/ck_tile/18_cross_gpu_reduce/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
find_library(MSCCLPP_LIBRARY mscclpp HINTS /mscclpp/build)
if(NOT MSCCLPP_LIBRARY)
message(FATAL_ERROR "MSCCLPP library not found in /mscclpp/build.")
endif()
find_path(MSCCLPP_INCLUDE_DIR mscclpp/core.hpp HINTS /mscclpp/include)
if(NOT MSCCLPP_INCLUDE_DIR)
message(FATAL_ERROR "MSCCLPP include directory not found in /mscclpp/include.")
endif()
add_executable(example_cross_gpu_reduce cross_gpu_reduce.cpp)
target_include_directories(example_cross_gpu_reduce PRIVATE ${MSCCLPP_INCLUDE_DIR})
target_link_libraries(example_cross_gpu_reduce PRIVATE ${MSCCLPP_LIBRARY})
12 changes: 12 additions & 0 deletions example/ck_tile/18_cross_gpu_reduce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Cross GPU Reduce Communication
This folder contains example for different GPUs communicate with each other to complete the reduce. It is currently a test operator to verify and exam the communication between two GPUs.

## build
```
# in the root of ck_tile
mkdir build && cd build
# you can replace <arch> with the appropriate architecture (for example gfx90a or gfx942) or leave it blank
sh ../script/cmake-ck-dev.sh ../ <arch>
make example_cross_gpu_reduce -j
```
This will result in an executable `build/bin/example_cross_gpu_reduce`
350 changes: 350 additions & 0 deletions example/ck_tile/18_cross_gpu_reduce/cross_gpu_reduce.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
// SPDX-License-Identifier: MIT
// Copyright (c) 2024, Advanced Micro Devices, Inc. All rights reserved.

#include <hip/hip_runtime.h>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include "cross_gpu_reduce.hpp"
#include "ck_tile/host.hpp"
#include "ck_tile/ops/cross_gpu_reduce.hpp"

template <typename InputType, typename OutputType>
struct AllocateAndTransferFunctor
{
// Invoke the memory transfer between GPUs based on whether it is host gpu or slave gpu.
float invoke_transfer(ck_tile::DeviceMem& transfer_buf,
std::vector<ck_tile::DeviceMem>& receive_mem_bufs,
ck_tile::index_t host_gpu,
int device_id,
const ck_tile::ArgParser& arg_parser,
const ck_tile::stream_config& s)
{
ck_tile::index_t M = arg_parser.get_int("M");
ck_tile::index_t N = arg_parser.get_int("N");

constexpr ck_tile::index_t M_Tile = 128;
constexpr ck_tile::index_t N_Tile = 128;

constexpr ck_tile::index_t M_Warp = 2;
constexpr ck_tile::index_t N_Warp = 2;

constexpr ck_tile::index_t M_Warp_Tile = 64;
constexpr ck_tile::index_t N_Warp_Tile = 64;

constexpr int kBlockPerCu = 1;
using Vector = ck_tile::sequence<8, 8>;

using ReduceShape = ck_tile::TileReduceShape<ck_tile::sequence<M_Tile, N_Tile>,
ck_tile::sequence<M_Warp, N_Warp>,
ck_tile::sequence<M_Warp_Tile, N_Warp_Tile>,
Vector>;

using ReducePartitioner = ck_tile::CrossReducePartitioner<ReduceShape>;

using CrossReduceReceivePipelinePolicy = ck_tile::ReduceReceivePipelineDefaultPolicy;

using CrossReduceSendPipelinePolicy = ck_tile::ReduceSendPipelineDefaultPolicy;

using CrossReduceReceivePipeline =
ck_tile::CrossReduceReceivePipelineScaleUp<InputType,
OutputType,
ReduceShape,
CrossReduceReceivePipelinePolicy>;
using CrossReduceSendPipeline = ck_tile::
CrossReduceSendPipelineScaleUp<InputType, ReduceShape, CrossReduceSendPipelinePolicy>;

constexpr ck_tile::index_t kBlockSize = CrossReduceReceivePipeline::BlockSize;

transfer_receive_basic_args args_receive;

args_receive.p_reduce = transfer_buf.GetDeviceBuffer();
args_receive.host_gpu = host_gpu;
args_receive.device_id = static_cast<ck_tile::index_t>(device_id);
args_receive.M = M;
args_receive.N = N;

transfer_send_basic_args args_send;
args_send.p_reduce = transfer_buf.GetDeviceBuffer();
args_send.host_gpu = host_gpu;
args_send.device_id = static_cast<ck_tile::index_t>(device_id);
args_send.M = M;
args_send.N = N;

float ave_time = 0.0;

// using MasterKernel = ck_tile::ReduceSendKernel<CrossReduceSendPipeline>;
using SlaveKernel =
ck_tile::ReduceReceiveKernel<ReducePartitioner, CrossReduceReceivePipeline>;
using MasterKernel = ck_tile::ReduceSendKernel<ReducePartitioner, CrossReduceSendPipeline>;
// Depending on whether to enable the receiving kernel or sending kernel
if(static_cast<ck_tile::index_t>(device_id) == host_gpu)
{
// initialize the receive data buffer and global memory location.
std::array<const void*, MaxSendGPUNum> p_receive_list;
for(size_t i = 0; i < receive_mem_bufs.size(); ++i)
{
p_receive_list[i] = receive_mem_bufs[i].GetDeviceBuffer();
}
args_receive.p_receive_list = p_receive_list;
// initialize the output data buffer.
std::string output_type = arg_parser.get_str("output_type");
if(output_type.compare("float") == 0)
{
ck_tile::HostTensor<OutputType> output_host({M, N});
ck_tile::DeviceMem output_buf(output_host.get_element_space_size_in_bytes());
args_receive.p_output = output_buf.GetDeviceBuffer();
auto kargs_slave = SlaveKernel::MakeKargs(args_receive.p_reduce,
args_receive.p_receive_list,
args_receive.p_output,
args_receive.M,
args_receive.N);
const dim3 grids_slave = SlaveKernel::GridSize(M, N);
ave_time = ck_tile::launch_kernel(
s,
ck_tile::make_kernel<kBlockSize, kBlockPerCu>(
SlaveKernel{}, grids_slave, kBlockSize, 0, kargs_slave));
}
else
{
std::cerr << "Currently, we do not support other output data type." << std::endl;
return -1;
}
}
else
{
auto kargs_master =
MasterKernel::MakeKargs(args_send.p_reduce, args_send.M, args_send.N);
const dim3 grids_master = MasterKernel::GridSize(M, N);
ave_time = ck_tile::launch_kernel(
s,
ck_tile::make_kernel<kBlockSize, kBlockPerCu>(
MasterKernel{}, grids_master, kBlockSize, 0, kargs_master));
}

std::string op_name{"Cross GPU Reduce"};
std::cout << "Run" << op_name << "kernel with M =" << M << "N =" << N << " : " << ave_time
<< "ms" << std::endl;

return ave_time;
}

void operator()(int device_id,
ck_tile::HostTensor<InputType>& host_tensor,
ck_tile::DeviceMem& device_mem,
std::vector<ck_tile::DeviceMem>& receive_mem,
ck_tile::index_t host_gpu,
const ck_tile::ArgParser& arg_parser)
{
hipError_t hip_err_set_device = hipSetDevice(device_id);
if(hip_err_set_device != hipSuccess)
{
std::cerr << "Error setting device " << device_id << ": "
<< hipGetErrorString(hip_err_set_device) << std::endl;
return;
}
if(device_id == host_gpu)
{
for(size_t i = 0; i < receive_mem.size(); ++i)
{
receive_mem[i].Realloc(host_tensor.get_element_space_size_in_bytes());
}
}
// Allocate device memory
device_mem.Realloc(host_tensor.get_element_space_size_in_bytes());
// Transfer data to device
device_mem.ToDevice(host_tensor.data());

int worldSize = arg_parser.get_int("gpu_nums");
setupConnection(device_id,
static_cast<int>(host_gpu),
static_cast<int>(worldSize),
device_mem.GetDeviceBuffer(),
receive_mem,
host_tensor.get_element_space_size_in_bytes());

int n_warmup = arg_parser.get_int("warmup");
int n_repeat = arg_parser.get_int("repeat");

invoke_transfer(device_mem,
receive_mem,
host_gpu,
device_id,
arg_parser,
ck_tile::stream_config{nullptr, true, 1, n_warmup, n_repeat});
}
};

template <typename InputType, typename OutputType>
bool run_cross_gpu_reduce(ck_tile::ArgParser arg_parser)
{
ck_tile::index_t gpu_nums = arg_parser.get_int("gpu_nums");
ck_tile::index_t host_gpu = arg_parser.get_int("host_gpu");
ck_tile::index_t transfer_dim1 = arg_parser.get_int("M");
ck_tile::index_t transfer_dim2 = arg_parser.get_int("N");

// Validate arguments
if(gpu_nums < 1)
{
std::cerr << "Invalid number of GPUs specified." << std::endl;
return -1;
}
// Examine how many gpus inside the server system.
int device_count = 0;
hipError_t hip_err_device_count = hipGetDeviceCount(&device_count);
if(hip_err_device_count != hipSuccess)
{
std::cerr << "Error getting device count: " << hipGetErrorString(hip_err_device_count)
<< std::endl;
return -1;
}

// Make sure the gpus is larger or equals to the required gpu_nums.
if(device_count < gpu_nums)
{
std::cerr << "The available GPUs in the system is less than required. All available GPUs: "
<< device_count << std::endl;
}

if(host_gpu < 0 || host_gpu >= device_count)
{
std::cerr << "Invalid host GPU index specified. Using GPU 0 as host GPU." << std::endl;
host_gpu = 0;
}

// Make sure that we could open each one of the GPU.
// Print device properties
for(int i = 0; i < gpu_nums; ++i)
{
hipDeviceProp_t device_prop;
hipError_t hip_err_device_prop = hipGetDeviceProperties(&device_prop, i);
if(hip_err_device_prop != hipSuccess)
{
std::cerr << "Error getting device properties for device " << i << ": "
<< hipGetErrorString(hip_err_device_prop) << std::endl;
return -1;
}
std::cout << "GPU " << i << ": " << device_prop.name << std::endl;
}

std::vector<int> device_list(gpu_nums);
std::vector<ck_tile::HostTensor<InputType>> transfer_tensor_host_list;
transfer_tensor_host_list.reserve(gpu_nums);
std::vector<ck_tile::DeviceMem> transfer_bufs(gpu_nums);
std::vector<ck_tile::DeviceMem> slave_receive_bufs(gpu_nums - 1);
std::vector<std::thread> threads;

AllocateAndTransferFunctor<InputType, OutputType> allocateAndTransfer;

// Initialize host tensors
for(int i = 0; i < gpu_nums; ++i)
{
device_list[i] = i; // Adjust based on available GPUs
std::vector<int> tensor_dims = {transfer_dim1, transfer_dim2};
transfer_tensor_host_list.emplace_back(tensor_dims);
ck_tile::FillUniformDistribution<InputType>{-5.f, 5.f}(transfer_tensor_host_list.back());
// Enable P2P access between GPUs
if(i != host_gpu)
{
int canAccessPeer = 0;
hipError_t err_peer =
hipDeviceCanAccessPeer(&canAccessPeer, device_list[host_gpu], device_list[i]);
if(err_peer != hipSuccess || !canAccessPeer)
{
std::cerr << "P2P not supported between device " << device_list[host_gpu]
<< " and device " << device_list[i] << std::endl;
return -1; // Handle error accordingly.
}
else
{
// Enable P2P access from host GPU to device i.
hipError_t hip_err_set_device_host = hipSetDevice(device_list[host_gpu]);
if(hip_err_set_device_host != hipSuccess)
{
std::cerr << "Error setting the host device " << host_gpu << ": "
<< hipGetErrorString(hip_err_set_device_host) << std::endl;
return -1;
}
hipError_t err_peer_host = hipDeviceEnablePeerAccess(device_list[i], 0);
if(err_peer_host != hipSuccess && err_peer_host != hipErrorPeerAccessAlreadyEnabled)
{
std::cerr << "Error enabling peer access from host " << device_list[host_gpu]
<< " to device " << device_list[i] << ": "
<< hipGetErrorString(err_peer_host) << std::endl;
return -1;
}
// Enable P2P access from device i to host GPU.
hipError_t hip_err_set_device_send = hipSetDevice(device_list[i]);
if(hip_err_set_device_send != hipSuccess)
{
std::cerr << "Error setting the host device " << host_gpu << ": "
<< hipGetErrorString(hip_err_set_device_send) << std::endl;
return -1;
}
hipError_t err_peer_device = hipDeviceEnablePeerAccess(device_list[host_gpu], 0);
if(err_peer_device != hipSuccess &&
err_peer_device != hipErrorPeerAccessAlreadyEnabled)
{
std::cerr << "Error enabling peer access from device " << device_list[i]
<< " to device " << device_list[host_gpu] << ": "
<< hipGetErrorString(err_peer_device) << std::endl;
return -1;
}
}
}
}

for(int i = 0; i < gpu_nums; ++i)
{
hipError_t hip_device_sync_enable = hipSetDevice(device_list[i]);
if(hip_device_sync_enable != hipSuccess)
{
std::cerr << "Error enable the device for synchronization" << std::endl;
return -1;
}
hipError_t hip_device_sync_err = hipDeviceSynchronize();
if(hip_device_sync_err != hipSuccess)
{
std::cerr << "Error in complete the device for synchronization" << std::endl;
return -1;
}
}

for(int i = 0; i < gpu_nums; ++i)
{
threads.emplace_back(allocateAndTransfer,
device_list[i],
std::ref(transfer_tensor_host_list[i]),
std::ref(transfer_bufs[i]),
std::ref(slave_receive_bufs),
host_gpu,
arg_parser);
}

// Wait for all threads to complete
for(auto& t : threads)
{
t.join();
}

bool pass = true;
return !pass;
}

int main(int argc, char* argv[])
{
auto [result, arg_parser] = create_args(argc, argv);
if(!result)
return -1;
std::string prec = arg_parser.get_str("pr");
bool run_result = true;
if(prec.compare("fp16") == 0)
{
run_result &= run_cross_gpu_reduce<ck_tile::fp16_t, float>(arg_parser);
}

return run_result ? 0 : 1;
}
Loading