Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import pkg_resources


# -- Project information -----------------------------------------------------

project = "pipit"
Expand Down
1 change: 0 additions & 1 deletion docs/examples/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pipit as pp


if __name__ == "__main__":
# Use pipit's ``from_csv`` API to read in traces in CSV format.
# The result is stored into pipit's Trace data structure.
Expand Down
1 change: 0 additions & 1 deletion docs/examples/hpctoolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pipit as pp


if __name__ == "__main__":
# Path to HPCToolkit traces
dirname = "../../pipit/tests/data/ping-pong-hpctoolkit"
Expand Down
1 change: 0 additions & 1 deletion docs/examples/nsight.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pipit as pp


if __name__ == "__main__":
# Path to Nsight traces
filename = "../../pipit/tests/data/nbody-nvtx/trace.csv"
Expand Down
1 change: 0 additions & 1 deletion docs/examples/otf2_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pipit as pp


if __name__ == "__main__":
# Path to OTF2 traces
dirname = "../../pipit/tests/data/ping-pong-otf2"
Expand Down
1 change: 0 additions & 1 deletion docs/examples/projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pipit as pp


if __name__ == "__main__":
# Path to OTF2 traces
dirname = "../../pipit/tests/data/ping-pong-projections"
Expand Down
15 changes: 9 additions & 6 deletions pipit/readers/nsight_sqlite_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,18 @@ def read(self) -> pipit.trace.Trace:
how="inner",
)
)
# Convert to numpy otherwise the index messes stuff up
# TODO: can get rid of the apply if we use an Arrow ListDtype for children
# globally
children = calls_that_launch["index_y"].apply(lambda x: [x])
# Convert to numpy otherwise the index messes stuff up
trace_df.loc[calls_that_launch["index_x"].to_numpy(), "_children"] = (
children.to_numpy()
)

# index_x can appear multiple times in calls_that_launch and we need to add all
# the index_y values to the _children list
# 1) Group by index_x → collect all index_y into a Python list
child_map = calls_that_launch.groupby("index_x")["index_y"].apply(list)
trace_df.loc[child_map.index, "_children"] = child_map.to_numpy()

trace_df.loc[calls_that_launch["index_y"].to_numpy(), "_parent"] = (
calls_that_launch["index_x"].to_numpy()
)

trace.events = trace_df

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

return trace
189 changes: 181 additions & 8 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import numpy as np
import pandas as pd
import copy
import re
from pipit.util.cct import create_cct


Expand Down Expand Up @@ -277,14 +279,19 @@ def _match_caller_callee_by_level(filtered_df):
enter_leave_df = self.events.loc[enter_leave_mask]

# add dummy values for depth/parent/children
# (otherwise loc won't insert the values)
self.events["_depth"] = 0
self.events["_parent"] = None
self.events["_children"] = None
self.events.loc[enter_leave_mask] = enter_leave_df.groupby(
self.parallelism_levels, group_keys=False, dropna=False

dummy = enter_leave_df.groupby(
self.parallelism_levels, group_keys=False, dropna=False, observed=False
).apply(_match_caller_callee_by_level)

# ensure proper indexing alignment on insert
self.events.loc[enter_leave_mask, "_depth"] = dummy["_depth"]
self.events.loc[enter_leave_mask, "_parent"] = dummy["_parent"]
self.events.loc[enter_leave_mask, "_children"] = dummy["_children"]

self.events = self.events.astype({"_depth": "Int32", "_parent": "Int32"})
self.events = self.events.astype({"_depth": "category", "_parent": "category"})

Expand Down Expand Up @@ -638,7 +645,7 @@ def calc_idle_time(events):
return idle_time

return (
self.events.groupby(self.parallelism_levels, dropna=False)
self.events.groupby(self.parallelism_levels, dropna=False, observed=False)
.apply(
calc_idle_time,
)
Expand Down Expand Up @@ -683,7 +690,7 @@ def time_profile(self, num_bins=50, normalized=False):
self.calc_inc_metrics(["Timestamp (ns)"])

# Filter by Enter rows
events = self.events[self.events["Event Type"] == "Enter"].copy(deep=False)
events = self.events[self.events["Event Type"] == "Enter"]
names = events["Name"].unique().tolist()

# Create equal-sized bins
Expand All @@ -706,7 +713,7 @@ def calc_exc_time_in_bin(events):
}

# start out with exc times being a copy of inc times
exc_times = list(events["inc_time_in_bin"].copy(deep=False))
exc_times = list(events["inc_time_in_bin"])

# filter to events that have children
filtered_df = events.loc[events["_children"].notnull()]
Expand Down Expand Up @@ -741,7 +748,7 @@ def calc_exc_time_in_bin(events):
in_bin = events[
(events["_matching_timestamp"] > start)
& (events["Timestamp (ns)"] < end)
].copy(deep=False)
]

# Calculate inc_time_in_bin for each function
# Case 1 - Function starts in bin
Expand Down Expand Up @@ -776,7 +783,7 @@ def calc_exc_time_in_bin(events):
calc_exc_time_in_bin(in_bin)

# Sum across all processes
agg = in_bin.groupby("Name")["exc_time_in_bin"].sum()
agg = in_bin.groupby("Name", observed=False)["exc_time_in_bin"].sum()
profile.append(agg.to_dict())

