Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,78 @@ def _match_events(self):

self.events = self.events.astype({"_matching_event": "Int32"})

def _match_messages(self):
Comment thread
movsesyanae marked this conversation as resolved.
"""
Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events
"""
if "_matching_event" not in self.events.columns:
self.events["_matching_event"] = None

if "_matching_timestamp" not in self.events.columns:
self.events["_matching_timestamp"] = np.nan

matching_events = list(self.events["_matching_event"])
matching_times = list(self.events["_matching_timestamp"])

Comment thread
movsesyanae marked this conversation as resolved.
Outdated
mpi_events = self.events[

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename to comm_events or some equivalent

self.events["Name"].isin(["MpiSend", "MpiRecv", "MpiIsend", "MpiIrecv"])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to check if the event name is in send_events_names + receive_events_names

]

queue = [[] for _ in range(len(self.events["Process"].unique()))]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update queue to be a dict per process, and index the dict per receiver process with sender processes. Also comments should be added to explain this.


df_indices = list(mpi_events.index)
timestamps = list(mpi_events["Timestamp (ns)"])
names = list(mpi_events["Name"])
attrs = list(mpi_events["Attributes"])
processes = list(mpi_events["Process"])

# Iterate through all events
for i in range(len(mpi_events)):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to iterate over send events and receive events separately, so that if a process with a receive event is iterated over, we can confirm that the send event has already been seen and added to the queue.

curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_name = names[i]
curr_attrs = attrs[i]
curr_process = processes[i]

if curr_name == "MpiSend" or curr_name == "MpiIsend":
# Add current dataframe index, timestmap, and process to stack
if "receiver" in curr_attrs:
queue[curr_attrs["receiver"]].append(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update for new dict queue implementation.

(curr_df_index, curr_timestamp, curr_name, curr_process)
)
elif curr_name == "MpiRecv" or curr_name == "MpiIrecv":
if "sender" in curr_attrs:
send_process = None
i = 0

# we want to iterate through the queue in order
# until we find the corresponding "send" event
while send_process != curr_attrs["sender"] and i < len(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update for new dict queue implementation.

queue[curr_process]
):
send_df_index, send_timestamp, send_name, send_process = queue[
curr_process
][i]
i += 1

if send_process == curr_attrs["sender"] and i <= len(
queue[curr_process]
):
# remove matched event from queue
del queue[curr_process][i - 1]

# Fill in the lists with the matching values if event found
matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index

matching_times[send_df_index] = curr_timestamp
matching_times[curr_df_index] = send_timestamp

self.events["_matching_event"] = matching_events
self.events["_matching_timestamp"] = matching_times

self.events = self.events.astype({"_matching_event": "Int32"})

def _match_caller_callee(self):
"""Matches callers (parents) to callees (children) and adds three
columns to the dataframe:
Expand Down