Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ee1887d
support ipc on amd
DanielDanyang Apr 9, 2026
01a38d3
Merge branch 'uccl-project:main' into ipc-opt
DanielDanyang Apr 10, 2026
d9be409
support ukernel device bench on amd
DanielDanyang Apr 10, 2026
cd949c1
format
DanielDanyang Apr 10, 2026
8974432
Merge branch 'uccl-project:main' into ipc-opt
DanielDanyang Apr 10, 2026
e39e57c
bug fix
DanielDanyang Apr 10, 2026
ff83f69
format
DanielDanyang Apr 10, 2026
da887e7
cache local ipc exports by allocation
DanielDanyang Apr 10, 2026
e2b6874
use versioned ipc metadata in transport bench
DanielDanyang Apr 10, 2026
c6b92a5
document ukernel ipc tuning progress
DanielDanyang Apr 10, 2026
8b31a70
prefetch remote ipc mappings during setup
DanielDanyang Apr 10, 2026
75ba875
reduce ipc recv poll interval
DanielDanyang Apr 10, 2026
b20f8bd
Merge branch 'uccl-project:main' into ipc-opt
DanielDanyang Apr 10, 2026
f044f03
reduce ipc worker wakeup overhead
DanielDanyang Apr 10, 2026
66c4262
dedupe steady-state ipc recv metadata publish
DanielDanyang Apr 10, 2026
b3d8ae2
fast-path single-request ipc waits
DanielDanyang Apr 11, 2026
5844c89
bug fix and format
DanielDanyang Apr 11, 2026
6dce8ca
fix rocm python p2p extension build
DanielDanyang Apr 13, 2026
68c0be3
enable ipc direct path in python p2p bench
DanielDanyang Apr 11, 2026
252fcee
bug fix and format
DanielDanyang Apr 14, 2026
ccf6a38
stage per-peer done_seq SHM completion with opener handshake
DanielDanyang Apr 15, 2026
ad6072e
Merge branch 'uccl-project:main' into ipc-opt
DanielDanyang Apr 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions experimental/ukernel/benchmarks/bench_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static constexpr int kIpcThroughputWindow = 8;
static constexpr int kUcclThroughputWindow = 4;
static constexpr int kTcpThroughputWindow = 1;
static constexpr uint64_t kBenchNamedMrGeneration = 1;
static constexpr uint64_t kBenchIpcBindingVersion = 1;

static PreferredTransport parse_transport(char const* value) {
if (strcmp(value, "auto") == 0) return PreferredTransport::Auto;
Expand Down Expand Up @@ -204,17 +205,54 @@ static bool exchange_remote_recv_mrs(Communicator& comm, int peer_rank,
static uint32_t remote_recv_slot_id(PeerTransportKind kind,
std::vector<MR> const& remote_recv_mrs,
int slot) {
if (kind != PeerTransportKind::Uccl) return 0;
if (kind != PeerTransportKind::Uccl && kind != PeerTransportKind::Ipc) {
return 0;
}
return remote_recv_mrs.at(static_cast<size_t>(slot)).id;
}

static std::optional<RemoteSlice> remote_recv_slice(
PeerTransportKind kind, std::vector<MR> const& remote_recv_mrs, int slot) {
uint32_t remote_id = remote_recv_slot_id(kind, remote_recv_mrs, slot);
if (remote_id == 0) return std::nullopt;
if (kind == PeerTransportKind::Ipc) {
// This benchmark keeps the recv slots stable for the communicator lifetime.
// Publish a fixed versioned IPC buffer view up front so sends can exercise
// the by-mem_id direct path instead of the request-level handshake.
return RemoteSlice{remote_id, 0, {}, kBenchIpcBindingVersion};
}
return RemoteSlice{remote_id, 0};
}

static bool publish_ipc_recv_buffers(Communicator& comm, int peer_rank,
PeerTransportKind kind,
std::vector<MR> const& local_recv_mrs,
std::vector<void*> const& recv_slots,
size_t msg_size) {
if (kind != PeerTransportKind::Ipc) return true;
for (size_t i = 0; i < local_recv_mrs.size(); ++i) {
if (!comm.notify_ipc_buffer(peer_rank, local_recv_mrs[i].id, recv_slots[i],
msg_size, kBenchIpcBindingVersion)) {
return false;
}
}
return true;
}

static bool prefetch_remote_ipc_recv_buffers(
Communicator& comm, int peer_rank, PeerTransportKind kind,
std::vector<MR> const& remote_recv_mrs) {
if (kind != PeerTransportKind::Ipc) return true;
for (auto const& remote_mr : remote_recv_mrs) {
if (remote_mr.id == 0) return false;
if (!comm.wait_ipc_buffer(peer_rank, remote_mr.id,
kBenchIpcBindingVersion)) {
return false;
}
}
return true;
}

static unsigned submit_send(Communicator& comm, int peer_rank,
uint32_t local_send_mr_id, size_t msg_size,
PeerTransportKind kind,
Expand Down Expand Up @@ -329,14 +367,29 @@ void run_sender(int gpu_id, int rank, int peer_rank, int world_size,
local_send_mr.id, throughput_window);

std::vector<MR> remote_recv_mrs;
if (transport_kind == PeerTransportKind::Uccl) {
if (transport_kind == PeerTransportKind::Ipc ||
transport_kind == PeerTransportKind::Uccl) {
if (!publish_ipc_recv_buffers(comm, peer_rank, transport_kind,
local_recv_mrs, recv_slots, msg_size)) {
fprintf(stderr, "[Sender %d] Failed to publish IPC receive buffers\n",
rank);
cleanup();
return;
}
if (!exchange_remote_recv_mrs(comm, peer_rank, local_recv_mrs,
remote_recv_mrs, true)) {
fprintf(stderr, "[Sender %d] Failed to exchange remote receive MRs\n",
rank);
cleanup();
return;
}
if (!prefetch_remote_ipc_recv_buffers(comm, peer_rank, transport_kind,
remote_recv_mrs)) {
fprintf(stderr, "[Sender %d] Failed to prefetch remote IPC buffers\n",
rank);
cleanup();
return;
}
}

printf("[Sender %d] Remote memory info received\n", rank);
Expand Down Expand Up @@ -602,14 +655,29 @@ void run_receiver(int gpu_id, int rank, int peer_rank, int world_size,
local_send_mr.id, throughput_window);

std::vector<MR> remote_recv_mrs;
if (transport_kind == PeerTransportKind::Uccl) {
if (transport_kind == PeerTransportKind::Ipc ||
transport_kind == PeerTransportKind::Uccl) {
if (!publish_ipc_recv_buffers(comm, peer_rank, transport_kind,
local_recv_mrs, recv_slots, msg_size)) {
fprintf(stderr, "[Receiver %d] Failed to publish IPC receive buffers\n",
rank);
cleanup();
return;
}
if (!exchange_remote_recv_mrs(comm, peer_rank, local_recv_mrs,
remote_recv_mrs, false)) {
fprintf(stderr, "[Receiver %d] Failed to exchange remote receive MRs\n",
rank);
cleanup();
return;
}
if (!prefetch_remote_ipc_recv_buffers(comm, peer_rank, transport_kind,
remote_recv_mrs)) {
fprintf(stderr, "[Receiver %d] Failed to prefetch remote IPC buffers\n",
rank);
cleanup();
return;
}
}

printf("[Receiver %d] Remote memory info received\n", rank);
Expand Down
72 changes: 62 additions & 10 deletions experimental/ukernel/py/bench_p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,73 @@ def bench_p2p_ukernel(comm, peer, size_bytes, warmup, iters):
n = size_bytes // 4 # float32
send_buf = torch.empty(n, device="cuda", dtype=torch.float32)
recv_buf = torch.empty(n, device="cuda", dtype=torch.float32)
comm.pin_tensor(send_buf)
comm.pin_tensor(recv_buf)
send_mr_id = comm.pin_tensor(send_buf)
recv_mr_id = comm.pin_tensor(recv_buf)

rank = comm.rank
use_direct_ipc = comm.peer_transport(peer) == "ipc" and comm.same_host(peer)
remote_recv_mr_id = 0
ipc_binding_version = 1
if use_direct_ipc:
remote_recv_mr_id = _exchange_peer_i64(int(recv_mr_id), rank)
if not comm.notify_ipc_tensor(
peer, recv_mr_id, recv_buf, 0, size_bytes, ipc_binding_version
):
raise RuntimeError(
f"notify_ipc_tensor(peer={peer}, mr={recv_mr_id}) failed"
)
if not comm.wait_ipc_buffer(peer, remote_recv_mr_id, ipc_binding_version):
raise RuntimeError(
f"wait_ipc_buffer(peer={peer}, mr={remote_recv_mr_id}) failed"
)
dist.barrier()

def _send():
if use_direct_ipc:
comm.send_direct(
peer, send_buf, remote_recv_mr_id, ipc_binding_version, 0, size_bytes, 0
)
else:
comm.send(peer, send_buf)

# Server (rank 0) recv first, client (rank 1) send first
if rank == 0:
for _ in range(warmup):
comm.recv(peer, recv_buf)
comm.send(peer, send_buf) # echo back
if use_direct_ipc:
req = comm.irecv(peer, recv_buf)
comm.wait_finish(req)
else:
comm.recv(peer, recv_buf)
_send() # echo back
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(iters):
comm.recv(peer, recv_buf)
comm.send(peer, send_buf)
if use_direct_ipc:
req = comm.irecv(peer, recv_buf)
comm.wait_finish(req)
else:
comm.recv(peer, recv_buf)
_send()
torch.cuda.synchronize()
else:
for _ in range(warmup):
comm.send(peer, send_buf)
comm.recv(peer, recv_buf)
if use_direct_ipc:
req = comm.irecv(peer, recv_buf)
_send()
if use_direct_ipc:
comm.wait_finish(req)
else:
comm.recv(peer, recv_buf)
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(iters):
comm.send(peer, send_buf)
comm.recv(peer, recv_buf)
if use_direct_ipc:
req = comm.irecv(peer, recv_buf)
_send()
if use_direct_ipc:
comm.wait_finish(req)
else:
comm.recv(peer, recv_buf)
torch.cuda.synchronize()
elapsed = time.perf_counter() - t0
comm.unpin_tensor(send_buf)
Expand Down Expand Up @@ -137,6 +180,15 @@ def _exchange_local_gpu_idx(local_gpu_idx: int, rank: int) -> int:
return remote_gpu_idx


def _exchange_peer_i64(local_value: int, rank: int) -> int:
if rank == 0:
_send_i64(local_value, dst=1)
return _recv_i64(src=1)
remote_value = _recv_i64(src=0)
_send_i64(local_value, dst=0)
return remote_value


def _create_uccl_endpoint(local_gpu_idx: int, rank: int):
# Support multiple p2p python binding signatures.
for num_cpus in (rank, 0):
Expand Down
Loading
Loading