diff --git a/experimental/ukernel/benchmarks/bench_transport.cc b/experimental/ukernel/benchmarks/bench_transport.cc index 67ebec5c8..a0ea7bce0 100644 --- a/experimental/ukernel/benchmarks/bench_transport.cc +++ b/experimental/ukernel/benchmarks/bench_transport.cc @@ -25,6 +25,7 @@ static constexpr int kIpcThroughputWindow = 8; static constexpr int kUcclThroughputWindow = 4; static constexpr int kTcpThroughputWindow = 1; static constexpr uint64_t kBenchNamedMrGeneration = 1; +static constexpr uint64_t kBenchIpcBindingVersion = 1; static PreferredTransport parse_transport(char const* value) { if (strcmp(value, "auto") == 0) return PreferredTransport::Auto; @@ -204,7 +205,9 @@ static bool exchange_remote_recv_mrs(Communicator& comm, int peer_rank, static uint32_t remote_recv_slot_id(PeerTransportKind kind, std::vector const& remote_recv_mrs, int slot) { - if (kind != PeerTransportKind::Uccl) return 0; + if (kind != PeerTransportKind::Uccl && kind != PeerTransportKind::Ipc) { + return 0; + } return remote_recv_mrs.at(static_cast(slot)).id; } @@ -212,9 +215,44 @@ static std::optional remote_recv_slice( PeerTransportKind kind, std::vector const& remote_recv_mrs, int slot) { uint32_t remote_id = remote_recv_slot_id(kind, remote_recv_mrs, slot); if (remote_id == 0) return std::nullopt; + if (kind == PeerTransportKind::Ipc) { + // This benchmark keeps the recv slots stable for the communicator lifetime. + // Publish a fixed versioned IPC buffer view up front so sends can exercise + // the by-mem_id direct path instead of the request-level handshake. + return RemoteSlice{remote_id, 0, {}, kBenchIpcBindingVersion}; + } return RemoteSlice{remote_id, 0}; } +static bool publish_ipc_recv_buffers(Communicator& comm, int peer_rank, + PeerTransportKind kind, + std::vector const& local_recv_mrs, + std::vector const& recv_slots, + size_t msg_size) { + if (kind != PeerTransportKind::Ipc) return true; + for (size_t i = 0; i < local_recv_mrs.size(); ++i) { + if (!comm.notify_ipc_buffer(peer_rank, local_recv_mrs[i].id, recv_slots[i], + msg_size, kBenchIpcBindingVersion)) { + return false; + } + } + return true; +} + +static bool prefetch_remote_ipc_recv_buffers( + Communicator& comm, int peer_rank, PeerTransportKind kind, + std::vector const& remote_recv_mrs) { + if (kind != PeerTransportKind::Ipc) return true; + for (auto const& remote_mr : remote_recv_mrs) { + if (remote_mr.id == 0) return false; + if (!comm.wait_ipc_buffer(peer_rank, remote_mr.id, + kBenchIpcBindingVersion)) { + return false; + } + } + return true; +} + static unsigned submit_send(Communicator& comm, int peer_rank, uint32_t local_send_mr_id, size_t msg_size, PeerTransportKind kind, @@ -329,7 +367,15 @@ void run_sender(int gpu_id, int rank, int peer_rank, int world_size, local_send_mr.id, throughput_window); std::vector remote_recv_mrs; - if (transport_kind == PeerTransportKind::Uccl) { + if (transport_kind == PeerTransportKind::Ipc || + transport_kind == PeerTransportKind::Uccl) { + if (!publish_ipc_recv_buffers(comm, peer_rank, transport_kind, + local_recv_mrs, recv_slots, msg_size)) { + fprintf(stderr, "[Sender %d] Failed to publish IPC receive buffers\n", + rank); + cleanup(); + return; + } if (!exchange_remote_recv_mrs(comm, peer_rank, local_recv_mrs, remote_recv_mrs, true)) { fprintf(stderr, "[Sender %d] Failed to exchange remote receive MRs\n", @@ -337,6 +383,13 @@ void run_sender(int gpu_id, int rank, int peer_rank, int world_size, cleanup(); return; } + if (!prefetch_remote_ipc_recv_buffers(comm, peer_rank, transport_kind, + remote_recv_mrs)) { + fprintf(stderr, "[Sender %d] Failed to prefetch remote IPC buffers\n", + rank); + cleanup(); + return; + } } printf("[Sender %d] Remote memory info received\n", rank); @@ -602,7 +655,15 @@ void run_receiver(int gpu_id, int rank, int peer_rank, int world_size, local_send_mr.id, throughput_window); std::vector remote_recv_mrs; - if (transport_kind == PeerTransportKind::Uccl) { + if (transport_kind == PeerTransportKind::Ipc || + transport_kind == PeerTransportKind::Uccl) { + if (!publish_ipc_recv_buffers(comm, peer_rank, transport_kind, + local_recv_mrs, recv_slots, msg_size)) { + fprintf(stderr, "[Receiver %d] Failed to publish IPC receive buffers\n", + rank); + cleanup(); + return; + } if (!exchange_remote_recv_mrs(comm, peer_rank, local_recv_mrs, remote_recv_mrs, false)) { fprintf(stderr, "[Receiver %d] Failed to exchange remote receive MRs\n", @@ -610,6 +671,13 @@ void run_receiver(int gpu_id, int rank, int peer_rank, int world_size, cleanup(); return; } + if (!prefetch_remote_ipc_recv_buffers(comm, peer_rank, transport_kind, + remote_recv_mrs)) { + fprintf(stderr, "[Receiver %d] Failed to prefetch remote IPC buffers\n", + rank); + cleanup(); + return; + } } printf("[Receiver %d] Remote memory info received\n", rank); diff --git a/experimental/ukernel/py/bench_p2p.py b/experimental/ukernel/py/bench_p2p.py index 76c9dbcec..a55e497ee 100644 --- a/experimental/ukernel/py/bench_p2p.py +++ b/experimental/ukernel/py/bench_p2p.py @@ -23,30 +23,73 @@ def bench_p2p_ukernel(comm, peer, size_bytes, warmup, iters): n = size_bytes // 4 # float32 send_buf = torch.empty(n, device="cuda", dtype=torch.float32) recv_buf = torch.empty(n, device="cuda", dtype=torch.float32) - comm.pin_tensor(send_buf) - comm.pin_tensor(recv_buf) + send_mr_id = comm.pin_tensor(send_buf) + recv_mr_id = comm.pin_tensor(recv_buf) rank = comm.rank + use_direct_ipc = comm.peer_transport(peer) == "ipc" and comm.same_host(peer) + remote_recv_mr_id = 0 + ipc_binding_version = 1 + if use_direct_ipc: + remote_recv_mr_id = _exchange_peer_i64(int(recv_mr_id), rank) + if not comm.notify_ipc_tensor( + peer, recv_mr_id, recv_buf, 0, size_bytes, ipc_binding_version + ): + raise RuntimeError( + f"notify_ipc_tensor(peer={peer}, mr={recv_mr_id}) failed" + ) + if not comm.wait_ipc_buffer(peer, remote_recv_mr_id, ipc_binding_version): + raise RuntimeError( + f"wait_ipc_buffer(peer={peer}, mr={remote_recv_mr_id}) failed" + ) + dist.barrier() + + def _send(): + if use_direct_ipc: + comm.send_direct( + peer, send_buf, remote_recv_mr_id, ipc_binding_version, 0, size_bytes, 0 + ) + else: + comm.send(peer, send_buf) + # Server (rank 0) recv first, client (rank 1) send first if rank == 0: for _ in range(warmup): - comm.recv(peer, recv_buf) - comm.send(peer, send_buf) # echo back + if use_direct_ipc: + req = comm.irecv(peer, recv_buf) + comm.wait_finish(req) + else: + comm.recv(peer, recv_buf) + _send() # echo back torch.cuda.synchronize() t0 = time.perf_counter() for _ in range(iters): - comm.recv(peer, recv_buf) - comm.send(peer, send_buf) + if use_direct_ipc: + req = comm.irecv(peer, recv_buf) + comm.wait_finish(req) + else: + comm.recv(peer, recv_buf) + _send() torch.cuda.synchronize() else: for _ in range(warmup): - comm.send(peer, send_buf) - comm.recv(peer, recv_buf) + if use_direct_ipc: + req = comm.irecv(peer, recv_buf) + _send() + if use_direct_ipc: + comm.wait_finish(req) + else: + comm.recv(peer, recv_buf) torch.cuda.synchronize() t0 = time.perf_counter() for _ in range(iters): - comm.send(peer, send_buf) - comm.recv(peer, recv_buf) + if use_direct_ipc: + req = comm.irecv(peer, recv_buf) + _send() + if use_direct_ipc: + comm.wait_finish(req) + else: + comm.recv(peer, recv_buf) torch.cuda.synchronize() elapsed = time.perf_counter() - t0 comm.unpin_tensor(send_buf) @@ -137,6 +180,15 @@ def _exchange_local_gpu_idx(local_gpu_idx: int, rank: int) -> int: return remote_gpu_idx +def _exchange_peer_i64(local_value: int, rank: int) -> int: + if rank == 0: + _send_i64(local_value, dst=1) + return _recv_i64(src=1) + remote_value = _recv_i64(src=0) + _send_i64(local_value, dst=0) + return remote_value + + def _create_uccl_endpoint(local_gpu_idx: int, rank: int): # Support multiple p2p python binding signatures. for num_cpus in (rank, 0): diff --git a/experimental/ukernel/py/setup.py b/experimental/ukernel/py/setup.py index fe76d8707..5b2061f5a 100644 --- a/experimental/ukernel/py/setup.py +++ b/experimental/ukernel/py/setup.py @@ -2,7 +2,8 @@ from pathlib import Path from setuptools import setup -from torch.utils.cpp_extension import BuildExtension, CUDAExtension +import torch +from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CppExtension try: import nanobind @@ -16,11 +17,17 @@ ROOT = Path(__file__).resolve().parent.parent UCCL_ROOT = ROOT.parent.parent CUDA_HOME = Path("/usr/local/cuda") +ROCM_HOME = Path(os.environ.get("ROCM_HOME", "/opt/rocm")) +USE_ROCM = getattr(torch.version, "hip", None) is not None +BUILD_CCL_ON_ROCM = os.environ.get("UKERNEL_BUILD_CCL_ON_ROCM", "0") == "1" +RDMACM_SO = Path("/usr/lib/x86_64-linux-gnu/librdmacm.so.1") GDRCOPY_INCLUDE_DIR = Path( os.environ.get("GDRCOPY_INCLUDE_DIR", "/usr/local/include") ) GDRCOPY_LIBDIR = os.environ.get("GDRCOPY_LIBDIR", "").strip() -RDMA_STATIC = UCCL_ROOT / "collective" / "rdma" / "librdma.a" +RDMA_STATIC = ( + UCCL_ROOT / "collective" / "rdma" / ("librdma_hip.a" if USE_ROCM else "librdma.a") +) NANOBIND_ROOT = Path(nanobind.__file__).resolve().parent @@ -84,134 +91,119 @@ def rel(path: Path) -> str: rel(UCCL_ROOT / "collective" / "rdma"), rel(UCCL_ROOT / "include"), str(GDRCOPY_INCLUDE_DIR), - str(CUDA_HOME / "include"), ] +if USE_ROCM: + include_dirs.append(str(ROCM_HOME / "include")) +else: + include_dirs.append(str(CUDA_HOME / "include")) -library_dirs = [ - str(CUDA_HOME / "lib64"), - "/usr/local/lib", - "/usr/lib", - "/usr/lib64", -] -runtime_library_dirs = [str(CUDA_HOME / "lib64")] +library_dirs = ["/usr/local/lib", "/usr/lib", "/usr/lib64", "/usr/lib/x86_64-linux-gnu"] +runtime_library_dirs = [] +if USE_ROCM: + library_dirs.append(str(ROCM_HOME / "lib")) + runtime_library_dirs.append(str(ROCM_HOME / "lib")) +else: + library_dirs.append(str(CUDA_HOME / "lib64")) + runtime_library_dirs.append(str(CUDA_HOME / "lib64")) if GDRCOPY_LIBDIR: library_dirs.append(GDRCOPY_LIBDIR) runtime_library_dirs.append(str(Path(GDRCOPY_LIBDIR).resolve())) -ext = CUDAExtension( +common_cxx_args = [ + "-O3", + "-std=c++17", + "-Wall", + "-Wno-unused-function", + "-Wno-sign-compare", + "-Wno-reorder", + "-Wno-unused-variable", + "-Wno-unused-label", + "-Wno-unused-but-set-variable", + "-Wno-stringop-overread", + "-Wno-narrowing", + "-pthread", + "-fPIC", + "-DUKERNEL_ENABLE_TMA=0", +] +if USE_ROCM: + common_cxx_args.extend(["-D__HIP_PLATFORM_AMD__", "-DUSE_ROCM=1", "-DHIPBLAS_V2"]) + +cuda_nvcc_args = [ + "-O3", + "-std=c++20", + "--expt-extended-lambda", + "--expt-relaxed-constexpr", + "-DKITTENS_HOPPER", + "-DUKERNEL_ENABLE_TMA=0", + "-gencode", + "arch=compute_80,code=sm_80", + "-gencode", + "arch=compute_86,code=sm_86", + "-gencode", + "arch=compute_89,code=sm_89", +] + +ExtensionCls = CppExtension if USE_ROCM else CUDAExtension + +common_libraries = [ + "gflags", + "z", + "ibverbs", + "nl-3", + "nl-route-3", + "pthread", + "numa", +] +if USE_ROCM: + common_libraries.extend(["amdhip64", "elf", "dl"]) +else: + common_libraries.append("rdmacm") + common_libraries.extend(["cudart", "cuda", "gdrapi"]) + +extra_link_args = [] +if USE_ROCM and RDMACM_SO.exists(): + extra_link_args.append(str(RDMACM_SO.resolve())) + +ext = ExtensionCls( name="ukernel_ccl._C", sources=sources, include_dirs=include_dirs, - extra_compile_args={ - "cxx": [ - "-O3", - "-std=c++17", - "-Wall", - "-Wno-unused-function", - "-Wno-sign-compare", - "-Wno-reorder", - "-Wno-unused-variable", - "-Wno-unused-label", - "-Wno-unused-but-set-variable", - "-Wno-stringop-overread", - "-Wno-narrowing", - "-pthread", - "-fPIC", - "-DUKERNEL_ENABLE_TMA=0", - ], - "nvcc": [ - "-O3", - "-std=c++20", - "--expt-extended-lambda", - "--expt-relaxed-constexpr", - "-DKITTENS_HOPPER", - "-DUKERNEL_ENABLE_TMA=0", - "-gencode", - "arch=compute_80,code=sm_80", - "-gencode", - "arch=compute_86,code=sm_86", - "-gencode", - "arch=compute_89,code=sm_89", - ], + extra_compile_args=common_cxx_args if USE_ROCM else { + "cxx": common_cxx_args, + "nvcc": cuda_nvcc_args, }, library_dirs=library_dirs, - libraries=[ - "cudart", - "cuda", - "gflags", - "z", - "ibverbs", - "nl-3", - "nl-route-3", - "pthread", - "rdmacm", - "gdrapi", - "numa", - ], + libraries=common_libraries, extra_objects=[str(RDMA_STATIC.resolve())], + extra_link_args=extra_link_args, runtime_library_dirs=runtime_library_dirs, ) -p2p_ext = CUDAExtension( +p2p_ext = ExtensionCls( name="ukernel_p2p._C", sources=p2p_sources, include_dirs=include_dirs, - extra_compile_args={ - "cxx": [ - "-O3", - "-std=c++17", - "-Wall", - "-Wno-unused-function", - "-Wno-sign-compare", - "-Wno-reorder", - "-Wno-unused-variable", - "-Wno-unused-label", - "-Wno-unused-but-set-variable", - "-Wno-stringop-overread", - "-Wno-narrowing", - "-pthread", - "-fPIC", - "-DUKERNEL_ENABLE_TMA=0", - ], - "nvcc": [ - "-O3", - "-std=c++20", - "--expt-extended-lambda", - "--expt-relaxed-constexpr", - "-DKITTENS_HOPPER", - "-DUKERNEL_ENABLE_TMA=0", - "-gencode", - "arch=compute_80,code=sm_80", - "-gencode", - "arch=compute_86,code=sm_86", - "-gencode", - "arch=compute_89,code=sm_89", - ], + extra_compile_args=common_cxx_args if USE_ROCM else { + "cxx": common_cxx_args, + "nvcc": cuda_nvcc_args, }, library_dirs=library_dirs, - libraries=[ - "cudart", - "cuda", - "gflags", - "z", - "ibverbs", - "nl-3", - "nl-route-3", - "pthread", - "rdmacm", - "gdrapi", - "numa", - ], + libraries=common_libraries, extra_objects=[str(RDMA_STATIC.resolve())], + extra_link_args=extra_link_args, runtime_library_dirs=runtime_library_dirs, ) +ext_modules = [p2p_ext] +if not USE_ROCM or BUILD_CCL_ON_ROCM: + ext_modules.insert(0, ext) + setup( name="ukernel-ccl", version="0.1.0", packages=["ukernel_ccl", "ukernel_p2p"], package_dir={"ukernel_ccl": "ukernel_ccl", "ukernel_p2p": "ukernel_p2p"}, - ext_modules=[ext, p2p_ext], + ext_modules=ext_modules, cmdclass={"build_ext": BuildExtension}, ) diff --git a/experimental/ukernel/py/ukernel_p2p.cpp b/experimental/ukernel/py/ukernel_p2p.cpp index 382af88d7..1aef056dd 100644 --- a/experimental/ukernel/py/ukernel_p2p.cpp +++ b/experimental/ukernel/py/ukernel_p2p.cpp @@ -199,6 +199,80 @@ class Communicator { return req; } + uint64_t isend_direct(int peer_rank, nb::handle tensor, uint32_t remote_mr_id, + uint64_t binding_version = 1, size_t offset = 0, + size_t len = 0, size_t remote_offset = 0) { + if (remote_mr_id == 0) { + throw std::invalid_argument( + "isend_direct requires non-zero remote_mr_id"); + } + torch::Tensor t = tensor_from_python(tensor, "tensor"); + if (!t.is_cuda()) { + throw std::invalid_argument("isend_direct requires a CUDA tensor"); + } + if (!t.is_contiguous()) { + throw std::invalid_argument("isend_direct requires a contiguous tensor"); + } + size_t elem_bytes = static_cast(t.element_size()); + size_t total_bytes = static_cast(t.numel()) * elem_bytes; + if (len == 0) len = total_bytes; + if (offset + len > total_bytes) { + throw std::invalid_argument( + "isend_direct offset+len exceeds tensor size"); + } + auto pinned_mr = find_pinned_mr_id(t.data_ptr(), total_bytes); + uint32_t mr_id = pinned_mr.has_value() + ? *pinned_mr + : comm_->reg_mr(t.data_ptr(), total_bytes).id; + uint64_t req = comm_->isend( + peer_rank, UKernel::Transport::LocalSlice{mr_id, offset, len}, + UKernel::Transport::RemoteSlice{ + remote_mr_id, remote_offset, {}, binding_version}); + if (req == 0) { + if (!pinned_mr.has_value()) { + comm_->dereg_mr(t.data_ptr()); + } + return 0; + } + track_request(req, std::move(t), + pinned_mr.has_value() ? nullptr : t.data_ptr()); + return req; + } + + bool notify_ipc_tensor(int peer_rank, uint32_t ipc_id, nb::handle tensor, + size_t offset = 0, size_t len = 0, + uint64_t binding_version = 0) { + torch::Tensor t = tensor_from_python(tensor, "tensor"); + if (!t.is_cuda()) { + throw std::invalid_argument("notify_ipc_tensor requires a CUDA tensor"); + } + if (!t.is_contiguous()) { + throw std::invalid_argument( + "notify_ipc_tensor requires a contiguous tensor"); + } + size_t elem_bytes = static_cast(t.element_size()); + size_t total_bytes = static_cast(t.numel()) * elem_bytes; + if (offset > total_bytes) { + throw std::invalid_argument( + "notify_ipc_tensor offset exceeds tensor size"); + } + if (len == 0) len = total_bytes - offset; + if (offset + len > total_bytes) { + throw std::invalid_argument( + "notify_ipc_tensor offset+len exceeds tensor size"); + } + void* ptr = + reinterpret_cast(reinterpret_cast(t.data_ptr()) + + static_cast(offset)); + return comm_->notify_ipc_buffer(peer_rank, ipc_id, ptr, len, + binding_version); + } + + bool wait_ipc_buffer(int peer_rank, uint32_t ipc_id, + uint64_t binding_version = 0) { + return comm_->wait_ipc_buffer(peer_rank, ipc_id, binding_version); + } + bool poll(uint64_t req) { try { bool done = comm_->poll(static_cast(req)); @@ -259,6 +333,14 @@ class Communicator { wait_finish(req); } + void send_direct(int peer_rank, nb::handle tensor, uint32_t remote_mr_id, + uint64_t binding_version = 1, size_t offset = 0, + size_t len = 0, size_t remote_offset = 0) { + uint64_t req = isend_direct(peer_rank, tensor, remote_mr_id, + binding_version, offset, len, remote_offset); + wait_finish(req); + } + void recv(int peer_rank, nb::handle tensor) { uint64_t req = irecv(peer_rank, tensor); wait_finish(req); @@ -332,8 +414,19 @@ NB_MODULE(TORCH_EXTENSION_NAME, m) { .def("unpin_tensor", &Communicator::unpin_tensor, nb::arg("tensor")) .def("isend", &Communicator::isend, nb::arg("peer_rank"), nb::arg("tensor"), nb::arg("offset") = 0, nb::arg("len") = 0) + .def("isend_direct", &Communicator::isend_direct, nb::arg("peer_rank"), + nb::arg("tensor"), nb::arg("remote_mr_id"), + nb::arg("binding_version") = 1, nb::arg("offset") = 0, + nb::arg("len") = 0, nb::arg("remote_offset") = 0) .def("irecv", &Communicator::irecv, nb::arg("peer_rank"), nb::arg("tensor"), nb::arg("offset") = 0, nb::arg("len") = 0) + .def("notify_ipc_tensor", &Communicator::notify_ipc_tensor, + nb::arg("peer_rank"), nb::arg("ipc_id"), nb::arg("tensor"), + nb::arg("offset") = 0, nb::arg("len") = 0, + nb::arg("binding_version") = 0) + .def("wait_ipc_buffer", &Communicator::wait_ipc_buffer, + nb::arg("peer_rank"), nb::arg("ipc_id"), + nb::arg("binding_version") = 0) .def("poll", &Communicator::poll, nb::arg("req")) .def("release", &Communicator::release, nb::arg("req")) .def("wait_finish", &Communicator::wait_finish, nb::arg("req")) @@ -343,6 +436,10 @@ NB_MODULE(TORCH_EXTENSION_NAME, m) { nb::arg("peer_rank")) .def("same_host", &Communicator::same_host, nb::arg("peer_rank")) .def("send", &Communicator::send, nb::arg("peer_rank"), nb::arg("tensor")) + .def("send_direct", &Communicator::send_direct, nb::arg("peer_rank"), + nb::arg("tensor"), nb::arg("remote_mr_id"), + nb::arg("binding_version") = 1, nb::arg("offset") = 0, + nb::arg("len") = 0, nb::arg("remote_offset") = 0) .def("recv", &Communicator::recv, nb::arg("peer_rank"), nb::arg("tensor")); } diff --git a/experimental/ukernel/src/transport/adapter/ipc_adapter.cc b/experimental/ukernel/src/transport/adapter/ipc_adapter.cc index 84d88c0af..98b10a3ca 100644 --- a/experimental/ukernel/src/transport/adapter/ipc_adapter.cc +++ b/experimental/ukernel/src/transport/adapter/ipc_adapter.cc @@ -5,6 +5,10 @@ #include #include #include +#include +#include +#include +#include namespace UKernel { namespace Transport { @@ -15,6 +19,7 @@ constexpr int kIpcControlPollTimeoutMs = 50; constexpr size_t kTaskRingSize = 1024; constexpr size_t kIpcSizePerEngine = 1ul << 20; constexpr int kIpcControlTimeoutMs = 50000; +constexpr int kWorkerIdleSpinIters = 256; bool ipc_force_relay_enabled() { char const* env = std::getenv("UHM_IPC_FORCE_RELAY"); @@ -41,16 +46,75 @@ bool enqueue_one_request_id(jring_t* ring, unsigned elem, return !stop.load(std::memory_order_acquire); } +void wait_for_worker_work(std::atomic const& stop, + std::atomic const& pending, std::mutex& cv_mu, + std::condition_variable& cv) { + for (int i = 0; i < kWorkerIdleSpinIters; ++i) { + if (stop.load(std::memory_order_relaxed) || + pending.load(std::memory_order_relaxed) > 0) { + return; + } + std::this_thread::yield(); + } + + std::unique_lock lk(cv_mu); + cv.wait(lk, [&] { + return stop.load(std::memory_order_relaxed) || + pending.load(std::memory_order_relaxed) > 0; + }); +} + +// POSIX SHM helpers for per-peer done_seq channel. +// Lower rank creates, higher rank opens. One file per (lo,hi) pair. + +static IpcChannelPair* create_ipc_channel_pair(std::string const& name) { + shm_unlink(name.c_str()); + int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0600); + if (fd < 0) return nullptr; + if (ftruncate(fd, sizeof(IpcChannelPair)) < 0) { + close(fd); + shm_unlink(name.c_str()); + return nullptr; + } + void* ptr = mmap(nullptr, sizeof(IpcChannelPair), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + close(fd); + if (ptr == MAP_FAILED) { + shm_unlink(name.c_str()); + return nullptr; + } + return new (ptr) IpcChannelPair{}; +} + +static IpcChannelPair* open_ipc_channel_pair(std::string const& name, + int timeout_ms) { + auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + while (true) { + int fd = shm_open(name.c_str(), O_RDWR, 0); + if (fd >= 0) { + void* ptr = mmap(nullptr, sizeof(IpcChannelPair), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + close(fd); + if (ptr != MAP_FAILED) return static_cast(ptr); + } + if (std::chrono::steady_clock::now() >= deadline) return nullptr; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } +} + } // namespace IpcAdapter::IpcAdapter(Communicator* comm, std::string ring_namespace, int self_local_id) - : next_match_seq_per_peer_(comm->world_size(), + : ring_namespace_(ring_namespace), + next_match_seq_per_peer_(comm->world_size(), std::array{1, 1}), shm_control_(std::make_shared( comm->rank(), comm->world_size(), std::move(ring_namespace), self_local_id)), comm_(comm) { + done_channels_.resize(comm->world_size()); send_task_ring_ = UKernel::Transport::create_ring(sizeof(unsigned), kTaskRingSize); recv_task_ring_ = @@ -76,10 +140,14 @@ void IpcAdapter::shutdown() { if (!shutdown_started_.compare_exchange_strong(expected, true)) return; stop_.store(true, std::memory_order_release); - cv_.notify_all(); + send_cv_.notify_all(); + recv_cv_.notify_all(); if (send_thread_.joinable()) send_thread_.join(); if (recv_thread_.joinable()) recv_thread_.join(); + for (int peer_rank = 0; peer_rank < comm_->world_size(); ++peer_rank) { + if (peer_rank != comm_->rank()) teardown_done_channel(peer_rank); + } if (shm_control_ != nullptr) { for (int peer_rank = 0; peer_rank < comm_->world_size(); ++peer_rank) { if (peer_rank == comm_->rank()) continue; @@ -108,6 +176,95 @@ void IpcAdapter::shutdown() { } } +bool IpcAdapter::setup_done_channel(int peer_rank) { + if (peer_rank < 0 || peer_rank >= (int)done_channels_.size()) return false; + auto& ch = done_channels_[peer_rank]; + if (ch.shm_base) return true; + + int self = comm_->rank(); + int lo = std::min(self, peer_rank); + int hi = std::max(self, peer_rank); + ch.is_creator = (self == lo); + ch.shm_name = "/" + ring_namespace_ + "_ipc_" + std::to_string(lo) + "_" + + std::to_string(hi); + + IpcChannelPair* pair = nullptr; + if (ch.is_creator) { + pair = create_ipc_channel_pair(ch.shm_name); + if (!pair) { + fprintf(stderr, "[IPC] failed to create channel SHM %s\n", + ch.shm_name.c_str()); + return false; + } + } else { + pair = open_ipc_channel_pair(ch.shm_name, /*timeout_ms=*/10000); + if (!pair) { + fprintf(stderr, "[IPC] failed to open channel SHM %s\n", + ch.shm_name.c_str()); + return false; + } + } + ch.shm_base = pair; + // Lower rank owns done_seq_lo_to_hi (its sends); higher rank owns hi_to_lo. + if (ch.is_creator) { + ch.local_ptr = &pair->done_seq_lo_to_hi; + ch.remote_ptr = &pair->done_seq_hi_to_lo; + // Creator's pointers are set now, but the fast path is NOT active until + // wait_done_channel_peer_ready() confirms the opener has mapped this SHM. + // Until then local_ptr is non-null but send_one must not use it — that + // gate is enforced in wait_done_channel_peer_ready (which clears the + // pointers on timeout). + } else { + ch.local_ptr = &pair->done_seq_hi_to_lo; + ch.remote_ptr = &pair->done_seq_lo_to_hi; + // Signal to the creator that we have successfully mapped the SHM. + pair->opener_ready.store(1, std::memory_order_release); + } + return true; +} + +void IpcAdapter::teardown_done_channel(int peer_rank) { + if (peer_rank < 0 || peer_rank >= (int)done_channels_.size()) return; + auto& ch = done_channels_[peer_rank]; + if (ch.shm_base) { + munmap(ch.shm_base, sizeof(IpcChannelPair)); + ch.shm_base = nullptr; + ch.local_ptr = nullptr; + ch.remote_ptr = nullptr; + } + if (ch.is_creator && !ch.shm_name.empty()) shm_unlink(ch.shm_name.c_str()); + ch.shm_name.clear(); + ch.is_creator = false; +} + +// Called by the creator (lower rank) after connect_to() returns. +// Polls the opener_ready flag that the higher rank writes when it maps the SHM. +// If the opener does not confirm within the timeout the done_seq channel is +// torn down on BOTH sides (creator tears down locally; opener's remote_ptr +// becomes stale but it will find done_seq.load() == 0 and keep retrying until +// it detects the channel is gone or falls back via the SHM ring). +// Result: both sides end up with local_ptr == nullptr → consistent fallback. +void IpcAdapter::wait_done_channel_peer_ready(int peer_rank) { + if (peer_rank < 0 || peer_rank >= (int)done_channels_.size()) return; + auto& ch = done_channels_[peer_rank]; + if (!ch.shm_base || !ch.is_creator) return; + + constexpr int kReadyTimeoutMs = 5000; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::milliseconds(kReadyTimeoutMs); + while (ch.shm_base->opener_ready.load(std::memory_order_acquire) == 0) { + if (std::chrono::steady_clock::now() >= deadline) { + fprintf(stderr, + "[IPC] done_seq opener did not signal ready in %dms for %s; " + "disabling done_seq fast path\n", + kReadyTimeoutMs, ch.shm_name.c_str()); + teardown_done_channel(peer_rank); + return; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + bool IpcAdapter::connect_to(int rank) { return shm_control_ != nullptr && shm_control_->connect_to(rank, 30000); } @@ -123,10 +280,29 @@ bool IpcAdapter::ensure_peer(PeerConnectSpec const& spec) { std::holds_alternative(spec.detail))) { return false; } - if (spec.type == PeerConnectType::Connect) { - return connect_to(spec.peer_rank); + int peer_rank = spec.peer_rank; + + // Lower rank creates the done_seq SHM *before* the connection handshake so + // the higher rank can open it by name right after the handshake completes. + bool is_lower = (comm_->rank() < peer_rank); + if (is_lower) setup_done_channel(peer_rank); + + bool ok = (spec.type == PeerConnectType::Connect) ? connect_to(peer_rank) + : accept_from(peer_rank); + if (!ok) return false; + + if (!is_lower) { + // Higher rank: open the SHM created by lower rank. On success, this also + // writes opener_ready = 1 so the lower rank can proceed past its poll. + setup_done_channel(peer_rank); + } else { + // Lower rank: wait for the opener to confirm it has mapped the SHM before + // allowing send_one to use the done_seq fast path. If the opener never + // signals (e.g., name mismatch, OS error), we tear down our side too so + // both peers stay on the same (fallback) control path. + wait_done_channel_peer_ready(peer_rank); } - return accept_from(spec.peer_rank); + return true; } void IpcAdapter::set_peer_local_id(int peer_rank, int local_id) { @@ -136,6 +312,7 @@ void IpcAdapter::set_peer_local_id(int peer_rank, int local_id) { } void IpcAdapter::close_peer(int peer_rank) { + teardown_done_channel(peer_rank); if (shm_control_ != nullptr) { shm_control_->close_peer(peer_rank); } @@ -263,14 +440,15 @@ bool IpcAdapter::enqueue_request(unsigned request_id, IpcReqType type) { return false; } pending_send_.fetch_add(1, std::memory_order_relaxed); + send_cv_.notify_one(); } else { if (recv_task_ring_ == nullptr || !enqueue_one_request_id(recv_task_ring_, request_id, stop_)) { return false; } pending_recv_.fetch_add(1, std::memory_order_relaxed); + recv_cv_.notify_one(); } - cv_.notify_all(); return true; } @@ -292,6 +470,7 @@ unsigned IpcAdapter::send_async(int peer_rank, void* local_ptr, size_t len, remote_slice.offset = remote_offset; if (remote_hint.has_value()) { remote_slice.write = remote_hint->write; + remote_slice.binding_version = remote_hint->binding_version; } unsigned request_id = 0; IpcRequestSlot* req = try_acquire_request_slot(&request_id); @@ -523,8 +702,13 @@ bool IpcAdapter::send_one(IpcRequestSlot* creq) { } if (can_use_direct_peer) { copy_to_remote(cached_dst, cached_remote_gpu); - if (!shm_control_->send_ack(to_rank, creq->match_seq, - kAckStatusOkDirect)) { + // Signal completion via done_seq atomic (fast path). + // Fall back to SHM ring ack only if channel not set up. + auto& done_ch = done_channels_[to_rank]; + if (done_ch.local_ptr) { + done_ch.local_ptr->store(creq->match_seq, std::memory_order_release); + } else if (!shm_control_->send_ack(to_rank, creq->match_seq, + kAckStatusOkDirect)) { std::cerr << "[ERROR] send direct cached ack(" << to_rank << ") failed for req " << creq->id << " match_seq " << creq->match_seq << std::endl; @@ -602,7 +786,11 @@ bool IpcAdapter::send_one(IpcRequestSlot* creq) { reinterpret_cast(reinterpret_cast(base) + got.offset); copy_to_remote(dst_ptr, static_cast(got.remote_gpu_idx_)); - if (!shm_control_->send_ack(to_rank, seq, kAckStatusOkDirect)) { + // Signal via done_seq (same as direct path), fallback to SHM ring ack. + auto& done_ch = done_channels_[to_rank]; + if (done_ch.local_ptr) { + done_ch.local_ptr->store(seq, std::memory_order_release); + } else if (!shm_control_->send_ack(to_rank, seq, kAckStatusOkDirect)) { std::cerr << "[ERROR] send_ack(" << to_rank << ") failed for req " << creq->id << " match_seq " << creq->match_seq << std::endl; return false; @@ -641,12 +829,13 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { }; // Phase 1: receiver waits for either: - // 1) direct completion ACK from sender; or - // 2) sender's bounce relay notification (ipc_cache/use_bounce_buffer=1); or - // 3) sender's ipc_cache_req asking receiver to advertise destination handle. + // 1) done_seq >= match_seq → direct-path copy complete (fast atomic spin); + // or 2) ipc_cache_req from sender → cold path, must export IPC handle; or 3) + // ipc_cache(use_bounce_buffer=1) → relay path. auto phase1_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(kIpcControlTimeoutMs); bool got_cache_req = false; + auto& done_ch = done_channels_[from_rank]; auto handle_relay_cache = [&](IpcCacheWire const& relay_cache, uint64_t relay_seq) -> bool { if (relay_seq != creq->match_seq) { @@ -683,11 +872,18 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { return true; }; while (!stop_.load(std::memory_order_acquire)) { - uint32_t status = 0; - int ack_result = wait_sender_ack(/*timeout_ms=*/0, &status); - if (ack_result < 0) return false; - if (ack_result > 0) { - return true; + // Fast path: spin on done_seq atomic (no sleep, no SHM ring message). + if (done_ch.remote_ptr) { + if (done_ch.remote_ptr->load(std::memory_order_acquire) >= + creq->match_seq) { + return true; + } + } else { + // Fallback when done_seq channel is unavailable: poll SHM ring ack. + uint32_t status = 0; + int ack_result = wait_sender_ack(/*timeout_ms=*/0, &status); + if (ack_result < 0) return false; + if (ack_result > 0) return true; } uint64_t req_seq = 0; if (shm_control_->recv_ipc_cache_req(from_rank, &req_seq, @@ -707,7 +903,7 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { << creq->id << " match_seq " << creq->match_seq << std::endl; return false; } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::yield(); // no sleep — yield to other threads only } if (stop_.load(std::memory_order_acquire) || !got_cache_req) return false; @@ -717,13 +913,16 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { transfer_info.is_send = 0; transfer_info.remote_gpu_idx_ = comm_->local_gpu_idx(); void* actual_dst = creq->data(); - - void* base = nullptr; - size_t base_size = 0; - GPU_RT_CHECK(gpuMemGetAddressRange(&base, &base_size, actual_dst)); - GPU_RT_CHECK(gpuIpcGetMemHandle(&transfer_info.handle, base)); - transfer_info.offset = reinterpret_cast(actual_dst) - - reinterpret_cast(base); + IPCItem exported = + comm_->export_local_ipc_buffer(actual_dst, creq->size_bytes); + if (!exported.valid) { + std::cerr << "[ERROR] export_local_ipc_buffer failed for req " << creq->id + << " match_seq " << creq->match_seq << std::endl; + return false; + } + transfer_info.handle = exported.handle; + transfer_info.offset = + reinterpret_cast(actual_dst) - exported.base_addr; if (!shm_control_->send_ipc_cache(from_rank, creq->match_seq, transfer_info)) { @@ -732,16 +931,22 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { return false; } - // Phase 2: after advertising IPC cache, sender may either: - // 1) copy directly and send ACK; or - // 2) fall back to bounce relay and send ipc_cache(use_bounce_buffer=1). + // Phase 2: sender has the IPC handle; wait for it to finish the copy. + // Sender signals via done_seq (same channel as direct path). auto final_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(kIpcControlTimeoutMs); while (!stop_.load(std::memory_order_acquire)) { - uint32_t status = 0; - int ack_result = wait_sender_ack(/*timeout_ms=*/0, &status); - if (ack_result < 0) return false; - if (ack_result > 0) return true; + if (done_ch.remote_ptr) { + if (done_ch.remote_ptr->load(std::memory_order_acquire) >= + creq->match_seq) { + return true; + } + } else { + uint32_t status = 0; + int ack_result = wait_sender_ack(/*timeout_ms=*/0, &status); + if (ack_result < 0) return false; + if (ack_result > 0) return true; + } IpcCacheWire relay_cache{}; uint64_t relay_seq = 0; @@ -756,7 +961,7 @@ bool IpcAdapter::recv_one(IpcRequestSlot* creq) { << std::endl; return false; } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::yield(); // no sleep } return false; } @@ -772,13 +977,7 @@ void IpcAdapter::complete_task(IpcRequestSlot* req, bool ok) { void IpcAdapter::send_thread_func() { while (!stop_.load(std::memory_order_relaxed)) { - { - std::unique_lock lk(cv_mu_); - cv_.wait(lk, [&] { - return stop_.load(std::memory_order_relaxed) || - pending_send_.load(std::memory_order_relaxed) > 0; - }); - } + wait_for_worker_work(stop_, pending_send_, send_cv_mu_, send_cv_); if (stop_.load(std::memory_order_relaxed)) break; unsigned request_id = 0; @@ -804,13 +1003,7 @@ void IpcAdapter::send_thread_func() { void IpcAdapter::recv_thread_func() { while (!stop_.load(std::memory_order_relaxed)) { - { - std::unique_lock lk(cv_mu_); - cv_.wait(lk, [&] { - return stop_.load(std::memory_order_relaxed) || - pending_recv_.load(std::memory_order_relaxed) > 0; - }); - } + wait_for_worker_work(stop_, pending_recv_, recv_cv_mu_, recv_cv_); if (stop_.load(std::memory_order_relaxed)) break; unsigned request_id = 0; diff --git a/experimental/ukernel/src/transport/adapter/ipc_adapter.h b/experimental/ukernel/src/transport/adapter/ipc_adapter.h index 401d755d0..c93c80de2 100644 --- a/experimental/ukernel/src/transport/adapter/ipc_adapter.h +++ b/experimental/ukernel/src/transport/adapter/ipc_adapter.h @@ -22,6 +22,17 @@ namespace Transport { class Communicator; +// Per-peer-pair POSIX SHM layout: two cache-line-aligned done_seq slots, +// one per direction (lo→hi and hi→lo), plus an opener_ready handshake flag. +// The opener (higher rank) writes opener_ready = 1 after mmap; the creator +// (lower rank) polls it after connect_to returns to ensure both sides are +// live before activating the done_seq fast path. +struct IpcChannelPair { + alignas(64) std::atomic done_seq_lo_to_hi{0}; + alignas(64) std::atomic done_seq_hi_to_lo{0}; + alignas(64) std::atomic opener_ready{0}; +}; + class IpcAdapter final : public TransportAdapter { public: // Lifecycle. @@ -134,6 +145,16 @@ class IpcAdapter final : public TransportAdapter { IpcRequestSlot* resolve_request_slot_const(unsigned request_id) const; void release_request_slot(unsigned request_id); + struct PeerDoneChannel { + IpcChannelPair* shm_base = nullptr; + std::atomic* local_ptr = + nullptr; // we write after send completes + std::atomic* remote_ptr = + nullptr; // we read to detect peer send done + std::string shm_name; + bool is_creator = false; + }; + // Task queue / worker execution. bool enqueue_request(unsigned request_id, IpcReqType type); bool send_one(IpcRequestSlot* creq); @@ -141,6 +162,9 @@ class IpcAdapter final : public TransportAdapter { void send_thread_func(); void recv_thread_func(); void complete_task(IpcRequestSlot* req, bool ok); + bool setup_done_channel(int peer_rank); + void teardown_done_channel(int peer_rank); + void wait_done_channel_peer_ready(int peer_rank); jring_t* send_task_ring_; jring_t* recv_task_ring_; @@ -148,12 +172,15 @@ class IpcAdapter final : public TransportAdapter { std::atomic shutdown_started_{false}; std::thread send_thread_; std::thread recv_thread_; - std::mutex cv_mu_; - std::condition_variable cv_; + std::mutex send_cv_mu_; + std::condition_variable send_cv_; + std::mutex recv_cv_mu_; + std::condition_variable recv_cv_; std::atomic pending_send_{0}; std::atomic pending_recv_{0}; std::vector ipc_streams_; + std::string ring_namespace_; std::mutex match_seq_mu_; // Two directed-edge counters per peer: // dir=0 -> low-rank to high-rank, dir=1 -> high-rank to low-rank. @@ -163,6 +190,7 @@ class IpcAdapter final : public TransportAdapter { std::shared_ptr shm_control_; Communicator* comm_; + std::vector done_channels_; }; } // namespace Transport diff --git a/experimental/ukernel/src/transport/communicator.cc b/experimental/ukernel/src/transport/communicator.cc index 5eff65e8f..6743ccc56 100644 --- a/experimental/ukernel/src/transport/communicator.cc +++ b/experimental/ukernel/src/transport/communicator.cc @@ -1267,11 +1267,37 @@ unsigned Communicator::irecv(int rank, LocalSlice dst) { void* ipc_base_ptr = reinterpret_cast(static_cast(local_mr.address)); size_t ipc_bytes = static_cast(local_mr.length); - if (!notify_ipc_buffer(rank, dst.mem_id, ipc_base_ptr, ipc_bytes)) { - std::cerr << "[WARN] Communicator " << global_rank_ - << " failed to publish IPC buffer metadata for rank " << rank - << ", ipc_id=" << dst.mem_id - << "; sender may fallback to ipc_cache handshake" << std::endl; + uintptr_t ipc_base_addr = reinterpret_cast(ipc_base_ptr); + bool should_publish = true; + uint64_t publish_binding_version = 0; + { + std::lock_guard lk(ipc_gen_mu_); + auto& published = local_ipc_published_buffers_[rank][dst.mem_id]; + if (published.valid && published.base_addr == ipc_base_addr && + published.bytes == ipc_bytes) { + should_publish = false; + publish_binding_version = published.binding_version; + } else { + publish_binding_version = + ++local_ipc_binding_versions_[rank][dst.mem_id]; + } + } + if (should_publish) { + if (!notify_ipc_buffer(rank, dst.mem_id, ipc_base_ptr, ipc_bytes, + publish_binding_version)) { + std::cerr << "[WARN] Communicator " << global_rank_ + << " failed to publish IPC buffer metadata for rank " << rank + << ", ipc_id=" << dst.mem_id + << "; sender may fallback to ipc_cache handshake" + << std::endl; + } else { + std::lock_guard lk(ipc_gen_mu_); + auto& published = local_ipc_published_buffers_[rank][dst.mem_id]; + published.base_addr = ipc_base_addr; + published.bytes = ipc_bytes; + published.binding_version = publish_binding_version; + published.valid = true; + } } } @@ -1480,6 +1506,35 @@ void Communicator::release(unsigned const req) { } bool Communicator::wait_finish(unsigned const req) { + if (req == 0) return false; + if (progress_started_.load(std::memory_order_acquire)) { + while (true) { + TrackedRequest* slot = resolve_request_slot(req); + if (slot == nullptr) return true; + if (slot->kind != PeerTransportKind::Ipc) break; + auto state = slot->state.load(std::memory_order_acquire); + if (state == TrackedRequest::SlotState::InFlight) { + std::this_thread::yield(); + continue; + } + if (state == TrackedRequest::SlotState::Completed || + state == TrackedRequest::SlotState::Failed) { + TrackedRequest snapshot{}; + if (try_release_request_slot(req, &snapshot)) { + bool failed = (state == TrackedRequest::SlotState::Failed); + cleanup_tracked_request(snapshot); + return !failed; + } + // CAS lost: another releaser (e.g. destructor cleanup) claimed the + // slot. The slot is transitioning to Free — retry until nullptr. + std::this_thread::yield(); + continue; + } + // Releasing is a transient state between Completed/Failed and Free. + // Yield and retry so the slot can finish transitioning. + std::this_thread::yield(); + } + } return wait_finish(std::vector{req}); } @@ -1590,13 +1645,7 @@ bool Communicator::notify_ipc_buffer(int remote_rank, uint32_t ipc_id, info.binding_version = binding_version; info.valid = false; if (local_buf != nullptr && len != 0) { - int original_device = -1; - GPU_RT_CHECK(gpuGetDevice(&original_device)); - auto restore = UKernel::Transport::finally( - [&]() { GPU_RT_CHECK(gpuSetDevice(original_device)); }); - GPU_RT_CHECK(gpuSetDevice(local_gpu_idx_)); - auto exported = - ipc_manager_.create_local_ipc(local_buf, len, local_gpu_idx_); + auto exported = export_local_ipc_buffer(local_buf, len); if (!exported.valid) return false; info.handle = exported.handle; @@ -1615,6 +1664,17 @@ bool Communicator::notify_ipc_buffer(int remote_rank, uint32_t ipc_id, return exchanger_client_->publish(versioned_key, info); } +IPCItem Communicator::export_local_ipc_buffer(void* local_buf, size_t len) { + if (local_buf == nullptr || len == 0) return {}; + + int original_device = -1; + GPU_RT_CHECK(gpuGetDevice(&original_device)); + auto restore = UKernel::Transport::finally( + [&]() { GPU_RT_CHECK(gpuSetDevice(original_device)); }); + GPU_RT_CHECK(gpuSetDevice(local_gpu_idx_)); + return ipc_manager_.create_local_ipc(local_buf, len, local_gpu_idx_); +} + bool Communicator::wait_ipc_buffer(int remote_rank, uint32_t ipc_id, uint64_t expected_binding_version) { if (!exchanger_client_ || !exchanger_client_->valid()) return false; @@ -1636,7 +1696,11 @@ bool Communicator::wait_ipc_buffer(int remote_rank, uint32_t ipc_id, state.device_idx = info.device_idx; state.valid = info.valid; state.ipc_id = ipc_id; - return ipc_manager_.register_remote_ipc(remote_rank, state); + bool ok = ipc_manager_.register_remote_ipc(remote_rank, state); + if (ok) { + try_open_remote_ipc_buffer(remote_rank, state); + } + return ok; } } auto const deadline = std::chrono::steady_clock::now() + @@ -1667,7 +1731,11 @@ bool Communicator::wait_ipc_buffer(int remote_rank, uint32_t ipc_id, state.device_idx = info.device_idx; state.valid = info.valid; state.ipc_id = ipc_id; - return ipc_manager_.register_remote_ipc(remote_rank, state); + bool ok = ipc_manager_.register_remote_ipc(remote_rank, state); + if (ok && expected_binding_version != 0) { + try_open_remote_ipc_buffer(remote_rank, state); + } + return ok; } bool Communicator::fetch_ipc_buffer(int remote_rank, uint32_t ipc_id, @@ -1690,7 +1758,11 @@ bool Communicator::fetch_ipc_buffer(int remote_rank, uint32_t ipc_id, return false; } state.ipc_id = ipc_id; - return ipc_manager_.register_remote_ipc(remote_rank, state); + bool ok = ipc_manager_.register_remote_ipc(remote_rank, state); + if (ok && expected_binding_version != 0) { + try_open_remote_ipc_buffer(remote_rank, state); + } + return ok; } } if (!exchanger_client_->fetch( @@ -1710,7 +1782,32 @@ bool Communicator::fetch_ipc_buffer(int remote_rank, uint32_t ipc_id, return false; } state.ipc_id = ipc_id; - return ipc_manager_.register_remote_ipc(remote_rank, state); + bool ok = ipc_manager_.register_remote_ipc(remote_rank, state); + if (ok && expected_binding_version != 0) { + try_open_remote_ipc_buffer(remote_rank, state); + } + return ok; +} + +void Communicator::try_open_remote_ipc_buffer(int remote_rank, + IPCItem const& state) { + if (!state.valid || state.direct_ptr != nullptr) return; + + int original_device = -1; + GPU_RT_CHECK(gpuGetDevice(&original_device)); + auto restore = UKernel::Transport::finally( + [&]() { GPU_RT_CHECK(gpuSetDevice(original_device)); }); + GPU_RT_CHECK(gpuSetDevice(local_gpu_idx_)); + + void* direct_ptr = nullptr; + gpuError_t open_err = gpuIpcOpenMemHandle(&direct_ptr, state.handle, + gpuIpcMemLazyEnablePeerAccess); + if (open_err != gpuSuccess || direct_ptr == nullptr) return; + + IPCItem opened = state; + opened.direct_ptr = direct_ptr; + opened.ipc_id = state.ipc_id; + (void)ipc_manager_.register_remote_ipc(remote_rank, opened); } bool Communicator::has_fresh_remote_ipc_buffer( diff --git a/experimental/ukernel/src/transport/communicator.h b/experimental/ukernel/src/transport/communicator.h index d616dbdc2..03feed184 100644 --- a/experimental/ukernel/src/transport/communicator.h +++ b/experimental/ukernel/src/transport/communicator.h @@ -78,6 +78,7 @@ class Communicator { bool resolve_ipc_buffer_pointer(int remote_rank, uint32_t ipc_id, size_t offset, size_t bytes, void** out_ptr, int* out_device_idx); + IPCItem export_local_ipc_buffer(void* local_buf, size_t len); bool register_remote_ipc_cache(int remote_rank, gpuIpcMemHandle_t handle, IPCItem const& ipc); @@ -158,6 +159,7 @@ class Communicator { bool has_fresh_remote_ipc_buffer(int remote_rank, uint32_t ipc_id, uint64_t expected_binding_version) const; void invalidate_remote_ipc_buffer(int remote_rank, uint32_t ipc_id); + void try_open_remote_ipc_buffer(int remote_rank, IPCItem const& state); void cleanup_tracked_request(TrackedRequest& tracked); bool complete_host_bounce_recv(TrackedRequest& tracked, bool blocking); SHMManager& require_shm_manager(char const* caller); @@ -225,6 +227,13 @@ class Communicator { }; std::vector> notify_targets_; + struct PublishedIpcBuffer { + uintptr_t base_addr = 0; + size_t bytes = 0; + uint64_t binding_version = 0; + bool valid = false; + }; + // UCCL registration cache. mutable std::mutex uccl_reg_mu_; std::unordered_set uccl_direct_reg_failed_mrs_; @@ -234,6 +243,8 @@ class Communicator { mutable std::mutex ipc_gen_mu_; std::unordered_map> local_ipc_binding_versions_; + std::unordered_map> + local_ipc_published_buffers_; }; } // namespace Transport diff --git a/experimental/ukernel/src/transport/memory/ipc_manager.cc b/experimental/ukernel/src/transport/memory/ipc_manager.cc index afd1998b2..f8bbb8b84 100644 --- a/experimental/ukernel/src/transport/memory/ipc_manager.cc +++ b/experimental/ukernel/src/transport/memory/ipc_manager.cc @@ -12,6 +12,15 @@ bool IPCManager::has_handle(gpuIpcMemHandle_t const& h) { return false; } +bool IPCManager::local_cache_contains(IPCItem const& item, + uintptr_t ptr_addr) noexcept { + if (!item.valid || item.base_addr == 0 || item.allocation_size == 0) { + return false; + } + return ptr_addr >= item.base_addr && + (ptr_addr - item.base_addr) < item.allocation_size; +} + void IPCManager::close_local_import_if_open(IPCItem& item) { if (item.is_local || item.direct_ptr == nullptr || item.device_idx < 0) { return; @@ -41,9 +50,21 @@ bool IPCManager::register_remote_ipc(int rank, IPCItem const& ipc) { IPCItem IPCManager::create_local_ipc(void* ptr, size_t len, int device_idx) { if (ptr == nullptr || len == 0) return {}; + uintptr_t ptr_addr = reinterpret_cast(ptr); void* base = nullptr; size_t base_size = 0; + { + std::lock_guard lk(local_mu_); + for (auto const& kv : local_ipc_cache_) { + if (!local_cache_contains(kv.second, ptr_addr)) continue; + IPCItem hit = kv.second; + hit.base_offset = ptr_addr - hit.base_addr; + hit.bytes = len; + return hit; + } + } + GPU_RT_CHECK(gpuMemGetAddressRange(&base, &base_size, ptr)); uintptr_t key = reinterpret_cast(base); @@ -51,23 +72,31 @@ IPCItem IPCManager::create_local_ipc(void* ptr, size_t len, int device_idx) { std::lock_guard lk(local_mu_); auto it = local_ipc_cache_.find(key); if (it != local_ipc_cache_.end()) { - return it->second; + IPCItem out = it->second; + out.base_offset = ptr_addr - out.base_addr; + out.bytes = len; + if (out.allocation_size == 0) out.allocation_size = base_size; + return out; } } IPCItem created{}; GPU_RT_CHECK(gpuIpcGetMemHandle(&created.handle, base)); created.base_addr = key; - created.base_offset = reinterpret_cast(ptr) - key; + created.base_offset = ptr_addr - key; created.bytes = len; + created.allocation_size = base_size; created.device_idx = device_idx; created.is_local = true; created.valid = true; std::lock_guard lk(local_mu_); auto [it, inserted] = local_ipc_cache_.emplace(key, created); - if (!inserted) return it->second; - return created; + IPCItem out = inserted ? created : it->second; + out.base_offset = ptr_addr - out.base_addr; + out.bytes = len; + if (out.allocation_size == 0) out.allocation_size = base_size; + return out; } IPCItem IPCManager::get_ipc(int rank, gpuIpcMemHandle_t handle) const { @@ -90,16 +119,19 @@ IPCItem IPCManager::get_ipc(int rank, uint32_t ipc_id) const { IPCItem IPCManager::get_ipc(void* local_ptr) const { if (local_ptr == nullptr) return {}; - void* base = nullptr; - size_t base_size = 0; - GPU_RT_CHECK(gpuMemGetAddressRange(&base, &base_size, local_ptr)); - (void)base_size; - uintptr_t key = reinterpret_cast(base); + uintptr_t ptr_addr = reinterpret_cast(local_ptr); std::lock_guard lk(local_mu_); - auto it = local_ipc_cache_.find(key); - if (it == local_ipc_cache_.end()) return {}; - return it->second; + for (auto const& kv : local_ipc_cache_) { + if (!local_cache_contains(kv.second, ptr_addr)) continue; + IPCItem hit = kv.second; + hit.base_offset = ptr_addr - hit.base_addr; + if (hit.allocation_size >= hit.base_offset) { + hit.bytes = hit.allocation_size - hit.base_offset; + } + return hit; + } + return {}; } bool IPCManager::delete_ipc(int rank, gpuIpcMemHandle_t handle) { @@ -143,23 +175,17 @@ bool IPCManager::delete_ipc(int rank, uint32_t ipc_id) { bool IPCManager::delete_ipc(void* local_ptr) { if (local_ptr == nullptr) return false; - uintptr_t key = reinterpret_cast(local_ptr); + uintptr_t ptr_addr = reinterpret_cast(local_ptr); { std::lock_guard lk(local_mu_); - auto it = local_ipc_cache_.find(key); - if (it != local_ipc_cache_.end()) { + for (auto it = local_ipc_cache_.begin(); it != local_ipc_cache_.end(); + ++it) { + if (!local_cache_contains(it->second, ptr_addr)) continue; local_ipc_cache_.erase(it); return true; } } - - void* base = nullptr; - size_t base_size = 0; - GPU_RT_CHECK(gpuMemGetAddressRange(&base, &base_size, local_ptr)); - (void)base_size; - key = reinterpret_cast(base); - std::lock_guard lk(local_mu_); - return local_ipc_cache_.erase(key) > 0; + return false; } void IPCManager::delete_ipc(int rank) { diff --git a/experimental/ukernel/src/transport/memory/ipc_manager.h b/experimental/ukernel/src/transport/memory/ipc_manager.h index 9a30f9cae..3212f4c8a 100644 --- a/experimental/ukernel/src/transport/memory/ipc_manager.h +++ b/experimental/ukernel/src/transport/memory/ipc_manager.h @@ -17,6 +17,7 @@ struct IPCItem { uintptr_t base_addr = 0; uintptr_t base_offset = 0; size_t bytes = 0; + size_t allocation_size = 0; int device_idx = -1; uint64_t binding_version = 0; uint32_t ipc_id = 0; @@ -65,6 +66,8 @@ class IPCManager { } static bool has_handle(gpuIpcMemHandle_t const& h); + static bool local_cache_contains(IPCItem const& item, + uintptr_t ptr_addr) noexcept; // Close local imported mapping (gpuIpcOpenMemHandle result) if present. // This only affects this process's VA mapping, not remote process memory. static void close_local_import_if_open(IPCItem& item);