diff --git a/docs/design.md b/docs/design.md index f3b12799..55b0cadc 100644 --- a/docs/design.md +++ b/docs/design.md @@ -182,9 +182,12 @@ The format of a liveliness token for a node is: The format of a liveliness token for a publisher, subscription, service server, or service client: ```text -@ros2_lv//////////// +@ros2_lv////////////[/] ``` +The trailing `` component is **optional**: it is only present for buffer-aware (i.e. `rosidl::Buffer`-carrying) publishers and subscribers and is omitted entirely for plain ROS message types. +See the [Buffer-aware pub/sub](#buffer-aware-pub-sub) section below for details. + Where: * A mangled name is the name with each `/` characters replaced by `%` @@ -206,6 +209,11 @@ Where: * `` - The topic or service type name, as declared in DDS (e.g.: `std_msgs::msg::dds_::String_`) when encoded in CDR * `` - the topic or service type hash, as defined in [REP-2016](https://github.com/ros-infrastructure/rep/pull/381/files) and generated by [`rosidl`](https://github.com/ros2/rosidl/tree/rolling/rosidl_generator_type_description) * `` - The entity QoS encoded in a compact format (see the `qos_to_keyexpr` function) +* `` - *(Optional, buffer-aware entities only)* A semicolon-separated list of `:` pairs prefixed with the literal `backends:`. +Backend names are sorted lexicographically so the same set always serializes identically. +The `` and `` fields are escaped to avoid collisions with the `:`, `;`, and `/` separators. +For example, a publisher that supports the `cpu` and `cuda` backends advertises `backends:cpu:;cuda:`. +This component is consumed by the [Buffer-aware pub/sub](#buffer-aware-pub-sub) discovery flow described below. During context initialization, `rmw_zenoh_cpp` calls `Session::liveliness_get` to get an initial view of the entire graph from other nodes in the system. From then on, when entities enter and leave the system, `rmw_zenoh_cpp` will get new liveliness tokens that it can use to update its internal cache. @@ -510,6 +518,83 @@ When a new service server is created, a liveliness token of type `SS` is sent ou * Query::reply * Query::ReplyOptions +## Buffer-aware pub/sub + +`rmw_zenoh_cpp` supports messages containing [`rosidl::Buffer`](https://discourse.openrobotics.org/t/working-prototype-of-native-buffers-accelerated-memory-transport/52399) fields, which describe data that may live outside of host memory (e.g. on a GPU). +For these messages, the RMW transports a small **buffer descriptor** (a handle that identifies the underlying memory region) instead of always copying the bytes through the host, while preserving full bit-compatibility with publishers and subscribers that only know how to consume regular CPU bytes. + +### Backends + +Each `rosidl::Buffer` field is bound to a *backend* (e.g. `cpu`, `cuda`). +At RMW context init time, `rmw_zenoh_cpp` calls `rosidl_buffer_backend_registry::initialize_buffer_backends()` to load all installed backend plugins into a per-context `BufferBackendContext`, and tears them down again on context shutdown. +Every buffer-aware publisher always implicitly supports the `cpu` backend, so a buffer-aware publisher can always fall back to plain CPU serialization for any subscriber. + +Subscribers can opt in to specific backends via the `acceptable_buffer_backends` `rmw_subscription_options_t` field: + +* `nullptr`, empty, or `"cpu"` -- CPU-only subscriber (advertises `backends:cpu:` in its liveliness token). +* `"any"` -- accept all installed backends. +* A comma-separated list of backend names -- accept only the listed backends (intersected with what is installed). + +### Discovery + +When a buffer-aware publisher and subscriber appear in the graph, they discover each other through the existing graph cache plus two new discovery callbacks (`register_subscriber_discovery_callback` and `register_publisher_discovery_callback`). +The callbacks are invoked **outside** of the graph cache mutex to avoid re-entrant deadlocks when they create per-endpoint Zenoh entities. + +The `` component of the liveliness token (see [Graph Cache](#graph-cache)) carries each endpoint's advertised backend set so that peers can negotiate which backend to use without any extra discovery traffic. + +### Topic key expressions + +Buffer-aware peers communicate over additional Zenoh key expressions derived from the standard topic key expression: + +| Suffix | Direction | Description | +| --- | --- | --- | +| *(none)* | both | Standard CDR-serialized payload. Always declared by every endpoint so a buffer-aware endpoint can still talk to a legacy (non-buffer) peer. | +| `/_buf_cpu` | pub -> sub | Shared CPU-group channel. A single Zenoh publisher/subscriber pair carries serialized CPU payloads for **all** CPU-only buffer-aware subscribers on the topic. Mirrors `rmw_fastrtps_cpp`'s eager CPU `DataWriter`. | +| `/_buf/` | pub -> sub | Per-subscriber accelerated channel. Each buffer-aware subscriber that advertises non-CPU backends owns a single shared accelerated Zenoh subscriber keyed by its GID. Every matching publisher writes the negotiated backend descriptor to this key. | + +For example, with `ROS_DOMAIN_ID=0`, a buffer-aware `chatter` topic carrying GPU images may use the following keys: + +```text +0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_ # base CDR fallback +0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_/_buf_cpu # shared CPU-group channel +0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_/_buf/ # per-subscriber accelerated channel +``` + +### Publish path + +`PublisherData::publish()` chooses a path based on the matched subscribers (mirroring `rmw_fastrtps_cpp`): + +1. **All matched subscribers are buffer-aware** -- skip the standard path and call `publish_buffer_aware()`, which: + * publishes a single CPU serialization to `/_buf_cpu` if there are any CPU-only buffer-aware subscribers, and + * publishes a per-subscriber, endpoint-aware serialization (containing only the buffer descriptor for non-CPU backends) to each `/_buf/` key. +2. **At least one legacy (non-buffer-aware) subscriber is matched** -- skip the buffer channels entirely and fall through to the standard base-key publish path so every subscriber, buffer-aware or not, receives a fully serialized message via the base key. + +### Subscribe path + +`SubscriptionData` always declares a Zenoh subscriber on the base key for the legacy fallback. +If the subscriber is buffer-aware it additionally: + +* declares a Zenoh subscriber on `/_buf_cpu` if it is CPU-only (so it shares the CPU channel with all other CPU-only buffer-aware subscribers on the topic), or +* declares a single accelerated Zenoh subscriber on `/_buf/` if it accepts at least one non-CPU backend. + +Incoming samples carry the originating endpoint info (via the `Message` struct's `EndpointInfoStorage`) so that the FastCDR-based deserialization layer can dispatch to the correct backend's `from_descriptor_with_endpoint()` to reconstruct the buffer in place. + +### Related RMW APIs + +* `rmw_subscription_options_t::acceptable_buffer_backends` +* `rosidl::Buffer` +* `rosidl_buffer_backend_registry::initialize_buffer_backends` +* `rosidl_buffer_backend_registry::shutdown_buffer_backends` +* `PublisherData::publish_buffer_aware` +* `SubscriptionData::on_publisher_discovered` +* `GraphCache::register_publisher_discovery_callback` +* `GraphCache::register_subscriber_discovery_callback` + +### Related Zenoh APIs + +* `Session::declare_advanced_publisher` +* `Session::declare_advanced_subscriber` + ## Quality of Service The ROS 2 RMW layer defines quite a few Quality of Service settings that are largely derived from DDS. diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index c1e23dbf..a3bee635 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -19,6 +19,7 @@ find_package(fastcdr CONFIG REQUIRED) find_package(nlohmann_json REQUIRED) find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) +find_package(rosidl_buffer_backend_registry REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) @@ -27,6 +28,7 @@ find_package(tracetools REQUIRED) find_package(zenoh_cpp_vendor REQUIRED) add_library(rmw_zenoh_cpp SHARED + src/detail/buffer_backend_loader.cpp src/detail/attachment_helpers.cpp src/detail/cdr.cpp src/detail/event.cpp @@ -68,6 +70,7 @@ target_link_libraries(rmw_zenoh_cpp fastcdr rcpputils::rcpputils rcutils::rcutils + rosidl_buffer_backend_registry::rosidl_buffer_backend_registry rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw @@ -77,6 +80,11 @@ target_link_libraries(rmw_zenoh_cpp configure_rmw_library(rmw_zenoh_cpp) +target_include_directories(rmw_zenoh_cpp + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/src +) + target_include_directories(rmw_zenoh_cpp PUBLIC $ $ diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index c772b1db..54f330d5 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -22,6 +22,7 @@ fastcdr rcpputils rcutils + rosidl_buffer_backend_registry rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw diff --git a/rmw_zenoh_cpp/src/detail/buffer_backend_context.hpp b/rmw_zenoh_cpp/src/detail/buffer_backend_context.hpp new file mode 100644 index 00000000..3e0367f1 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_backend_context.hpp @@ -0,0 +1,39 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_BACKEND_CONTEXT_HPP_ +#define DETAIL__BUFFER_BACKEND_CONTEXT_HPP_ + +#include +#include +#include + +#include "rosidl_buffer_backend/buffer_backend.hpp" +#include "rosidl_buffer_backend_registry/buffer_backend_registry.hpp" +#include "rosidl_typesupport_fastrtps_cpp/buffer_serialization.hpp" + +namespace rmw_zenoh_cpp +{ + +/// Per-rmw_context bundle of buffer backend state. +struct BufferBackendContext +{ + rosidl_typesupport_fastrtps_cpp::BufferSerializationContext serialization_context; + std::unique_ptr registry; + std::unordered_map> backend_instances; +}; + +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__BUFFER_BACKEND_CONTEXT_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/buffer_backend_loader.cpp b/rmw_zenoh_cpp/src/detail/buffer_backend_loader.cpp new file mode 100644 index 00000000..396da35e --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_backend_loader.cpp @@ -0,0 +1,191 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "buffer_backend_loader.hpp" + +#include +#include +#include +#include +#include +#include + +#include "rcutils/error_handling.h" +#include "rcutils/logging_macros.h" +#include "rosidl_buffer_backend_registry/buffer_backend_registry.hpp" +#include "rosidl_typesupport_fastrtps_cpp/identifier.hpp" +#include "rosidl_typesupport_fastrtps_cpp/message_type_support_decl.hpp" +#include "rosidl_typesupport_fastrtps_cpp/buffer_serialization.hpp" +#include "rosidl_runtime_c/message_type_support_struct.h" + +namespace rmw_zenoh_cpp +{ + +static const char * kLoggerName = "rmw_zenoh_cpp.buffer_backend_loader"; + +void initialize_buffer_backends(BufferBackendContext & context) +{ + context.registry = std::make_unique(); + auto & registry = *context.registry; + + auto & backend_ops = context.serialization_context.descriptor_ops; + auto & serializers = context.serialization_context.descriptor_serializers; + std::vector backend_names = registry.get_backend_names(); + RCUTILS_LOG_DEBUG_NAMED( + kLoggerName, "Buffer backends: found %zu backend(s)", backend_names.size()); + + for (const std::string & backend_name : backend_names) { + std::shared_ptr backend = + registry.create_backend_instance(backend_name); + if (!backend) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerName, "Backend '%s' pointer is null", backend_name.c_str()); + continue; + } + + std::string backend_type = backend->get_backend_type(); + RCUTILS_LOG_DEBUG_NAMED( + kLoggerName, "Processing backend '%s' (type: %s)", + backend_name.c_str(), backend_type.c_str()); + + context.backend_instances[backend_type] = backend; + + rosidl::BufferDescriptorOps ops; + + // Capture a weak_ptr so the lambdas (which live inside + // descriptor_ops/descriptor_serializers) do not extend the backend's + // lifetime past shutdown_buffer_backends(). The backend's only + // owning reference is held in context.backend_instances and is + // released explicitly during shutdown. + std::weak_ptr backend_wp = backend; + ops.create_descriptor_with_endpoint = [backend_wp]( + const void * impl, + const rmw_topic_endpoint_info_t & endpoint_info) -> std::shared_ptr { + std::shared_ptr backend_sp = backend_wp.lock(); + if (!backend_sp) { + throw std::runtime_error( + "Buffer backend instance has been destroyed; " + "create_descriptor_with_endpoint called after shutdown"); + } + return backend_sp->create_descriptor_with_endpoint(impl, endpoint_info); + }; + ops.from_descriptor_with_endpoint = [backend_wp]( + const void * descriptor, + const rmw_topic_endpoint_info_t & endpoint_info) + -> std::unique_ptr { + std::shared_ptr backend_sp = backend_wp.lock(); + if (!backend_sp) { + throw std::runtime_error( + "Buffer backend instance has been destroyed; " + "from_descriptor_with_endpoint called after shutdown"); + } + return backend_sp->from_descriptor_with_endpoint(descriptor, endpoint_info); + }; + + backend_ops[backend_type] = ops; + + const rosidl_message_type_support_t * descriptor_ts = + backend->get_descriptor_type_support(); + if (!descriptor_ts) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerName, + " Backend '%s' returned null descriptor type support", + backend_type.c_str()); + continue; + } + const rosidl_message_type_support_t * fastrtps_descriptor_ts = get_message_typesupport_handle( + descriptor_ts, rosidl_typesupport_fastrtps_cpp::typesupport_identifier); + if (!fastrtps_descriptor_ts) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerName, + " Backend '%s' descriptor type support could not be resolved to " + "rosidl_typesupport_fastrtps_cpp", + backend_type.c_str()); + rcutils_reset_error(); + continue; + } + + const auto * callbacks = static_cast( + fastrtps_descriptor_ts->data); + if (!callbacks) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerName, + " Backend '%s' descriptor callbacks are null", + backend_type.c_str()); + continue; + } + + rosidl_typesupport_fastrtps_cpp::BufferDescriptorSerializers desc_ser; + desc_ser.serialize = [callbacks]( + eprosima::fastcdr::Cdr & cdr, + const std::shared_ptr & desc_ptr, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & serialization_context) + { + if (!desc_ptr) { + throw std::runtime_error("Descriptor pointer is null"); + } + if (callbacks->cdr_serialize_with_endpoint) { + callbacks->cdr_serialize_with_endpoint( + desc_ptr.get(), cdr, endpoint_info, serialization_context); + } else { + callbacks->cdr_serialize(desc_ptr.get(), cdr); + } + }; + desc_ser.deserialize = [callbacks, backend_wp]( + eprosima::fastcdr::Cdr & cdr, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & serialization_context) + -> std::shared_ptr + { + std::shared_ptr backend_sp = backend_wp.lock(); + if (!backend_sp) { + throw std::runtime_error( + "Buffer backend instance has been destroyed; " + "descriptor deserialize called after shutdown"); + } + std::shared_ptr desc = backend_sp->create_empty_descriptor(); + if (!desc) { + throw std::runtime_error("Backend returned null descriptor instance"); + } + if (callbacks->cdr_deserialize_with_endpoint) { + callbacks->cdr_deserialize_with_endpoint( + cdr, desc.get(), endpoint_info, serialization_context); + } else { + callbacks->cdr_deserialize(cdr, desc.get()); + } + return desc; + }; + serializers[backend_type] = std::move(desc_ser); + + RCUTILS_LOG_DEBUG_NAMED( + kLoggerName, " FastCDR descriptor serializers registered for '%s'", + backend_type.c_str()); + } +} + +void shutdown_buffer_backends(BufferBackendContext & context) +{ + try { + context.serialization_context.descriptor_ops.clear(); + context.serialization_context.descriptor_serializers.clear(); + context.backend_instances.clear(); + context.registry.reset(); + } catch (const std::exception & e) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerName, "Error during buffer backend shutdown: %s", e.what()); + } +} + +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/buffer_backend_loader.hpp b/rmw_zenoh_cpp/src/detail/buffer_backend_loader.hpp new file mode 100644 index 00000000..5c95ceed --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_backend_loader.hpp @@ -0,0 +1,33 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_BACKEND_LOADER_HPP_ +#define DETAIL__BUFFER_BACKEND_LOADER_HPP_ + +#include "buffer_backend_context.hpp" + +namespace rmw_zenoh_cpp +{ + +/// Load buffer backend plugins and register them with FastCDR serialization. +/// Populates an RMW-context-local serialization map so multiple contexts in the +/// same process do not share mutable global descriptor state. +void initialize_buffer_backends(BufferBackendContext & context); + +/// Clear context-local serialization maps. +void shutdown_buffer_backends(BufferBackendContext & context); + +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__BUFFER_BACKEND_LOADER_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/buffer_endpoint_helpers.hpp b/rmw_zenoh_cpp/src/detail/buffer_endpoint_helpers.hpp new file mode 100644 index 00000000..e989b6eb --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_endpoint_helpers.hpp @@ -0,0 +1,123 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_ENDPOINT_HELPERS_HPP_ +#define DETAIL__BUFFER_ENDPOINT_HELPERS_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +#include "rmw/types.h" + +#include "liveliness_utils.hpp" + +namespace rmw_zenoh_cpp +{ +namespace buffer_endpoint_helpers +{ + +/// Convert an `rmw_gid_t` to its lowercase hex string representation. +inline std::string gid_to_hex(const rmw_gid_t & gid) +{ + std::ostringstream out; + out << std::hex << std::setfill('0'); + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + out << std::setw(2) << static_cast(gid.data[i]); + } + return out.str(); +} + +/// Convert a raw GID byte array to its lowercase hex string representation. +inline std::string gid_array_to_hex( + const std::array & gid_array) +{ + std::ostringstream out; + out << std::hex << std::setfill('0'); + for (size_t i = 0; i < gid_array.size(); ++i) { + out << std::setw(2) << static_cast(gid_array[i]); + } + return out.str(); +} + +/// Build the shared CPU-group endpoint key for a base topic key expression. +/// +/// All buffer-aware publishers and CPU-only subscribers on a topic share a +/// single Zenoh channel under this suffix. +inline std::string make_cpu_group_key(const std::string & base_key) +{ + return base_key + "/_buf_cpu"; +} + +/// Build the per-subscriber accelerated buffer key for a base topic key. +/// +/// Each buffer-aware subscriber owns one accelerated channel keyed by its +/// GID. Every matching publisher writes to this key, so a subscriber only +/// needs a single accelerated Zenoh subscriber regardless of the number of +/// matched publishers. +inline std::string make_accelerated_key( + const std::string & base_key, + const std::string & sub_gid_hex) +{ + return base_key + "/_buf/" + sub_gid_hex; +} + +/// Convenience helper: accelerated key from a publisher-side `rmw_gid_t`. +inline std::string make_accelerated_key( + const std::string & base_key, + const rmw_gid_t & sub_gid) +{ + return make_accelerated_key(base_key, gid_to_hex(sub_gid)); +} + +/// Stringify a liveliness EntityType for log messages. +inline const char * entity_type_to_string(liveliness::EntityType type) +{ + switch (type) { + case liveliness::EntityType::Node: + return "Node"; + case liveliness::EntityType::Publisher: + return "Publisher"; + case liveliness::EntityType::Subscription: + return "Subscription"; + case liveliness::EntityType::Service: + return "Service"; + case liveliness::EntityType::Client: + return "Client"; + default: + return "Unknown"; + } +} + +/// True iff the only advertised backend type is "cpu". +inline bool is_cpu_only_backend_types(const std::vector & backend_types) +{ + return backend_types.size() == 1 && backend_types.front() == "cpu"; +} + +/// True iff the only advertised backend in the metadata map is "cpu". +inline bool is_cpu_only_backend_metadata( + const std::unordered_map & backend_metadata) +{ + return backend_metadata.size() == 1 && backend_metadata.count("cpu") == 1; +} + +} // namespace buffer_endpoint_helpers +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__BUFFER_ENDPOINT_HELPERS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/endpoint_info.hpp b/rmw_zenoh_cpp/src/detail/endpoint_info.hpp new file mode 100644 index 00000000..2a285518 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/endpoint_info.hpp @@ -0,0 +1,71 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__ENDPOINT_INFO_HPP_ +#define DETAIL__ENDPOINT_INFO_HPP_ + +#include +#include + +#include "liveliness_utils.hpp" + +#include "rmw/types.h" +#include "rmw/topic_endpoint_info.h" +#include "rosidl_runtime_c/type_hash.h" + +namespace rmw_zenoh_cpp +{ + +/// Owning storage for endpoint info, keeping strings alive for the +/// non-owning const char * pointers inside rmw_topic_endpoint_info_t. +struct EndpointInfoStorage +{ + rmw_topic_endpoint_info_t info{}; + std::string node_name; + std::string node_namespace; + std::string topic_type; +}; + +/// Build an EndpointInfoStorage from a liveliness Entity. +/// This is a shared helper that replaces the duplicate lambdas previously +/// defined in rmw_publisher_data.cpp and rmw_subscription_data.cpp. +inline EndpointInfoStorage build_endpoint_info_from_entity( + const liveliness::Entity & entity, + rmw_endpoint_type_t endpoint_type) +{ + EndpointInfoStorage storage; + storage.node_name = entity.node_name(); + storage.node_namespace = entity.node_namespace(); + auto topic_info = entity.topic_info(); + if (topic_info.has_value()) { + storage.topic_type = topic_info->type_; + storage.info.qos_profile = topic_info->qos_; + storage.info.topic_type_hash = rosidl_get_zero_initialized_type_hash(); + (void)rosidl_parse_type_hash_string( + topic_info->type_hash_.c_str(), + &storage.info.topic_type_hash); + } + storage.info.node_name = storage.node_name.c_str(); + storage.info.node_namespace = storage.node_namespace.c_str(); + storage.info.topic_type = storage.topic_type.c_str(); + storage.info.endpoint_type = endpoint_type; + + auto gid_array = entity.copy_gid(); + std::memcpy(storage.info.endpoint_gid, gid_array.data(), RMW_GID_STORAGE_SIZE); + return storage; +} + +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__ENDPOINT_INFO_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 58d0abce..d65e1e22 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -91,16 +91,18 @@ std::shared_ptr GraphCache::make_graph_node(const Entity & entity) co } ///============================================================================= -void GraphCache::update_topic_maps_for_put( +std::vector +GraphCache::update_topic_maps_for_put( GraphNodePtr graph_node, liveliness::ConstEntityPtr entity) { if (entity->type() == EntityType::Node) { // Nothing to update for a node entity. - return; + return {}; } // First update the topic map within the node. + // These calls never fire discovery callbacks (report_events = false). if (entity->type() == EntityType::Publisher) { update_topic_map_for_put(graph_node->pubs_, entity); } else if (entity->type() == EntityType::Subscription) { @@ -112,19 +114,20 @@ void GraphCache::update_topic_maps_for_put( } // Then update the variables tracking topics across the graph. - // We invoke update_topic_map_for_put() with report_events set to true for - // pub/sub. + // Only pub/sub calls collect discovery callbacks (report_events = true); + // return them to the caller for invocation outside graph_mutex_. if (entity->type() == EntityType::Publisher || entity->type() == EntityType::Subscription) { - update_topic_map_for_put(this->graph_topics_, entity, true); - } else { - update_topic_map_for_put(this->graph_services_, entity); + return update_topic_map_for_put(this->graph_topics_, entity, true); } + update_topic_map_for_put(this->graph_services_, entity); + return {}; } ///============================================================================= -void GraphCache::update_topic_map_for_put( +std::vector +GraphCache::update_topic_map_for_put( GraphNode::TopicMap & topic_map, liveliness::ConstEntityPtr entity, bool report_events) @@ -136,7 +139,7 @@ void GraphCache::update_topic_map_for_put( "rmw_zenoh_cpp", "update_topic_map_for_put() called for non-node entity without valid TopicInfo. " "Report this."); - return; + return {}; } // For the sake of reusing data structures and lookup functions, we treat publishers and @@ -155,7 +158,7 @@ void GraphCache::update_topic_map_for_put( topic_map.insert(std::make_pair(graph_topic_data->info_.name_, std::move(topic_data_map))); // We do not need to check for events since this is the first time and entiry for this topic // was added to the topic map. - return; + return {}; } // The topic exists in the topic_map so we check if the type also exists. GraphNode::TopicTypeMap::iterator topic_type_map_it = topic_map_it->second.find( @@ -167,7 +170,7 @@ void GraphCache::update_topic_map_for_put( graph_topic_data->info_.type_, std::move(topic_qos_map))); // TODO(Yadunund) Check for and report an *_INCOMPATIBLE_TYPE events. - return; + return {}; } // The topic type already exists. if (report_events) { @@ -195,6 +198,38 @@ void GraphCache::update_topic_map_for_put( existing_graph_topic->subs_.insert(entity); } } + + // Collect discovery callbacks under discovery_mutex_ and return them to the + // caller for invocation *outside* graph_mutex_. + // + // Lock ordering invariant: graph_mutex_ → discovery_mutex_. + // parse_put holds graph_mutex_ for its entire body; register_*_callback + // also acquires graph_mutex_ before discovery_mutex_, so the order is + // consistent and ABBA-free. + // + // Callbacks must NOT be invoked while graph_mutex_ is held: any callback + // that re-enters a GraphCache method would deadlock (std::mutex is + // non-recursive). Callers are responsible for invoking the returned + // callbacks only after releasing graph_mutex_. + std::vector callbacks_to_invoke; + { + std::lock_guard disc_lock(discovery_mutex_); + const std::string & topic_name = graph_topic_data->info_.name_; + if (is_pub) { + if (publisher_discovery_callbacks_.count(topic_name)) { + for (const auto & [gid_hash, cb] : publisher_discovery_callbacks_[topic_name]) { + callbacks_to_invoke.push_back(cb); + } + } + } else { + if (subscriber_discovery_callbacks_.count(topic_name)) { + for (const auto & [gid_hash, cb] : subscriber_discovery_callbacks_[topic_name]) { + callbacks_to_invoke.push_back(cb); + } + } + } + } + return callbacks_to_invoke; } ///============================================================================= @@ -327,8 +362,11 @@ void GraphCache::parse_put( return; } - // Lock the graph mutex before accessing the graph. - std::lock_guard lock(graph_mutex_); + // Use unique_lock so we can release graph_mutex_ before invoking discovery + // callbacks; callbacks must not run under graph_mutex_ (non-recursive mutex). + std::unique_lock lock(graph_mutex_); + + std::vector pending_callbacks; // If the namespace did not exist, create it and add the node to the graph and return. NamespaceMap::iterator ns_it = graph_.find(entity->node_namespace()); @@ -341,50 +379,58 @@ void GraphCache::parse_put( NodeMap node_map = { {entity->node_name(), node}}; graph_.emplace(std::make_pair(entity->node_namespace(), std::move(node_map))); - update_topic_maps_for_put(node, entity); + pending_callbacks = update_topic_maps_for_put(node, entity); total_nodes_in_graph_ += 1; - return; + } else { + // Add the node to the namespace if it did not exist and return. + // Case 1: First time a node with this name is added to the namespace. + // Case 2: There are one or more nodes with the same name but the entity could + // represent a node with the same name but a unique id which would make it a + // new addition to the graph. + std::pair range = ns_it->second.equal_range( + entity->node_name()); + NodeMap::iterator node_it = std::find_if( + range.first, range.second, + [entity](const std::pair & node_it) + { + // Match nodes if their zenoh session and node ids match. + return entity->zid() == node_it.second->zid_ && entity->nid() == node_it.second->nid_; + }); + if (node_it == range.second) { + // Either the first time a node with this name is added or with an existing + // name but unique id. + GraphNodePtr node = make_graph_node(*entity); + if (node == nullptr) { + // Error handled. + return; + } + NodeMap::iterator insertion_it = + ns_it->second.insert(std::make_pair(entity->node_name(), node)); + pending_callbacks = update_topic_maps_for_put(node, entity); + total_nodes_in_graph_ += 1; + if (insertion_it == ns_it->second.end()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to add a new node /%s to an " + "existing namespace %s in the graph. Report this bug.", + entity->node_name().c_str(), + entity->node_namespace().c_str()); + } + } else { + // Otherwise, the entity represents a node that already exists in the graph. + // Update topic info if required below. + pending_callbacks = update_topic_maps_for_put(node_it->second, entity); + } } - // Add the node to the namespace if it did not exist and return. - // Case 1: First time a node with this name is added to the namespace. - // Case 2: There are one or more nodes with the same name but the entity could - // represent a node with the same name but a unique id which would make it a - // new addition to the graph. - std::pair range = ns_it->second.equal_range( - entity->node_name()); - NodeMap::iterator node_it = std::find_if( - range.first, range.second, - [entity](const std::pair & node_it) - { - // Match nodes if their zenoh session and node ids match. - return entity->zid() == node_it.second->zid_ && entity->nid() == node_it.second->nid_; - }); - if (node_it == range.second) { - // Either the first time a node with this name is added or with an existing - // name but unique id. - GraphNodePtr node = make_graph_node(*entity); - if (node == nullptr) { - // Error handled. - return; - } - NodeMap::iterator insertion_it = - ns_it->second.insert(std::make_pair(entity->node_name(), node)); - update_topic_maps_for_put(node, entity); - total_nodes_in_graph_ += 1; - if (insertion_it == ns_it->second.end()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to add a new node /%s to an " - "existing namespace %s in the graph. Report this bug.", - entity->node_name().c_str(), - entity->node_namespace().c_str()); - } - return; + // Release graph_mutex_ before invoking discovery callbacks. + // Callbacks may create per-endpoint Zenoh publishers/subscribers; they must + // not hold graph_mutex_ to avoid potential re-entrant deadlock. + lock.unlock(); + + for (const auto & cb : pending_callbacks) { + cb(*entity); } - // Otherwise, the entity represents a node that already exists in the graph. - // Update topic info if required below. - update_topic_maps_for_put(node_it->second, entity); } ///============================================================================= @@ -1327,6 +1373,96 @@ void GraphCache::remove_qos_event_callbacks(std::size_t entity_gid_hash) event_callbacks_.erase(entity_gid_hash); } +///============================================================================= +void GraphCache::register_subscriber_discovery_callback( + const std::string & topic_name, + std::size_t publisher_gid_hash, + EntityDiscoveryCallback callback) +{ + // Collect existing entities under lock, then invoke callbacks outside + // to avoid ABBA deadlock between graph_mutex_ and discovery_mutex_. + // Lock ordering: graph_mutex_ first, then discovery_mutex_ (matches parse_put). + std::vector entities_to_notify; + { + std::lock_guard graph_lock(graph_mutex_); + std::lock_guard disc_lock(discovery_mutex_); + subscriber_discovery_callbacks_[topic_name][publisher_gid_hash] = callback; + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[GraphCache] Registered subscriber discovery callback for publisher on topic: '%s'", + topic_name.c_str()); + + for (const auto & [topic, type_map] : graph_topics_) { + if (topic == topic_name) { + for (const auto & [type, qos_map] : type_map) { + for (const auto & [qos, topic_data] : qos_map) { + for (const auto & entity_ptr : topic_data->subs_) { + entities_to_notify.push_back(entity_ptr); + } + } + } + } + } + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[GraphCache] Found %zu existing subscriber(s) for topic '%s'", + entities_to_notify.size(), topic_name.c_str()); + } + + for (const auto & entity_ptr : entities_to_notify) { + callback(*entity_ptr); + } +} + +///============================================================================= +void GraphCache::register_publisher_discovery_callback( + const std::string & topic_name, + std::size_t subscriber_gid_hash, + EntityDiscoveryCallback callback) +{ + // Same collect-then-invoke pattern as register_subscriber_discovery_callback. + std::vector entities_to_notify; + { + std::lock_guard graph_lock(graph_mutex_); + std::lock_guard disc_lock(discovery_mutex_); + publisher_discovery_callbacks_[topic_name][subscriber_gid_hash] = callback; + + for (const auto & [topic, type_map] : graph_topics_) { + if (topic == topic_name) { + for (const auto & [type, qos_map] : type_map) { + for (const auto & [qos, topic_data] : qos_map) { + for (const auto & entity_ptr : topic_data->pubs_) { + entities_to_notify.push_back(entity_ptr); + } + } + } + } + } + } + + for (const auto & entity_ptr : entities_to_notify) { + callback(*entity_ptr); + } +} + +///============================================================================= +void GraphCache::unregister_discovery_callbacks(std::size_t gid_hash) +{ + std::lock_guard lock(discovery_mutex_); + + // Remove from all topics in subscriber discovery callbacks + for (auto & [topic_name, callbacks] : subscriber_discovery_callbacks_) { + callbacks.erase(gid_hash); + } + + // Remove from all topics in publisher discovery callbacks + for (auto & [topic_name, callbacks] : publisher_discovery_callbacks_) { + callbacks.erase(gid_hash); + } +} + ///============================================================================= bool GraphCache::is_entity_local(const liveliness::Entity & entity) const { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index c1e2d121..cf02f768 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -105,6 +105,10 @@ class GraphCache final /// event is detected. using GraphCacheEventCallback = std::function; + /// @brief Signature for a function that will be invoked when an entity is discovered. + /// Used for Buffer-aware publishers/subscribers to dynamically create endpoints. + using EntityDiscoveryCallback = std::function; + /// @brief Constructor /// @param id The id of the zenoh session that is building the graph cache. /// This is used to infer which entities originated from the current session @@ -191,6 +195,30 @@ class GraphCache final /// Remove all qos event callbacks for an entity. void remove_qos_event_callbacks(std::size_t entity_gid_hash); + /// Register a callback for when a subscriber is discovered on a topic + /// (for Buffer-aware publishers). + /// @param topic_name The topic name to monitor + /// @param publisher_gid_hash The gid hash of the publisher registering the callback + /// @param callback The callback function to invoke when a subscriber is discovered + void register_subscriber_discovery_callback( + const std::string & topic_name, + std::size_t publisher_gid_hash, + EntityDiscoveryCallback callback); + + /// Register a callback for when a publisher is discovered on a topic + /// (for Buffer-aware subscribers). + /// @param topic_name The topic name to monitor + /// @param subscriber_gid_hash The gid hash of the subscriber registering the callback + /// @param callback The callback function to invoke when a publisher is discovered + void register_publisher_discovery_callback( + const std::string & topic_name, + std::size_t subscriber_gid_hash, + EntityDiscoveryCallback callback); + + /// Unregister all discovery callbacks for an entity. + /// @param gid_hash The gid hash of the entity + void unregister_discovery_callbacks(std::size_t gid_hash); + /// Returns true if the entity is a publisher or client. False otherwise. static bool is_entity_pub(const liveliness::Entity & entity); @@ -200,11 +228,16 @@ class GraphCache final std::shared_ptr make_graph_node(const liveliness::Entity & entity) const; // Helper function to update TopicMap within the node the cache for the entire graph. - void update_topic_maps_for_put( + // Returns any discovery callbacks that should be invoked by the caller + // *outside* graph_mutex_ to prevent re-entrant deadlock. + std::vector update_topic_maps_for_put( GraphNodePtr graph_node, liveliness::ConstEntityPtr entity); - void update_topic_map_for_put( + // Returns discovery callbacks to invoke; never invokes them directly. + // Callers must hold graph_mutex_ when calling, and must invoke the returned + // callbacks *after* releasing graph_mutex_. + std::vector update_topic_map_for_put( GraphNode::TopicMap & topic_map, liveliness::ConstEntityPtr entity, bool report_events = false); @@ -292,6 +325,15 @@ class GraphCache final std::unordered_map> unregistered_event_changes_; std::mutex events_mutex_; + // Discovery callbacks for Buffer-aware endpoints + // Map: (topic_name, publisher_gid_hash) -> callback for subscriber discovery + std::unordered_map> subscriber_discovery_callbacks_; + // Map: (topic_name, subscriber_gid_hash) -> callback for publisher discovery + std::unordered_map> publisher_discovery_callbacks_; + std::mutex discovery_mutex_; + // Mutex to lock before modifying the members above. mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 149948b8..9754b0f1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -14,6 +14,7 @@ #include "liveliness_utils.hpp" +#include #include #include #include @@ -78,11 +79,13 @@ TopicInfo::TopicInfo( std::string name, std::string type, std::string type_hash, - rmw_qos_profile_t qos) + rmw_qos_profile_t qos, + std::optional> backend_metadata) : name_(std::move(name)), type_(std::move(type)), type_hash_(std::move(type_hash)), - qos_(std::move(qos)) + qos_(std::move(qos)), + backend_metadata_(std::move(backend_metadata)) { topic_keyexpr_ = std::to_string(domain_id); topic_keyexpr_ += "/"; @@ -96,6 +99,65 @@ TopicInfo::TopicInfo( ///============================================================================= namespace { +std::string escape_backend_field(const std::string & input) +{ + std::string out; + out.reserve(input.size()); + for (char c : input) { + switch (c) { + case '%': + out += "%25"; + break; + case ';': + out += "%3B"; + break; + case ':': + out += "%3A"; + break; + case '/': + out += "%2F"; + break; + default: + out += c; + break; + } + } + return out; +} + +int hex_value(char c) +{ + if (c >= '0' && c <= '9') { + return c - '0'; + } + if (c >= 'A' && c <= 'F') { + return 10 + (c - 'A'); + } + if (c >= 'a' && c <= 'f') { + return 10 + (c - 'a'); + } + return -1; +} + +std::string unescape_backend_field(const std::string & input) +{ + std::string out; + out.reserve(input.size()); + for (size_t i = 0; i < input.size(); ++i) { + if (input[i] == '%' && i + 2 < input.size()) { + int hi = hex_value(input[i + 1]); + int lo = hex_value(input[i + 2]); + if (hi >= 0 && lo >= 0) { + char decoded = static_cast((hi << 4) | lo); + out += decoded; + i += 2; + continue; + } + } + out += input[i]; + } + return out; +} /// Enum of liveliness key-expression components. enum KeyexprIndex { @@ -111,12 +173,18 @@ enum KeyexprIndex TopicName, TopicType, TopicTypeHash, - TopicQoS + TopicQoS, + Backends // Optional: only present for Buffer message types }; // Every keyexpression will have components upto node name. #define KEYEXPR_INDEX_MIN KeyexprIndex::NodeName -#define KEYEXPR_INDEX_MAX KeyexprIndex::TopicQoS +// Last mandatory key index for a non-Node entity. Anything past this index +// (currently just `Backends`) is optional and may legitimately be missing +// from a liveliness token. Bump this when promoting a field from optional +// to mandatory. +#define MANDATORY_KEYEXPR_INDEX_MAX KeyexprIndex::TopicQoS +#define KEYEXPR_INDEX_MAX KeyexprIndex::Backends /// The admin space used to prefix the liveliness tokens. static const char ADMIN_SPACE[] = "@ros2_lv"; @@ -466,6 +534,30 @@ Entity::Entity( keyexpr_parts[KeyexprIndex::TopicType] = mangle_name(topic_info.type_); keyexpr_parts[KeyexprIndex::TopicTypeHash] = mangle_name(topic_info.type_hash_); keyexpr_parts[KeyexprIndex::TopicQoS] = qos_to_keyexpr(topic_info.qos_); + + // Add backend metadata if present (only for Buffer message types) + if (topic_info.backend_metadata_.has_value() && !topic_info.backend_metadata_.value().empty()) { + const auto & backend_metadata = topic_info.backend_metadata_.value(); + std::vector backend_names; + backend_names.reserve(backend_metadata.size()); + for (const auto & pair : backend_metadata) { + backend_names.push_back(pair.first); + } + std::sort(backend_names.begin(), backend_names.end()); + + std::string backends_str = "backends:"; + for (size_t i = 0; i < backend_names.size(); ++i) { + const auto & name = backend_names[i]; + const auto & metadata = backend_metadata.at(name); + backends_str += escape_backend_field(name); + backends_str += ":"; + backends_str += escape_backend_field(metadata); + if (i + 1 < backend_names.size()) { + backends_str += ";"; + } + } + keyexpr_parts[KeyexprIndex::Backends] = backends_str; + } } for (std::size_t i = 0; i < KEYEXPR_INDEX_MAX + 1; ++i) { @@ -586,7 +678,12 @@ std::shared_ptr Entity::make(const std::string & keyexpr) // Populate topic_info if we have a token for an entity other than a node. if (entity_type != EntityType::Node) { - if (parts.size() < KEYEXPR_INDEX_MAX + 1) { + // Mandatory key-expression components for non-Node entities run from + // index 0 through MANDATORY_KEYEXPR_INDEX_MAX (= TopicQoS). Anything + // beyond that (currently just the optional Backends component) may + // legitimately be missing, so we validate against the mandatory bound + // only. + if (parts.size() < MANDATORY_KEYEXPR_INDEX_MAX + 1) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received liveliness token for non-node entity without required parameters."); @@ -599,12 +696,50 @@ std::shared_ptr Entity::make(const std::string & keyexpr) "Received liveliness token with invalid qos keyexpr"); return nullptr; } + + // Parse optional backends field (only present for Buffer message types) + std::optional> backend_metadata = std::nullopt; + if (parts.size() > KeyexprIndex::TopicQoS + 1 && + !parts[KeyexprIndex::Backends].empty() && + parts[KeyexprIndex::Backends].rfind("backends:", 0) == 0) + { + // Parse backend list: "backends:cuda:metadata;cpu:" -> map + std::string backends_str = parts[KeyexprIndex::Backends].substr(9); // Skip "backends:" + if (!backends_str.empty()) { + std::unordered_map parsed; + size_t start = 0; + while (start <= backends_str.size()) { + size_t sep = backends_str.find(';', start); + std::string entry = backends_str.substr( + start, sep == std::string::npos ? std::string::npos : sep - start); + if (!entry.empty()) { + size_t colon = entry.find(':'); + std::string name = entry.substr(0, colon); + std::string metadata = (colon == std::string::npos) ? "" : entry.substr(colon + 1); + name = unescape_backend_field(name); + metadata = unescape_backend_field(metadata); + if (!name.empty()) { + parsed[name] = metadata; + } + } + if (sep == std::string::npos) { + break; + } + start = sep + 1; + } + if (!parsed.empty()) { + backend_metadata = std::move(parsed); + } + } + } + topic_info = TopicInfo{ domain_id, demangle_name(std::move(parts[KeyexprIndex::TopicName])), demangle_name(std::move(parts[KeyexprIndex::TopicType])), demangle_name(std::move(parts[KeyexprIndex::TopicTypeHash])), - std::move(qos.value()) + std::move(qos.value()), + std::move(backend_metadata) }; } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 9ab8398f..deb1441f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -18,10 +18,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include @@ -55,13 +57,16 @@ struct TopicInfo std::string type_hash_; std::string topic_keyexpr_; rmw_qos_profile_t qos_; + // Backend metadata for Buffer message types (backend -> metadata string) + std::optional> backend_metadata_; TopicInfo( std::size_t domain_id, std::string name, std::string type, std::string type_hash, - rmw_qos_profile_t qos); + rmw_qos_profile_t qos, + std::optional> backend_metadata = std::nullopt); }; ///============================================================================= @@ -272,4 +277,20 @@ struct equal_to }; } // namespace std +///============================================================================= +namespace rmw_zenoh_cpp +{ +/// Helper function to convert Entity GID to rmw_gid_t +inline rmw_gid_t entity_gid_to_rmw_gid( + const liveliness::Entity & entity, + const char * implementation_identifier) +{ + rmw_gid_t gid; + gid.implementation_identifier = implementation_identifier; + auto gid_array = entity.copy_gid(); + memcpy(gid.data, gid_array.data(), RMW_GID_STORAGE_SIZE); + return gid; +} +} // namespace rmw_zenoh_cpp + #endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/logging_macros.hpp b/rmw_zenoh_cpp/src/detail/logging_macros.hpp index 81201856..c8503e7a 100644 --- a/rmw_zenoh_cpp/src/detail/logging_macros.hpp +++ b/rmw_zenoh_cpp/src/detail/logging_macros.hpp @@ -33,5 +33,10 @@ RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_WARN_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} +// For added rosidl_buffer relevant logging, to be removed when rosidl_buffer is finalized +#define RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ + RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} +#define RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ + RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} #endif // DETAIL__LOGGING_MACROS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 5c64982b..2f9c6498 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -29,6 +29,8 @@ #include +#include "buffer_backend_context.hpp" +#include "buffer_backend_loader.hpp" #include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" @@ -91,6 +93,10 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this return RMW_RET_ERROR; } + if (buffer_backend_context_) { + rmw_zenoh_cpp::shutdown_buffer_backends(*buffer_backend_context_); + } + // Explicitly close the session before releasing our shared_ptr reference. // This calls wait_callbacks() internally (since zenoh commit e5db0ce), which waits // for all in-flight callbacks to finish. We call close() here while node-level entities @@ -151,6 +157,11 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this return serialization_buffer_pool_; } + rmw_zenoh_cpp::BufferBackendContext * buffer_backend_context() + { + return buffer_backend_context_.get(); + } + bool create_node_data( const rmw_node_t * const node, const std::string & ns, @@ -471,6 +482,10 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this // Initialize the serialization buffer pool. serialization_buffer_pool_ = std::make_shared(); + + // Initialize the buffer backend context. + buffer_backend_context_ = std::make_unique(); + rmw_zenoh_cpp::initialize_buffer_backends(*buffer_backend_context_); } void init() @@ -546,6 +561,8 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this std::unordered_map> nodes_; zenoh::KeyExpr liveliness_keyexpr_; + + std::unique_ptr buffer_backend_context_; }; ///============================================================================= @@ -622,6 +639,12 @@ std::shared_ptr rmw_context_impl_s::serialization_buf return data_->serialization_buffer_pool(); } +///============================================================================= +rmw_zenoh_cpp::BufferBackendContext * rmw_context_impl_s::buffer_backend_context() +{ + return data_->buffer_backend_context(); +} + ///============================================================================= bool rmw_context_impl_s::create_node_data( const rmw_node_t * const node, diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 92e937eb..cd7f5a2e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -22,6 +22,7 @@ #include +#include "buffer_backend_context.hpp" #include "graph_cache.hpp" #include "rmw_node_data.hpp" #include "zenoh_utils.hpp" @@ -90,6 +91,9 @@ struct rmw_context_impl_s final /// Delete the NodeData for a given rmw_node_t if present. void delete_node_data(const rmw_node_t * const node); + /// Return a pointer to the per-context BufferBackendContext. + rmw_zenoh_cpp::BufferBackendContext * buffer_backend_context(); + // Forward declaration class Data; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index a7262bbd..60442765 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -18,9 +18,12 @@ #include #include +#include +#include #include #include #include +#include #include #include #include @@ -28,6 +31,11 @@ #include #include "cdr.hpp" + +#include "rosidl_buffer_backend_registry/backend_utils.hpp" +#include "buffer_backend_context.hpp" +#include "buffer_endpoint_helpers.hpp" +#include "identifier.hpp" #include "rmw_context_impl_s.hpp" #include "message_type_support.hpp" #include "logging_macros.hpp" @@ -39,11 +47,22 @@ #include "rmw/error_handling.h" #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "rosidl_runtime_c/type_hash.h" + +#include "rosidl_typesupport_fastrtps_cpp/message_type_support.h" #include "tracetools/tracetools.h" namespace rmw_zenoh_cpp { + +using buffer_endpoint_helpers::entity_type_to_string; +using buffer_endpoint_helpers::gid_array_to_hex; +using buffer_endpoint_helpers::gid_to_hex; +using buffer_endpoint_helpers::is_cpu_only_backend_metadata; +using buffer_endpoint_helpers::make_accelerated_key; +using buffer_endpoint_helpers::make_cpu_group_key; + // Period (ms) of heartbeats sent for detection of lost samples // by a RELIABLE + TRANSIENT_LOCAL Publisher #define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500 @@ -73,6 +92,28 @@ std::shared_ptr PublisherData::make( auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); + bool has_buffer_fields = callbacks->has_buffer_fields; + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED("rmw_zenoh_cpp", + "[PublisherData::make] Creating publisher for topic '%s', " + "type name '%s', has_buffer_fields: '%d'", + topic_name.c_str(), message_type_support->get_name(), has_buffer_fields); + + // Get installed backends metadata if message type has Buffer fields + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + std::unordered_map backend_metadata; + if (has_buffer_fields) { + auto * backend_ctx = context_impl->buffer_backend_context(); + if (backend_ctx) { + backend_metadata = rosidl_buffer_backend_registry::get_all_backend_metadata( + backend_ctx->backend_instances); + } + // CPU serialization is always implicitly supported by buffer-aware publishers. + if (backend_metadata.find("cpu") == backend_metadata.end()) { + backend_metadata["cpu"] = ""; + } + } + // Convert the type hash to a string so that it can be included in // the keyexpr. char * type_hash_c_str = nullptr; @@ -101,7 +142,8 @@ std::shared_ptr PublisherData::make( topic_name, message_type_support->get_name(), type_hash_c_str, - adapted_qos_profile} + adapted_qos_profile, + has_buffer_fields ? std::make_optional(backend_metadata) : std::nullopt} ); if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -170,7 +212,7 @@ std::shared_ptr PublisherData::make( return nullptr; } - return std::shared_ptr( + auto pub_data = std::shared_ptr( new PublisherData{ rmw_publisher, node, @@ -179,8 +221,59 @@ std::shared_ptr PublisherData::make( std::move(adv_pub), std::move(token), type_support->data, - std::move(message_type_support) + std::move(message_type_support), + has_buffer_fields, + backend_metadata }); + + if (has_buffer_fields) { + pub_data->local_endpoint_info_ = + build_endpoint_info_from_entity(*pub_data->entity_, RMW_ENDPOINT_PUBLISHER); + + auto * backend_ctx = context_impl->buffer_backend_context(); + if (backend_ctx) { + rosidl_buffer_backend_registry::notify_endpoint_created( + backend_ctx->backend_instances, pub_data->local_endpoint_info_.info); + } + + // Eagerly create the CPU-group publisher endpoint. All CPU-only + // subscribers share this single channel instead of requiring individual + // peer-to-peer endpoints -- mirrors rmw_fastrtps_cpp's eager CPU + // DataWriter creation. + const std::string cpu_group_key = make_cpu_group_key( + pub_data->entity_->topic_info()->topic_keyexpr_); + auto cpu_endpoint = pub_data->create_publisher_endpoint(cpu_group_key); + if (!cpu_endpoint) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to eagerly create CPU-group publisher endpoint for topic '%s'", + topic_name.c_str()); + return nullptr; + } + pub_data->endpoints_[cpu_group_key] = std::move(cpu_endpoint); + } + + // Register discovery callback for Buffer-aware publishers + if (has_buffer_fields) { + pub_data->graph_cache_ = context_impl->graph_cache(); + if (pub_data->graph_cache_ != nullptr) { + std::weak_ptr weak_pub_data = pub_data; + pub_data->graph_cache_->register_subscriber_discovery_callback( + topic_name, + pub_data->gid_hash(), + [weak_pub_data](const liveliness::Entity & entity) { + if (auto pd = weak_pub_data.lock()) { + pd->on_subscriber_discovered(entity); + } + }); + + // Manually add this local publisher to the graph cache so local subscribers can discover it + // Liveliness events from the same session don't trigger graph updates automatically + pub_data->graph_cache_->parse_put(pub_data->entity_->liveliness_keyexpr(), false); + } + } + + return pub_data; } ///============================================================================= @@ -192,7 +285,9 @@ PublisherData::PublisherData( zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, - std::unique_ptr type_support) + std::unique_ptr type_support, + bool is_buffer_aware, + std::unordered_map backend_metadata) : rmw_publisher_(rmw_publisher), rmw_node_(rmw_node), entity_(std::move(entity)), @@ -202,9 +297,228 @@ PublisherData::PublisherData( type_support_impl_(type_support_impl), type_support_(std::move(type_support)), sequence_number_(1), - is_shutdown_(false) + is_shutdown_(false), + is_buffer_aware_(is_buffer_aware), + backend_metadata_(std::move(backend_metadata)) { events_mgr_ = std::make_shared(); + + // For simple publishers, create a single base endpoint + if (!is_buffer_aware_) { + auto base_endpoint = std::make_shared(); + base_endpoint->key = entity_->topic_info()->topic_keyexpr_; + base_endpoint->pub = std::optional(std::move(pub_)); + endpoints_[base_endpoint->key] = base_endpoint; + } + // For buffer-aware publishers, endpoints are created dynamically on subscriber discovery +} + +///============================================================================= +// Helper function for buffer-aware publishing +rmw_ret_t PublisherData::publish_buffer_aware( + const void * ros_message, + ShmContext * shm) +{ + (void)shm; // SHM not currently used for buffer-aware publishing + + rmw_context_impl_t * ctx_impl = static_cast(rmw_node_->context->impl); + auto * backend_ctx = ctx_impl->buffer_backend_context(); + if (!backend_ctx) { + RMW_SET_ERROR_MSG("Buffer-aware publish missing buffer backend context"); + return RMW_RET_ERROR; + } + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Publishing buffer-aware message for topic '%s' (message type: %s)", + entity_->topic_info()->name_.c_str(), entity_->topic_info()->type_.c_str()); + + // For buffer-aware publishers, route to different endpoints based on discovered subscribers + if (discovered_subscribers_.empty()) { + // No subscribers yet, skip publish + return RMW_RET_OK; + } + + // Clear message caches + for (auto & [key, ep] : endpoints_) { + ep->cached_message.reset(); + } + + const std::string cpu_group_key = + make_cpu_group_key(entity_->topic_info()->topic_keyexpr_); + + // Serialize and publish to each endpoint + rmw_ret_t ret = RMW_RET_OK; + + auto cpu_endpoint_it = endpoints_.find(cpu_group_key); + if (cpu_endpoint_it != endpoints_.end() && + cpu_endpoint_it->second && + !cpu_endpoint_it->second->target_subscribers.empty()) + { + auto cpu_endpoint = cpu_endpoint_it->second; + const size_t max_data_length = type_support_->get_estimated_serialized_size( + ros_message, type_support_impl_); + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + void * data = allocator->allocate(max_data_length, allocator->state); + if (!data) { + RMW_SET_ERROR_MSG("failed to allocate CPU-group serialization buffer"); + return RMW_RET_BAD_ALLOC; + } + + auto always_free_data = rcpputils::make_scope_exit( + [data, allocator]() { + allocator->deallocate(data, allocator->state); + }); + + uint8_t * msg_bytes = static_cast(data); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); + rmw_zenoh_cpp::Cdr ser(fastbuffer); + if (!type_support_->serialize_ros_message( + ros_message, + ser.get_cdr(), + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not serialize ROS message for CPU-group publish"); + return RMW_RET_ERROR; + } + + const size_t data_length = ser.get_serialized_data_length(); + auto deleter = [data, allocator](uint8_t *) { + allocator->deallocate(data, allocator->state); + }; + auto payload = zenoh::Bytes(msg_bytes, data_length, deleter); + always_free_data.cancel(); + + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); + auto gid = entity_->copy_gid(); + auto attachment_data = rmw_zenoh_cpp::AttachmentData( + sequence_number_++, source_timestamp, gid); + auto attachment_bytes = attachment_data.serialize_to_zbytes(); + + zenoh::ext::AdvancedPublisher::PutOptions options = + zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + options.put_options.attachment = std::move(attachment_bytes); + + zenoh::ZResult result; + if (cpu_endpoint->pub.has_value()) { + cpu_endpoint->pub.value().put(std::move(payload), std::move(options), &result); + if (result != Z_OK) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to publish to CPU-group endpoint '%s'", cpu_group_key.c_str()); + ret = RMW_RET_ERROR; + } + } + } + + size_t iteration = 0; + for (auto & sub : discovered_subscribers_) { + if (sub.uses_cpu_group) { + continue; + } + + iteration++; + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Processing message for subscriber endpoint %zu/%zu with key='%s'", + iteration, discovered_subscribers_.size(), sub.endpoint_key.c_str()); + + auto endpoint_it = endpoints_.find(sub.endpoint_key); + if (endpoint_it == endpoints_.end() || !endpoint_it->second) { + continue; + } + auto endpoint = endpoint_it->second; + + // The generated typesupport estimate includes the CDR encapsulation and + // the bounded descriptor size used by endpoint-aware Buffer serialization. + const size_t max_data_length = type_support_->get_estimated_serialized_size( + ros_message, type_support_impl_); + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + void * data = allocator->allocate(max_data_length, allocator->state); + if (!data) { + RMW_SET_ERROR_MSG("failed to allocate serialization buffer"); + return RMW_RET_BAD_ALLOC; + } + + auto always_free_data = rcpputils::make_scope_exit( + [data, allocator]() { + allocator->deallocate(data, allocator->state); + }); + + uint8_t * msg_bytes = static_cast(data); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); + rmw_zenoh_cpp::Cdr ser(fastbuffer); + + bool ok = type_support_->serialize_ros_message_with_endpoint( + ros_message, ser.get_cdr(), type_support_impl_, sub.endpoint_info.info, + backend_ctx->serialization_context); + if (!ok) { + RMW_SET_ERROR_MSG("could not serialize ROS message with endpoint awareness"); + return RMW_RET_ERROR; + } + + size_t data_length = ser.get_serialized_data_length(); + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Serialization complete, actual size: %zu bytes (allocated: %zu, usage: %.1f%%)", + data_length, max_data_length, (data_length * 100.0) / max_data_length); + + // Sanity check: ensure we didn't overflow + if (data_length > max_data_length) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[Publisher] CRITICAL: Serialized size %zu exceeds allocated buffer %zu!", + data_length, max_data_length); + return RMW_RET_ERROR; + } + + // Publish to this endpoint + // Use deleter to manage memory since Zenoh takes ownership + auto deleter = [data, allocator](uint8_t *) { + allocator->deallocate(data, allocator->state); + }; + auto payload = zenoh::Bytes(msg_bytes, data_length, deleter); + always_free_data.cancel(); // Zenoh now owns the memory + + // Create attachment AFTER serialization + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Creating message attachment data"); + + int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns(); + auto gid = entity_->copy_gid(); + + auto attachment_data = rmw_zenoh_cpp::AttachmentData( + sequence_number_++, source_timestamp, gid); + + auto attachment_bytes = attachment_data.serialize_to_zbytes(); + + zenoh::ext::AdvancedPublisher::PutOptions options = + zenoh::ext::AdvancedPublisher::PutOptions::create_default(); + // Set attachment directly without std::make_optional (same as non-buffer-aware path) + options.put_options.attachment = std::move(attachment_bytes); + + zenoh::ZResult result; + if (endpoint->pub.has_value()) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Calling Zenoh put"); + + endpoint->pub.value().put(std::move(payload), std::move(options), &result); + + if (result != Z_OK) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to publish to endpoint '%s'", sub.endpoint_key.c_str()); + ret = RMW_RET_ERROR; + } + } + } + + return ret; } ///============================================================================= @@ -218,6 +532,27 @@ rmw_ret_t PublisherData::publish( return RMW_RET_ERROR; } + // Buffer-aware publishing strategy (mirrors rmw_fastrtps): + // - If ALL matched subscribers are buffer-aware, publish only to + // per-subscriber endpoint keys (accelerated path). + // - If ANY legacy (non-buffer-aware) subscriber exists, skip buffer + // channels entirely and fall through to the standard base-key publish. + // Buffer-aware subscribers also have a base-key subscription so they + // will still receive the message via standard deserialization. + if (is_buffer_aware_) { + size_t total_matched = 0; + if (graph_cache_) { + graph_cache_->publisher_count_matched_subscriptions( + entity_->topic_info().value(), &total_matched); + } + if (total_matched <= discovered_subscribers_.size()) { + return publish_buffer_aware(ros_message, shm); + } + // Legacy subscribers present -- fall through to standard serialization + // on the base key so every subscriber (buffer-aware and legacy) receives + // the message. + } + // Serialize data. size_t max_data_length = type_support_->get_estimated_serialized_size( ros_message, @@ -469,6 +804,202 @@ std::shared_ptr PublisherData::events_mgr() const return events_mgr_; } +///============================================================================= +void PublisherData::on_subscriber_discovered(const liveliness::Entity & entity) +{ + if (entity.type() != liveliness::EntityType::Subscription) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Ignoring discovered entity type=%s node='%s' ns='%s'", + entity_type_to_string(entity.type()), + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; + } + + auto topic_info_opt = entity.topic_info(); + if (!topic_info_opt.has_value()) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Discovered subscriber without topic info on Buffer topic"); + return; + } + + std::unordered_map sub_backend_metadata; + if (topic_info_opt->backend_metadata_.has_value()) { + sub_backend_metadata = topic_info_opt->backend_metadata_.value(); + } + const bool use_cpu_group = is_cpu_only_backend_metadata(sub_backend_metadata); + + auto gid = entity_gid_to_rmw_gid(entity, rmw_zenoh_identifier); + const auto entity_gid_array = entity.copy_gid(); + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Publisher] Discovered subscriber entity keyexpr='%s', " + "topic='%s', entity.type='%s', node='%s', ns='%s', " + "zid='%s', gid='%s', entity_gid='%s'", + entity.liveliness_keyexpr().c_str(), + topic_info_opt->name_.c_str(), + entity_type_to_string(entity.type()), + entity.node_name().c_str(), + entity.node_namespace().c_str(), + entity.zid().c_str(), + gid_to_hex(gid).c_str(), + gid_array_to_hex(entity_gid_array).c_str()); + + auto sub_endpoint_info = build_endpoint_info_from_entity(entity, RMW_ENDPOINT_SUBSCRIPTION); + + // Phase 1: collect state under lock, check duplicates, mark pending + std::string full_key; + std::vector existing_endpoints; + std::unordered_map>> backend_endpoint_groups; + bool need_create_endpoint = false; + { + std::lock_guard lock(mutex_); + if (!is_buffer_aware_ || is_shutdown_) { + return; + } + + for (const auto & existing : discovered_subscribers_) { + if (memcmp(existing.gid.data, gid.data, RMW_GID_STORAGE_SIZE) == 0) { + return; + } + } + + existing_endpoints.reserve(1 + discovered_subscribers_.size()); + existing_endpoints.push_back(local_endpoint_info_.info); + for (const auto & existing : discovered_subscribers_) { + existing_endpoints.push_back(existing.endpoint_info.info); + } + + for (const auto & existing : discovered_subscribers_) { + backend_endpoint_groups.insert(existing.backend_groups.begin(), + existing.backend_groups.end()); + } + + full_key = use_cpu_group ? + make_cpu_group_key(entity_->topic_info()->topic_keyexpr_) : + make_accelerated_key(entity_->topic_info()->topic_keyexpr_, gid); + + need_create_endpoint = (endpoints_.find(full_key) == endpoints_.end()); + if (need_create_endpoint) { + if (!pending_endpoints_.insert(full_key).second) { + return; + } + } + } + + // Phase 2: external operations without lock + std::unordered_map>> backend_groups; + { + if (!use_cpu_group) { + rmw_context_impl_t * ctx_impl = static_cast(rmw_node_->context->impl); + auto * backend_ctx = ctx_impl->buffer_backend_context(); + if (backend_ctx) { + (void)rosidl_buffer_backend_registry::notify_endpoint_discovered( + backend_ctx->backend_instances, + sub_endpoint_info.info, existing_endpoints, backend_endpoint_groups, + sub_backend_metadata); + } + } + } + + std::shared_ptr new_endpoint; + if (need_create_endpoint) { + new_endpoint = create_publisher_endpoint(full_key); + if (!new_endpoint) { + std::lock_guard lock(mutex_); + pending_endpoints_.erase(full_key); + return; + } + } + + // Phase 3: store results under lock + { + std::lock_guard lock(mutex_); + if (new_endpoint) { + endpoints_[full_key] = new_endpoint; + pending_endpoints_.erase(full_key); + } + + SubscriberInfo sub_info; + sub_info.gid = gid; + sub_info.endpoint_key = full_key; + sub_info.endpoint_info = std::move(sub_endpoint_info); + sub_info.uses_cpu_group = use_cpu_group; + sub_info.backend_metadata = sub_backend_metadata; + sub_info.backend_groups = std::move(backend_groups); + discovered_subscribers_.push_back(std::move(sub_info)); + + if (endpoints_.count(full_key)) { + endpoints_[full_key]->target_subscribers.push_back(gid); + } + } +} + +///============================================================================= +std::shared_ptr PublisherData::get_or_create_endpoint( + const std::string & full_key) +{ + auto it = endpoints_.find(full_key); + if (it != endpoints_.end()) { + return it->second; + } + + auto endpoint = create_publisher_endpoint(full_key); + if (endpoint) { + endpoints_[full_key] = endpoint; + } + return endpoint; +} + +///============================================================================= +std::shared_ptr PublisherData::create_publisher_endpoint( + const std::string & full_key) +{ + auto endpoint = std::make_shared(); + endpoint->key = full_key; + + zenoh::KeyExpr pub_ke(full_key); + + auto qos_profile = entity_->topic_info()->qos_; + + using AdvancedPublisherOptions = zenoh::ext::SessionExt::AdvancedPublisherOptions; + auto adv_pub_opts = AdvancedPublisherOptions::create_default(); + + if (qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + adv_pub_opts.publisher_detection = true; + adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default(); + adv_pub_opts.cache->max_samples = qos_profile.depth; + } + + auto pub_opts = zenoh::Session::PublisherOptions::create_default(); + pub_opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + if (qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + pub_opts.reliability = Z_RELIABILITY_RELIABLE; + if (qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { + pub_opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } + } else { + pub_opts.reliability = Z_RELIABILITY_BEST_EFFORT; + } + adv_pub_opts.publisher_options = pub_opts; + + zenoh::ZResult result; + auto pub = sess_->ext().declare_advanced_publisher( + pub_ke, std::move(adv_pub_opts), &result); + + if (result != Z_OK) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to create dynamic endpoint for key: %s", full_key.c_str()); + return nullptr; + } + + endpoint->pub = std::optional(std::move(pub)); + return endpoint; +} + ///============================================================================= PublisherData::~PublisherData() { @@ -485,12 +1016,35 @@ PublisherData::~PublisherData() ///============================================================================= rmw_ret_t PublisherData::shutdown() { - std::lock_guard lock(mutex_); - if (is_shutdown_) { - return RMW_RET_OK; + std::unordered_map> endpoints_to_destroy; + bool was_buffer_aware = false; + { + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + is_shutdown_ = true; + was_buffer_aware = is_buffer_aware_; + if (was_buffer_aware) { + endpoints_to_destroy = std::move(endpoints_); + endpoints_.clear(); + } + } + + if (was_buffer_aware) { + zenoh::ZResult result; + for (auto & [key, endpoint] : endpoints_to_destroy) { + if (endpoint->pub.has_value()) { + std::move(endpoint->pub.value()).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Failed to undeclare endpoint with key '%s'", key.c_str()); + } + } + } } - // Unregister this publisher from the ROS graph. zenoh::ZResult result; std::move(token_).value().undeclare(&result); if (result != Z_OK) { @@ -500,9 +1054,14 @@ rmw_ret_t PublisherData::shutdown() entity_->topic_info().value().name_.c_str()); return RMW_RET_ERROR; } + + // For buffer-aware publishers, pub_ is the base publisher used for + // fallback/standard serialization and was never moved. + // For non-buffer-aware publishers, pub_ was moved into the base endpoint + // but still needs to be undeclared. std::move(pub_).undeclare(&result); if (result != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to undeclare the publisher for topic '%s'", entity_->topic_info().value().name_.c_str()); @@ -510,7 +1069,6 @@ rmw_ret_t PublisherData::shutdown() } sess_.reset(); - is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 5dfe0098..90bc56bf 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -21,11 +21,16 @@ #include #include #include +#include #include +#include +#include #include +#include "endpoint_info.hpp" #include "event.hpp" +#include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" #include "type_support_common.hpp" @@ -91,6 +96,25 @@ class PublisherData final ~PublisherData(); private: + // Structures for Buffer-aware publishers + struct SubscriberInfo + { + rmw_gid_t gid; + std::string endpoint_key; + EndpointInfoStorage endpoint_info; + bool uses_cpu_group{false}; + std::unordered_map backend_metadata; + std::unordered_map>> backend_groups; + }; + + struct PublisherEndpoint + { + std::string key; + std::optional pub; + std::vector target_subscribers; + std::optional> cached_message; + }; + // Constructor. PublisherData( const rmw_publisher_t * const rmw_publisher, @@ -100,7 +124,25 @@ class PublisherData final zenoh::ext::AdvancedPublisher pub, zenoh::LivelinessToken token, const void * type_support_impl, - std::unique_ptr type_support); + std::unique_ptr type_support, + bool is_buffer_aware, + std::unordered_map backend_metadata); + + // Discovery callback for Buffer-aware publishers + void on_subscriber_discovered(const liveliness::Entity & entity); + + // Get or create an endpoint for a specific full key (caller must hold mutex_) + std::shared_ptr get_or_create_endpoint( + const std::string & full_key); + + // Create a Zenoh publisher endpoint (does not require mutex_) + std::shared_ptr create_publisher_endpoint( + const std::string & full_key); + + // Buffer-aware publish helper + rmw_ret_t publish_buffer_aware( + const void * ros_message, + ShmContext * shm); // Internal mutex. mutable std::mutex mutex_; @@ -112,7 +154,7 @@ class PublisherData final std::shared_ptr entity_; // A shared session. std::shared_ptr sess_; - // An owned AdvancedPublisher. + // An owned AdvancedPublisher (for simple publishers, this is the only one). zenoh::ext::AdvancedPublisher pub_; // Liveliness token for the publisher. std::optional token_; @@ -123,6 +165,17 @@ class PublisherData final size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + + // Buffer-aware publisher fields + bool is_buffer_aware_; + std::unordered_map backend_metadata_; + // For simple publishers: endpoints_ contains only base endpoint + // For buffer-aware: multiple endpoints based on discovered subscribers + std::unordered_map> endpoints_; + std::set pending_endpoints_; + std::vector discovered_subscribers_; + EndpointInfoStorage local_endpoint_info_; + std::shared_ptr graph_cache_; }; using PublisherDataPtr = std::shared_ptr; using PublisherDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index db7a1bfa..4126ebc5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -18,35 +18,55 @@ #include #include +#include +#include #include #include #include +#include #include #include #include #include "attachment_helpers.hpp" #include "cdr.hpp" + +#include "rosidl_buffer_backend_registry/backend_utils.hpp" +#include "buffer_backend_context.hpp" +#include "buffer_endpoint_helpers.hpp" #include "identifier.hpp" #include "rmw_context_impl_s.hpp" #include "message_type_support.hpp" #include "logging_macros.hpp" #include "qos.hpp" +#include "liveliness_utils.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "rosidl_runtime_c/type_hash.h" + +#include "rosidl_typesupport_fastrtps_cpp/message_type_support.h" namespace rmw_zenoh_cpp { + +using buffer_endpoint_helpers::entity_type_to_string; +using buffer_endpoint_helpers::gid_array_to_hex; +using buffer_endpoint_helpers::gid_to_hex; +using buffer_endpoint_helpers::is_cpu_only_backend_types; +using buffer_endpoint_helpers::make_accelerated_key; +using buffer_endpoint_helpers::make_cpu_group_key; ///============================================================================= SubscriptionData::Message::Message( const zenoh::Bytes & p, uint64_t recv_ts, - AttachmentData && attachment_) -: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) + AttachmentData && attachment_, + std::optional endpoint_info_) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)), + endpoint_info(std::move(endpoint_info_)) { } @@ -76,6 +96,125 @@ std::shared_ptr SubscriptionData::make( auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); + // CREATION-TIME DECISION: Check if message type has Buffer fields + bool has_buffer_fields = callbacks->has_buffer_fields; + bool is_buffer_aware = has_buffer_fields; + + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[SubscriptionData::make] Topic: %s, has_buffer_fields: %d, is_buffer_aware: %d", + topic_name.c_str(), has_buffer_fields, is_buffer_aware); + + // Query and filter installed backends based on acceptable_buffer_backends option + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + std::optional> backend_types = std::nullopt; + std::vector my_backend_types; + if (is_buffer_aware) { + auto * backend_ctx = context_impl->buffer_backend_context(); + std::vector all_installed; + std::unordered_map all_backend_metadata; + if (backend_ctx) { + for (const auto & [name, _] : backend_ctx->backend_instances) { + all_installed.push_back(name); + } + all_backend_metadata = rosidl_buffer_backend_registry::get_all_backend_metadata( + backend_ctx->backend_instances); + } + + const char * requested = sub_options.acceptable_buffer_backends; + + // Parse comma-separated list (if provided and non-empty) + std::vector requested_list; + if (requested != nullptr && strlen(requested) > 0) { + std::istringstream stream(requested); + std::string token; + while (std::getline(stream, token, ',')) { + size_t start = token.find_first_not_of(" \t"); + size_t end = token.find_last_not_of(" \t"); + if (start != std::string::npos) { + requested_list.push_back(token.substr(start, end - start + 1)); + } + } + } + + // "any": accept all installed backends + bool use_all = false; + for (const auto & name : requested_list) { + if (name == "any") { + use_all = true; + break; + } + } + + // NULL, empty, or only "cpu" entries: CPU-only (backward compat default) + bool cpu_only = !use_all && (requested_list.empty() || + std::all_of(requested_list.begin(), requested_list.end(), + [](const std::string & n) {return n == "cpu";})); + + if (use_all) { + my_backend_types = all_installed; + backend_types = all_backend_metadata; + + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Creating Buffer-aware subscription for topic %s with %zu backends (all installed)", + topic_name.c_str(), my_backend_types.size()); + } else if (cpu_only) { + // CPU-only: advertise "cpu" as the only supported backend so the + // subscription stays on the buffer-aware per-endpoint route and + // passes the backends_compatible check with CPU-only publishers. + my_backend_types.push_back("cpu"); + backend_types = std::unordered_map{{"cpu", ""}}; + + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Creating CPU-only Buffer subscription for topic %s " + "(acceptable_buffer_backends='%s')", + topic_name.c_str(), requested ? requested : "(null)"); + } else { + // Validate: every requested non-CPU backend must be installed + for (const auto & name : requested_list) { + if (name == "cpu") { + continue; + } + if (std::find(all_installed.begin(), all_installed.end(), name) == + all_installed.end()) + { + std::string available_str; + for (size_t i = 0; i < all_installed.size(); ++i) { + if (i > 0) {available_str += ", ";} + available_str += all_installed[i]; + } + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Buffer backend '%s' specified in acceptable_buffer_backends " + "is not installed. Available backends: %s", + name.c_str(), available_str.c_str()); + return nullptr; + } + } + + // Filter: only include requested backends and collect their backend metadata + std::unordered_map filtered_backend_metadata; + for (const auto & name : requested_list) { + if (name == "cpu") { + continue; + } + my_backend_types.push_back(name); + auto meta_it = all_backend_metadata.find(name); + if (meta_it != all_backend_metadata.end()) { + filtered_backend_metadata[name] = meta_it->second; + } + } + backend_types = filtered_backend_metadata; + + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Creating subscription for topic %s with %zu filtered backends " + "(from acceptable_buffer_backends='%s')", + topic_name.c_str(), my_backend_types.size(), requested); + } + } + // Convert the type hash to a string so that it can be included in the keyexpr. char * type_hash_c_str = nullptr; rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( @@ -105,7 +244,8 @@ std::shared_ptr SubscriptionData::make( topic_name, message_type_support->get_name(), type_hash_c_str, - adapted_qos_profile} + adapted_qos_profile, + backend_types} // Include backends only if Buffer message type ); if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -123,14 +263,49 @@ std::shared_ptr SubscriptionData::make( std::move(session), type_support->data, std::move(message_type_support), - sub_options + sub_options, + is_buffer_aware, + my_backend_types }); + if (is_buffer_aware) { + sub_data->local_endpoint_info_ = + build_endpoint_info_from_entity(*sub_data->entity_, RMW_ENDPOINT_SUBSCRIPTION); + + auto * backend_ctx = context_impl->buffer_backend_context(); + if (backend_ctx) { + rosidl_buffer_backend_registry::notify_endpoint_created( + backend_ctx->backend_instances, sub_data->local_endpoint_info_.info); + } + } + if (!sub_data->init()) { // init() already set the error return nullptr; } + // Register discovery callback for Buffer-aware subscribers + if (is_buffer_aware && graph_cache != nullptr) { + std::weak_ptr weak_sub_data = sub_data; + if (!is_cpu_only_backend_types(my_backend_types)) { + graph_cache->register_publisher_discovery_callback( + topic_name, + sub_data->gid_hash(), + [weak_sub_data](const liveliness::Entity & entity) { + if (auto sd = weak_sub_data.lock()) { + sd->on_publisher_discovered(entity); + } + }); + } + + // Manually add this local subscriber to the graph cache so local publishers can discover it + // Liveliness events from the same session don't trigger graph updates automatically + graph_cache->parse_put(sub_data->entity_->liveliness_keyexpr(), false); + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Manually added local subscriber to graph cache for discovery"); + } + return sub_data; } @@ -142,7 +317,9 @@ SubscriptionData::SubscriptionData( std::shared_ptr session, const void * type_support_impl, std::unique_ptr type_support, - rmw_subscription_options_t sub_options) + rmw_subscription_options_t sub_options, + bool is_buffer_aware, + std::vector my_backend_types) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), @@ -154,7 +331,9 @@ SubscriptionData::SubscriptionData( reception_sn_(0), wait_set_data_(nullptr), is_shutdown_(false), - initialized_(false) + initialized_(false), + is_buffer_aware_(is_buffer_aware), + my_backend_types_(std::move(my_backend_types)) { events_mgr_ = std::make_shared(); } @@ -207,44 +386,85 @@ bool SubscriptionData::init() } } - std::weak_ptr data_wp = shared_from_this(); - auto on_sample = [data_wp](const zenoh::Sample & sample) { - auto sub_data = data_wp.lock(); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "SubscriberCallback triggered over %s.", - std::string(sample.get_keyexpr().as_string_view()).c_str() - ); - return; + // Always create the base key subscription. For non-buffer-aware messages + // this is the only subscription. For buffer-aware messages this acts as a + // fallback so the subscriber can still receive standard-serialized data from + // legacy publishers or when the publisher detects mixed (legacy + buffer) + // subscribers and falls back to base-key-only publishing. + // Additional buffer-aware subscriptions are either created dynamically in + // on_publisher_discovered() for non-CPU backends or bound to the shared + // CPU channel below for CPU-only subscriptions. + { + std::weak_ptr data_wp = shared_from_this(); + auto on_sample = [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "SubscriberCallback triggered over %s.", + std::string(sample.get_keyexpr().as_string_view()).c_str() + ); + return; + } + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment for topic '%s'", + std::string(sample.get_keyexpr().as_string_view()).c_str()) + return; + } + auto attachment_value = attachment.value(); + + AttachmentData attachment_data(attachment_value); + sub_data->add_new_message( + std::make_unique( + sample.get_payload(), + get_system_time_in_ns(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }; + sub_ = context_impl->session()->ext().declare_advanced_subscriber( + sub_ke, + std::move(on_sample), + zenoh::closures::none, + std::move(adv_sub_opts), + &result); + if (result != Z_OK) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return false; + } + } + + if (is_buffer_aware_) { + if (is_cpu_only_backend_types(my_backend_types_)) { + auto cpu_endpoint = create_subscription_endpoint( + make_cpu_group_key(entity_->topic_info()->topic_keyexpr_), + std::nullopt, + false); + if (!cpu_endpoint) { + RMW_SET_ERROR_MSG("unable to create zenoh CPU subscription"); + return false; } - auto attachment = sample.get_attachment(); - if (!attachment.has_value()) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment for topic '%s'", - std::string(sample.get_keyexpr().as_string_view()).c_str()) - return; + sub_endpoints_[cpu_endpoint->key] = cpu_endpoint; + } else { + // Eagerly create a single shared accelerated channel keyed by this + // subscriber's GID. All buffer-aware publishers that match this + // subscriber publish to this key, so one zenoh subscriber serves all + // of them. Mirrors rmw_fastrtps_cpp's "single shared accelerated + // DataReader per subscriber" design. + rmw_gid_t local_gid = rmw_zenoh_cpp::entity_gid_to_rmw_gid( + *entity_, rmw_zenoh_cpp::rmw_zenoh_identifier); + const std::string accel_key = make_accelerated_key( + entity_->topic_info()->topic_keyexpr_, gid_to_hex(local_gid)); + auto accel_endpoint = create_subscription_endpoint( + accel_key, std::nullopt, true); + if (!accel_endpoint) { + RMW_SET_ERROR_MSG("unable to create zenoh accelerated subscription"); + return false; } - auto attachment_value = attachment.value(); - - AttachmentData attachment_data(attachment_value); - sub_data->add_new_message( - std::make_unique( - sample.get_payload(), - get_system_time_in_ns(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }; - sub_ = context_impl->session()->ext().declare_advanced_subscriber( - sub_ke, - std::move(on_sample), - zenoh::closures::none, - std::move(adv_sub_opts), - &result); - if (result != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return false; + sub_endpoints_[accel_endpoint->key] = accel_endpoint; + } } // Publish to the graph that a new subscription is in town. @@ -262,6 +482,19 @@ bool SubscriptionData::init() initialized_ = true; + if (is_buffer_aware_) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Initialized buffer-aware subscription, " + "base key: '%s'", + entity_->topic_info()->topic_keyexpr_.c_str()); + } else { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Initialized simple subscription on key: '%s'", + entity_->topic_info()->topic_keyexpr_.c_str()); + } + return true; } @@ -309,19 +542,328 @@ SubscriptionData::~SubscriptionData() } } +///============================================================================= +void SubscriptionData::on_publisher_discovered(const liveliness::Entity & entity) +{ + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] on_publisher_discovered callback triggered!"); + + if (entity.type() != liveliness::EntityType::Publisher) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Ignoring discovered entity type=%s node='%s' ns='%s'", + entity_type_to_string(entity.type()), + entity.node_name().c_str(), + entity.node_namespace().c_str()); + return; + } + + const auto & topic_info = entity.topic_info(); + if (!topic_info.has_value()) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Discovered publisher without topic info on Buffer topic"); + return; + } + + std::unordered_map pub_backend_metadata; + std::vector pub_backends; + if (topic_info->backend_metadata_.has_value() && !topic_info->backend_metadata_->empty()) { + pub_backend_metadata = topic_info->backend_metadata_.value(); + pub_backends.reserve(pub_backend_metadata.size() + 1); + for (const auto & pair : pub_backend_metadata) { + pub_backends.push_back(pair.first); + } + } + // CPU serialization is always implicitly supported by all buffer-aware publishers + pub_backends.push_back("cpu"); + + if (!rosidl_buffer_backend_registry::backends_compatible( + my_backend_types_, pub_backends)) + { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Discovered publisher with incompatible backends, skipping"); + return; + } + + rmw_gid_t pub_gid = rmw_zenoh_cpp::entity_gid_to_rmw_gid(entity, + rmw_zenoh_cpp::rmw_zenoh_identifier); + + auto pub_endpoint_info = build_endpoint_info_from_entity(entity, RMW_ENDPOINT_PUBLISHER); + if (is_cpu_only_backend_types(my_backend_types_)) { + return; + } + + // Phase 1: collect state under lock, check duplicates. + // A single shared accelerated endpoint was already created in init(); + // here we only need to register the discovered publisher's metadata so + // take_one_message / the accelerated channel callback can resolve the + // sending publisher's endpoint_info by GID lookup. + std::vector existing_endpoints; + std::unordered_map>> backend_endpoint_groups; + { + std::lock_guard lock(mutex_); + if (!is_buffer_aware_ || is_shutdown_) { + return; + } + + for (const auto & existing : discovered_publishers_) { + if (memcmp(existing.gid.data, pub_gid.data, RMW_GID_STORAGE_SIZE) == 0) { + return; + } + } + + existing_endpoints.reserve(1 + discovered_publishers_.size()); + existing_endpoints.push_back(local_endpoint_info_.info); + for (const auto & existing : discovered_publishers_) { + existing_endpoints.push_back(existing.endpoint_info.info); + } + + for (const auto & existing : discovered_publishers_) { + backend_endpoint_groups.insert(existing.backend_groups.begin(), + existing.backend_groups.end()); + } + } + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Discovered publisher entity keyexpr='%s', " + "topic='%s', entity.type='%s', node='%s', ns='%s', " + "zid='%s', gid='%s', entity_gid='%s'", + entity.liveliness_keyexpr().c_str(), + entity.topic_info().has_value() ? entity.topic_info()->name_.c_str() : "", + entity_type_to_string(entity.type()), + entity.node_name().c_str(), + entity.node_namespace().c_str(), + entity.zid().c_str(), + gid_to_hex(pub_gid).c_str(), + gid_array_to_hex(entity.copy_gid()).c_str()); + + // Phase 2: notify backend of the discovered publisher without holding the lock. + std::unordered_map>> backend_groups; + { + rmw_context_impl_t * ctx_impl = static_cast(rmw_node_->context->impl); + auto * backend_ctx = ctx_impl->buffer_backend_context(); + if (backend_ctx) { + (void)rosidl_buffer_backend_registry::notify_endpoint_discovered( + backend_ctx->backend_instances, + pub_endpoint_info.info, existing_endpoints, backend_endpoint_groups, + pub_backend_metadata); + } + } + + // Phase 3: record the publisher's metadata under the lock. The shared + // accelerated endpoint (created in init()) already receives messages from + // any buffer-aware publisher that matches this subscriber; we do not + // need to create a per-publisher zenoh subscription here. + { + std::lock_guard lock(mutex_); + PublisherInfo pub_info; + pub_info.gid = pub_gid; + pub_info.endpoint_key.clear(); + pub_info.endpoint_info = std::move(pub_endpoint_info); + pub_info.backend_metadata = pub_backend_metadata; + pub_info.backend_groups = std::move(backend_groups); + discovered_publishers_.push_back(std::move(pub_info)); + } +} + +///============================================================================= +std::optional SubscriptionData::lookup_publisher_endpoint_info( + const std::array & publisher_gid) const +{ + std::lock_guard lock(mutex_); + for (const auto & pub_info : discovered_publishers_) { + if (std::memcmp(pub_info.gid.data, publisher_gid.data(), RMW_GID_STORAGE_SIZE) == 0) { + return pub_info.endpoint_info; + } + } + return std::nullopt; +} + +///============================================================================= +void SubscriptionData::create_subscription_for_key( + const std::string & key, + std::optional publisher_info) +{ + auto endpoint = create_subscription_endpoint(key, publisher_info, false); + if (endpoint) { + sub_endpoints_[key] = endpoint; + } +} + +///============================================================================= +std::shared_ptr +SubscriptionData::create_subscription_endpoint( + const std::string & key, + std::optional publisher_info, + bool is_accelerated) +{ + zenoh::ZResult result; + zenoh::KeyExpr sub_ke(key, true, &result); + if (result != Z_OK) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create zenoh keyexpr for key: %s", key.c_str()); + return nullptr; + } + + rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + + using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions; + using RecoveryOptions = AdvancedSubscriberOptions::RecoveryOptions; + auto adv_sub_opts = AdvancedSubscriberOptions::create_default(); + + if (sub_options_.ignore_local_publications) { + adv_sub_opts.subscriber_options.allowed_origin = ZC_LOCALITY_REMOTE; + } + + if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + adv_sub_opts.subscriber_detection = true; + adv_sub_opts.query_timeout_ms = std::numeric_limits::max(); + adv_sub_opts.history = AdvancedSubscriberOptions::HistoryOptions::create_default(); + adv_sub_opts.history->detect_late_publishers = true; + adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; + if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + adv_sub_opts.recovery = AdvancedSubscriberOptions::RecoveryOptions{}; + adv_sub_opts.recovery->last_sample_miss_detection = RecoveryOptions::Heartbeat{}; + } + } + + if (publisher_info.has_value()) { + rmw_gid_t publisher_gid = {}; + std::memcpy(publisher_gid.data, publisher_info->info.endpoint_gid, RMW_GID_STORAGE_SIZE); + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Creating endpoint-aware subscription for key='%s' (publisher gid=%s)", + key.c_str(), + gid_to_hex(publisher_gid).c_str()); + } else { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Creating CPU-group subscription for key='%s'", + key.c_str()); + } + + auto endpoint = std::make_shared(); + endpoint->key = key; + endpoint->publisher_info = std::move(publisher_info); + endpoint->is_accelerated = is_accelerated; + + std::weak_ptr data_wp = shared_from_this(); + auto ep = endpoint; + auto on_sample = [data_wp, ep, key](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + return; + } + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Received sample on key='%s' (sample key='%s')", + key.c_str(), + std::string(sample.get_keyexpr().as_string_view()).c_str()); + + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment for topic '%s'", + std::string(sample.get_keyexpr().as_string_view()).c_str()); + return; + } + + AttachmentData attachment_data(attachment.value()); + + // For the shared accelerated channel, resolve the sending publisher's + // endpoint info by looking up its GID from the attachment. This lets + // take_one_message use endpoint-aware deserialization without needing + // a per-publisher zenoh subscriber. + std::optional resolved_endpoint_info = ep->publisher_info; + if (ep->is_accelerated) { + resolved_endpoint_info = sub_data->lookup_publisher_endpoint_info( + attachment_data.copy_gid()); + if (!resolved_endpoint_info.has_value()) { + // Publisher liveliness discovery may not have caught up yet. Fall + // back to a minimal endpoint_info populated with only the GID so + // endpoint-aware deserialization still runs (matches the behavior + // of rmw_fastrtps_cpp's buffer-aware take path). + EndpointInfoStorage minimal; + minimal.info = rmw_get_zero_initialized_topic_endpoint_info(); + minimal.info.endpoint_type = RMW_ENDPOINT_PUBLISHER; + const auto gid_bytes = attachment_data.copy_gid(); + std::memcpy( + minimal.info.endpoint_gid, gid_bytes.data(), RMW_GID_STORAGE_SIZE); + resolved_endpoint_info = std::move(minimal); + } + } + + sub_data->add_new_message( + std::make_unique( + sample.get_payload(), + get_system_time_in_ns(), + std::move(attachment_data), + std::move(resolved_endpoint_info)), + std::string(sample.get_keyexpr().as_string_view())); + }; + + auto sub = context_impl->session()->ext().declare_advanced_subscriber( + sub_ke, + std::move(on_sample), + zenoh::closures::none, + std::move(adv_sub_opts), + &result); + + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create zenoh subscription for key: %s", key.c_str()); + return nullptr; + } + + endpoint->sub = std::optional>(std::move(sub)); + + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] Created buffer-aware subscription for key: '%s'", + key.c_str()); + + return endpoint; +} + ///============================================================================= rmw_ret_t SubscriptionData::shutdown() { rmw_ret_t ret = RMW_RET_OK; - std::lock_guard lock(mutex_); - if (is_shutdown_ || !initialized_) { - return ret; + std::unordered_map> endpoints_to_destroy; + bool was_buffer_aware = false; + std::optional> base_sub; + { + std::lock_guard lock(mutex_); + if (is_shutdown_ || !initialized_) { + return ret; + } + is_shutdown_ = true; + initialized_ = false; + was_buffer_aware = is_buffer_aware_; + if (was_buffer_aware) { + endpoints_to_destroy = std::move(sub_endpoints_); + sub_endpoints_.clear(); + } + if (sub_.has_value()) { + base_sub = std::move(sub_); + sub_.reset(); + } } - // Remove any event callbacks registered to this subscription. graph_cache_->remove_qos_event_callbacks(entity_->gid_hash()); - // Unregister this subscription from the ROS graph. + if (was_buffer_aware) { + graph_cache_->unregister_discovery_callbacks(entity_->gid_hash()); + } + zenoh::ZResult result; std::move(token_).value().undeclare(&result); if (result != Z_OK) { @@ -332,8 +874,21 @@ rmw_ret_t SubscriptionData::shutdown() return RMW_RET_ERROR; } - if (sub_.has_value()) { - std::move(sub_.value()).undeclare(&result); + for (auto & [key, endpoint] : endpoints_to_destroy) { + if (endpoint->sub.has_value()) { + std::move(endpoint->sub.value()).undeclare(&result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare subscription for key '%s'", + key.c_str()); + ret = RMW_RET_ERROR; + } + } + } + + if (base_sub.has_value()) { + std::move(base_sub.value()).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -344,8 +899,6 @@ rmw_ret_t SubscriptionData::shutdown() } sess_.reset(); - is_shutdown_ = true; - initialized_ = false; return ret; } @@ -388,6 +941,14 @@ rmw_ret_t SubscriptionData::take_one_message( *taken = false; std::lock_guard lock(mutex_); + if (entity_ && entity_->topic_info().has_value()) { + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] take_one_message topic='%s' is_buffer_aware=%d queue_size=%zu", + entity_->topic_info().value().name_.c_str(), + is_buffer_aware_, + message_queue_.size()); + } if (is_shutdown_ || message_queue_.empty()) { // This tells rcl that the check for a new message was done, but no messages have come in yet. return RMW_RET_OK; @@ -395,7 +956,7 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - auto & payload_data = msg_data->payload; + const Payload & payload_data = msg_data->payload; if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( @@ -403,18 +964,70 @@ rmw_ret_t SubscriptionData::take_one_message( "SubscriptionData not able to get slice data"); return RMW_RET_ERROR; } + // Object that manages the raw buffer + // FastCDR needs extra space for internal operations during deserialization + // Allocate a larger buffer and copy the payload data + // TODO(wjwwood): Use actual serialized message size instead of conservative estimate + size_t buffer_size = + payload_data.size() * 4 + 65536; // 4x + 64KB safety margin + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + void * buffer_data = allocator->allocate(buffer_size, allocator->state); + if (buffer_data == nullptr) { + RMW_SET_ERROR_MSG("failed to allocate deserialization buffer"); + return RMW_RET_ERROR; + } + auto cleanup_buffer = rcpputils::make_scope_exit( + [allocator, buffer_data]() { + allocator->deallocate(buffer_data, allocator->state); + }); + + // Copy payload data to the larger buffer + std::memcpy(buffer_data, payload_data.data(), payload_data.size()); + + // FastCDR needs to know the actual data size, not the buffer size eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload_data.data())), - payload_data.size()); + reinterpret_cast(buffer_data), + payload_data.size()); // Use actual payload size, not allocated buffer size - // Object that serializes the data + // Object that deserializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_message, - type_support_impl_)) - { + + bool deserialize_success = false; + + try { + if (msg_data->endpoint_info.has_value()) { + // Message arrived on a per-publisher endpoint key -- use endpoint-aware + // deserialization so vendor-backed buffer descriptors are resolved. + rmw_context_impl_t * ctx_impl = + static_cast(rmw_node_->context->impl); + auto * backend_ctx = ctx_impl->buffer_backend_context(); + if (!backend_ctx) { + RMW_SET_ERROR_MSG("Buffer-aware deserialize missing buffer backend context"); + return RMW_RET_ERROR; + } + deserialize_success = type_support_->deserialize_ros_message_with_endpoint( + deser.get_cdr(), + ros_message, + type_support_impl_, + msg_data->endpoint_info->info, + backend_ctx->serialization_context); + } else { + // Message arrived on the base key (legacy / standard serialization). + deserialize_success = type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_message, + type_support_impl_); + } + } catch (const std::exception & e) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[Subscription] EXCEPTION during deserialization: %s", e.what()); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Deserialization exception: %s", e.what()); + return RMW_RET_ERROR; + } + + if (!deserialize_success) { RMW_SET_ERROR_MSG("could not deserialize ROS message"); return RMW_RET_ERROR; } @@ -453,7 +1066,7 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - auto & payload_data = msg_data->payload; + const Payload & payload_data = msg_data->payload; if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( @@ -471,7 +1084,7 @@ rmw_ret_t SubscriptionData::take_serialized_message( serialized_message->buffer_length = payload_data.size(); memcpy( serialized_message->buffer, - reinterpret_cast(const_cast(payload_data.data())), + payload_data.data(), payload_data.size()); *taken = true; @@ -495,12 +1108,19 @@ rmw_ret_t SubscriptionData::take_serialized_message( ///============================================================================= void SubscriptionData::add_new_message( - std::unique_ptr msg, const std::string & topic_name) + std::unique_ptr msg, + const std::string & topic_name) { std::lock_guard lock(mutex_); if (is_shutdown_) { return; } + RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[Subscription] add_new_message topic='%s' is_buffer_aware=%d payload_size=%zu", + topic_name.c_str(), + is_buffer_aware_, + msg->payload.size()); const rmw_qos_profile_t adapted_qos_profile = entity_->topic_info().value().qos_; if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && message_queue_.size() >= adapted_qos_profile.depth) diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 517a13ea..4c57932f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -23,17 +23,21 @@ #include #include #include +#include #include #include -#include +#include + +#include #include +#include "attachment_helpers.hpp" +#include "endpoint_info.hpp" #include "event.hpp" #include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" -#include "attachment_helpers.hpp" #include "type_support_common.hpp" #include "zenoh_utils.hpp" @@ -50,16 +54,16 @@ class SubscriptionData final : public std::enable_shared_from_this endpoint_info_ = std::nullopt); Payload payload; uint64_t recv_timestamp; AttachmentData attachment; + std::optional endpoint_info; }; // Make a shared_ptr of SubscriptionData. @@ -69,7 +73,7 @@ class SubscriptionData final : public std::enable_shared_from_this events_mgr() const; - // Shutdown this SubscriptionData. - rmw_ret_t shutdown(); - - // Check if this SubscriptionData is shutdown. - bool is_shutdown() const; - // Add a new message to the queue. - void add_new_message(std::unique_ptr msg, const std::string & topic_name); - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); + void add_new_message( + std::unique_ptr msg, + const std::string & topic_name); + // Take a ROS message. rmw_ret_t take_one_message( void * ros_message, rmw_message_info_t * message_info, bool * taken); + // Take a serialized ROS message. rmw_ret_t take_serialized_message( rmw_serialized_message_t * serialized_message, bool * taken, @@ -114,12 +112,46 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache() const; + // Shutdown this SubscriptionData. + rmw_ret_t shutdown(); + + // Check if this SubscriptionData is shutdown. + bool is_shutdown() const; + // Destructor. ~SubscriptionData(); private: + struct PublisherInfo + { + rmw_gid_t gid{}; + std::string endpoint_key; + EndpointInfoStorage endpoint_info; + std::unordered_map backend_metadata; + std::unordered_map>> backend_groups; + }; + + struct SubscriptionEndpoint + { + std::string key; + std::optional publisher_info; + std::optional> sub; + // When true, this endpoint is a shared accelerated channel keyed by the + // local subscriber's GID; any buffer-aware publisher matched to this + // subscriber writes to it. The message callback must resolve the sending + // publisher's endpoint_info by GID lookup against discovered_publishers_. + bool is_accelerated{false}; + }; + + // Constructor. SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, @@ -127,43 +159,74 @@ class SubscriptionData final : public std::enable_shared_from_this session, const void * type_support_impl, std::unique_ptr type_support, - rmw_subscription_options_t sub_options); + rmw_subscription_options_t sub_options, + bool is_buffer_aware, + std::vector my_backend_types); + // Initialize the subscription. bool init(); + // Discovery callback for Buffer-aware subscriptions. + void on_publisher_discovered(const liveliness::Entity & entity); + + // Create a subscription endpoint for a specific key (caller must hold mutex_). + void create_subscription_for_key( + const std::string & key, + std::optional publisher_info); + + // Create a Zenoh subscription endpoint (does not require mutex_). + std::shared_ptr create_subscription_endpoint( + const std::string & key, + std::optional publisher_info, + bool is_accelerated); + + // Look up a discovered publisher's endpoint info by GID. Used by the + // shared accelerated channel to attach endpoint-aware context to each + // received message. Thread-safe: acquires mutex_ internally. + std::optional lookup_publisher_endpoint_info( + const std::array & publisher_gid) const; + // Internal mutex. mutable std::mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; - // The graph cache. + // Graph cache for discovery. std::shared_ptr graph_cache_; // The Entity generated for the subscription. std::shared_ptr entity_; - // A shared session + // A shared session. std::shared_ptr sess_; - // An owned advanced subscriber. + // An owned AdvancedSubscriber for non-Buffer-aware subscriptions. std::optional> sub_; // Liveliness token for the subscription. std::optional token_; - // Type support fields + // Type support fields. const void * type_support_impl_; std::unique_ptr type_support_; - // Subscription options. rmw_subscription_options_t sub_options_; + std::shared_ptr events_mgr_; + // Message queue. std::deque> message_queue_; - // Map GID of a subscription to the sequence number of the message it published. + // Map of publisher gid hash to last known sequence number. std::unordered_map last_known_published_msg_; // Per-subscriber reception sequence number counter, incremented on every take. std::atomic reception_sn_; // Wait set data. rmw_wait_set_data_t * wait_set_data_; - // Callback managers. + // Data callback manager. DataCallbackManager data_callback_mgr_; - std::shared_ptr events_mgr_; // Shutdown flag. bool is_shutdown_; // Whether the object has ever successfully been initialized. bool initialized_; + + // Buffer-aware subscription fields + bool is_buffer_aware_; + std::vector my_backend_types_; + EndpointInfoStorage local_endpoint_info_; + std::vector discovered_publishers_; + std::unordered_map> sub_endpoints_; + std::set pending_sub_endpoints_; }; using SubscriptionDataPtr = std::shared_ptr; using SubscriptionDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/type_support.cpp b/rmw_zenoh_cpp/src/detail/type_support.cpp index fc462975..00e70fb0 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.cpp +++ b/rmw_zenoh_cpp/src/detail/type_support.cpp @@ -140,4 +140,72 @@ bool TypeSupport::deserialize_ros_message( return true; } + +///============================================================================= +bool TypeSupport::serialize_ros_message_with_endpoint( + const void * ros_message, + eprosima::fastcdr::Cdr & ser, + const void * impl, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & + serialization_context) const +{ + assert(ros_message); + assert(impl); + + ser.serialize_encapsulation(); + + if (has_data_) { + auto callbacks = static_cast(impl); + + if (callbacks->cdr_serialize_with_endpoint) { + return callbacks->cdr_serialize_with_endpoint( + ros_message, ser, endpoint_info, serialization_context); + } else { + return callbacks->cdr_serialize(ros_message, ser); + } + } + + ser << (uint8_t)0; + return true; +} + +///============================================================================= +bool TypeSupport::deserialize_ros_message_with_endpoint( + eprosima::fastcdr::Cdr & deser, + void * ros_message, + const void * impl, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & + serialization_context) const +{ + assert(ros_message); + assert(impl); + + try { + deser.read_encapsulation(); + + if (has_data_) { + auto callbacks = static_cast(impl); + + if (callbacks->cdr_deserialize_with_endpoint) { + return callbacks->cdr_deserialize_with_endpoint( + deser, ros_message, endpoint_info, serialization_context); + } else { + return callbacks->cdr_deserialize(deser, ros_message); + } + } + + uint8_t dump = 0; + deser >> dump; + (void)dump; + } catch (const eprosima::fastcdr::exception::Exception & e) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Fast CDR exception deserializing message of type %s. %s", + get_name(), e.what()); + return false; + } + + return true; +} } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/type_support.hpp b/rmw_zenoh_cpp/src/detail/type_support.hpp index c7b2be52..622be2c2 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.hpp +++ b/rmw_zenoh_cpp/src/detail/type_support.hpp @@ -23,7 +23,9 @@ #include "fastcdr/Cdr.h" +#include "rosidl_typesupport_fastrtps_cpp/buffer_serialization.hpp" #include "rosidl_typesupport_fastrtps_cpp/message_type_support.h" +#include "rmw/topic_endpoint_info.h" namespace rmw_zenoh_cpp { @@ -60,6 +62,22 @@ class TypeSupport bool deserialize_ros_message( eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const; + bool serialize_ros_message_with_endpoint( + const void * ros_message, + eprosima::fastcdr::Cdr & ser, + const void * impl, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & + serialization_context) const; + + bool deserialize_ros_message_with_endpoint( + eprosima::fastcdr::Cdr & deser, + void * ros_message, + const void * impl, + const rmw_topic_endpoint_info_t & endpoint_info, + const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & + serialization_context) const; + virtual ~TypeSupport() {} protected: