Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,40 @@ class IntraProcessManager
std::vector<uint64_t> take_ownership_subscriptions;
};

/// Hash function for rmw_gid_t to enable use in unordered_map
struct rmw_gid_hash
{
std::size_t operator()(const rmw_gid_t & gid) const noexcept
{
// Using the FNV-1a hash algorithm on the gid data
constexpr std::size_t FNV_prime = 1099511628211u;
std::size_t result = 14695981039346656037u;

for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
result ^= gid.data[i];
result *= FNV_prime;
}
return result;
}
};

/// Equality comparison for rmw_gid_t to enable use in unordered_map
struct rmw_gid_equal
{
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
{
// Compare implementation identifier first for fast rejection
if (lhs.implementation_identifier != rhs.implementation_identifier) {
return false;
}
// Compare the data bytes
return std::equal(
std::begin(lhs.data),
std::end(lhs.data),
std::begin(rhs.data));
}
};

using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

Expand All @@ -398,6 +432,9 @@ class IntraProcessManager
using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;

using GidToPublisherIdMap =
std::unordered_map<rmw_gid_t, uint64_t, rmw_gid_hash, rmw_gid_equal>;

RCLCPP_PUBLIC
static
uint64_t
Expand Down Expand Up @@ -640,6 +677,7 @@ class IntraProcessManager
SubscriptionMap subscriptions_;
PublisherMap publishers_;
PublisherBufferMap publisher_buffers_;
GidToPublisherIdMap gid_to_pub_id_;

mutable std::shared_timed_mutex mutex_;
};
Expand Down
34 changes: 25 additions & 9 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
}
}

// Add GID to publisher ID mapping for fast lookups
gid_to_pub_id_[publisher->get_gid()] = pub_id;

// Initialize the subscriptions storage for this publisher.
pub_to_subs_[pub_id] = SplittedSubscriptions();

Expand Down Expand Up @@ -98,6 +101,15 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

// Remove GID to publisher ID mapping
auto pub_it = publishers_.find(intra_process_publisher_id);
if (pub_it != publishers_.end()) {
auto publisher = pub_it->second.lock();
if (publisher) {
gid_to_pub_id_.erase(publisher->get_gid());
}
}

publishers_.erase(intra_process_publisher_id);
publisher_buffers_.erase(intra_process_publisher_id);
pub_to_subs_.erase(intra_process_publisher_id);
Expand All @@ -108,16 +120,20 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
// Use O(1) hash map lookup instead of O(n) iteration
auto it = gid_to_pub_id_.find(*id);
if (it == gid_to_pub_id_.end()) {
return false;
}
Comment thread
fujitatomoya marked this conversation as resolved.
Outdated

// Verify the publisher still exists
auto pub_it = publishers_.find(it->second);
Comment thread
fujitatomoya marked this conversation as resolved.
Outdated
if (pub_it == publishers_.end()) {
return false;
}
return false;

auto publisher = pub_it->second.lock();
return publisher != nullptr;
Comment thread
fujitatomoya marked this conversation as resolved.
}

size_t
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/test/rclcpp/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,14 @@ class PublisherBase
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
: topic_name(topic),
qos_profile(qos)
{}
{
// Initialize a mock GID with unique data based on this pointer
gid_.implementation_identifier = "mock_rmw";
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
}
}

virtual ~PublisherBase()
{}
Expand Down Expand Up @@ -192,6 +199,12 @@ class PublisherBase
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
}

const rmw_gid_t &
get_gid() const
{
return gid_;
}

bool
operator==([[maybe_unused]] const rmw_gid_t & gid) const
{
Expand All @@ -210,6 +223,7 @@ class PublisherBase
private:
std::string topic_name;
rclcpp::QoS qos_profile;
rmw_gid_t gid_;
};

template<typename T, typename Alloc = std::allocator<void>>
Expand Down