# Convert to DataFrame
Expand Down Expand Up @@ -896,3 +903,169 @@ def detect_pattern(
patterns.append(match_original)

return patterns

def ann_time_breakdown(
self,
filter_regex: str | list = None,
mapper: dict = None,
):
"""
Time breakdown by NVTX-annotation, using the `_children` pointers. Supports
repeated annotation names by indexing results with each annotation's row-index

Arguments:
- filter_regex: str, list, optional
A string regex or list of regexes to select specific annotations to
include in the breakdown
- mapper: dict, optional = None
A dictionary mapping kernel names to their corresponding groups

Returns:
- pd.DataFrame
Contains per-annotation CPU and GPU time breakdowns and the following:
index = annotation row-index (ann_idx)
columns: ["Name", "gpu_time", "gpu_idle_time"]
"""

# calculate inclusive metrics
if "time.inc" not in self.events.columns:
self.calc_inc_metrics(["Timestamp (ns)"])

# calculate exclusive time if needed
if "time.exc" not in self.events.columns:
self.calc_exc_metrics(["Timestamp (ns)"])

# 1) Select all "Enter" events of type "annotation"
ann_events = self.events[
(self.events["type"] == "annotation")
& (self.events["Event Type"] == "Enter")
]

# 2) Optionally filter by regex on the annotation Name
if filter_regex is not None:
if isinstance(filter_regex, list):
filter_pattern = "|".join(filter_regex)
else:
filter_pattern = filter_regex
ann_events = ann_events[
ann_events["Name"].str.contains(filter_pattern, regex=True)
]

breakdown_columns = {
"gpu_time": 0,
"gpu_idle_time": 0,
}

# Based on the mapper, get the different time columns we need to compute
if mapper is not None:
for key, value in mapper.items():
if value not in breakdown_columns:
breakdown_columns[value] = 0

# Prepare a list for per-annotation records
records = []

# 3) For each annotation row (by index), walk its descendants
for ann_idx, ann_row in ann_events.iterrows():
ann_name = ann_row["Name"]
raw_children = ann_row["_children"]
cpu_time = ann_row["time.inc"]

# If _children is NaN, record zeros immediately
is_list_like = isinstance(raw_children, (list, tuple, np.ndarray))
if (not is_list_like) and pd.isna(raw_children):
records.append(
{
"ann_idx": ann_idx,
"Name": ann_name,
"cpu_time": cpu_time,
**breakdown_columns,
}
)
continue

# Normalize raw_children into a list of ints
if isinstance(raw_children, (int, np.integer)):
child_stack = [int(raw_children)]
else:
# Assume it's already a Python list of ints
child_stack = list(raw_children)

visited = set()
all_timestamps = []
all_matching = []
active_gpu_time = 0
breakdown = copy.deepcopy(breakdown_columns)

# Depth‐first traversal of all descendants
while child_stack:
cid = child_stack.pop()
if cid in visited:
continue
if cid not in self.events.index:
# Skip invalid indices
continue

visited.add(cid)
child_row = self.events.loc[cid]
child_type = child_row["type"]

# Record this row's timestamps (if present)
ts = child_row["Timestamp (ns)"]
mts = child_row["_matching_timestamp"]

if not pd.isna(child_type) and child_type in ("kernel", "comm"):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"comm" does not exist as a type really. My csv generation code using the v2 api had this hacky logic -
trace.events.loc[(trace.events["Name"].str.contains("nccl")) & (trace.events["type"] == "kernel"), "type"] = "comm"

This is not needed really because the regex can handle what kernels belong to communication

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this should include "cuda_memcpy" type times as well so that those don't get counted as idle time.

if not pd.isna(ts):
all_timestamps.append(ts)
if not pd.isna(mts):
all_matching.append(mts)

# If this row has its own children, push them too
child_children = child_row["_children"]
if isinstance(
child_children, (list, tuple, np.ndarray, int, np.integer)
):
if isinstance(child_children, (int, np.integer)):
child_stack.append(int(child_children))
else:
child_stack.extend(list(child_children))

# If this row is a kernel or comm, accumulate its active GPU time
if not pd.isna(child_row["type"]):
if child_row["type"] in ("kernel", "comm"):
if (not pd.isna(ts)) and (not pd.isna(mts)):
breakdown["gpu_time"] += abs(mts - ts)
active_gpu_time += abs(mts - ts)

if mapper is not None:
# Match the name with the mapper key regex
for key, value in mapper.items():
label = child_row["Name"]
if re.search(key, label):
breakdown[value] += abs(mts - ts)
break

# 4) Compute overall [min, max] window, then idle time
if all_timestamps or all_matching:
overall_min = min(all_timestamps + all_matching)
overall_max = max(all_timestamps + all_matching)
idle_gpu_time = (overall_max - overall_min) - active_gpu_time
idle_gpu_time = max(idle_gpu_time, 0) # clamp ≥ 0
breakdown["gpu_idle_time"] += idle_gpu_time
else:
idle_gpu_time = 0

records.append(
{
"ann_idx": ann_idx,
"Name": ann_name,
"cpu_time": cpu_time,
**breakdown,
}
)

# 5) Build DataFrame, indexed by ann_idx (annotation row-index)
result_df = pd.DataFrame(records).set_index("ann_idx")[
["Name", "cpu_time"] + list(breakdown_columns.keys())
]
return result_df