Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 1 addition & 173 deletions collective/rdma/eqds.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "util/util.h"
#include "util_buffpool.h"
#include "util_timer.h"
#include <cc/eqds.h>
#include <infiniband/verbs.h>
#include <atomic>
#include <iomanip>
Expand Down Expand Up @@ -56,179 +57,6 @@ class CreditChunkBuffPool : public BuffPool {
~CreditChunkBuffPool() = default;
};

struct active_item {
struct EQDSCC* eqds_cc;
struct list_head active_link;
};

struct idle_item {
struct EQDSCC* eqds_cc;
struct list_head idle_link;
};

typedef uint8_t PullQuanta;

constexpr bool pullno_lt(PullQuanta a, PullQuanta b) {
return static_cast<int8_t>(a - b) < 0;
}
constexpr bool pullno_le(PullQuanta a, PullQuanta b) {
return static_cast<int8_t>(a - b) <= 0;
}
constexpr bool pullno_eq(PullQuanta a, PullQuanta b) {
return static_cast<int8_t>(a - b) == 0;
}
constexpr bool pullno_ge(PullQuanta a, PullQuanta b) {
return static_cast<int8_t>(a - b) >= 0;
}
constexpr bool pullno_gt(PullQuanta a, PullQuanta b) {
return static_cast<int8_t>(a - b) > 0;
}

#define PULL_QUANTUM 16384
#define PULL_SHIFT 14

static inline uint32_t unquantize(uint8_t pull_quanta) {
return (uint32_t)pull_quanta << PULL_SHIFT;
}

static inline PullQuanta quantize_floor(uint32_t bytes) {
return bytes >> PULL_SHIFT;
}

static inline PullQuanta quantize_ceil(uint32_t bytes) {
return (bytes + PULL_QUANTUM - 1) >> PULL_SHIFT;
}

// Per-QP congestion control state for EQDS.
struct EQDSCC {
static constexpr PullQuanta INIT_PULL_QUANTA = 50;
// static constexpr uint32_t kEQDSMaxCwnd = 1000000; // Bytes
static constexpr uint32_t kEQDSMaxCwnd = 500000; // Bytes

/********************************************************************/
/************************ Sender-side states ************************/
/********************************************************************/

// Last received highest credit in PullQuanta.
PullQuanta pull_ = INIT_PULL_QUANTA;
PullQuanta last_sent_pull_target_ = INIT_PULL_QUANTA;
// Receive request credit in PullQuanta, but consume it in bytes
uint32_t credit_pull_ = 0;
uint32_t credit_spec_ = kEQDSMaxCwnd;
bool in_speculating_ = true;
/********************************************************************/
/*********************** Receiver-side states ***********************/
/********************************************************************/

/***************** Shared between engine and pacer ******************/
std::atomic<PullQuanta> highest_pull_target_;

/*************************** Pacer only *****************************/
PullQuanta latest_pull_;
struct active_item active_item;
struct idle_item idle_item;
/************************* No modification **************************/
uint32_t fid_;
struct PacerCreditQPWrapper* pc_qpw_;

inline uint32_t credit() { return credit_pull_ + credit_spec_; }

// Called when transmitting a chunk.
// Return true if we can transmit the chunk. Otherwise,
// sender should pause sending this message until credit is received.
inline bool spend_credit(uint32_t chunk_size) {
if (credit_pull_ > 0) {
if (credit_pull_ > chunk_size)
credit_pull_ -= chunk_size;
else
credit_pull_ = 0;
return true;
} else if (in_speculating_ && credit_spec_ > 0) {
if (credit_spec_ > chunk_size)
credit_spec_ -= chunk_size;
else
credit_spec_ = 0;
return true;
}

// let pull target can advance
if (credit_spec_ > chunk_size)
credit_spec_ -= chunk_size;
else
credit_spec_ = 0;

return false;
}

// Called when we receiving ACK or pull packet.
inline void stop_speculating() { in_speculating_ = false; }

PullQuanta compute_pull_target(void* context, uint32_t chunk_size);

inline bool handle_pull_target(PullQuanta pull_target) {
PullQuanta hpt = highest_pull_target_.load();
if (pullno_gt(pull_target, hpt)) {
// Only we can increase the pull target.
highest_pull_target_.store(pull_target);
return true;
}
return false;
}

inline bool handle_pull(PullQuanta pullno) {
if (pullno_gt(pullno, pull_)) {
PullQuanta extra_credit = pullno - pull_;
credit_pull_ += unquantize(extra_credit);
if (credit_pull_ > kEQDSMaxCwnd) {
credit_pull_ = kEQDSMaxCwnd;
}
pull_ = pullno;
return true;
}
return false;
}

/// Helper functions called by pacer ///

inline void set_fid(uint32_t fid) { fid_ = fid; }

inline void set_pacer_credit_qpw(struct PacerCreditQPWrapper* pc_qpw) {
pc_qpw_ = pc_qpw;
}

inline void init_active_item(void) {
INIT_LIST_HEAD(&active_item.active_link);
active_item.eqds_cc = this;
}

inline void init_idle_item(void) {
INIT_LIST_HEAD(&idle_item.idle_link);
idle_item.eqds_cc = this;
}

inline PullQuanta backlog() {
auto hpt = highest_pull_target_.load();
if (pullno_gt(hpt, latest_pull_)) {
return hpt - latest_pull_;
} else {
return 0;
}
}

inline bool idle_credit_enough() {
PullQuanta idle_cumulate_credit;
auto hpt = highest_pull_target_.load();

if (pullno_ge(hpt, latest_pull_)) {
idle_cumulate_credit = 0;
} else {
idle_cumulate_credit = latest_pull_ - hpt;
}

return idle_cumulate_credit >= quantize_floor(kEQDSMaxCwnd);
}
};

