From 9df9ce040b3d0a6f01f680d8903c01fd97de5ac1 Mon Sep 17 00:00:00 2001 From: Andrzej Kuriata Date: Tue, 24 Mar 2026 08:18:18 -0400 Subject: [PATCH 1/3] Congestion control chages * Moved CC algos to shared location. * In P2P added support for Timely and Swift. Signed-off-by: Andrzej Kuriata --- collective/rdma/eqds.h | 174 +------------------- collective/rdma/pcb.h | 4 +- collective/rdma/timing_wheel.h | 2 +- include/cc/cc_state.h | 104 ++++++++++++ include/cc/eqds.h | 193 +++++++++++++++++++++++ include/cc/link_bandwidth.h | 44 ++++++ {collective/rdma => include/cc}/swift.h | 4 +- {collective/rdma => include/cc}/timely.h | 4 +- p2p/rdma/rdma_connection.h | 25 ++- p2p/rdma/rdma_endpoint.h | 9 +- 10 files changed, 377 insertions(+), 186 deletions(-) create mode 100644 include/cc/cc_state.h create mode 100644 include/cc/eqds.h create mode 100644 include/cc/link_bandwidth.h rename {collective/rdma => include/cc}/swift.h (99%) rename {collective/rdma => include/cc}/timely.h (99%) diff --git a/collective/rdma/eqds.h b/collective/rdma/eqds.h index c01bf9ee9..bf7c89e16 100644 --- a/collective/rdma/eqds.h +++ b/collective/rdma/eqds.h @@ -12,6 +12,7 @@ #include "util/util.h" #include "util_buffpool.h" #include "util_timer.h" +#include #include #include #include @@ -56,179 +57,6 @@ class CreditChunkBuffPool : public BuffPool { ~CreditChunkBuffPool() = default; }; -struct active_item { - struct EQDSCC* eqds_cc; - struct list_head active_link; -}; - -struct idle_item { - struct EQDSCC* eqds_cc; - struct list_head idle_link; -}; - -typedef uint8_t PullQuanta; - -constexpr bool pullno_lt(PullQuanta a, PullQuanta b) { - return static_cast(a - b) < 0; -} -constexpr bool pullno_le(PullQuanta a, PullQuanta b) { - return static_cast(a - b) <= 0; -} -constexpr bool pullno_eq(PullQuanta a, PullQuanta b) { - return static_cast(a - b) == 0; -} -constexpr bool pullno_ge(PullQuanta a, PullQuanta b) { - return static_cast(a - b) >= 0; -} -constexpr bool pullno_gt(PullQuanta a, PullQuanta b) { - return static_cast(a - b) > 0; -} - -#define PULL_QUANTUM 16384 -#define PULL_SHIFT 14 - -static inline uint32_t unquantize(uint8_t pull_quanta) { - return (uint32_t)pull_quanta << PULL_SHIFT; -} - -static inline PullQuanta quantize_floor(uint32_t bytes) { - return bytes >> PULL_SHIFT; -} - -static inline PullQuanta quantize_ceil(uint32_t bytes) { - return (bytes + PULL_QUANTUM - 1) >> PULL_SHIFT; -} - -// Per-QP congestion control state for EQDS. -struct EQDSCC { - static constexpr PullQuanta INIT_PULL_QUANTA = 50; - // static constexpr uint32_t kEQDSMaxCwnd = 1000000; // Bytes - static constexpr uint32_t kEQDSMaxCwnd = 500000; // Bytes - - /********************************************************************/ - /************************ Sender-side states ************************/ - /********************************************************************/ - - // Last received highest credit in PullQuanta. - PullQuanta pull_ = INIT_PULL_QUANTA; - PullQuanta last_sent_pull_target_ = INIT_PULL_QUANTA; - // Receive request credit in PullQuanta, but consume it in bytes - uint32_t credit_pull_ = 0; - uint32_t credit_spec_ = kEQDSMaxCwnd; - bool in_speculating_ = true; - /********************************************************************/ - /*********************** Receiver-side states ***********************/ - /********************************************************************/ - - /***************** Shared between engine and pacer ******************/ - std::atomic highest_pull_target_; - - /*************************** Pacer only *****************************/ - PullQuanta latest_pull_; - struct active_item active_item; - struct idle_item idle_item; - /************************* No modification **************************/ - uint32_t fid_; - struct PacerCreditQPWrapper* pc_qpw_; - - inline uint32_t credit() { return credit_pull_ + credit_spec_; } - - // Called when transmitting a chunk. - // Return true if we can transmit the chunk. Otherwise, - // sender should pause sending this message until credit is received. - inline bool spend_credit(uint32_t chunk_size) { - if (credit_pull_ > 0) { - if (credit_pull_ > chunk_size) - credit_pull_ -= chunk_size; - else - credit_pull_ = 0; - return true; - } else if (in_speculating_ && credit_spec_ > 0) { - if (credit_spec_ > chunk_size) - credit_spec_ -= chunk_size; - else - credit_spec_ = 0; - return true; - } - - // let pull target can advance - if (credit_spec_ > chunk_size) - credit_spec_ -= chunk_size; - else - credit_spec_ = 0; - - return false; - } - - // Called when we receiving ACK or pull packet. - inline void stop_speculating() { in_speculating_ = false; } - - PullQuanta compute_pull_target(void* context, uint32_t chunk_size); - - inline bool handle_pull_target(PullQuanta pull_target) { - PullQuanta hpt = highest_pull_target_.load(); - if (pullno_gt(pull_target, hpt)) { - // Only we can increase the pull target. - highest_pull_target_.store(pull_target); - return true; - } - return false; - } - - inline bool handle_pull(PullQuanta pullno) { - if (pullno_gt(pullno, pull_)) { - PullQuanta extra_credit = pullno - pull_; - credit_pull_ += unquantize(extra_credit); - if (credit_pull_ > kEQDSMaxCwnd) { - credit_pull_ = kEQDSMaxCwnd; - } - pull_ = pullno; - return true; - } - return false; - } - - /// Helper functions called by pacer /// - - inline void set_fid(uint32_t fid) { fid_ = fid; } - - inline void set_pacer_credit_qpw(struct PacerCreditQPWrapper* pc_qpw) { - pc_qpw_ = pc_qpw; - } - - inline void init_active_item(void) { - INIT_LIST_HEAD(&active_item.active_link); - active_item.eqds_cc = this; - } - - inline void init_idle_item(void) { - INIT_LIST_HEAD(&idle_item.idle_link); - idle_item.eqds_cc = this; - } - - inline PullQuanta backlog() { - auto hpt = highest_pull_target_.load(); - if (pullno_gt(hpt, latest_pull_)) { - return hpt - latest_pull_; - } else { - return 0; - } - } - - inline bool idle_credit_enough() { - PullQuanta idle_cumulate_credit; - auto hpt = highest_pull_target_.load(); - - if (pullno_ge(hpt, latest_pull_)) { - idle_cumulate_credit = 0; - } else { - idle_cumulate_credit = latest_pull_ - hpt; - } - - return idle_cumulate_credit >= quantize_floor(kEQDSMaxCwnd); - } -}; - class EQDSChannel { static constexpr uint32_t kChannelSize = 2048; diff --git a/collective/rdma/pcb.h b/collective/rdma/pcb.h index 305b7e884..647e5371e 100644 --- a/collective/rdma/pcb.h +++ b/collective/rdma/pcb.h @@ -1,11 +1,11 @@ #pragma once #include "eqds.h" -#include "swift.h" -#include "timely.h" #include "timing_wheel.h" #include "util/debug.h" #include "util/util.h" +#include +#include #include #include #include diff --git a/collective/rdma/timing_wheel.h b/collective/rdma/timing_wheel.h index f77e1a0e6..cd5425da2 100644 --- a/collective/rdma/timing_wheel.h +++ b/collective/rdma/timing_wheel.h @@ -10,10 +10,10 @@ #pragma once -#include "timely.h" #include "transport_config.h" #include "util/cb.h" #include "util_timer.h" +#include #include namespace uccl { diff --git a/include/cc/cc_state.h b/include/cc/cc_state.h new file mode 100644 index 000000000..9f645006a --- /dev/null +++ b/include/cc/cc_state.h @@ -0,0 +1,104 @@ +/** + * @file cc_state.h + * @brief Shared congestion control state for RoCE RDMA transports. + * Used by both P2P and EP subsystems. + */ + +#pragma once + +#include "cc/swift.h" +#include "cc/timely.h" +#include "util/timer.h" +#include +#include +#include +#include +#include +#include + +namespace uccl { +namespace cc { + +class CongestionControlState { + public: + enum class Mode : uint8_t { kNone, kTimely, kSwift }; + + CongestionControlState() = default; + + /// Construct with known parameters (e.g. P2P path where link BW is known). + CongestionControlState(Mode mode, double freq_ghz, double link_bw_bps) + : mode_(mode), + timely_(freq_ghz, link_bw_bps), + swift_(freq_ghz, link_bw_bps) {} + + /// Deferred init (e.g. EP path — after RDMA context is available). + void init(Mode mode, double freq_ghz, double link_bw_bps) { + mode_ = mode; + timely_ = timely::TimelyCC(freq_ghz, link_bw_bps); + swift_ = swift::SwiftCC(freq_ghz, link_bw_bps); + } + + /// Parse CC mode from an environment variable (case-insensitive). + static Mode parseMode(char const* env_var) { + auto* env = std::getenv(env_var); + if (env == nullptr) return Mode::kNone; + std::string mode(env); + std::transform(mode.begin(), mode.end(), mode.begin(), + [](unsigned char c) { return std::tolower(c); }); + if (mode == "timely") return Mode::kTimely; + if (mode == "swift") return Mode::kSwift; + return Mode::kNone; + } + + bool enabled() const { return mode_ != Mode::kNone; } + Mode mode() const { return mode_; } + + /// Record send timestamp for a given wr_id. + void recordSendTsc(uint64_t wr_id) { + if (mode_ == Mode::kNone) return; + send_tsc_[wr_id % kTscWindowSize].store(rdtsc(), std::memory_order_release); + } + + /// Update CC state on ACK. Call once per completed WR. + void onAck(uint64_t wr_id, size_t acked_bytes) { + if (mode_ == Mode::kNone) return; + + uint64_t send_tsc = + send_tsc_[wr_id % kTscWindowSize].load(std::memory_order_acquire); + if (send_tsc == 0) return; + + uint64_t now = rdtsc(); + size_t sample_rtt_tsc = now - send_tsc; + if (sample_rtt_tsc == 0) return; + + if (mode_ == Mode::kTimely) { + timely_.update_rate(now, sample_rtt_tsc, ::kEwmaAlpha); + } else if (mode_ == Mode::kSwift) { + double delay_us = to_usec(sample_rtt_tsc, freq_ghz); + uint32_t bytes = acked_bytes > 0 + ? static_cast(acked_bytes) + : static_cast(swift::SwiftCC::kMSS); + swift_.adjust_wnd(delay_us, bytes); + } + send_tsc_[wr_id % kTscWindowSize].store(0, std::memory_order_relaxed); + } + + /// Returns CC-controlled window in bytes, or 0 if CC is disabled. + size_t getWindowBytes() const { + if (mode_ == Mode::kTimely) return timely_.get_wnd(); + if (mode_ == Mode::kSwift) return swift_.get_wnd(); + return 0; + } + + private: + static constexpr size_t kTscWindowSize = 65536; + std::unique_ptr[]> send_tsc_ { + new std::atomic[kTscWindowSize] {} + }; + Mode mode_ = Mode::kNone; + timely::TimelyCC timely_; + swift::SwiftCC swift_; +}; + +} // namespace cc +} // namespace uccl diff --git a/include/cc/eqds.h b/include/cc/eqds.h new file mode 100644 index 000000000..b1b9060bb --- /dev/null +++ b/include/cc/eqds.h @@ -0,0 +1,193 @@ +/** + * @file eqds.h + * @brief EQDS congestion control [NSDI'22] + */ + +#pragma once + +#include "util/list.h" +#include +#include + +namespace uccl { +namespace eqds { + +struct PacerCreditQPWrapper; + +struct EQDSCC; + +struct active_item { + struct EQDSCC* eqds_cc; + struct list_head active_link; +}; + +struct idle_item { + struct EQDSCC* eqds_cc; + struct list_head idle_link; +}; + +typedef uint8_t PullQuanta; + +constexpr bool pullno_lt(PullQuanta a, PullQuanta b) { + return static_cast(a - b) < 0; +} +constexpr bool pullno_le(PullQuanta a, PullQuanta b) { + return static_cast(a - b) <= 0; +} +constexpr bool pullno_eq(PullQuanta a, PullQuanta b) { + return static_cast(a - b) == 0; +} +constexpr bool pullno_ge(PullQuanta a, PullQuanta b) { + return static_cast(a - b) >= 0; +} +constexpr bool pullno_gt(PullQuanta a, PullQuanta b) { + return static_cast(a - b) > 0; +} + +#define PULL_QUANTUM 16384 +#define PULL_SHIFT 14 + +static inline uint32_t unquantize(uint8_t pull_quanta) { + return (uint32_t)pull_quanta << PULL_SHIFT; +} + +static inline PullQuanta quantize_floor(uint32_t bytes) { + return bytes >> PULL_SHIFT; +} + +static inline PullQuanta quantize_ceil(uint32_t bytes) { + return (bytes + PULL_QUANTUM - 1) >> PULL_SHIFT; +} + +// Per-QP congestion control state for EQDS. +struct EQDSCC { + static constexpr PullQuanta INIT_PULL_QUANTA = 50; + // static constexpr uint32_t kEQDSMaxCwnd = 1000000; // Bytes + static constexpr uint32_t kEQDSMaxCwnd = 500000; // Bytes + + /********************************************************************/ + /************************ Sender-side states ************************/ + /********************************************************************/ + + // Last received highest credit in PullQuanta. + PullQuanta pull_ = INIT_PULL_QUANTA; + PullQuanta last_sent_pull_target_ = INIT_PULL_QUANTA; + // Receive request credit in PullQuanta, but consume it in bytes + uint32_t credit_pull_ = 0; + uint32_t credit_spec_ = kEQDSMaxCwnd; + bool in_speculating_ = true; + /********************************************************************/ + /*********************** Receiver-side states ***********************/ + /********************************************************************/ + + /***************** Shared between engine and pacer ******************/ + std::atomic highest_pull_target_; + + /*************************** Pacer only *****************************/ + PullQuanta latest_pull_; + struct active_item active_item; + struct idle_item idle_item; + /************************* No modification **************************/ + uint32_t fid_; + struct PacerCreditQPWrapper* pc_qpw_; + + inline uint32_t credit() { return credit_pull_ + credit_spec_; } + + // Called when transmitting a chunk. + // Return true if we can transmit the chunk. Otherwise, + // sender should pause sending this message until credit is received. + inline bool spend_credit(uint32_t chunk_size) { + if (credit_pull_ > 0) { + if (credit_pull_ > chunk_size) + credit_pull_ -= chunk_size; + else + credit_pull_ = 0; + return true; + } else if (in_speculating_ && credit_spec_ > 0) { + if (credit_spec_ > chunk_size) + credit_spec_ -= chunk_size; + else + credit_spec_ = 0; + return true; + } + + // let pull target can advance + if (credit_spec_ > chunk_size) + credit_spec_ -= chunk_size; + else + credit_spec_ = 0; + + return false; + } + + // Called when we receiving ACK or pull packet. + inline void stop_speculating() { in_speculating_ = false; } + + PullQuanta compute_pull_target(void* context, uint32_t chunk_size); + + inline bool handle_pull_target(PullQuanta pull_target) { + PullQuanta hpt = highest_pull_target_.load(); + if (pullno_gt(pull_target, hpt)) { + // Only we can increase the pull target. + highest_pull_target_.store(pull_target); + return true; + } + return false; + } + + inline bool handle_pull(PullQuanta pullno) { + if (pullno_gt(pullno, pull_)) { + PullQuanta extra_credit = pullno - pull_; + credit_pull_ += unquantize(extra_credit); + if (credit_pull_ > kEQDSMaxCwnd) { + credit_pull_ = kEQDSMaxCwnd; + } + pull_ = pullno; + return true; + } + return false; + } + + /// Helper functions called by pacer /// + + inline void set_fid(uint32_t fid) { fid_ = fid; } + + inline void set_pacer_credit_qpw(struct PacerCreditQPWrapper* pc_qpw) { + pc_qpw_ = pc_qpw; + } + + inline void init_active_item(void) { + INIT_LIST_HEAD(&active_item.active_link); + active_item.eqds_cc = this; + } + + inline void init_idle_item(void) { + INIT_LIST_HEAD(&idle_item.idle_link); + idle_item.eqds_cc = this; + } + + inline PullQuanta backlog() { + auto hpt = highest_pull_target_.load(); + if (pullno_gt(hpt, latest_pull_)) { + return hpt - latest_pull_; + } else { + return 0; + } + } + + inline bool idle_credit_enough() { + PullQuanta idle_cumulate_credit; + auto hpt = highest_pull_target_.load(); + + if (pullno_ge(hpt, latest_pull_)) { + idle_cumulate_credit = 0; + } else { + idle_cumulate_credit = latest_pull_ - hpt; + } + + return idle_cumulate_credit >= quantize_floor(kEQDSMaxCwnd); + } +}; + +} // namespace eqds +} // namespace uccl diff --git a/include/cc/link_bandwidth.h b/include/cc/link_bandwidth.h new file mode 100644 index 000000000..5ba95d3d0 --- /dev/null +++ b/include/cc/link_bandwidth.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +namespace uccl { +namespace cc { + +// Get link bandwidth in bytes/sec. +// Priority: env_var override → ibv_query_port auto-detect → 400 Gbps default. +inline double get_link_bandwidth_bps(ibv_context* ctx, + char const* env_var = nullptr, + uint8_t port_num = 1) { + static constexpr double kDefault = 400.0 * 1e9 / 8.0; + + if (env_var) { + if (auto* val = std::getenv(env_var)) { + double gbps = std::atof(val); + if (gbps > 0) return gbps * 1e9 / 8.0; + } + } + + if (ctx) { + ibv_port_attr attr{}; + if (ibv_query_port(ctx, port_num, &attr) == 0) { + static constexpr int kSpeeds[] = {2500, 5000, 10000, 10000, + 14000, 25000, 50000, 100000}; + static constexpr int kWidths[] = {1, 4, 8, 12, 2}; + auto firstBit = [](int val, int max) { + for (int i = 0; i < max; i++) + if (val & (1 << i)) return i; + return max; + }; + int spd = kSpeeds[firstBit(attr.active_speed, 7)]; + int wid = kWidths[firstBit(attr.active_width, 4)]; + return static_cast(spd) * wid * 1e6 / 8.0; + } + } + + return kDefault; +} + +} // namespace cc +} // namespace uccl diff --git a/collective/rdma/swift.h b/include/cc/swift.h similarity index 99% rename from collective/rdma/swift.h rename to include/cc/swift.h index 269efb9f0..d25a5392a 100644 --- a/collective/rdma/swift.h +++ b/include/cc/swift.h @@ -6,8 +6,8 @@ #pragma once #include "util/latency.h" +#include "util/timer.h" #include "util/util.h" -#include "util_timer.h" #include #include @@ -158,4 +158,4 @@ class SwiftCC { }; } // namespace swift -} // namespace uccl \ No newline at end of file +} // namespace uccl diff --git a/collective/rdma/timely.h b/include/cc/timely.h similarity index 99% rename from collective/rdma/timely.h rename to include/cc/timely.h index 782a99ebb..8353c2de1 100644 --- a/collective/rdma/timely.h +++ b/include/cc/timely.h @@ -8,8 +8,8 @@ #pragma once #include "util/latency.h" +#include "util/timer.h" #include "util/util.h" -#include "util_timer.h" #include #include @@ -223,4 +223,4 @@ class TimelyCC { }; } // namespace timely -} // namespace uccl \ No newline at end of file +} // namespace uccl diff --git a/p2p/rdma/rdma_connection.h b/p2p/rdma/rdma_connection.h index 00dddb39f..6562cec61 100644 --- a/p2p/rdma/rdma_connection.h +++ b/p2p/rdma/rdma_connection.h @@ -3,6 +3,8 @@ #include "define.h" #include "rdma_ctrl_channel.h" #include "rdma_data_channel.h" +#include +#include #include class RDMAConnection { @@ -104,11 +106,14 @@ class RDMAConnection { class SendConnection : public RDMAConnection { public: - SendConnection(int numa_node, bool auto_start_polling = true) + SendConnection(int numa_node, bool auto_start_polling = true, + double link_bandwidth_bps = 400.0 * 1e9 / 8.0) : numa_node_(numa_node), running_(false), poll_thread_(nullptr), - auto_start_polling_(auto_start_polling) { + auto_start_polling_(auto_start_polling), + cc_(uccl::cc::CongestionControlState::parseMode("UCCL_P2P_RDMA_CC"), + uccl::freq_ghz, link_bandwidth_bps) { tracker_ = std::make_shared(); request_queue_ = std::make_unique< RingBuffer, kRingCapacity>>(); @@ -160,6 +165,7 @@ class SendConnection : public RDMAConnection { int64_t send(std::shared_ptr req) { int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; + cc_.recordSendTsc(req->wr_id); if (unlikely(request_queue_->push(req) < 0)) { UCCL_LOG(WARN) << "SendConnection: isend request queue is full, wr_id=" << wr_id; @@ -178,6 +184,7 @@ class SendConnection : public RDMAConnection { std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; + cc_.recordSendTsc(req->wr_id); auto [channel_id, context_id] = selectNextChannelRoundRobin(); if (unlikely(channel_id == 0)) { @@ -200,6 +207,7 @@ class SendConnection : public RDMAConnection { std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; + cc_.recordSendTsc(req->wr_id); auto [channel_id, context_id] = selectNextChannelRoundRobin(); if (unlikely(channel_id == 0)) { @@ -256,6 +264,7 @@ class SendConnection : public RDMAConnection { } int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; + cc_.recordSendTsc(req->wr_id); UCCL_LOG(INFO, UCCL_RDMA) << "SendConnection: Processing send request meta: " << meta; processOnceSendRequests(req, meta, index); @@ -273,6 +282,12 @@ class SendConnection : public RDMAConnection { bool auto_start_polling_; int numa_node_ = 0; + uccl::cc::CongestionControlState cc_; + + inline size_t currentInflightLimitBytes() { + return cc_.enabled() ? cc_.getWindowBytes() : kInFlightMaxSizeKB * 1024; + } + // Send a request through the appropriate channel // Returns true on success, false on failure bool postRequestOnChannel(std::shared_ptr req) { @@ -398,9 +413,10 @@ class SendConnection : public RDMAConnection { // } while (has_meta) { std::shared_ptr req; - if (tracker_->getTotalInflightBytes() > kInFlightMaxSizeKB * 1024 || + size_t inflight_limit_bytes = currentInflightLimitBytes(); + if (tracker_->getTotalInflightBytes() > inflight_limit_bytes || !request_queue_->pop(req)) { - if (tracker_->getTotalInflightBytes() > kInFlightMaxSizeKB * 1024) { + if (tracker_->getTotalInflightBytes() > inflight_limit_bytes) { UCCL_LOG(WARN) << "SendConnection: In-flight bytes exceed " "limit,pausing sending." << tracker_->getTotalInflightBytes() @@ -470,6 +486,7 @@ class SendConnection : public RDMAConnection { // << channel_id // << " polled completion: " << cq_data; tracker_->acknowledge(cq_data.wr_id); + cc_.onAck(cq_data.wr_id, cq_data.len); } } } diff --git a/p2p/rdma/rdma_endpoint.h b/p2p/rdma/rdma_endpoint.h index ab86870f9..abc6a3563 100644 --- a/p2p/rdma/rdma_endpoint.h +++ b/p2p/rdma/rdma_endpoint.h @@ -9,6 +9,7 @@ #include "rdma_device.h" #include "util/debug.h" #include "util/net.h" +#include #include #include @@ -755,11 +756,15 @@ class NICEndpoint { } auto numa_node = RdmaDeviceManager::instance().get_numa_node( RdmaDeviceManager::instance().get_best_dev_idx(gpu_index_)[0]); + auto* ctx = + (!contexts_.empty() && contexts_[0]) ? contexts_[0]->getCtx() : nullptr; + double link_bw = + uccl::cc::get_link_bandwidth_bps(ctx, "UCCL_P2P_RDMA_LINK_GBPS"); { std::unique_lock write_lock(send_channel_mutex_); auto [it, inserted] = send_channel_groups_.try_emplace( - rank_id, - std::make_shared(numa_node, auto_start_polling_)); + rank_id, std::make_shared( + numa_node, auto_start_polling_, link_bw)); return it->second; } } From 90680981e2026be9f7695b6504c3d2a3e0840cc3 Mon Sep 17 00:00:00 2001 From: Andrzej Kuriata Date: Thu, 9 Apr 2026 04:06:24 -0400 Subject: [PATCH 2/3] Review fixes. Running polling thread, per chunk CC, tsc capture closer to the send. Signed-off-by: Andrzej Kuriata --- p2p/engine.cc | 7 ++++++- p2p/rdma/rdma_connection.h | 43 +++++++++++++++++++++++++++++++------- p2p/rdma/rdma_endpoint.h | 10 ++++++--- p2p/rdma/seq_num.h | 10 +++++++++ 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/p2p/engine.cc b/p2p/engine.cc index d0e103d42..1cb569d25 100644 --- a/p2p/engine.cc +++ b/p2p/engine.cc @@ -4,6 +4,7 @@ #include "util/pause.h" #include "util/util.h" #include +#include #include #include #include @@ -214,8 +215,12 @@ Endpoint::Endpoint(uint32_t const gpu_idx) : passive_accept_(false) { ep_ = std::make_shared(local_gpu_idx_, 0); numa_node_ = tcp::get_tcp_numa_node_from_iface(); } else { + // Enable the polling thread when congestion control is active. + bool cc_polling = + uccl::cc::CongestionControlState::parseMode("UCCL_P2P_RDMA_CC") != + uccl::cc::CongestionControlState::Mode::kNone; ep_ = std::shared_ptr( - new NICEndpoint(local_gpu_idx_, INVALID_RANK_ID, 0, false)); + new NICEndpoint(local_gpu_idx_, INVALID_RANK_ID, 0, cc_polling)); } std::cout << "Engine initialized for GPU " << local_gpu_idx_ << std::endl; diff --git a/p2p/rdma/rdma_connection.h b/p2p/rdma/rdma_connection.h index 6562cec61..0b26a9fbe 100644 --- a/p2p/rdma/rdma_connection.h +++ b/p2p/rdma/rdma_connection.h @@ -163,9 +163,11 @@ class SendConnection : public RDMAConnection { } int64_t send(std::shared_ptr req) { - int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); + // Allocate seq_num without counting bytes — actual size is registered + // later by the polling thread after popping from the queue, so that + // getTotalInflightBytes() only reflects requests actually being sent. + int64_t wr_id = tracker_->sendPacket(0); req->wr_id = wr_id; - cc_.recordSendTsc(req->wr_id); if (unlikely(request_queue_->push(req) < 0)) { UCCL_LOG(WARN) << "SendConnection: isend request queue is full, wr_id=" << wr_id; @@ -184,7 +186,6 @@ class SendConnection : public RDMAConnection { std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; - cc_.recordSendTsc(req->wr_id); auto [channel_id, context_id] = selectNextChannelRoundRobin(); if (unlikely(channel_id == 0)) { @@ -207,7 +208,6 @@ class SendConnection : public RDMAConnection { std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; - cc_.recordSendTsc(req->wr_id); auto [channel_id, context_id] = selectNextChannelRoundRobin(); if (unlikely(channel_id == 0)) { @@ -256,6 +256,11 @@ class SendConnection : public RDMAConnection { if (unlikely(ctrl_channel_ == nullptr)) { return -1; } + // Enforce CC window before accepting a new request + size_t inflight_limit_bytes = currentInflightLimitBytes(); + if (tracker_->getTotalInflightBytes() > inflight_limit_bytes) { + return -1; + } SendReqMeta meta; std::shared_lock lock(ctrl_channel_mutex_); int index = ctrl_channel_->getOneSendRequestMeta(meta); @@ -264,7 +269,6 @@ class SendConnection : public RDMAConnection { } int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; - cc_.recordSendTsc(req->wr_id); UCCL_LOG(INFO, UCCL_RDMA) << "SendConnection: Processing send request meta: " << meta; processOnceSendRequests(req, meta, index); @@ -283,6 +287,7 @@ class SendConnection : public RDMAConnection { int numa_node_ = 0; uccl::cc::CongestionControlState cc_; + std::atomic chunk_tsc_counter_{0}; inline size_t currentInflightLimitBytes() { return cc_.enabled() ? cc_.getWindowBytes() : kInFlightMaxSizeKB * 1024; @@ -298,7 +303,22 @@ class SendConnection : public RDMAConnection { return false; } + // Per-chunk CC: assign a unique TSC ID and record send timestamp + // close to the actual ibv_post_send. The TSC ID is encoded in the + // upper 32 bits of wr_id; the lower 32 bits keep the message seq + // used by the tracker. We save/restore req->wr_id so that callers + // (e.g. updateExpectedAckCount) still see the original message seq. + int64_t saved_wr_id = req->wr_id; + if (cc_.enabled()) { + uint32_t tsc_id = + chunk_tsc_counter_.fetch_add(1, std::memory_order_relaxed); + req->wr_id = (static_cast(tsc_id) << 32) | + static_cast(req->wr_id); + cc_.recordSendTsc(tsc_id); + } + int64_t send_ret = channel->submitRequest(req); + req->wr_id = saved_wr_id; if (send_ret < 0) { UCCL_LOG(WARN) << "SendConnection: Failed to send on channel_id " << req->channel_id; @@ -424,6 +444,8 @@ class SendConnection : public RDMAConnection { } break; } + // Register actual packet size now that we are about to send it. + tracker_->updatePacketSize(req->wr_id, req->getLocalLen()); index = ctrl_channel_->getOneSendRequestMeta(meta); UCCL_LOG(INFO, UCCL_RDMA) << "SendConnection: Processing send request meta: " << meta; @@ -485,8 +507,15 @@ class SendConnection : public RDMAConnection { // Channel " // << channel_id // << " polled completion: " << cq_data; - tracker_->acknowledge(cq_data.wr_id); - cc_.onAck(cq_data.wr_id, cq_data.len); + if (cc_.enabled()) { + // Decode: low 32 bits = message seq (tracker), high 32 = TSC ID + uint32_t msg_seq = static_cast(cq_data.wr_id); + uint32_t tsc_id = static_cast(cq_data.wr_id >> 32); + tracker_->acknowledge(msg_seq); + cc_.onAck(tsc_id, cq_data.len); + } else { + tracker_->acknowledge(cq_data.wr_id); + } } } } diff --git a/p2p/rdma/rdma_endpoint.h b/p2p/rdma/rdma_endpoint.h index abc6a3563..46dc520d6 100644 --- a/p2p/rdma/rdma_endpoint.h +++ b/p2p/rdma/rdma_endpoint.h @@ -504,9 +504,6 @@ class NICEndpoint { // Manual polling routine for send channels when auto_start_polling_ is false int sendWithoutInnerQueue(std::shared_ptr req) { - if (auto_start_polling_) { - return -1; // Do nothing if auto polling is enabled - } if (!req) { UCCL_LOG(WARN) << "NICEndpoint::sendRoutine - null request"; return -1; @@ -531,6 +528,13 @@ class NICEndpoint { return -1; } + // When the polling thread is active, enqueue via send() so that + // only the polling thread touches QPs (avoids concurrent ibv_post_* + // from two threads). + if (auto_start_polling_) { + return send_group->send(req); + } + return send_group->processSendRequests(req); } diff --git a/p2p/rdma/seq_num.h b/p2p/rdma/seq_num.h index 7d9719dde..cd5c99e31 100644 --- a/p2p/rdma/seq_num.h +++ b/p2p/rdma/seq_num.h @@ -348,6 +348,16 @@ class AtomicBitmapPacketTrackerMultiAck { return total; } + // Update packet size for a seq_num that was registered with size 0. + // Used when the actual size is known later (e.g. after popping from queue). + void updatePacketSize(uint32_t seq_num, size_t packet_size) { + uint32_t base = base_seq_num_.load(std::memory_order_acquire); + if (seq_num < base || seq_num - base >= WINDOW_SIZE) return; + + packet_sizes_[seq_num % WINDOW_SIZE].store(packet_size, + std::memory_order_release); + } + private: void slideWindow() { uint32_t base = base_seq_num_.load(std::memory_order_acquire); From 35af412ac4d3b3c492832aee50dc0c7707ff44ed Mon Sep 17 00:00:00 2001 From: Andrzej Kuriata Date: Tue, 28 Apr 2026 05:43:00 -0400 Subject: [PATCH 3/3] added CC, per chunk, for one-sided ops Signed-off-by: Andrzej Kuriata --- p2p/rdma/rdma_connection.h | 208 +++++++++++++++++++++++++++---------- 1 file changed, 153 insertions(+), 55 deletions(-) diff --git a/p2p/rdma/rdma_connection.h b/p2p/rdma/rdma_connection.h index 0b26a9fbe..b1b661af6 100644 --- a/p2p/rdma/rdma_connection.h +++ b/p2p/rdma/rdma_connection.h @@ -5,6 +5,7 @@ #include "rdma_data_channel.h" #include #include +#include #include class RDMAConnection { @@ -183,6 +184,16 @@ class SendConnection : public RDMAConnection { "SendType::Write"; return -1; } + + // Enforce CC window before posting + if (cc_.enabled()) { + size_t inflight_limit_bytes = currentInflightLimitBytes(); + while (currentInflightBytes() > inflight_limit_bytes) { + std::this_thread::yield(); + inflight_limit_bytes = currentInflightLimitBytes(); + } + } + std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; @@ -196,6 +207,14 @@ class SendConnection : public RDMAConnection { req->channel_id = channel_id; postChunkedRequest(req); + // Since postChunkedRequest() is non-blocking — if the CC + // window is exhausted mid-message it saves the remaining chunks + // and returns immediately. + // Draining them here. + while (!drainPendingChunks()) { + std::this_thread::yield(); + } + return wr_id; } @@ -205,6 +224,16 @@ class SendConnection : public RDMAConnection { "SendType::Read"; return -1; } + + // Enforce CC window before posting + if (cc_.enabled()) { + size_t inflight_limit_bytes = currentInflightLimitBytes(); + while (currentInflightBytes() > inflight_limit_bytes) { + std::this_thread::yield(); + inflight_limit_bytes = currentInflightLimitBytes(); + } + } + std::shared_lock lock(ctrl_channel_mutex_); int64_t wr_id = tracker_->sendPacket(req->getLocalLen()); req->wr_id = wr_id; @@ -218,6 +247,11 @@ class SendConnection : public RDMAConnection { req->channel_id = channel_id; postChunkedRequest(req); + // Draining any remaining chunks, as in postWriteOrRead() + while (!drainPendingChunks()) { + std::this_thread::yield(); + } + return wr_id; } @@ -258,7 +292,7 @@ class SendConnection : public RDMAConnection { } // Enforce CC window before accepting a new request size_t inflight_limit_bytes = currentInflightLimitBytes(); - if (tracker_->getTotalInflightBytes() > inflight_limit_bytes) { + if (currentInflightBytes() > inflight_limit_bytes) { return -1; } SendReqMeta meta; @@ -288,11 +322,30 @@ class SendConnection : public RDMAConnection { uccl::cc::CongestionControlState cc_; std::atomic chunk_tsc_counter_{0}; + // Per-chunk inflight byte counter for CC window checks. + // Unlike tracker_->getTotalInflightBytes() which only decreases when ALL + // chunks of a message are acked, this counter decreases on each chunk CQE. + std::atomic cc_inflight_bytes_{0}; + + // Pending chunked request state for per-chunk CC pacing. + struct PendingChunkedState { + std::shared_ptr req; + std::vector chunks; + size_t next_chunk_idx = 0; + int remaining_expected_count = 0; + }; + std::optional pending_chunked_; inline size_t currentInflightLimitBytes() { return cc_.enabled() ? cc_.getWindowBytes() : kInFlightMaxSizeKB * 1024; } + // Return the inflight byte count, depends on CC enablement status + inline size_t currentInflightBytes() { + return cc_.enabled() ? cc_inflight_bytes_.load(std::memory_order_relaxed) + : tracker_->getTotalInflightBytes(); + } + // Send a request through the appropriate channel // Returns true on success, false on failure bool postRequestOnChannel(std::shared_ptr req) { @@ -319,6 +372,10 @@ class SendConnection : public RDMAConnection { int64_t send_ret = channel->submitRequest(req); req->wr_id = saved_wr_id; + if (send_ret >= 0 && cc_.enabled()) { + cc_inflight_bytes_.fetch_add(req->getLocalLen(), + std::memory_order_relaxed); + } if (send_ret < 0) { UCCL_LOG(WARN) << "SendConnection: Failed to send on channel_id " << req->channel_id; @@ -339,6 +396,76 @@ class SendConnection : public RDMAConnection { } } + // Build and post a single chunk from a split message. + bool postSingleChunk(std::shared_ptr const& req, + MessageChunk const& chunk, size_t chunk_index, + size_t total_chunks, size_t num_channels, + int& expected_chunk_count) { + uint32_t chunk_channel_id = + ((req->channel_id - 1 + chunk_index) % num_channels) + 1; + + auto chunk_local_mem = std::make_shared( + static_cast(req->local_mem->addr) + chunk.offset, chunk.size, + req->local_mem->mr_array, req->local_mem->type); + + auto chunk_remote_mem = std::make_shared( + req->remote_mem->addr + chunk.offset, chunk.size, + req->remote_mem->rkey_array, req->remote_mem->type); + + bool is_last_chunk = (chunk_index == total_chunks - 1); + auto chunk_req = std::make_shared( + chunk_local_mem, chunk_remote_mem, req->imm_data, is_last_chunk); + + // Due to compression, the chunk count may differ from the original + // split, so set the expected chunk count for each chunk request. + if (expected_chunk_count > 0) { + if (is_last_chunk && expected_chunk_count > 1) { + chunk_req->imm_data.set_chunk_count(expected_chunk_count); + } else { + chunk_req->imm_data.set_chunk_count(1); + } + expected_chunk_count -= 1; + } + + chunk_req->channel_id = chunk_channel_id; + chunk_req->from_rank_id = req->from_rank_id; + chunk_req->to_rank_id = req->to_rank_id; + chunk_req->wr_id = req->wr_id; + chunk_req->send_type = req->send_type; + + return postRequestOnChannel(chunk_req); + } + + // Post remaining chunks from a previously paused request. + // Returns true if all chunks are sent, false if still CC-blocked. + bool drainPendingChunks() { + if (!pending_chunked_) return true; + + auto& ps = *pending_chunked_; + size_t num_channels = normalChannelCount(); + + while (ps.next_chunk_idx < ps.chunks.size()) { + // Per-chunk CC: check window before each chunk. + if (cc_.enabled()) { + size_t inflight_limit_bytes = currentInflightLimitBytes(); + if (currentInflightBytes() > inflight_limit_bytes) { + return false; // Yield back to polling loop. + } + } + + if (!postSingleChunk(ps.req, ps.chunks[ps.next_chunk_idx], + ps.next_chunk_idx, ps.chunks.size(), num_channels, + ps.remaining_expected_count)) { + UCCL_LOG(WARN) << "SendConnection: Failed to send pending chunk " + << ps.next_chunk_idx; + } + ps.next_chunk_idx++; + } + + pending_chunked_.reset(); + return true; + } + void postChunkedRequest(std::shared_ptr req, int expected_chunk_count = 0) { if (expected_chunk_count == 1) { @@ -363,59 +490,19 @@ class SendConnection : public RDMAConnection { size_t num_channels = normalChannelCount(); for (size_t i = 0; i < chunks.size(); ++i) { - auto const& chunk = chunks[i]; - - // Use different channel for each chunk: round-robin - uint32_t chunk_channel_id = - ((req->channel_id - 1 + i) % num_channels) + 1; - - // Create RegMemBlock for this chunk - auto chunk_local_mem = std::make_shared( - static_cast(req->local_mem->addr) + chunk.offset, chunk.size, - req->local_mem->mr_array, req->local_mem->type); - - // Create RemoteMemInfo for this chunk - auto chunk_remote_mem = std::make_shared( - req->remote_mem->addr + chunk.offset, chunk.size, - req->remote_mem->rkey_array, req->remote_mem->type); - - // Create send request for this chunk - // Only the last chunk needs signaled for completion notification - bool is_last_chunk = (i == chunks.size() - 1); - auto chunk_req = std::make_shared( - chunk_local_mem, chunk_remote_mem, req->imm_data, is_last_chunk); - - // due to the compression, the chunk count may be different from the - // original split, so we need to set the expected chunk count for each - // chunk request - if (expected_chunk_count > 0) { - if (is_last_chunk && expected_chunk_count > 1) { - chunk_req->imm_data.set_chunk_count(expected_chunk_count); - } else { - chunk_req->imm_data.set_chunk_count(1); + // Per-chunk CC: if over budget, save remaining chunks and return. + if (cc_.enabled()) { + size_t inflight_limit_bytes = currentInflightLimitBytes(); + if (currentInflightBytes() > inflight_limit_bytes) { + pending_chunked_ = PendingChunkedState{req, std::move(chunks), i, + expected_chunk_count}; + return; } - expected_chunk_count -= 1; } - chunk_req->channel_id = chunk_channel_id; - chunk_req->from_rank_id = req->from_rank_id; - chunk_req->to_rank_id = req->to_rank_id; - chunk_req->wr_id = req->wr_id; - // Inherit the send type from the original request. - chunk_req->send_type = req->send_type; - // Send the chunk - if (postRequestOnChannel(chunk_req)) { - // UCCL_LOG(INFO, UCCL_RDMA) << "SendConnection: Sent chunk " << i << - // "/" - // << chunks.size() << " (offset: " << chunk.offset - // << ", size: " << chunk.size - // << ", channel_id: " << chunk_channel_id << ")" << - // std::endl; - } else { - UCCL_LOG(WARN) << "SendConnection: Failed to send chunk " << i - << " (offset: " << chunk.offset - << ", size: " << chunk.size - << ", channel_id: " << chunk_channel_id << ")"; + if (!postSingleChunk(req, chunks[i], i, chunks.size(), num_channels, + expected_chunk_count)) { + UCCL_LOG(WARN) << "SendConnection: Failed to send chunk " << i; } } } @@ -424,6 +511,13 @@ class SendConnection : public RDMAConnection { if (unlikely(ctrl_channel_ == nullptr)) { return; } + + // First, try to drain any pending chunks from a previous request + // that was paused due to CC window limits. + if (!drainPendingChunks()) { + return; // Still CC-blocked, don't dequeue new requests. + } + SendReqMeta meta; bool has_meta = false; int index = -1; @@ -434,13 +528,12 @@ class SendConnection : public RDMAConnection { while (has_meta) { std::shared_ptr req; size_t inflight_limit_bytes = currentInflightLimitBytes(); - if (tracker_->getTotalInflightBytes() > inflight_limit_bytes || + if (currentInflightBytes() > inflight_limit_bytes || !request_queue_->pop(req)) { - if (tracker_->getTotalInflightBytes() > inflight_limit_bytes) { + if (currentInflightBytes() > inflight_limit_bytes) { UCCL_LOG(WARN) << "SendConnection: In-flight bytes exceed " "limit,pausing sending." - << tracker_->getTotalInflightBytes() - << " bytes in-flight."; + << currentInflightBytes() << " bytes in-flight."; } break; } @@ -513,6 +606,11 @@ class SendConnection : public RDMAConnection { uint32_t tsc_id = static_cast(cq_data.wr_id >> 32); tracker_->acknowledge(msg_seq); cc_.onAck(tsc_id, cq_data.len); + // Decrease per-chunk inflight counter so CC window checks + // unblock pending chunks without waiting for the whole message. + size_t prev = cc_inflight_bytes_.load(std::memory_order_relaxed); + size_t sub = std::min(prev, static_cast(cq_data.len)); + cc_inflight_bytes_.fetch_sub(sub, std::memory_order_relaxed); } else { tracker_->acknowledge(cq_data.wr_id); }