Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,15 @@ def __init__(
self.dram_kv_miss_count_stats_name: str = (
f"dram_kv.perf.get.tbe_id{tbe_unique_id}.dram_read_miss_count"
)
self.enrichment_query_count_stats_name: str = (
f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_query_count"
)
self.enrichment_empty_count_stats_name: str = (
f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_empty_count"
)
self.enrichment_success_rate_stats_name: str = (
f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_success_rate_pct"
)
self.l1_hit_rate_stats_name: str = (
f"ssd_tbe.prefetch.tbe_id{tbe_unique_id}.l1_hit_rate_pct"
)
Expand Down Expand Up @@ -1212,6 +1221,9 @@ def __init__(
self.stats_reporter.register_stats(self.dram_kv_hit_rate_stats_name)
self.stats_reporter.register_stats(self.dram_kv_hit_count_stats_name)
self.stats_reporter.register_stats(self.dram_kv_miss_count_stats_name)
self.stats_reporter.register_stats(self.enrichment_query_count_stats_name)
self.stats_reporter.register_stats(self.enrichment_empty_count_stats_name)
self.stats_reporter.register_stats(self.enrichment_success_rate_stats_name)
self.stats_reporter.register_stats(self.l1_hit_rate_stats_name)
for t in self.feature_table_map:
self.stats_reporter.register_stats(
Expand Down Expand Up @@ -4720,6 +4732,35 @@ def _report_dram_kv_perf_stats(self) -> None:
enable_tb_metrics=True,
)

# Enrichment query metrics (indices 38-39)
if len(dram_kv_perf_stats) >= 40:
enrichment_query_count = dram_kv_perf_stats[38]
enrichment_empty_count = dram_kv_perf_stats[39]
stats_reporter.report_data_amount(
iteration_step=self.step,
event_name=self.enrichment_query_count_stats_name,
data_bytes=enrichment_query_count,
enable_tb_metrics=True,
)
stats_reporter.report_data_amount(
iteration_step=self.step,
event_name=self.enrichment_empty_count_stats_name,
data_bytes=enrichment_empty_count,
enable_tb_metrics=True,
)
if enrichment_query_count > 0:
enrichment_success_rate = (
100.0
* (enrichment_query_count - enrichment_empty_count)
/ enrichment_query_count
)
stats_reporter.report_data_amount(
iteration_step=self.step,
event_name=self.enrichment_success_rate_stats_name,
data_bytes=enrichment_success_rate,
enable_tb_metrics=True,
)

def _recording_to_timer(self, timer: AsyncSeriesTimer | None, **kwargs: Any) -> Any:
"""
helper function to call AsyncSeriesTimer, wrap it inside the kernels we want to record
Expand Down
21 changes: 20 additions & 1 deletion fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,16 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
XLOG(INFO) << "[EmbeddingCacheEnrich] " << log_prefix
<< payloads.size() << "/" << unhashed_ids.size()
<< ", latency_ms: " << latency_ms;
enrichment_query_count_.fetch_add(unhashed_ids.size());
if (unhashed_ids.size() >= payloads.size()) {
enrichment_empty_count_.fetch_add(
unhashed_ids.size() - payloads.size());
} else {
XLOG(WARN) << "[EmbeddingCacheEnrich] " << log_prefix
<< "payloads.size() (" << payloads.size()
<< ") > unhashed_ids.size() (" << unhashed_ids.size()
<< ")";
}
if (!payloads.empty()) {
auto result = prepareFn(hashed_ids, unhashed_ids, payloads);
if (result.has_value()) {
Expand Down Expand Up @@ -2049,7 +2059,7 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
std::vector<double> get_dram_kv_perf(
const int64_t step,
const int64_t interval) {
std::vector<double> ret(38, 0); // num metrics
std::vector<double> ret(40, 0); // num metrics
if (step > 0 && step % interval == 0) {
const double d_interval = static_cast<double>(interval);
int reset_val = 0;
Expand Down Expand Up @@ -2125,6 +2135,8 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
auto read_num_counts = read_num_counts_.exchange(reset_val);
auto read_hit_count = read_hit_count_.exchange(reset_val);
auto read_miss_count = read_miss_count_.exchange(reset_val);
auto enrichment_query_count = enrichment_query_count_.exchange(reset_val);
auto enrichment_empty_count = enrichment_empty_count_.exchange(reset_val);

ret[0] = dram_read_total_duration / d_interval;
ret[1] = dram_read_sharding_total_duration / d_interval;
Expand Down Expand Up @@ -2173,6 +2185,9 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {

ret[36] = read_hit_count;
ret[37] = read_miss_count;

ret[38] = enrichment_query_count;
ret[39] = enrichment_empty_count;
}
return ret;
}
Expand Down Expand Up @@ -2436,6 +2451,10 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
std::atomic<int64_t> read_hit_count_{0};
std::atomic<int64_t> read_miss_count_{0};

// Enrichment (laser) query counters: total keys queried and empty results
std::atomic<int64_t> enrichment_query_count_{0};
std::atomic<int64_t> enrichment_empty_count_{0};

bool disable_random_init_;

// Whether raw embedding streaming (RES) is enabled for this cache
Expand Down
Loading