From 532fc529504cdb6fd340df0038f2a7e006e7f333 Mon Sep 17 00:00:00 2001 From: laochonlam Date: Sun, 22 Feb 2026 00:55:31 +0000 Subject: [PATCH 01/19] [EP] Add batched low-latency RDMA send path Add batched RDMA send handling and related low-latency path updates, including dispatch-side token-count signaling and receiver-side immediate data handling for batched sends. --- ep/bench/buffer.py | 34 +++++++++ ep/include/ep_config.hpp | 12 ++++ ep/include/uccl_ibgda.cuh | 10 ++- ep/src/internode_ll.cu | 141 +++++++++++++++++++++++++++++++------- ep/src/rdma.cpp | 9 ++- 5 files changed, 179 insertions(+), 27 deletions(-) diff --git a/ep/bench/buffer.py b/ep/bench/buffer.py index b49a528f9..e2e5c65a1 100644 --- a/ep/bench/buffer.py +++ b/ep/bench/buffer.py @@ -281,6 +281,24 @@ def low_latency_dispatch( hidden=x.shape[1], num_experts=num_experts, ) + + # DEBUG: Print data before CUDA kernel launch + if os.environ.get("DEBUG_DISPATCH", "0") == "1": + print(f"\n{'='*60}", flush=True) + print(f"[DEBUG] low_latency_dispatch - BEFORE CUDA kernel", flush=True) + print(f"{'='*60}", flush=True) + print(f" rank: {self.rank}", flush=True) + print(f" x.shape: {x.shape}, dtype: {x.dtype}", flush=True) + print(f" x.data_ptr: {hex(x.data_ptr())}", flush=True) + print(f" x[:3, :8]:\n{x[:3, :8]}", flush=True) + print(f" topk_idx.shape: {topk_idx.shape}, dtype: {topk_idx.dtype}", flush=True) + print(f" topk_idx[:5]:\n{topk_idx[:5]}", flush=True) + print(f" num_max_dispatch_tokens_per_rank: {num_max_dispatch_tokens_per_rank}", flush=True) + print(f" num_experts: {num_experts}", flush=True) + print(f" use_fp8: {use_fp8}, round_scale: {round_scale}, use_ue8m0: {use_ue8m0}", flush=True) + print(f" async_finish: {async_finish}, return_recv_hook: {return_recv_hook}", flush=True) + print(f"{'='*60}\n", flush=True) + ( packed_recv_x, packed_recv_x_scales, @@ -309,6 +327,22 @@ def low_latency_dispatch( x.size(1), num_experts, ) + + # DEBUG: Print data after CUDA kernel returns + if os.environ.get("DEBUG_DISPATCH", "0") == "1": + import torch + torch.cuda.synchronize() # Wait for kernel to complete + print(f"\n{'='*60}", flush=True) + print(f"[DEBUG] low_latency_dispatch - AFTER CUDA kernel", flush=True) + print(f"{'='*60}", flush=True) + print(f" packed_recv_x.shape: {packed_recv_x.shape}, dtype: {packed_recv_x.dtype}", flush=True) + print(f" packed_recv_count: {packed_recv_count}", flush=True) + print(f" packed_recv_src_info.shape: {packed_recv_src_info.shape}", flush=True) + print(f" packed_recv_layout_range.shape: {packed_recv_layout_range.shape}", flush=True) + if packed_recv_x_scales is not None: + print(f" packed_recv_x_scales.shape: {packed_recv_x_scales.shape}", flush=True) + print(f"{'='*60}\n", flush=True) + tensors_to_record = ( x, topk_idx, diff --git a/ep/include/ep_config.hpp b/ep/include/ep_config.hpp index 451b3d27b..082fcdf29 100644 --- a/ep/include/ep_config.hpp +++ b/ep/include/ep_config.hpp @@ -196,8 +196,20 @@ struct LowLatencyLayout { size_t num_bytes_per_combine_msg = hidden * sizeof(nv_bfloat16); // Send buffer +#ifdef LAM_DEV + // Lam: Buffer layout for batched RDMA sends: + // ┌─────────────────────────────────┬──────────────────────────────────────────────┐ + // │ Temp buffer (offset 0) │ RDMA batch buffer (offset num_max_tokens) │ + // │ rdma_x[token_idx] │ rdma_x[num_max_tokens + expert*max + slot] │ + // │ Size: num_max_tokens * msg_size │ Size: num_experts * num_max_tokens * msg_size│ + // └─────────────────────────────────┴──────────────────────────────────────────────┘ + // Flow: FP8 cast -> temp buffer -> copy to rdma_batch_buffer -> batch RDMA send + size_t dispatch_send_buffer_bytes = + (num_experts + 1) * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; +#else size_t dispatch_send_buffer_bytes = num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; +#endif size_t combine_send_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg; diff --git a/ep/include/uccl_ibgda.cuh b/ep/include/uccl_ibgda.cuh index 41b48d1cd..753a5e63f 100644 --- a/ep/include/uccl_ibgda.cuh +++ b/ep/include/uccl_ibgda.cuh @@ -29,7 +29,7 @@ __device__ __forceinline__ void nvshmemi_ibgda_put_nbi_warp( int expert_idx, int lane_id, int message_idx, uint64_t const* d2h_channel_addrs, int num_d2h_channel_addrs, bool is_combine, int low_latency_buffer_idx = 0, uint64_t atomic_offset = 0, - uint64_t atomic_val = 0) { + uint64_t atomic_val = 0, int num_tokens = 1) { // NOTE(MaoZiming): different from the nvshmemi_ibgda_put_nbi_warp in // ibgda_device.cuh, we don't do warp-cooperation. if (lane_id != 0) return; @@ -67,6 +67,10 @@ __device__ __forceinline__ void nvshmemi_ibgda_put_nbi_warp( cmd.atomic_val = atomic_val; } else { cmd.expert_idx = expert_idx; + // Low-latency WRITE: use atomic_val byte for num_tokens (1..255). + cmd.atomic_val = (num_tokens <= 0 || num_tokens > 255) + ? 1 + : static_cast(num_tokens); } h->atomic_set_and_commit(cmd, &slot); } @@ -115,6 +119,10 @@ __device__ __forceinline__ void nvshmemi_ibgda_put_nbi_warp( cmd.atomic_val = atomic_val; } else { cmd.expert_idx = expert_idx; + // Low-latency WRITE: use atomic_val byte for num_tokens (1..255). + cmd.atomic_val = (num_tokens <= 0 || num_tokens > 255) + ? 1 + : static_cast(num_tokens); } h->atomic_set_and_commit(cmd, &slot); break; diff --git a/ep/src/internode_ll.cu b/ep/src/internode_ll.cu index e8c352af5..2a57e20ca 100644 --- a/ep/src/internode_ll.cu +++ b/ep/src/internode_ll.cu @@ -12,6 +12,9 @@ namespace cg = cooperative_groups; namespace uccl { namespace internode_ll { +// Lam: Global lock for debug printing (ensures printf calls don't interleave) +__device__ int g_print_lock = 0; + #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) constexpr int kNumMaxWarpGroups = 16; #else @@ -53,6 +56,7 @@ __global__ __launch_bounds__(1024, 1) void dispatch( int64_t* dispatch_wait_recv_cost_stats, void* rdma_recv_x, int* rdma_recv_count, void* rdma_x, void const* x, int64_t const* topk_idx, int* atomic_counter_per_expert, int* atomic_finish_counter_per_expert, + int* atomic_send_counter_per_expert, int* next_clean, int64_t* next_clean_second, int num_next_clean_int, int num_tokens, int num_max_dispatch_tokens_per_rank, int num_topk, int num_experts, int rank, int num_ranks, int num_warp_groups, @@ -63,6 +67,11 @@ __global__ __launch_bounds__(1024, 1) void dispatch( void* atomic_buffer_ptr = nullptr, int64_t* rdma_recv_count_internode = nullptr, int* grid_sync_barrier_ptr = nullptr) { +// #ifdef LAM_DEV + // if (blockIdx.x == 0 && threadIdx.x == 0) { + // printf("[LAM_DEV] dispatch called\n"); + // } +// #endif auto const sm_id = static_cast(blockIdx.x); auto const thread_id = static_cast(threadIdx.x); auto const warp_id = thread_id / WARP_SIZE, lane_id = get_lane_id(); @@ -98,6 +107,11 @@ __global__ __launch_bounds__(1024, 1) void dispatch( // Expert counts __shared__ int shared_num_tokens_sent_per_expert[kNumMaxWarpGroups]; + // Lam: Send slots for each topk destination (for batched send buffer layout) + constexpr int kNumMaxTopK = 9; + __shared__ int shared_send_slots[kNumMaxTopK]; + __shared__ int shared_dst_experts[kNumMaxTopK]; + #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) // initialize barrier amd::barrier_init(1); @@ -136,6 +150,20 @@ __global__ __launch_bounds__(1024, 1) void dispatch( : -1; thread_id == 0 ? (*rdma_x_src_idx = token_idx) : 0; + // Lam: Allocate send slots for each topk destination + // Each warp (warp_id < num_topk) allocates a slot for its destination expert + if (warp_id < num_topk && lane_id == 0) { + shared_dst_experts[warp_id] = dst_expert_idx; + if (dst_expert_idx >= 0) { + shared_send_slots[warp_id] = + atomicAdd(atomic_send_counter_per_expert + dst_expert_idx, 1); + } else { + shared_send_slots[warp_id] = -1; + } + } + // Sync to make shared_send_slots visible to all threads + sync_barrier_1((num_warps - 1) * WARP_SIZE); + // FP8 cast #pragma unroll for (int i = thread_id; i < hidden_bf16_int4; i += num_threads) { @@ -221,15 +249,15 @@ __global__ __launch_bounds__(1024, 1) void dispatch( dst_rank, max_nvl_peers, 0) : 0; if (dst_p2p_ptr == 0) { - __threadfence_system(); - uccl::nvshmemi_ibgda_put_nbi_warp( - dst_ptr - reinterpret_cast(rdma_buffer_ptr), - src_ptr - reinterpret_cast(rdma_buffer_ptr), - num_bytes_per_msg, dst_rank, - /*warp_id=*/dst_expert_local_idx, // NOTE(Yang): for selecting - // rb. - lane_id, slot_idx, d2h_channel_addrs, num_d2h_channel_addrs, - false, low_latency_buffer_idx); + // Lam: IBGDA -> copy temp to rdma_batch_buffer, batch send later + auto const lam_slot = shared_send_slots[warp_id]; + auto const batch_buf_offset = num_max_dispatch_tokens_per_rank * num_bytes_per_msg; + auto const batch_buf_ptr = static_cast(rdma_x) + batch_buf_offset + + (dst_expert_idx * num_max_dispatch_tokens_per_rank + lam_slot) * num_bytes_per_msg; + auto const* src_int4_ptr = reinterpret_cast(rdma_x_src_idx); + auto* batch_buf_int4_ptr = reinterpret_cast(batch_buf_ptr); + UNROLLED_WARP_COPY(8, lane_id, num_int4_per_msg, batch_buf_int4_ptr, + src_int4_ptr, ld_nc_global, st_na_global); } else { // Intra-node: use direct memory copy via IPC auto const* src_int4_ptr = reinterpret_cast(src_ptr); @@ -288,6 +316,78 @@ __global__ __launch_bounds__(1024, 1) void dispatch( } } __syncthreads(); + + // Lam: Grid-wide sync before batch send phase. + // __syncthreads() only syncs within a single thread block (SM). + // The token loop distributes tokens round-robin across SMs + // (token_idx = sm_id, stepping by num_sms). When num_tokens < num_sms, + // most SMs skip the token loop and pass __syncthreads() immediately, + // while the SMs processing tokens are still writing to the batch buffer + // and incrementing atomic_send_counter_per_expert. + // Without grid sync, the batch send phase can read a stale/partial + // counter and send fewer tokens than actually produced, causing the + // receiver to hang waiting for data that never arrives. +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) + amd::grid_sync(grid_sync_barrier_ptr, num_sms); +#else + cg::this_grid().sync(); +#endif + + // Lam: Batch RDMA send phase - send entire expert buffer in ONE IBGDA call + // Each warp group handles one expert (only first sub_warp does the send) + if (responsible_expert_idx < num_experts && sub_warp_id == 0) { + auto const dst_rank = responsible_expert_idx / num_local_experts; + auto const dst_expert_local_idx = responsible_expert_idx % num_local_experts; + + // Check if this destination is inter-node (needs IBGDA batch send) + // IPC destinations were already sent in the token loop + auto const test_dst_ptr = reinterpret_cast(rdma_recv_x); + auto const dst_p2p_ptr = + ipc_rdma_base_ptrs + ? uccl::get_ipc_p2p_ptr(test_dst_ptr, ipc_rdma_base_ptrs, rank, + dst_rank, max_nvl_peers, 0) + : 0; + + if (dst_p2p_ptr == 0) { + // Inter-node: batch send ALL tokens for this expert in ONE call + auto const num_tokens_to_send = + atomic_send_counter_per_expert[responsible_expert_idx]; + + if (num_tokens_to_send > 0) { + auto const batch_buf_offset = + num_max_dispatch_tokens_per_rank * num_bytes_per_msg; + // Source: start of this expert's batch buffer (contiguous) + auto const batch_buf_ptr = + static_cast(rdma_x) + batch_buf_offset + + responsible_expert_idx * num_max_dispatch_tokens_per_rank * + num_bytes_per_msg; + auto const src_ptr = reinterpret_cast(batch_buf_ptr); + // Destination: start of this expert's recv buffer on remote rank + auto const dst_ptr = + reinterpret_cast(rdma_recv_x) + + dst_expert_local_idx * num_ranks * + num_max_dispatch_tokens_per_rank * num_bytes_per_msg + + rank * num_max_dispatch_tokens_per_rank * num_bytes_per_msg; + // Total bytes: all tokens for this expert + auto const total_bytes = num_tokens_to_send * num_bytes_per_msg; + + __threadfence_system(); + + uccl::nvshmemi_ibgda_put_nbi_warp( + dst_ptr - reinterpret_cast(rdma_buffer_ptr), + src_ptr - reinterpret_cast(rdma_buffer_ptr), + total_bytes, dst_rank, + /*warp_id=*/dst_expert_local_idx, // NOTE(Yang): for selecting rb. + lane_id, /*slot=*/0, d2h_channel_addrs, num_d2h_channel_addrs, + false, low_latency_buffer_idx, 0, 0, num_tokens_to_send); + + } + } + // IPC: already sent in the token loop, nothing to do here + } + + __threadfence_system(); // Ensure batch sends are visible before count sends + // Issue count sends if (responsible_expert_idx < num_experts and sub_warp_id == 0 and lane_id == 0) { @@ -329,10 +429,12 @@ __global__ __launch_bounds__(1024, 1) void dispatch( st_release_sys_global(reinterpret_cast(dst_p2p_ptr), -num_tokens_sent - 1); } + // Clean workspace for next use atomic_counter_per_expert[responsible_expert_idx] = 0; atomic_finish_counter_per_expert[responsible_expert_idx] = 0; + atomic_send_counter_per_expert[responsible_expert_idx] = 0; // Clean `packed_recv_count` if (dst_rank == 0) packed_recv_count[dst_expert_local_idx] = 0; } @@ -440,13 +542,6 @@ LOW_LATENCY_DISPATCH_RECV: num_recv_tokens_internode != 0 ? -num_recv_tokens_internode - 1 : 0; num_recv_tokens_ipc = num_recv_tokens_ipc != 0 ? -num_recv_tokens_ipc - 1 : 0; - // printf( - // "num_recv_tokens_internode: %d, num_recv_tokens_ipc: %d, src_rank:" - // "%d, rank: %d, max_nvl_peers: %d, responsible_expert_idx: %d," - // "num_experts: %d, num_local_experts: %d\n", - // num_recv_tokens_internode, num_recv_tokens_ipc, src_rank, rank, - // max_nvl_peers, responsible_expert_idx, num_experts, - // num_local_experts); num_recv_tokens = num_recv_tokens_internode + num_recv_tokens_ipc; recv_token_begin_idx = atomicAdd(packed_recv_count + local_expert_idx, num_recv_tokens); @@ -522,8 +617,6 @@ LOW_LATENCY_DISPATCH_RECV: } } } - // if (blockIdx.x == 0 && threadIdx.x == 0) - // printf("[dispatch] RECV finished\n"); } } @@ -556,8 +649,10 @@ void dispatch(void* packed_recv_x, void* packed_recv_x_scales, auto atomic_counter_per_expert = static_cast(workspace); auto atomic_finish_counter_per_expert = atomic_counter_per_expert + num_experts; - auto grid_sync_barrier_ptr = atomic_finish_counter_per_expert + num_experts; - EP_HOST_ASSERT((num_experts * 2 + 1) * sizeof(int) <= NUM_WORKSPACE_BYTES); + auto atomic_send_counter_per_expert = + atomic_finish_counter_per_expert + num_experts; + auto grid_sync_barrier_ptr = atomic_send_counter_per_expert + num_experts; + EP_HOST_ASSERT((num_experts * 3 + 1) * sizeof(int) <= NUM_WORKSPACE_BYTES); // FP8 checks if (use_ue8m0) @@ -575,6 +670,7 @@ void dispatch(void* packed_recv_x, void* packed_recv_x_scales, cumulative_local_expert_recv_stats, dispatch_wait_recv_cost_stats, \ rdma_recv_x, rdma_recv_count, rdma_x, x, topk_idx, \ atomic_counter_per_expert, atomic_finish_counter_per_expert, \ + atomic_send_counter_per_expert, \ next_clean, next_clean_second, num_next_clean_int, num_tokens, \ num_max_dispatch_tokens_per_rank, num_topk, num_experts, rank, \ num_ranks, num_warp_groups, num_warps_per_group, round_scale, phases, \ @@ -946,8 +1042,6 @@ __global__ __launch_bounds__(1024, 1) void combine( // Receiving phase LOW_LATENCY_COMBINE_RECV: if ((phases & LOW_LATENCY_RECV_PHASE) == 0) { - // if (blockIdx.x == 0 && threadIdx.x == 0) - // printf("[combine] SEND finished\n"); return; } // Wait all ranks to arrive @@ -1060,8 +1154,6 @@ LOW_LATENCY_COMBINE_RECV: token_idx * hidden_bf16_int4)[hidden_idx] = combined_int4; } - // if (blockIdx.x == 0 && threadIdx.x == 0) - // printf("[combine] RECV finished\n"); } #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) @@ -1103,7 +1195,6 @@ void combine(void* combined_x, void* rdma_recv_x, int* rdma_recv_flag, constexpr int kNumTMABytesPerWarp = 12 * (512 + 16); int const smem_size = kNumTMABytesPerWarp * num_warps; - // printf("Combine launched\n"); #define COMBINE_LAUNCH_CASE(hidden) \ { \ diff --git a/ep/src/rdma.cpp b/ep/src/rdma.cpp index cfa24d96c..f28bfb4aa 100644 --- a/ep/src/rdma.cpp +++ b/ep/src/rdma.cpp @@ -1307,9 +1307,16 @@ static void post_rdma_async_batched_fast_mode( get_low_latency(cmd.cmd_type)}; #endif #ifdef USE_RECEIVER_BARRIER + // Lam: Low-latency: num_tokens from cmd.atomic_val (GPU sets it); else 1. + // Use atomic_val whenever GPU set it (non-zero), not only when + // get_low_latency(cmd_type); dispatch can also set atomic_val. + uint32_t num_tokens_imm = + cmd.atomic_val ? static_cast(cmd.atomic_val) : 1u; + // get_is_combine(cmd.cmd_type) ? printf("Receiving this combine imm? num_tokens_imm: %d, cmd.atomic_val: %d\n", num_tokens_imm, cmd.atomic_val) : printf("Receiving this dispatch imm? num_tokens_imm: %d, cmd.atomic_val: %d\n", num_tokens_imm, cmd.atomic_val); + // fflush(stdout); uint32_t imm = WriteImm::Pack(get_is_combine(cmd.cmd_type), get_low_latency(cmd.cmd_type), - cmd.expert_idx, 1, my_rank) + cmd.expert_idx, num_tokens_imm, my_rank) .GetImmData(); ibv_wr_rdma_write_imm(qpx, ctx->remote_rkey, remote_addr, htonl(imm)); #else From 2cbed00192061f1db3924be2f59f0bae30499468 Mon Sep 17 00:00:00 2001 From: laochonlam Date: Sun, 22 Feb 2026 07:25:05 +0000 Subject: [PATCH 02/19] Cleanup code and comments --- ep/bench/buffer.py | 34 ---------------------------------- ep/include/ep_config.hpp | 20 ++++++++------------ ep/include/uccl_ibgda.cuh | 10 ++++------ ep/src/internode_ll.cu | 20 ++++++++++++++------ ep/src/rdma.cpp | 5 ----- 5 files changed, 26 insertions(+), 63 deletions(-) diff --git a/ep/bench/buffer.py b/ep/bench/buffer.py index b4afc3f23..14ef8cc34 100644 --- a/ep/bench/buffer.py +++ b/ep/bench/buffer.py @@ -284,24 +284,6 @@ def low_latency_dispatch( hidden=x.shape[1], num_experts=num_experts, ) - - # DEBUG: Print data before CUDA kernel launch - if os.environ.get("DEBUG_DISPATCH", "0") == "1": - print(f"\n{'='*60}", flush=True) - print(f"[DEBUG] low_latency_dispatch - BEFORE CUDA kernel", flush=True) - print(f"{'='*60}", flush=True) - print(f" rank: {self.rank}", flush=True) - print(f" x.shape: {x.shape}, dtype: {x.dtype}", flush=True) - print(f" x.data_ptr: {hex(x.data_ptr())}", flush=True) - print(f" x[:3, :8]:\n{x[:3, :8]}", flush=True) - print(f" topk_idx.shape: {topk_idx.shape}, dtype: {topk_idx.dtype}", flush=True) - print(f" topk_idx[:5]:\n{topk_idx[:5]}", flush=True) - print(f" num_max_dispatch_tokens_per_rank: {num_max_dispatch_tokens_per_rank}", flush=True) - print(f" num_experts: {num_experts}", flush=True) - print(f" use_fp8: {use_fp8}, round_scale: {round_scale}, use_ue8m0: {use_ue8m0}", flush=True) - print(f" async_finish: {async_finish}, return_recv_hook: {return_recv_hook}", flush=True) - print(f"{'='*60}\n", flush=True) - ( packed_recv_x, packed_recv_x_scales, @@ -330,22 +312,6 @@ def low_latency_dispatch( x.size(1), num_experts, ) - - # DEBUG: Print data after CUDA kernel returns - if os.environ.get("DEBUG_DISPATCH", "0") == "1": - import torch - torch.cuda.synchronize() # Wait for kernel to complete - print(f"\n{'='*60}", flush=True) - print(f"[DEBUG] low_latency_dispatch - AFTER CUDA kernel", flush=True) - print(f"{'='*60}", flush=True) - print(f" packed_recv_x.shape: {packed_recv_x.shape}, dtype: {packed_recv_x.dtype}", flush=True) - print(f" packed_recv_count: {packed_recv_count}", flush=True) - print(f" packed_recv_src_info.shape: {packed_recv_src_info.shape}", flush=True) - print(f" packed_recv_layout_range.shape: {packed_recv_layout_range.shape}", flush=True) - if packed_recv_x_scales is not None: - print(f" packed_recv_x_scales.shape: {packed_recv_x_scales.shape}", flush=True) - print(f"{'='*60}\n", flush=True) - tensors_to_record = ( x, topk_idx, diff --git a/ep/include/ep_config.hpp b/ep/include/ep_config.hpp index 082fcdf29..e4e96bcbc 100644 --- a/ep/include/ep_config.hpp +++ b/ep/include/ep_config.hpp @@ -196,20 +196,16 @@ struct LowLatencyLayout { size_t num_bytes_per_combine_msg = hidden * sizeof(nv_bfloat16); // Send buffer -#ifdef LAM_DEV - // Lam: Buffer layout for batched RDMA sends: - // ┌─────────────────────────────────┬──────────────────────────────────────────────┐ - // │ Temp buffer (offset 0) │ RDMA batch buffer (offset num_max_tokens) │ - // │ rdma_x[token_idx] │ rdma_x[num_max_tokens + expert*max + slot] │ - // │ Size: num_max_tokens * msg_size │ Size: num_experts * num_max_tokens * msg_size│ - // └─────────────────────────────────┴──────────────────────────────────────────────┘ - // Flow: FP8 cast -> temp buffer -> copy to rdma_batch_buffer -> batch RDMA send + // Buffer layout for RDMA sends, used by the batched RDMA-send path in the dispatch-LL kernel. + // ┌──────────────────────────────────────────┬──────────────────────────────────────────────────────────┐ + // │ Temp buffer (offset 0) │ Per-expert RDMA batch buffer (offset num_max_token) │ + // │ rdma_x[token_idx] │ rdma_x[num_max_token + expert * num_max_token + slot] │ + // │ Size: num_max_token * msg_size │ Size: num_experts * num_max_token * msg_size │ + // └──────────────────────────────────────────┴──────────────────────────────────────────────────────────┘ + // Flow: (optional FP8 cast) -> temp buffer -> copy to per-expert batch buffer -> batched RDMA send + // TODO: Support per-GPU destination batching in this path. size_t dispatch_send_buffer_bytes = (num_experts + 1) * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; -#else - size_t dispatch_send_buffer_bytes = - num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; -#endif size_t combine_send_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg; diff --git a/ep/include/uccl_ibgda.cuh b/ep/include/uccl_ibgda.cuh index 753a5e63f..424fddec3 100644 --- a/ep/include/uccl_ibgda.cuh +++ b/ep/include/uccl_ibgda.cuh @@ -68,9 +68,8 @@ __device__ __forceinline__ void nvshmemi_ibgda_put_nbi_warp( } else { cmd.expert_idx = expert_idx; // Low-latency WRITE: use atomic_val byte for num_tokens (1..255). - cmd.atomic_val = (num_tokens <= 0 || num_tokens > 255) - ? 1 - : static_cast(num_tokens); + EP_DEVICE_ASSERT(num_tokens > 0 && num_tokens <= 255); + cmd.atomic_val = static_cast(num_tokens); } h->atomic_set_and_commit(cmd, &slot); } @@ -120,9 +119,8 @@ __device__ __forceinline__ void nvshmemi_ibgda_put_nbi_warp( } else { cmd.expert_idx = expert_idx; // Low-latency WRITE: use atomic_val byte for num_tokens (1..255). - cmd.atomic_val = (num_tokens <= 0 || num_tokens > 255) - ? 1 - : static_cast(num_tokens); + EP_DEVICE_ASSERT(num_tokens > 0 && num_tokens <= 255); + cmd.atomic_val = static_cast(num_tokens); } h->atomic_set_and_commit(cmd, &slot); break; diff --git a/ep/src/internode_ll.cu b/ep/src/internode_ll.cu index 728e2b844..303883916 100644 --- a/ep/src/internode_ll.cu +++ b/ep/src/internode_ll.cu @@ -67,11 +67,6 @@ __global__ __launch_bounds__(1024, 1) void dispatch( void* atomic_buffer_ptr = nullptr, int64_t* rdma_recv_count_internode = nullptr, int* grid_sync_barrier_ptr = nullptr) { -// #ifdef LAM_DEV - // if (blockIdx.x == 0 && threadIdx.x == 0) { - // printf("[LAM_DEV] dispatch called\n"); - // } -// #endif auto const sm_id = static_cast(blockIdx.x); auto const thread_id = static_cast(threadIdx.x); auto const warp_id = thread_id / WARP_SIZE, lane_id = get_lane_id(); @@ -443,6 +438,8 @@ __global__ __launch_bounds__(1024, 1) void dispatch( // Receiving phase LOW_LATENCY_DISPATCH_RECV: if ((phases & LOW_LATENCY_RECV_PHASE) == 0) { + // if (blockIdx.x == 0 && threadIdx.x == 0) + // printf("[combine] SEND finished\n"); return; } @@ -542,6 +539,13 @@ LOW_LATENCY_DISPATCH_RECV: num_recv_tokens_internode != 0 ? -num_recv_tokens_internode - 1 : 0; num_recv_tokens_ipc = num_recv_tokens_ipc != 0 ? -num_recv_tokens_ipc - 1 : 0; + // printf( + // "num_recv_tokens_internode: %d, num_recv_tokens_ipc: %d, src_rank:" + // "%d, rank: %d, max_nvl_peers: %d, responsible_expert_idx: %d," + // "num_experts: %d, num_local_experts: %d\n", + // num_recv_tokens_internode, num_recv_tokens_ipc, src_rank, rank, + // max_nvl_peers, responsible_expert_idx, num_experts, + // num_local_experts); num_recv_tokens = num_recv_tokens_internode + num_recv_tokens_ipc; recv_token_begin_idx = atomicAdd(packed_recv_count + local_expert_idx, num_recv_tokens); @@ -617,6 +621,8 @@ LOW_LATENCY_DISPATCH_RECV: } } } + // if (blockIdx.x == 0 && threadIdx.x == 0) + // printf("[dispatch] RECV finished\n"); } } @@ -1153,7 +1159,9 @@ LOW_LATENCY_COMBINE_RECV: (static_cast(combined_x) + token_idx * hidden_bf16_int4)[hidden_idx] = combined_int4; } - + + // if (blockIdx.x == 0 && threadIdx.x == 0) + // printf("[combine] RECV finished\n"); } #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) diff --git a/ep/src/rdma.cpp b/ep/src/rdma.cpp index 4f42072e6..101388796 100644 --- a/ep/src/rdma.cpp +++ b/ep/src/rdma.cpp @@ -1399,13 +1399,8 @@ static void post_rdma_async_batched_fast_mode( get_low_latency(cmd.cmd_type)}; #endif #ifdef USE_RECEIVER_BARRIER - // Lam: Low-latency: num_tokens from cmd.atomic_val (GPU sets it); else 1. - // Use atomic_val whenever GPU set it (non-zero), not only when - // get_low_latency(cmd_type); dispatch can also set atomic_val. uint32_t num_tokens_imm = cmd.atomic_val ? static_cast(cmd.atomic_val) : 1u; - // get_is_combine(cmd.cmd_type) ? printf("Receiving this combine imm? num_tokens_imm: %d, cmd.atomic_val: %d\n", num_tokens_imm, cmd.atomic_val) : printf("Receiving this dispatch imm? num_tokens_imm: %d, cmd.atomic_val: %d\n", num_tokens_imm, cmd.atomic_val); - // fflush(stdout); uint32_t imm = WriteImm::Pack(get_is_combine(cmd.cmd_type), get_low_latency(cmd.cmd_type), cmd.expert_idx, num_tokens_imm, my_rank) From 38a54df357c2c841e54b2a49eb33d070eae4f856 Mon Sep 17 00:00:00 2001 From: laochonlam Date: Sun, 22 Feb 2026 07:30:05 +0000 Subject: [PATCH 03/19] Cleanup code and comments --- ep/src/internode_ll.cu | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ep/src/internode_ll.cu b/ep/src/internode_ll.cu index 303883916..9b04b737d 100644 --- a/ep/src/internode_ll.cu +++ b/ep/src/internode_ll.cu @@ -1048,6 +1048,8 @@ __global__ __launch_bounds__(1024, 1) void combine( // Receiving phase LOW_LATENCY_COMBINE_RECV: if ((phases & LOW_LATENCY_RECV_PHASE) == 0) { + // if (blockIdx.x == 0 && threadIdx.x == 0) + // printf("[combine] SEND finished\n"); return; } // Wait all ranks to arrive @@ -1159,9 +1161,9 @@ LOW_LATENCY_COMBINE_RECV: (static_cast(combined_x) + token_idx * hidden_bf16_int4)[hidden_idx] = combined_int4; } - - // if (blockIdx.x == 0 && threadIdx.x == 0) - // printf("[combine] RECV finished\n"); + + // if (blockIdx.x == 0 && threadIdx.x == 0) + // printf("[combine] RECV finished\n"); } #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) From 247af5695f55634b6644972d5839023d82cb66b0 Mon Sep 17 00:00:00 2001 From: laochonlam Date: Sun, 22 Feb 2026 07:45:47 +0000 Subject: [PATCH 04/19] Cleanup code and comments --- ep/include/ep_config.hpp | 11 ++-- ep/src/internode_ll.cu | 107 ++++++++++++++++++--------------------- 2 files changed, 56 insertions(+), 62 deletions(-) diff --git a/ep/include/ep_config.hpp b/ep/include/ep_config.hpp index e4e96bcbc..958fef041 100644 --- a/ep/include/ep_config.hpp +++ b/ep/include/ep_config.hpp @@ -196,16 +196,19 @@ struct LowLatencyLayout { size_t num_bytes_per_combine_msg = hidden * sizeof(nv_bfloat16); // Send buffer - // Buffer layout for RDMA sends, used by the batched RDMA-send path in the dispatch-LL kernel. + // Buffer layout for RDMA sends, used by the batched RDMA-send path in the + // dispatch-LL kernel. // ┌──────────────────────────────────────────┬──────────────────────────────────────────────────────────┐ // │ Temp buffer (offset 0) │ Per-expert RDMA batch buffer (offset num_max_token) │ // │ rdma_x[token_idx] │ rdma_x[num_max_token + expert * num_max_token + slot] │ // │ Size: num_max_token * msg_size │ Size: num_experts * num_max_token * msg_size │ // └──────────────────────────────────────────┴──────────────────────────────────────────────────────────┘ - // Flow: (optional FP8 cast) -> temp buffer -> copy to per-expert batch buffer -> batched RDMA send + // Flow: (optional FP8 cast) -> temp buffer -> copy to per-expert batch + // buffer -> batched RDMA send // TODO: Support per-GPU destination batching in this path. - size_t dispatch_send_buffer_bytes = - (num_experts + 1) * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg; + size_t dispatch_send_buffer_bytes = (num_experts + 1) * + num_max_dispatch_tokens_per_rank * + num_bytes_per_dispatch_msg; size_t combine_send_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg; diff --git a/ep/src/internode_ll.cu b/ep/src/internode_ll.cu index 9b04b737d..981ab9014 100644 --- a/ep/src/internode_ll.cu +++ b/ep/src/internode_ll.cu @@ -12,9 +12,6 @@ namespace cg = cooperative_groups; namespace uccl { namespace internode_ll { -// Lam: Global lock for debug printing (ensures printf calls don't interleave) -__device__ int g_print_lock = 0; - #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) constexpr int kNumMaxWarpGroups = 16; #else @@ -56,13 +53,12 @@ __global__ __launch_bounds__(1024, 1) void dispatch( int64_t* dispatch_wait_recv_cost_stats, void* rdma_recv_x, int* rdma_recv_count, void* rdma_x, void const* x, int64_t const* topk_idx, int* atomic_counter_per_expert, int* atomic_finish_counter_per_expert, - int* atomic_send_counter_per_expert, - int* next_clean, int64_t* next_clean_second, int num_next_clean_int, - int num_tokens, int num_max_dispatch_tokens_per_rank, int num_topk, - int num_experts, int rank, int num_ranks, int num_warp_groups, - int num_warps_per_group, bool round_scale, int phases, - uint64_t const* d2h_channel_addrs, int num_d2h_channel_addrs, - int max_nvl_peers, int low_latency_buffer_idx, + int* atomic_send_counter_per_expert, int* next_clean, + int64_t* next_clean_second, int num_next_clean_int, int num_tokens, + int num_max_dispatch_tokens_per_rank, int num_topk, int num_experts, + int rank, int num_ranks, int num_warp_groups, int num_warps_per_group, + bool round_scale, int phases, uint64_t const* d2h_channel_addrs, + int num_d2h_channel_addrs, int max_nvl_peers, int low_latency_buffer_idx, void** ipc_rdma_base_ptrs = nullptr, void* rdma_buffer_ptr = nullptr, void* atomic_buffer_ptr = nullptr, int64_t* rdma_recv_count_internode = nullptr, @@ -102,7 +98,7 @@ __global__ __launch_bounds__(1024, 1) void dispatch( // Expert counts __shared__ int shared_num_tokens_sent_per_expert[kNumMaxWarpGroups]; - // Lam: Send slots for each topk destination (for batched send buffer layout) + // Global counter slots used for batching sends to each top-k destination. constexpr int kNumMaxTopK = 9; __shared__ int shared_send_slots[kNumMaxTopK]; __shared__ int shared_dst_experts[kNumMaxTopK]; @@ -145,8 +141,9 @@ __global__ __launch_bounds__(1024, 1) void dispatch( : -1; thread_id == 0 ? (*rdma_x_src_idx = token_idx) : 0; - // Lam: Allocate send slots for each topk destination - // Each warp (warp_id < num_topk) allocates a slot for its destination expert + // Allocate per-expert send slots for top-k destinations. + // Each warp (warp_id < num_topk) reserves one slot for its destination + // expert. if (warp_id < num_topk && lane_id == 0) { shared_dst_experts[warp_id] = dst_expert_idx; if (dst_expert_idx >= 0) { @@ -244,12 +241,19 @@ __global__ __launch_bounds__(1024, 1) void dispatch( dst_rank, max_nvl_peers, 0) : 0; if (dst_p2p_ptr == 0) { - // Lam: IBGDA -> copy temp to rdma_batch_buffer, batch send later - auto const lam_slot = shared_send_slots[warp_id]; - auto const batch_buf_offset = num_max_dispatch_tokens_per_rank * num_bytes_per_msg; - auto const batch_buf_ptr = static_cast(rdma_x) + batch_buf_offset + - (dst_expert_idx * num_max_dispatch_tokens_per_rank + lam_slot) * num_bytes_per_msg; - auto const* src_int4_ptr = reinterpret_cast(rdma_x_src_idx); + // For inter-node send path, copy temp data to the per-expert batch + // buffer, then issue a batched RDMA send. + // TODO: This has an extra temp->per-expert copy in the FP8 path. + // FP8 output is written to the temp buffer first, then copied here. + auto const slot_idx = shared_send_slots[warp_id]; + auto const batch_buf_offset = + num_max_dispatch_tokens_per_rank * num_bytes_per_msg; + auto const batch_buf_ptr = + static_cast(rdma_x) + batch_buf_offset + + (dst_expert_idx * num_max_dispatch_tokens_per_rank + slot_idx) * + num_bytes_per_msg; + auto const* src_int4_ptr = + reinterpret_cast(rdma_x_src_idx); auto* batch_buf_int4_ptr = reinterpret_cast(batch_buf_ptr); UNROLLED_WARP_COPY(8, lane_id, num_int4_per_msg, batch_buf_int4_ptr, src_int4_ptr, ld_nc_global, st_na_global); @@ -312,27 +316,19 @@ __global__ __launch_bounds__(1024, 1) void dispatch( } __syncthreads(); - // Lam: Grid-wide sync before batch send phase. - // __syncthreads() only syncs within a single thread block (SM). - // The token loop distributes tokens round-robin across SMs - // (token_idx = sm_id, stepping by num_sms). When num_tokens < num_sms, - // most SMs skip the token loop and pass __syncthreads() immediately, - // while the SMs processing tokens are still writing to the batch buffer - // and incrementing atomic_send_counter_per_expert. - // Without grid sync, the batch send phase can read a stale/partial - // counter and send fewer tokens than actually produced, causing the - // receiver to hang waiting for data that never arrives. + // Grid-wide sync before batch-send. #if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) amd::grid_sync(grid_sync_barrier_ptr, num_sms); #else cg::this_grid().sync(); #endif - // Lam: Batch RDMA send phase - send entire expert buffer in ONE IBGDA call + // Batch RDMA send phase - send entire expert buffer in ONE IBGDA call // Each warp group handles one expert (only first sub_warp does the send) if (responsible_expert_idx < num_experts && sub_warp_id == 0) { auto const dst_rank = responsible_expert_idx / num_local_experts; - auto const dst_expert_local_idx = responsible_expert_idx % num_local_experts; + auto const dst_expert_local_idx = + responsible_expert_idx % num_local_experts; // Check if this destination is inter-node (needs IBGDA batch send) // IPC destinations were already sent in the token loop @@ -370,12 +366,11 @@ __global__ __launch_bounds__(1024, 1) void dispatch( uccl::nvshmemi_ibgda_put_nbi_warp( dst_ptr - reinterpret_cast(rdma_buffer_ptr), - src_ptr - reinterpret_cast(rdma_buffer_ptr), - total_bytes, dst_rank, + src_ptr - reinterpret_cast(rdma_buffer_ptr), total_bytes, + dst_rank, /*warp_id=*/dst_expert_local_idx, // NOTE(Yang): for selecting rb. lane_id, /*slot=*/0, d2h_channel_addrs, num_d2h_channel_addrs, false, low_latency_buffer_idx, 0, 0, num_tokens_to_send); - } } // IPC: already sent in the token loop, nothing to do here @@ -424,11 +419,9 @@ __global__ __launch_bounds__(1024, 1) void dispatch( st_release_sys_global(reinterpret_cast(dst_p2p_ptr), -num_tokens_sent - 1); } - // Clean workspace for next use atomic_counter_per_expert[responsible_expert_idx] = 0; atomic_finish_counter_per_expert[responsible_expert_idx] = 0; - atomic_send_counter_per_expert[responsible_expert_idx] = 0; // Clean `packed_recv_count` if (dst_rank == 0) packed_recv_count[dst_expert_local_idx] = 0; @@ -438,8 +431,6 @@ __global__ __launch_bounds__(1024, 1) void dispatch( // Receiving phase LOW_LATENCY_DISPATCH_RECV: if ((phases & LOW_LATENCY_RECV_PHASE) == 0) { - // if (blockIdx.x == 0 && threadIdx.x == 0) - // printf("[combine] SEND finished\n"); return; } @@ -664,26 +655,26 @@ void dispatch(void* packed_recv_x, void* packed_recv_x_scales, if (use_ue8m0) EP_HOST_ASSERT(round_scale and "UE8M0 SF requires `round_scale=True`"); -#define DISPATCH_LAUNCH_CASE(hidden) \ - { \ - auto dispatch_func = dispatch