Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,12 @@ The format of a liveliness token for a node is:
The format of a liveliness token for a publisher, subscription, service server, or service client:

```text
@ros2_lv/<domain_id>/<session_id>/<node_id>/<entity_id>/<entity_kind>/<mangled_enclave>/<mangled_namespace>/<node_name>/<mangled_qualified_name>/<type_name>/<type_hash>/<qos>
@ros2_lv/<domain_id>/<session_id>/<node_id>/<entity_id>/<entity_kind>/<mangled_enclave>/<mangled_namespace>/<node_name>/<mangled_qualified_name>/<type_name>/<type_hash>/<qos>[/<backends>]
```

The trailing `<backends>` component is **optional**: it is only present for buffer-aware (i.e. `rosidl::Buffer`-carrying) publishers and subscribers and is omitted entirely for plain ROS message types.
See the [Buffer-aware pub/sub](#buffer-aware-pub-sub) section below for details.

Where:

* A mangled name is the name with each `/` characters replaced by `%`
Expand All @@ -206,6 +209,11 @@ Where:
* `<type_name>` - The topic or service type name, as declared in DDS (e.g.: `std_msgs::msg::dds_::String_`) when encoded in CDR
* `<type_hash>` - the topic or service type hash, as defined in [REP-2016](https://github.com/ros-infrastructure/rep/pull/381/files) and generated by [`rosidl`](https://github.com/ros2/rosidl/tree/rolling/rosidl_generator_type_description)
* `<qos>` - The entity QoS encoded in a compact format (see the `qos_to_keyexpr` function)
* `<backends>` - *(Optional, buffer-aware entities only)* A semicolon-separated list of `<backend_name>:<backend_metadata>` pairs prefixed with the literal `backends:`.
Backend names are sorted lexicographically so the same set always serializes identically.
The `<backend_name>` and `<backend_metadata>` fields are escaped to avoid collisions with the `:`, `;`, and `/` separators.
For example, a publisher that supports the `cpu` and `cuda` backends advertises `backends:cpu:;cuda:<cuda-metadata>`.
This component is consumed by the [Buffer-aware pub/sub](#buffer-aware-pub-sub) discovery flow described below.

During context initialization, `rmw_zenoh_cpp` calls `Session::liveliness_get` to get an initial view of the entire graph from other nodes in the system.
From then on, when entities enter and leave the system, `rmw_zenoh_cpp` will get new liveliness tokens that it can use to update its internal cache.
Expand Down Expand Up @@ -510,6 +518,83 @@ When a new service server is created, a liveliness token of type `SS` is sent ou
* Query::reply
* Query::ReplyOptions

## Buffer-aware pub/sub

`rmw_zenoh_cpp` supports messages containing [`rosidl::Buffer<T>`](https://discourse.openrobotics.org/t/working-prototype-of-native-buffers-accelerated-memory-transport/52399) fields, which describe data that may live outside of host memory (e.g. on a GPU).
For these messages, the RMW transports a small **buffer descriptor** (a handle that identifies the underlying memory region) instead of always copying the bytes through the host, while preserving full bit-compatibility with publishers and subscribers that only know how to consume regular CPU bytes.

### Backends

Each `rosidl::Buffer` field is bound to a *backend* (e.g. `cpu`, `cuda`).
At RMW context init time, `rmw_zenoh_cpp` calls `rosidl_buffer_backend_registry::initialize_buffer_backends()` to load all installed backend plugins into a per-context `BufferBackendContext`, and tears them down again on context shutdown.
Every buffer-aware publisher always implicitly supports the `cpu` backend, so a buffer-aware publisher can always fall back to plain CPU serialization for any subscriber.

Subscribers can opt in to specific backends via the `acceptable_buffer_backends` `rmw_subscription_options_t` field:

* `nullptr`, empty, or `"cpu"` -- CPU-only subscriber (advertises `backends:cpu:` in its liveliness token).
* `"any"` -- accept all installed backends.
* A comma-separated list of backend names -- accept only the listed backends (intersected with what is installed).

### Discovery

When a buffer-aware publisher and subscriber appear in the graph, they discover each other through the existing graph cache plus two new discovery callbacks (`register_subscriber_discovery_callback` and `register_publisher_discovery_callback`).
The callbacks are invoked **outside** of the graph cache mutex to avoid re-entrant deadlocks when they create per-endpoint Zenoh entities.

The `<backends>` component of the liveliness token (see [Graph Cache](#graph-cache)) carries each endpoint's advertised backend set so that peers can negotiate which backend to use without any extra discovery traffic.

### Topic key expressions

Buffer-aware peers communicate over additional Zenoh key expressions derived from the standard topic key expression:

| Suffix | Direction | Description |
| --- | --- | --- |
| *(none)* | both | Standard CDR-serialized payload. Always declared by every endpoint so a buffer-aware endpoint can still talk to a legacy (non-buffer) peer. |
| `/_buf_cpu` | pub -> sub | Shared CPU-group channel. A single Zenoh publisher/subscriber pair carries serialized CPU payloads for **all** CPU-only buffer-aware subscribers on the topic. Mirrors `rmw_fastrtps_cpp`'s eager CPU `DataWriter`. |
| `/_buf/<sub_gid_hex>` | pub -> sub | Per-subscriber accelerated channel. Each buffer-aware subscriber that advertises non-CPU backends owns a single shared accelerated Zenoh subscriber keyed by its GID. Every matching publisher writes the negotiated backend descriptor to this key. |

For example, with `ROS_DOMAIN_ID=0`, a buffer-aware `chatter` topic carrying GPU images may use the following keys:

```text
0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_<hash> # base CDR fallback
0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_<hash>/_buf_cpu # shared CPU-group channel
0/chatter/sensor_msgs::msg::dds_::Image_/RIHS01_<hash>/_buf/<gid> # per-subscriber accelerated channel
```

### Publish path

`PublisherData::publish()` chooses a path based on the matched subscribers (mirroring `rmw_fastrtps_cpp`):

1. **All matched subscribers are buffer-aware** -- skip the standard path and call `publish_buffer_aware()`, which:
* publishes a single CPU serialization to `/_buf_cpu` if there are any CPU-only buffer-aware subscribers, and
* publishes a per-subscriber, endpoint-aware serialization (containing only the buffer descriptor for non-CPU backends) to each `/_buf/<sub_gid_hex>` key.
2. **At least one legacy (non-buffer-aware) subscriber is matched** -- skip the buffer channels entirely and fall through to the standard base-key publish path so every subscriber, buffer-aware or not, receives a fully serialized message via the base key.

### Subscribe path

`SubscriptionData` always declares a Zenoh subscriber on the base key for the legacy fallback.
If the subscriber is buffer-aware it additionally:

* declares a Zenoh subscriber on `/_buf_cpu` if it is CPU-only (so it shares the CPU channel with all other CPU-only buffer-aware subscribers on the topic), or
* declares a single accelerated Zenoh subscriber on `/_buf/<my_gid_hex>` if it accepts at least one non-CPU backend.

Incoming samples carry the originating endpoint info (via the `Message` struct's `EndpointInfoStorage`) so that the FastCDR-based deserialization layer can dispatch to the correct backend's `from_descriptor_with_endpoint()` to reconstruct the buffer in place.

### Related RMW APIs

* `rmw_subscription_options_t::acceptable_buffer_backends`
* `rosidl::Buffer<T>`
* `rosidl_buffer_backend_registry::initialize_buffer_backends`
* `rosidl_buffer_backend_registry::shutdown_buffer_backends`
* `PublisherData::publish_buffer_aware`
* `SubscriptionData::on_publisher_discovered`
* `GraphCache::register_publisher_discovery_callback`
* `GraphCache::register_subscriber_discovery_callback`

### Related Zenoh APIs

* `Session::declare_advanced_publisher`
* `Session::declare_advanced_subscriber`

## Quality of Service

The ROS 2 RMW layer defines quite a few Quality of Service settings that are largely derived from DDS.
Expand Down
8 changes: 8 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ find_package(fastcdr CONFIG REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosidl_buffer_backend_registry REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
Expand All @@ -27,6 +28,7 @@ find_package(tracetools REQUIRED)
find_package(zenoh_cpp_vendor REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/buffer_backend_loader.cpp
src/detail/attachment_helpers.cpp
src/detail/cdr.cpp
src/detail/event.cpp
Expand Down Expand Up @@ -68,6 +70,7 @@ target_link_libraries(rmw_zenoh_cpp
fastcdr
rcpputils::rcpputils
rcutils::rcutils
rosidl_buffer_backend_registry::rosidl_buffer_backend_registry
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
Expand All @@ -77,6 +80,11 @@ target_link_libraries(rmw_zenoh_cpp

configure_rmw_library(rmw_zenoh_cpp)

target_include_directories(rmw_zenoh_cpp
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src
)

target_include_directories(rmw_zenoh_cpp PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<depend>fastcdr</depend>
<depend>rcpputils</depend>
<depend>rcutils</depend>
<depend>rosidl_buffer_backend_registry</depend>
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
Expand Down
39 changes: 39 additions & 0 deletions rmw_zenoh_cpp/src/detail/buffer_backend_context.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2026 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__BUFFER_BACKEND_CONTEXT_HPP_
#define DETAIL__BUFFER_BACKEND_CONTEXT_HPP_

#include <memory>
#include <string>
#include <unordered_map>

#include "rosidl_buffer_backend/buffer_backend.hpp"
#include "rosidl_buffer_backend_registry/buffer_backend_registry.hpp"
#include "rosidl_typesupport_fastrtps_cpp/buffer_serialization.hpp"

namespace rmw_zenoh_cpp
{

/// Per-rmw_context bundle of buffer backend state.
struct BufferBackendContext
{
rosidl_typesupport_fastrtps_cpp::BufferSerializationContext serialization_context;
std::unique_ptr<rosidl_buffer_backend_registry::BufferBackendRegistry> registry;
std::unordered_map<std::string, std::shared_ptr<rosidl::BufferBackend>> backend_instances;
};

} // namespace rmw_zenoh_cpp

#endif // DETAIL__BUFFER_BACKEND_CONTEXT_HPP_
191 changes: 191 additions & 0 deletions rmw_zenoh_cpp/src/detail/buffer_backend_loader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright 2026 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "buffer_backend_loader.hpp"

#include <memory>
#include <cstring>
#include <string>
#include <stdexcept>
#include <utility>
#include <vector>

#include "rcutils/error_handling.h"
#include "rcutils/logging_macros.h"
#include "rosidl_buffer_backend_registry/buffer_backend_registry.hpp"
#include "rosidl_typesupport_fastrtps_cpp/identifier.hpp"
#include "rosidl_typesupport_fastrtps_cpp/message_type_support_decl.hpp"
#include "rosidl_typesupport_fastrtps_cpp/buffer_serialization.hpp"
#include "rosidl_runtime_c/message_type_support_struct.h"

namespace rmw_zenoh_cpp
{

static const char * kLoggerName = "rmw_zenoh_cpp.buffer_backend_loader";

void initialize_buffer_backends(BufferBackendContext & context)
{
context.registry = std::make_unique<rosidl_buffer_backend_registry::BufferBackendRegistry>();
auto & registry = *context.registry;

auto & backend_ops = context.serialization_context.descriptor_ops;
auto & serializers = context.serialization_context.descriptor_serializers;
std::vector<std::string> backend_names = registry.get_backend_names();
RCUTILS_LOG_DEBUG_NAMED(
kLoggerName, "Buffer backends: found %zu backend(s)", backend_names.size());

for (const std::string & backend_name : backend_names) {
std::shared_ptr<rosidl::BufferBackend> backend =
registry.create_backend_instance(backend_name);
if (!backend) {
RCUTILS_LOG_ERROR_NAMED(
kLoggerName, "Backend '%s' pointer is null", backend_name.c_str());
continue;
}

std::string backend_type = backend->get_backend_type();
RCUTILS_LOG_DEBUG_NAMED(
kLoggerName, "Processing backend '%s' (type: %s)",
backend_name.c_str(), backend_type.c_str());

context.backend_instances[backend_type] = backend;

rosidl::BufferDescriptorOps ops;

// Capture a weak_ptr so the lambdas (which live inside
// descriptor_ops/descriptor_serializers) do not extend the backend's
// lifetime past shutdown_buffer_backends(). The backend's only
// owning reference is held in context.backend_instances and is
// released explicitly during shutdown.
std::weak_ptr<rosidl::BufferBackend> backend_wp = backend;
ops.create_descriptor_with_endpoint = [backend_wp](
const void * impl,
const rmw_topic_endpoint_info_t & endpoint_info) -> std::shared_ptr<void> {
std::shared_ptr<rosidl::BufferBackend> backend_sp = backend_wp.lock();
if (!backend_sp) {
throw std::runtime_error(
"Buffer backend instance has been destroyed; "
"create_descriptor_with_endpoint called after shutdown");
}
return backend_sp->create_descriptor_with_endpoint(impl, endpoint_info);
};
ops.from_descriptor_with_endpoint = [backend_wp](
const void * descriptor,
const rmw_topic_endpoint_info_t & endpoint_info)
-> std::unique_ptr<void, void (*)(void *)> {
std::shared_ptr<rosidl::BufferBackend> backend_sp = backend_wp.lock();
if (!backend_sp) {
throw std::runtime_error(
"Buffer backend instance has been destroyed; "
"from_descriptor_with_endpoint called after shutdown");
}
return backend_sp->from_descriptor_with_endpoint(descriptor, endpoint_info);
};

backend_ops[backend_type] = ops;

const rosidl_message_type_support_t * descriptor_ts =
backend->get_descriptor_type_support();
if (!descriptor_ts) {
RCUTILS_LOG_ERROR_NAMED(
kLoggerName,
" Backend '%s' returned null descriptor type support",
backend_type.c_str());
continue;
}
const rosidl_message_type_support_t * fastrtps_descriptor_ts = get_message_typesupport_handle(
descriptor_ts, rosidl_typesupport_fastrtps_cpp::typesupport_identifier);
if (!fastrtps_descriptor_ts) {
RCUTILS_LOG_ERROR_NAMED(
kLoggerName,
" Backend '%s' descriptor type support could not be resolved to "
"rosidl_typesupport_fastrtps_cpp",
backend_type.c_str());
rcutils_reset_error();
continue;
}

const auto * callbacks = static_cast<const message_type_support_callbacks_t *>(
fastrtps_descriptor_ts->data);
if (!callbacks) {
RCUTILS_LOG_ERROR_NAMED(
kLoggerName,
" Backend '%s' descriptor callbacks are null",
backend_type.c_str());
continue;
}

rosidl_typesupport_fastrtps_cpp::BufferDescriptorSerializers desc_ser;
desc_ser.serialize = [callbacks](
eprosima::fastcdr::Cdr & cdr,
const std::shared_ptr<void> & desc_ptr,
const rmw_topic_endpoint_info_t & endpoint_info,
const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & serialization_context)
{
if (!desc_ptr) {
throw std::runtime_error("Descriptor pointer is null");
}
if (callbacks->cdr_serialize_with_endpoint) {
callbacks->cdr_serialize_with_endpoint(
desc_ptr.get(), cdr, endpoint_info, serialization_context);
} else {
callbacks->cdr_serialize(desc_ptr.get(), cdr);
}
};
desc_ser.deserialize = [callbacks, backend_wp](
eprosima::fastcdr::Cdr & cdr,
const rmw_topic_endpoint_info_t & endpoint_info,
const rosidl_typesupport_fastrtps_cpp::BufferSerializationContext & serialization_context)
-> std::shared_ptr<void>
{
std::shared_ptr<rosidl::BufferBackend> backend_sp = backend_wp.lock();
if (!backend_sp) {
throw std::runtime_error(
"Buffer backend instance has been destroyed; "
"descriptor deserialize called after shutdown");
}
std::shared_ptr<void> desc = backend_sp->create_empty_descriptor();
if (!desc) {
throw std::runtime_error("Backend returned null descriptor instance");
}
if (callbacks->cdr_deserialize_with_endpoint) {
callbacks->cdr_deserialize_with_endpoint(
cdr, desc.get(), endpoint_info, serialization_context);
} else {
callbacks->cdr_deserialize(cdr, desc.get());
}
return desc;
};
serializers[backend_type] = std::move(desc_ser);

RCUTILS_LOG_DEBUG_NAMED(
kLoggerName, " FastCDR descriptor serializers registered for '%s'",
backend_type.c_str());
}
}

void shutdown_buffer_backends(BufferBackendContext & context)
{
try {
context.serialization_context.descriptor_ops.clear();
context.serialization_context.descriptor_serializers.clear();
context.backend_instances.clear();
context.registry.reset();
} catch (const std::exception & e) {
RCUTILS_LOG_ERROR_NAMED(
kLoggerName, "Error during buffer backend shutdown: %s", e.what());
}
}

} // namespace rmw_zenoh_cpp
Loading
Loading