diff --git a/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py b/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py index 4afede5977..0031f4e0c6 100644 --- a/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py +++ b/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py @@ -1160,6 +1160,15 @@ def __init__( self.dram_kv_miss_count_stats_name: str = ( f"dram_kv.perf.get.tbe_id{tbe_unique_id}.dram_read_miss_count" ) + self.enrichment_query_count_stats_name: str = ( + f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_query_count" + ) + self.enrichment_empty_count_stats_name: str = ( + f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_empty_count" + ) + self.enrichment_success_rate_stats_name: str = ( + f"dram_kv.perf.get.tbe_id{tbe_unique_id}.enrichment_success_rate_pct" + ) self.l1_hit_rate_stats_name: str = ( f"ssd_tbe.prefetch.tbe_id{tbe_unique_id}.l1_hit_rate_pct" ) @@ -1212,6 +1221,9 @@ def __init__( self.stats_reporter.register_stats(self.dram_kv_hit_rate_stats_name) self.stats_reporter.register_stats(self.dram_kv_hit_count_stats_name) self.stats_reporter.register_stats(self.dram_kv_miss_count_stats_name) + self.stats_reporter.register_stats(self.enrichment_query_count_stats_name) + self.stats_reporter.register_stats(self.enrichment_empty_count_stats_name) + self.stats_reporter.register_stats(self.enrichment_success_rate_stats_name) self.stats_reporter.register_stats(self.l1_hit_rate_stats_name) for t in self.feature_table_map: self.stats_reporter.register_stats( @@ -4720,6 +4732,35 @@ def _report_dram_kv_perf_stats(self) -> None: enable_tb_metrics=True, ) + # Enrichment query metrics (indices 38-39) + if len(dram_kv_perf_stats) >= 40: + enrichment_query_count = dram_kv_perf_stats[38] + enrichment_empty_count = dram_kv_perf_stats[39] + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name=self.enrichment_query_count_stats_name, + data_bytes=enrichment_query_count, + enable_tb_metrics=True, + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name=self.enrichment_empty_count_stats_name, + data_bytes=enrichment_empty_count, + enable_tb_metrics=True, + ) + if enrichment_query_count > 0: + enrichment_success_rate = ( + 100.0 + * (enrichment_query_count - enrichment_empty_count) + / enrichment_query_count + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name=self.enrichment_success_rate_stats_name, + data_bytes=enrichment_success_rate, + enable_tb_metrics=True, + ) + def _recording_to_timer(self, timer: AsyncSeriesTimer | None, **kwargs: Any) -> Any: """ helper function to call AsyncSeriesTimer, wrap it inside the kernels we want to record diff --git a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h index b6d1307784..0530184b5f 100644 --- a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h +++ b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h @@ -836,6 +836,16 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { XLOG(INFO) << "[EmbeddingCacheEnrich] " << log_prefix << payloads.size() << "/" << unhashed_ids.size() << ", latency_ms: " << latency_ms; + enrichment_query_count_.fetch_add(unhashed_ids.size()); + if (unhashed_ids.size() >= payloads.size()) { + enrichment_empty_count_.fetch_add( + unhashed_ids.size() - payloads.size()); + } else { + XLOG(WARN) << "[EmbeddingCacheEnrich] " << log_prefix + << "payloads.size() (" << payloads.size() + << ") > unhashed_ids.size() (" << unhashed_ids.size() + << ")"; + } if (!payloads.empty()) { auto result = prepareFn(hashed_ids, unhashed_ids, payloads); if (result.has_value()) { @@ -2049,7 +2059,7 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { std::vector get_dram_kv_perf( const int64_t step, const int64_t interval) { - std::vector ret(38, 0); // num metrics + std::vector ret(40, 0); // num metrics if (step > 0 && step % interval == 0) { const double d_interval = static_cast(interval); int reset_val = 0; @@ -2125,6 +2135,8 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { auto read_num_counts = read_num_counts_.exchange(reset_val); auto read_hit_count = read_hit_count_.exchange(reset_val); auto read_miss_count = read_miss_count_.exchange(reset_val); + auto enrichment_query_count = enrichment_query_count_.exchange(reset_val); + auto enrichment_empty_count = enrichment_empty_count_.exchange(reset_val); ret[0] = dram_read_total_duration / d_interval; ret[1] = dram_read_sharding_total_duration / d_interval; @@ -2173,6 +2185,9 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { ret[36] = read_hit_count; ret[37] = read_miss_count; + + ret[38] = enrichment_query_count; + ret[39] = enrichment_empty_count; } return ret; } @@ -2436,6 +2451,10 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { std::atomic read_hit_count_{0}; std::atomic read_miss_count_{0}; + // Enrichment (laser) query counters: total keys queried and empty results + std::atomic enrichment_query_count_{0}; + std::atomic enrichment_empty_count_{0}; + bool disable_random_init_; // Whether raw embedding streaming (RES) is enabled for this cache