class EQDSChannel {
static constexpr uint32_t kChannelSize = 2048;

Expand Down
4 changes: 2 additions & 2 deletions collective/rdma/pcb.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#pragma once

#include "eqds.h"
#include "swift.h"
#include "timely.h"
#include "timing_wheel.h"
#include "util/debug.h"
#include "util/util.h"
#include <cc/swift.h>
#include <cc/timely.h>
#include <cmath>
#include <cstddef>
#include <cstdint>
Expand Down
2 changes: 1 addition & 1 deletion collective/rdma/timing_wheel.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

#pragma once

#include "timely.h"
#include "transport_config.h"
#include "util/cb.h"
#include "util_timer.h"
#include <cc/timely.h>
#include <queue>

namespace uccl {
Expand Down
104 changes: 104 additions & 0 deletions include/cc/cc_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* @file cc_state.h
* @brief Shared congestion control state for RoCE RDMA transports.
* Used by both P2P and EP subsystems.
*/

#pragma once

#include "cc/swift.h"
#include "cc/timely.h"
#include "util/timer.h"
#include <algorithm>
#include <atomic>
#include <cctype>
#include <cstdlib>
#include <memory>
#include <string>

namespace uccl {
namespace cc {

class CongestionControlState {
public:
enum class Mode : uint8_t { kNone, kTimely, kSwift };

CongestionControlState() = default;

/// Construct with known parameters (e.g. P2P path where link BW is known).
CongestionControlState(Mode mode, double freq_ghz, double link_bw_bps)
: mode_(mode),
timely_(freq_ghz, link_bw_bps),
swift_(freq_ghz, link_bw_bps) {}

/// Deferred init (e.g. EP path — after RDMA context is available).
void init(Mode mode, double freq_ghz, double link_bw_bps) {
mode_ = mode;
timely_ = timely::TimelyCC(freq_ghz, link_bw_bps);
swift_ = swift::SwiftCC(freq_ghz, link_bw_bps);
}

/// Parse CC mode from an environment variable (case-insensitive).
static Mode parseMode(char const* env_var) {
auto* env = std::getenv(env_var);
if (env == nullptr) return Mode::kNone;
std::string mode(env);
std::transform(mode.begin(), mode.end(), mode.begin(),
[](unsigned char c) { return std::tolower(c); });
if (mode == "timely") return Mode::kTimely;
if (mode == "swift") return Mode::kSwift;
return Mode::kNone;
}

bool enabled() const { return mode_ != Mode::kNone; }
Mode mode() const { return mode_; }

/// Record send timestamp for a given wr_id.
void recordSendTsc(uint64_t wr_id) {
if (mode_ == Mode::kNone) return;
send_tsc_[wr_id % kTscWindowSize].store(rdtsc(), std::memory_order_release);
}

/// Update CC state on ACK. Call once per completed WR.
void onAck(uint64_t wr_id, size_t acked_bytes) {
if (mode_ == Mode::kNone) return;

uint64_t send_tsc =
send_tsc_[wr_id % kTscWindowSize].load(std::memory_order_acquire);
if (send_tsc == 0) return;

uint64_t now = rdtsc();
size_t sample_rtt_tsc = now - send_tsc;
if (sample_rtt_tsc == 0) return;

if (mode_ == Mode::kTimely) {
timely_.update_rate(now, sample_rtt_tsc, ::kEwmaAlpha);
} else if (mode_ == Mode::kSwift) {
double delay_us = to_usec(sample_rtt_tsc, freq_ghz);
uint32_t bytes = acked_bytes > 0
? static_cast<uint32_t>(acked_bytes)
: static_cast<uint32_t>(swift::SwiftCC::kMSS);
swift_.adjust_wnd(delay_us, bytes);
}
send_tsc_[wr_id % kTscWindowSize].store(0, std::memory_order_relaxed);
}

/// Returns CC-controlled window in bytes, or 0 if CC is disabled.
size_t getWindowBytes() const {
if (mode_ == Mode::kTimely) return timely_.get_wnd();
if (mode_ == Mode::kSwift) return swift_.get_wnd();
return 0;
}

private:
static constexpr size_t kTscWindowSize = 65536;
std::unique_ptr<std::atomic<uint64_t>[]> send_tsc_ {
new std::atomic<uint64_t>[kTscWindowSize] {}
};
Mode mode_ = Mode::kNone;
timely::TimelyCC timely_;
swift::SwiftCC swift_;
};

} // namespace cc
} // namespace uccl
Loading
Loading