diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 1477ca98f..a50d27e28 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -257,6 +257,42 @@ // }, // }, // ], + // /// Overwrite QoS options for messages sent and received from/to the network + // /// This allows more fine grained rules (per network card, etc...) but is + // /// less performant than the publication option above. + // network: [ + // { + // /// Optional Id, has to be unique. + // id: "lo0_en0_qos_overwrite", + // // Optional list of interfaces, if not specified, will be applied to all interfaces. + // interfaces: [ + // "lo0", + // "en0", + // ], + // /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten. + // /// If absent, the overwrite will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], + // /// List of message types to apply to. + // messages: [ + // "put", // put publications + // "delete" // delete publications + // "query", // get queries + // "reply", // replies to queries + // ], + // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). + // /// If absent, the rules will be applied to both flows. + // flows: ["egress", "ingress"], + // key_exprs: ["test/demo"], + // overwrite: { + // /// Optional new priority value, if not specified priority of the messages will stay unchanged. + // priority: "real_time", + // /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged. + // congestion_control: "block", + // /// Optional new express value, if not specified express flag of the messages will stay unchanged. + // express: true + // }, + // }, + // ], // }, // /// The declarations aggregation strategy. @@ -287,8 +323,11 @@ // /// Optional Id, has to be unique // "id": "wlan0egress", // /// Optional list of network interfaces messages will be processed on, the rest will be passed as is. - // /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any. + // /// If absent, the rules will be applied to all interfaces. An empty list is invalid. // interfaces: [ "wlan0" ], + // /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered. + // /// If absent, the rules will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). // /// If absent, the rules will be applied to both flows. // flow: ["ingress", "egress"], @@ -387,6 +426,12 @@ // "id": "subject3", // /// An empty subject combination is a wildcard // }, + // { + // "id": "subject4", + // /// link protocols can also be used to identify transports to filter messages on. + // /// If absent, the rules will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], + // }, // ], // /// The policies list associates rules to subjects // "policies": @@ -401,7 +446,7 @@ // }, // { // "rules": ["rule2"], - // "subjects": ["subject3"], + // "subjects": ["subject3", "subject4"], // }, // ] //}, diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 8792f287d..31d8105d0 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -265,6 +265,42 @@ // }, // }, // ], + // /// Overwrite QoS options for messages sent and received from/to the network + // /// This allows more fine grained rules (per network card, etc...) but is + // /// less performant than the publication option above. + // network: [ + // { + // /// Optional Id, has to be unique. + // id: "lo0_en0_qos_overwrite", + // // Optional list of interfaces, if not specified, will be applied to all interfaces. + // interfaces: [ + // "lo0", + // "en0", + // ], + // /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten. + // /// If absent, the overwrite will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], + // /// List of message types to apply to. + // messages: [ + // "put", // put publications + // "delete" // delete publications + // "query", // get queries + // "reply", // replies to queries + // ], + // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). + // /// If absent, the rules will be applied to both flows. + // flows: ["egress", "ingress"], + // key_exprs: ["test/demo"], + // overwrite: { + // /// Optional new priority value, if not specified priority of the messages will stay unchanged. + // priority: "real_time", + // /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged. + // congestion_control: "block", + // /// Optional new express value, if not specified express flag of the messages will stay unchanged. + // express: true + // }, + // }, + // ], // }, // /// The declarations aggregation strategy. @@ -295,8 +331,11 @@ // /// Optional Id, has to be unique // "id": "wlan0egress", // /// Optional list of network interfaces messages will be processed on, the rest will be passed as is. - // /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any. + // /// If absent, the rules will be applied to all interfaces. An empty list is invalid. // interfaces: [ "wlan0" ], + // /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered. + // /// If absent, the rules will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). // /// If absent, the rules will be applied to both flows. // flow: ["ingress", "egress"], @@ -395,6 +434,12 @@ // "id": "subject3", // /// An empty subject combination is a wildcard // }, + // { + // "id": "subject4", + // /// link protocols can also be used to identify transports to filter messages on. + // /// If absent, the rules will be applied to all transports. An empty list is invalid. + // link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"], + // }, // ], // /// The policies list associates rules to subjects // "policies": @@ -409,7 +454,7 @@ // }, // { // "rules": ["rule2"], - // "subjects": ["subject3"], + // "subjects": ["subject3", "subject4"], // }, // ] //}, diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index ddd913fce..18e5678f8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -44,6 +44,10 @@ namespace rmw_zenoh_cpp // TODO(yuyuan): SHM, make this configurable #define SHM_BUF_OK_SIZE 2621440 +// Period (ms) of heartbeats sent for detection of lost samples +// by a RELIABLE + TRANSIENT_LOCAL Publisher +#define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500 + ///============================================================================= std::shared_ptr PublisherData::make( std::shared_ptr session, @@ -115,6 +119,14 @@ std::shared_ptr PublisherData::make( adv_pub_opts.publisher_detection = true; adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default(); adv_pub_opts.cache->max_samples = adapted_qos_profile.depth; + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + // If RELIABLE + TRANSIENT_LOCAL activate sample miss detection for subscriber + // to detect missed samples and retrieve those from the Publisher cache. + // HeartbeatSporadic is used to prevent excessive background traffic + adv_pub_opts.sample_miss_detection.emplace().heartbeat = + AdvancedPublisherOptions::SampleMissDetectionOptions::HeartbeatSporadic{ + SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD}; + } } zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index c0a2fa0e6..be541c5db 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -196,6 +196,13 @@ bool SubscriptionData::init() // Enable detection of late joiner publishers and query for their historical data. adv_sub_opts.history->detect_late_publishers = true; adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth; + if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + // Activate recovery of lost samples. + // This requires the Publisher to have sample_miss_detection configured, + // which is the case for a RELIABLE + TRANSIENT_LOCAL Publisher. + adv_sub_opts.recovery.emplace().last_sample_miss_detection = + AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{}; + } } std::weak_ptr data_wp = shared_from_this(); diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index 5b7cf794d..1f62899f7 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -18,28 +18,25 @@ find_package(ament_cmake_vendor_package REQUIRED) set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_udp zenoh/transport_tls") # Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from: -# - Reword SHM warning log about "setting scheduling priority": -# - https://github.com/eclipse-zenoh/zenoh/pull/1778 -# - Performances improvements at launch time: -# - https://github.com/eclipse-zenoh/zenoh/pull/1786 -# - https://github.com/eclipse-zenoh/zenoh/pull/1789 -# - https://github.com/eclipse-zenoh/zenoh/pull/1793 -# - Fixed open timeout -# - https://github.com/eclipse-zenoh/zenoh/pull/1796 -# - Improve ACL behaviour, notably for S-ROS -# - https://github.com/eclipse-zenoh/zenoh/pull/1781 -# - https://github.com/eclipse-zenoh/zenoh/pull/1785 -# - https://github.com/eclipse-zenoh/zenoh/pull/1795 -# - https://github.com/eclipse-zenoh/zenoh/pull/1806 -# - Reduce the number of threads in case of scouting -# - https://github.com/eclipse-zenoh/zenoh-c/pull/937 -# - Namespace prefix support -# - https://github.com/eclipse-zenoh/zenoh/pull/1792 -# - Fix debug mode crash -# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/432 +# - Fix a bug leading to invalid inapropriate "Unable to push non droppable network message" log and transport closure: +# - https://github.com/eclipse-zenoh/zenoh/pull/1855 +# - Fix crash with highly chunked keys: +# - https://github.com/eclipse-zenoh/zenoh/pull/1826 +# - Resolve issue with closing the Session in atexit: +# - https://github.com/eclipse-zenoh/zenoh/pull/1632 +# - Change `Session::close()` implementation so it can be safely waited and awaited in `atexit`` +# - https://github.com/eclipse-zenoh/zenoh/pull/1632 +# - Add QoS overwrite interceptor allowing for instance a Router to be configured to change QoS on the fly +# - https://github.com/eclipse-zenoh/zenoh/pull/1825 +# - Add link protocols as subject to interceptors (access_control, downsampling or qos overwrite): +# - https://github.com/eclipse-zenoh/zenoh/pull/1850 +# - Add new non periodic last sample miss detection mechanism for Advanced Publisher: +# - https://github.com/eclipse-zenoh/zenoh/pull/1861 +# - Improve tracing for better analysis on the system like rmw_zenoh +# - https://github.com/eclipse-zenoh/zenoh/pull/1844 ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION e6a1971139f405f7887bf5bb54f0efe402123032 + VCS_VERSION ffa4bddc947f7ed6c0e3b4546205dd1b73e7df81 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -50,7 +47,7 @@ ament_export_dependencies(zenohc) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 8ad67f6c7a9031acd437c8739bbc8ddab0ca8173 + VCS_VERSION 868fdad0e7418e8f8cb96e94c89a3aed05905e63 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF )