diff --git a/ecal/core/CMakeLists.txt b/ecal/core/CMakeLists.txt index 9ae6fc5691..6f5ca5e701 100644 --- a/ecal/core/CMakeLists.txt +++ b/ecal/core/CMakeLists.txt @@ -153,6 +153,7 @@ set(ecal_config_src src/config/configuration.cpp src/config/ecal_path_processing.cpp src/config/ecal_path_processing.h + src/config/shm_mutex.cpp src/types/ecal_custom_data_types.cpp ) if (ECAL_CORE_CONFIGURATION) @@ -803,8 +804,7 @@ set_target_properties(core VISIBILITY_INLINES_HIDDEN ON ) -target_compile_features(core INTERFACE cxx_std_14) -target_compile_features(core PRIVATE cxx_std_17) +target_compile_features(core PUBLIC cxx_std_17) set_property(TARGET core PROPERTY FOLDER core) endmacro() diff --git a/ecal/core/cfg/CMakeLists.txt b/ecal/core/cfg/CMakeLists.txt index 3a14094db3..911c96187d 100644 --- a/ecal/core/cfg/CMakeLists.txt +++ b/ecal/core/cfg/CMakeLists.txt @@ -29,9 +29,6 @@ set(ECAL_GENERATED_YAML_PATH ${ECAL_YAML_PATH} CACHE INTERNAL "Path to the gener add_executable(${PROJECT_NAME} ${ECAL_CORE_PROJECT_ROOT}/core/cfg/generate_configuration_yaml.cpp - ${ECAL_CORE_PROJECT_ROOT}/core/src/config/configuration_writer.cpp - ${ECAL_CORE_PROJECT_ROOT}/core/src/config/default_configuration.cpp - ${ECAL_CORE_PROJECT_ROOT}/core/src/config/ecal_path_processing.cpp ) # Set the output paths for the generated YAML files @@ -57,17 +54,10 @@ add_custom_target(run_${PROJECT_NAME} ALL # Ensure the executable is built before running the custom target add_dependencies(run_${PROJECT_NAME} ${PROJECT_NAME}) -# Specify include directories for the executable -target_include_directories(${PROJECT_NAME} PRIVATE - ${ECAL_CORE_PROJECT_ROOT}/core/src - ${ECAL_CORE_PROJECT_ROOT}/core/include -) - target_link_libraries(${PROJECT_NAME} PRIVATE $<$,$>>:dl> - eCAL::core - eCAL::ecal-utils + ecal_core_private ) # Select the correct ecal config directory on Linux and Windows diff --git a/ecal/core/include/ecal/config/transport_layer.h b/ecal/core/include/ecal/config/transport_layer.h index 3b5b851cd0..b8e184b575 100644 --- a/ecal/core/include/ecal/config/transport_layer.h +++ b/ecal/core/include/ecal/config/transport_layer.h @@ -27,6 +27,10 @@ #include #include +#include +#include + + namespace eCAL { namespace TransportLayer @@ -79,10 +83,30 @@ namespace eCAL }; } + namespace SHM + { + enum class eMutexType : std::uint8_t + { + mutex, + recoverable_mutex + }; + + ECAL_API eMutexType DefaultMutexType(); + ECAL_API std::string_view ToString(eMutexType mutex_type); + ECAL_API std::optional FromString(std::string_view mutex_type_string); + + + struct Configuration + { + eMutexType mutex_type { DefaultMutexType() }; + }; + } + struct Configuration { UDP::Configuration udp; TCP::Configuration tcp; + SHM::Configuration shm; }; } -} \ No newline at end of file +} diff --git a/ecal/core/src/config/configuration_reader.cpp b/ecal/core/src/config/configuration_reader.cpp index 91b030d2db..d167060322 100644 --- a/ecal/core/src/config/configuration_reader.cpp +++ b/ecal/core/src/config/configuration_reader.cpp @@ -1,4 +1,5 @@ #include "configuration_reader.h" +#include "configuration_to_yaml.h" namespace { diff --git a/ecal/core/src/config/configuration_reader.h b/ecal/core/src/config/configuration_reader.h index c92abb8978..80e4bf92a8 100644 --- a/ecal/core/src/config/configuration_reader.h +++ b/ecal/core/src/config/configuration_reader.h @@ -30,8 +30,6 @@ #endif #include -#include "configuration_to_yaml.h" - #include #include #include diff --git a/ecal/core/src/config/configuration_to_yaml.cpp b/ecal/core/src/config/configuration_to_yaml.cpp index 921eaa56f9..2267145e88 100644 --- a/ecal/core/src/config/configuration_to_yaml.cpp +++ b/ecal/core/src/config/configuration_to_yaml.cpp @@ -3,6 +3,21 @@ // utility functions for yaml node handling namespace YAML { + std::string ShmMutexTypeToString(const eCAL::TransportLayer::SHM::eMutexType type) + { + using namespace eCAL::TransportLayer::SHM; + return ToString(type).data(); + } + + void ParseShmMutexType(const std::string& text, eCAL::TransportLayer::SHM::eMutexType& out) + { + using namespace eCAL::TransportLayer::SHM; + if (auto mutex_type = FromString(text); mutex_type.has_value()) + { + out = mutex_type.value(); + } + } + eCAL::Logging::Filter ParseLogLevel(const std::vector& filter_) { // create excluding filter list @@ -281,12 +296,29 @@ namespace YAML AssignValue(config_.local, node_, "local"); return true; } + + Node convert::encode(const eCAL::TransportLayer::SHM::Configuration& config_) + { + Node node; + node["mutex_type"] = ShmMutexTypeToString(config_.mutex_type); + return node; + } + + bool convert::decode(const Node& node_, eCAL::TransportLayer::SHM::Configuration& config_) + { + if (node_["mutex_type"]) + { + ParseShmMutexType(node_["mutex_type"].as(), config_.mutex_type); + } + return true; + } Node convert::encode(const eCAL::TransportLayer::Configuration& config_) { Node node; node["udp"] = config_.udp; node["tcp"] = config_.tcp; + node["shm"] = config_.shm; return node; } @@ -295,6 +327,7 @@ namespace YAML { AssignValue(config_.udp, node_, "udp"); AssignValue(config_.tcp, node_, "tcp"); + AssignValue(config_.shm, node_, "shm"); return true; } @@ -680,6 +713,7 @@ namespace YAML Node convert::encode(const eCAL::Configuration& config_) { Node node; + node["transport_layer"] = config_.transport_layer; node["publisher"] = config_.publisher; node["subscriber"] = config_.subscriber; node["registration"] = config_.registration; @@ -707,4 +741,4 @@ namespace YAML return true; } -} \ No newline at end of file +} diff --git a/ecal/core/src/config/configuration_to_yaml.h b/ecal/core/src/config/configuration_to_yaml.h index 012c3e2a0d..20de241fec 100644 --- a/ecal/core/src/config/configuration_to_yaml.h +++ b/ecal/core/src/config/configuration_to_yaml.h @@ -137,6 +137,14 @@ namespace YAML static bool decode(const Node& node_, eCAL::TransportLayer::Configuration& config_); }; + template<> + struct convert + { + static Node encode(const eCAL::TransportLayer::SHM::Configuration& config_); + + static bool decode(const Node& node_, eCAL::TransportLayer::SHM::Configuration& config_); + }; + /* ___ __ ___ __ @@ -359,4 +367,4 @@ namespace YAML }; } -#endif // CONFIGURATION_TO_YAML_H \ No newline at end of file +#endif // CONFIGURATION_TO_YAML_H diff --git a/ecal/core/src/config/default_configuration.cpp b/ecal/core/src/config/default_configuration.cpp index 0de9e9c7ed..6909ed7ac9 100644 --- a/ecal/core/src/config/default_configuration.cpp +++ b/ecal/core/src/config/default_configuration.cpp @@ -1,6 +1,7 @@ #include "default_configuration.h" #include "ecal/config.h" +#include "config/configuration_to_yaml.h" #include @@ -138,6 +139,12 @@ namespace { return std::string("\"") + ip_.Get() + std::string("\""); } + + std::string quoteString(const eCAL::TransportLayer::SHM::eMutexType type_) + { + using namespace eCAL::TransportLayer::SHM; + return std::string("\"") + ToString(type_).data() + std::string("\""); + } } namespace eCAL @@ -248,6 +255,11 @@ namespace eCAL ss << R"( # Reconnection attemps the session will try to reconnect in case of an issue)" << "\n"; ss << R"( max_reconnections: )" << config_.transport_layer.tcp.max_reconnections << "\n"; ss << R"()" << "\n"; + ss << R"( shm:)" << "\n"; + ss << R"( # Named mutex mode for shared-memory synchronization)" << "\n"; + ss << R"( # Options: "mutex", "recoverable_mutex")" << "\n"; + ss << R"( mutex_type: )" << quoteString(config_.transport_layer.shm.mutex_type) << "\n"; + ss << R"()" << "\n"; ss << R"()" << "\n"; ss << R"(# Publisher specific base settings)" << "\n"; ss << R"(publisher:)" << "\n"; @@ -256,6 +268,8 @@ namespace eCAL ss << R"( shm:)" << "\n"; ss << R"( # Enable layer)" << "\n"; ss << R"( enable: )" << config_.publisher.layer.shm.enable << "\n"; + ss << R"( # Optional override. If omitted, inherits transport_layer.shm.mutex_type)" << "\n"; + ss << R"( # mutex_type: "recoverable_mutex")" << "\n"; ss << R"( # Enable zero copy shared memory transport mode)" << "\n"; ss << R"( zero_copy_mode: )" << config_.publisher.layer.shm.zero_copy_mode << "\n"; ss << R"( # Force connected subscribers to send acknowledge event after processing the message.)" << "\n"; @@ -276,7 +290,7 @@ namespace eCAL ss << R"( # Base configuration for TCP publisher)" << "\n"; ss << R"( tcp:)" << "\n"; ss << R"( # Enable layer)" << "\n"; - ss << R"( enable: )" << config_.publisher.layer.shm.enable << "\n"; + ss << R"( enable: )" << config_.publisher.layer.tcp.enable << "\n"; ss << R"()" << "\n"; ss << R"( # Priority list for layer usage in local mode (Default: SHM > UDP > TCP))" << "\n"; ss << R"( priority_local: )" << quoteString(config_.publisher.layer_priority_local) << "\n"; @@ -291,6 +305,8 @@ namespace eCAL ss << R"( shm:)" << "\n"; ss << R"( # Enable layer)" << "\n"; ss << R"( enable: )" << config_.subscriber.layer.shm.enable << "\n"; + ss << R"( # Optional override. If omitted, inherits transport_layer.shm.mutex_type)" << "\n"; + ss << R"( # mutex_type: "recoverable_mutex")" << "\n"; ss << R"()" << "\n"; ss << R"( # Base configuration for UDP subscriber)" << "\n"; ss << R"( udp:)" << "\n"; @@ -402,4 +418,4 @@ namespace eCAL return ss; } } -} \ No newline at end of file +} diff --git a/ecal/core/src/config/shm_mutex.cpp b/ecal/core/src/config/shm_mutex.cpp new file mode 100644 index 0000000000..99752ceebf --- /dev/null +++ b/ecal/core/src/config/shm_mutex.cpp @@ -0,0 +1,53 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2026 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include +#include "io/mtx/shm_mutex_resolution.h" + +namespace eCAL::TransportLayer::SHM +{ + eMutexType DefaultMutexType() + { + return detail::DefaultSemanticMutexType(); + } + + std::string_view ToString(eMutexType mutex_type) + { + switch (mutex_type) + { + case eMutexType::mutex: + return "mutex"; + case eMutexType::recoverable_mutex: + return "recoverable_mutex"; + default: + return "mutex"; + } + } + + std::optional FromString(std::string_view mutex_type_string) + { + if (mutex_type_string == "mutex") + return eMutexType::mutex; + + if (mutex_type_string == "recoverable_mutex") + return eMutexType::recoverable_mutex; + + return std::nullopt; + } +} diff --git a/ecal/core/src/io/mtx/ecal_named_mutex.cpp b/ecal/core/src/io/mtx/ecal_named_mutex.cpp index 163b1d27ce..f8262493ca 100644 --- a/ecal/core/src/io/mtx/ecal_named_mutex.cpp +++ b/ecal/core/src/io/mtx/ecal_named_mutex.cpp @@ -22,11 +22,13 @@ **/ #include +#include #include #include #include #include "ecal_named_mutex.h" +#include "shm_mutex_resolution.h" #ifdef ECAL_OS_LINUX #include "linux/ecal_named_mutex_impl.h" @@ -39,13 +41,11 @@ #include "win32/ecal_named_mutex_impl.h" #endif -#include - namespace eCAL { - CNamedMutex::CNamedMutex(const std::string& name_, bool recoverable_) : CNamedMutex() + CNamedMutex::CNamedMutex(const std::string& name_, const detail::eResolvedMutexType mutex_type_, bool recoverable_) : CNamedMutex() { - Create(name_, recoverable_); + Create(name_, mutex_type_, recoverable_); } CNamedMutex::CNamedMutex() @@ -69,24 +69,29 @@ namespace eCAL return *this; } - bool CNamedMutex::Create(const std::string& name_, bool recoverable_) + bool CNamedMutex::Create(const std::string& name_, const detail::eResolvedMutexType mutex_type_, bool recoverable_) { + switch (mutex_type_) + { + case detail::eResolvedMutexType::pthread_mutex: #ifdef ECAL_OS_LINUX -#if !defined(ECAL_USE_CLOCKLOCK_MUTEX) && defined(ECAL_HAS_ROBUST_MUTEX) - if(recoverable_) - m_impl = std::make_unique(name_, true); - else - m_impl = std::make_unique(name_, false); -#elif defined(ECAL_USE_CLOCKLOCK_MUTEX) && defined(ECAL_HAS_CLOCKLOCK_MUTEX) - m_impl = std::make_unique(name_, recoverable_); -#else - m_impl = std::make_unique(name_, recoverable_); + m_impl = std::make_unique(name_, recoverable_); #endif + break; + case detail::eResolvedMutexType::pthread_robust_mutex: +#if defined(ECAL_OS_LINUX) && (defined(ECAL_HAS_ROBUST_MUTEX) || defined(ECAL_HAS_CLOCKLOCK_MUTEX)) + m_impl = std::make_unique(name_, recoverable_); #endif - + break; + case detail::eResolvedMutexType::winapi_mutex: #ifdef ECAL_OS_WINDOWS - m_impl = std::make_unique(name_, recoverable_); + m_impl = std::make_unique(name_, recoverable_); #endif + break; + default: + break; + } + return IsCreated(); } @@ -130,4 +135,3 @@ namespace eCAL m_impl->Unlock(); } } - diff --git a/ecal/core/src/io/mtx/ecal_named_mutex.h b/ecal/core/src/io/mtx/ecal_named_mutex.h index 626c9f07b3..cb25c98e22 100644 --- a/ecal/core/src/io/mtx/ecal_named_mutex.h +++ b/ecal/core/src/io/mtx/ecal_named_mutex.h @@ -27,6 +27,8 @@ #include #include +#include "shm_mutex_resolution.h" + namespace eCAL { class CNamedMutexImplBase; @@ -34,7 +36,7 @@ namespace eCAL class CNamedMutex { public: - explicit CNamedMutex(const std::string& name_, bool recoverable_ = false); + explicit CNamedMutex(const std::string& name_, const detail::eResolvedMutexType mutex_type_, bool recoverable_ = false); CNamedMutex(); ~CNamedMutex(); @@ -43,7 +45,7 @@ namespace eCAL CNamedMutex(CNamedMutex&& named_mutex) noexcept; CNamedMutex& operator=(CNamedMutex&& named_mutex) noexcept; - bool Create(const std::string& name_, bool recoverable_ = false); + bool Create(const std::string& name_, detail::eResolvedMutexType mutex_type_, bool recoverable_ = false); void Destroy(); bool IsCreated() const; @@ -59,4 +61,4 @@ namespace eCAL private: std::unique_ptr m_impl; }; -} \ No newline at end of file +} diff --git a/ecal/core/src/io/mtx/shm_mutex_resolution.h b/ecal/core/src/io/mtx/shm_mutex_resolution.h new file mode 100644 index 0000000000..86627a2654 --- /dev/null +++ b/ecal/core/src/io/mtx/shm_mutex_resolution.h @@ -0,0 +1,101 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2026 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#pragma once + +#include +#include +#include +#include + +namespace eCAL::detail +{ + enum class eResolvedMutexType : std::uint8_t + { + winapi_mutex, + pthread_mutex, + pthread_robust_mutex + }; + + template + struct shm_mutex_traits; + + template<> + struct shm_mutex_traits + { + static constexpr bool supported = true; +#ifdef ECAL_OS_WINDOWS + static constexpr eResolvedMutexType resolved = eResolvedMutexType::winapi_mutex; +#else + static constexpr eResolvedMutexType resolved = eResolvedMutexType::pthread_mutex; +#endif + }; + + template<> + struct shm_mutex_traits + { +#ifdef ECAL_OS_WINDOWS + static constexpr bool supported = true; + static constexpr eResolvedMutexType resolved = eResolvedMutexType::winapi_mutex; +#elif defined(ECAL_HAS_ROBUST_MUTEX) || defined(ECAL_HAS_CLOCKLOCK_MUTEX) + static constexpr bool supported = true; + static constexpr eResolvedMutexType resolved = eResolvedMutexType::pthread_robust_mutex; +#else + static constexpr bool supported = false; + static constexpr eResolvedMutexType resolved = eResolvedMutexType::pthread_mutex; +#endif + }; + + inline bool IsSupported(const eCAL::TransportLayer::SHM::eMutexType type) + { + switch (type) + { + case eCAL::TransportLayer::SHM::eMutexType::mutex: + return shm_mutex_traits::supported; + case eCAL::TransportLayer::SHM::eMutexType::recoverable_mutex: + return shm_mutex_traits::supported; + default: + return false; + } + } + + inline eCAL::TransportLayer::SHM::eMutexType DefaultSemanticMutexType() + { + return IsSupported(eCAL::TransportLayer::SHM::eMutexType::recoverable_mutex) + ? eCAL::TransportLayer::SHM::eMutexType::recoverable_mutex + : eCAL::TransportLayer::SHM::eMutexType::mutex; + } + + inline eResolvedMutexType Resolve(const eCAL::TransportLayer::SHM::eMutexType type) + { + switch (type) + { + case eCAL::TransportLayer::SHM::eMutexType::mutex: + return shm_mutex_traits::resolved; + case eCAL::TransportLayer::SHM::eMutexType::recoverable_mutex: + if (!shm_mutex_traits::supported) + { + throw std::runtime_error("recoverable_mutex is not supported by this build"); + } + return shm_mutex_traits::resolved; + default: + throw std::runtime_error("Unknown SHM mutex type"); + } + } +} diff --git a/ecal/core/src/io/shm/ecal_memfile.cpp b/ecal/core/src/io/shm/ecal_memfile.cpp index bbe88a1800..4e2c273de9 100644 --- a/ecal/core/src/io/shm/ecal_memfile.cpp +++ b/ecal/core/src/io/shm/ecal_memfile.cpp @@ -56,12 +56,13 @@ namespace eCAL Destroy(false); } - bool CMemoryFile::Create(const char* name_, const bool create_, const size_t len_, bool auto_sanitizing_) + bool CMemoryFile::Create(const char* name_, const bool create_, detail::eResolvedMutexType mutex_type_, const size_t len_, bool auto_sanitizing_) { assert((create_ && len_ > 0) || (!create_ && len_ == 0)); assert((auto_sanitizing_ && create_) || !auto_sanitizing_); m_auto_sanitizing = auto_sanitizing_; + m_memfile_mutex_type = mutex_type_; // do we have to recreate the file ? if ((m_name != name_) @@ -98,7 +99,7 @@ namespace eCAL // create mutex // for performance reasons only apply consistency check if it is explicitly set - if(!m_memfile_mutex.Create(name_, m_auto_sanitizing)) + if(!m_memfile_mutex.Create(name_, m_memfile_mutex_type, m_auto_sanitizing)) { #ifndef NDEBUG printf("Could not create memory file mutex: %s.\n", name_); diff --git a/ecal/core/src/io/shm/ecal_memfile.h b/ecal/core/src/io/shm/ecal_memfile.h index 4df3280b74..832657879a 100644 --- a/ecal/core/src/io/shm/ecal_memfile.h +++ b/ecal/core/src/io/shm/ecal_memfile.h @@ -33,6 +33,7 @@ #include "ecal_memfile_info.h" #include "io/mtx/ecal_named_mutex.h" +#include "io/mtx/shm_mutex_resolution.h" namespace eCAL { @@ -63,7 +64,7 @@ namespace eCAL * * @return true if it succeeds, false if it fails. **/ - bool Create(const char* name_, bool create_, size_t len_ = 0, bool auto_sanitizing_ = false); + bool Create(const char* name_, bool create_, detail::eResolvedMutexType mutex_type_, size_t len_ = 0, bool auto_sanitizing_ = false); /** * @brief Delete the associated memory file from system. @@ -225,6 +226,7 @@ namespace eCAL SInternalHeader m_header; std::shared_ptr m_memfile_info; CNamedMutex m_memfile_mutex; + detail::eResolvedMutexType m_memfile_mutex_type { detail::Resolve(eCAL::TransportLayer::SHM::DefaultMutexType()) }; private: CMemoryFile(const CMemoryFile&); // prevent copy-construction diff --git a/ecal/core/src/io/shm/ecal_memfile_pool.cpp b/ecal/core/src/io/shm/ecal_memfile_pool.cpp index efef7d2beb..b5097456ce 100644 --- a/ecal/core/src/io/shm/ecal_memfile_pool.cpp +++ b/ecal/core/src/io/shm/ecal_memfile_pool.cpp @@ -58,7 +58,7 @@ namespace eCAL Destroy(); } - bool CMemFileObserver::Create(const std::string& memfile_name_, const std::string& memfile_event_) + bool CMemFileObserver::Create(const std::string& memfile_name_, const std::string& memfile_event_, detail::eResolvedMutexType mutex_type_) { if (m_created) return false; @@ -67,7 +67,7 @@ namespace eCAL gOpenNamedEvent(&m_event_ack, memfile_event_ + "_ack", false); // create memory file access - m_memfile.Create(memfile_name_.c_str(), false); + m_memfile.Create(memfile_name_.c_str(), false, mutex_type_); m_created = true; @@ -370,7 +370,7 @@ namespace eCAL m_created = false; } - bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_) + bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, detail::eResolvedMutexType mutex_type_, int timeout_observation_ms, const MemFileDataCallbackT& callback_) { if(!m_created) return(false); if(memfile_name_.empty()) return(false); @@ -402,7 +402,7 @@ namespace eCAL else { auto observer = std::make_shared(m_memfile_map); - observer->Create(memfile_name_, memfile_event_); + observer->Create(memfile_name_, memfile_event_, mutex_type_); observer->Start(timeout_observation_ms, callback_); m_observer_pool[memfile_name_] = observer; #ifndef NDEBUG diff --git a/ecal/core/src/io/shm/ecal_memfile_pool.h b/ecal/core/src/io/shm/ecal_memfile_pool.h index d84c5745e5..c409d68291 100644 --- a/ecal/core/src/io/shm/ecal_memfile_pool.h +++ b/ecal/core/src/io/shm/ecal_memfile_pool.h @@ -59,7 +59,7 @@ namespace eCAL CMemFileObserver(CMemFileObserver&& rhs) = delete; CMemFileObserver& operator=(CMemFileObserver&& rhs) = delete; - bool Create(const std::string& memfile_name_, const std::string& memfile_event_); + bool Create(const std::string& memfile_name_, const std::string& memfile_event_, detail::eResolvedMutexType mutex_type_); bool Destroy(); bool Start(int timeout_, const MemFileDataCallbackT& callback_); @@ -98,7 +98,7 @@ namespace eCAL void Start(); void Stop(); - bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_); + bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, detail::eResolvedMutexType mutex_type_, int timeout_observation_ms, const MemFileDataCallbackT& callback_); protected: void CleanupPoolThread(); diff --git a/ecal/core/src/io/shm/ecal_memfile_sync.cpp b/ecal/core/src/io/shm/ecal_memfile_sync.cpp index 478c364355..995a430bbe 100644 --- a/ecal/core/src/io/shm/ecal_memfile_sync.cpp +++ b/ecal/core/src/io/shm/ecal_memfile_sync.cpp @@ -265,7 +265,7 @@ namespace eCAL if (memfile_size < m_attr.min_size) memfile_size = m_attr.min_size; // create the memory file - if (!m_memfile.Create(m_memfile_name.c_str(), true, memfile_size)) + if (!m_memfile.Create(m_memfile_name.c_str(), true, m_attr.mutex_type, memfile_size)) { Logging::Log(Logging::log_level_error, std::string("CSyncMemoryFile::Create FAILED : ") + m_memfile_name); return false; diff --git a/ecal/core/src/io/shm/ecal_memfile_sync.h b/ecal/core/src/io/shm/ecal_memfile_sync.h index f9995cd655..d26ea5c439 100644 --- a/ecal/core/src/io/shm/ecal_memfile_sync.h +++ b/ecal/core/src/io/shm/ecal_memfile_sync.h @@ -31,6 +31,7 @@ #include "readwrite/ecal_writer_data.h" #include "ecal_eventhandle.h" #include "ecal_memfile.h" +#include "io/mtx/shm_mutex_resolution.h" #include #include @@ -40,6 +41,7 @@ namespace eCAL { struct SSyncMemoryFileAttr { + detail::eResolvedMutexType mutex_type; size_t min_size; //!< memory file minimum size [Bytes] size_t reserve; //!< dynamic file size reserve before recreating memory file if payload size changes [%] int64_t timeout_open_ms; //!< timeout to open a memory file using mutex lock [ms] diff --git a/ecal/core/src/pubsub/config/builder/reader_attribute_builder.cpp b/ecal/core/src/pubsub/config/builder/reader_attribute_builder.cpp index 43dba6564a..f9212c9534 100644 --- a/ecal/core/src/pubsub/config/builder/reader_attribute_builder.cpp +++ b/ecal/core/src/pubsub/config/builder/reader_attribute_builder.cpp @@ -64,7 +64,7 @@ namespace eCAL attributes.tcp.max_reconnection_attempts = transport_layer_config.tcp.max_reconnections; attributes.shm.enable = subscriber_config.layer.shm.enable; - + return attributes; } -} \ No newline at end of file +} diff --git a/ecal/core/src/pubsub/config/builder/writer_attribute_builder.cpp b/ecal/core/src/pubsub/config/builder/writer_attribute_builder.cpp index 24a0a19613..e0501754ef 100644 --- a/ecal/core/src/pubsub/config/builder/writer_attribute_builder.cpp +++ b/ecal/core/src/pubsub/config/builder/writer_attribute_builder.cpp @@ -76,4 +76,4 @@ namespace eCAL return attributes; } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/config/attributes/reader_attributes.h b/ecal/core/src/readwrite/config/attributes/reader_attributes.h index 155ec99222..f3649fe31e 100644 --- a/ecal/core/src/readwrite/config/attributes/reader_attributes.h +++ b/ecal/core/src/readwrite/config/attributes/reader_attributes.h @@ -22,6 +22,7 @@ #include #include #include +#include "io/mtx/shm_mutex_resolution.h" namespace eCAL { @@ -46,6 +47,7 @@ namespace eCAL struct SSHMAttributes { bool enable; + detail::eResolvedMutexType mutex_type; }; struct SAttributes @@ -69,4 +71,4 @@ namespace eCAL bool share_topic_description; }; } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/config/attributes/writer_attributes.h b/ecal/core/src/readwrite/config/attributes/writer_attributes.h index 224126c961..22c4c26dff 100644 --- a/ecal/core/src/readwrite/config/attributes/writer_attributes.h +++ b/ecal/core/src/readwrite/config/attributes/writer_attributes.h @@ -25,6 +25,7 @@ #include #include +#include "io/mtx/shm_mutex_resolution.h" namespace eCAL { @@ -49,6 +50,7 @@ namespace eCAL struct SSHMAttributes { bool enable; + detail::eResolvedMutexType mutex_type; bool zero_copy_mode; unsigned int acknowledge_timeout_ms; unsigned int memfile_buffer_count; @@ -79,4 +81,4 @@ namespace eCAL SSHMAttributes shm; }; } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/config/builder/shm_attribute_builder.cpp b/ecal/core/src/readwrite/config/builder/shm_attribute_builder.cpp index f822810157..6e48e33be8 100644 --- a/ecal/core/src/readwrite/config/builder/shm_attribute_builder.cpp +++ b/ecal/core/src/readwrite/config/builder/shm_attribute_builder.cpp @@ -27,6 +27,7 @@ namespace eCAL { SHM::SAttributes attributes; + attributes.mutex_type = attr_.shm.mutex_type; attributes.process_id = attr_.process_id; attributes.registration_timeout_ms = attr_.registration_timeout_ms; @@ -40,6 +41,7 @@ namespace eCAL { SHM::SAttributes attributes; + attributes.mutex_type = attr_.shm.mutex_type; attributes.acknowledge_timeout_ms = attr_.shm.acknowledge_timeout_ms; attributes.memfile_buffer_count = attr_.shm.memfile_buffer_count; attributes.memfile_reserve_percent = attr_.shm.memfile_reserve_percent; @@ -51,4 +53,4 @@ namespace eCAL return attributes; } } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/shm/config/attributes/reader_shm_attributes.h b/ecal/core/src/readwrite/shm/config/attributes/reader_shm_attributes.h index 59784568ca..0e9700992c 100644 --- a/ecal/core/src/readwrite/shm/config/attributes/reader_shm_attributes.h +++ b/ecal/core/src/readwrite/shm/config/attributes/reader_shm_attributes.h @@ -19,6 +19,8 @@ #pragma once +#include "io/mtx/shm_mutex_resolution.h" + #include namespace eCAL @@ -29,9 +31,10 @@ namespace eCAL { struct SAttributes { + detail::eResolvedMutexType mutex_type; int process_id; unsigned int registration_timeout_ms; }; } } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/shm/config/attributes/writer_shm_attributes.h b/ecal/core/src/readwrite/shm/config/attributes/writer_shm_attributes.h index 3359a84375..b1ed095c2e 100644 --- a/ecal/core/src/readwrite/shm/config/attributes/writer_shm_attributes.h +++ b/ecal/core/src/readwrite/shm/config/attributes/writer_shm_attributes.h @@ -19,6 +19,8 @@ #pragma once +#include "io/mtx/shm_mutex_resolution.h" + #include namespace eCAL @@ -29,6 +31,7 @@ namespace eCAL { struct SAttributes { + detail::eResolvedMutexType mutex_type; unsigned int acknowledge_timeout_ms; unsigned int memfile_buffer_count; unsigned int memfile_min_size_bytes; @@ -39,4 +42,4 @@ namespace eCAL }; } } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp index 0933db175a..61142508c9 100644 --- a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp @@ -69,7 +69,7 @@ namespace eCAL { return OnNewShmFileContent(topic_info, buf_, len_, id_, clock_, time_, hash_); }; - m_memfile_thread_pool->ObserveFile(memfile_name, memfile_event, m_attributes.registration_timeout_ms, data_callback); + m_memfile_thread_pool->ObserveFile(memfile_name, memfile_event, m_attributes.mutex_type, m_attributes.registration_timeout_ms, data_callback); } } } diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp index e9e6e9515c..f9bf4609f0 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp @@ -175,6 +175,7 @@ namespace eCAL // prepare memfile attributes SSyncMemoryFileAttr memory_file_attr = {}; + memory_file_attr.mutex_type = m_attributes.mutex_type; memory_file_attr.min_size = m_attributes.memfile_min_size_bytes; memory_file_attr.reserve = m_attributes.memfile_reserve_percent; memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO; diff --git a/ecal/core/src/registration/shm/ecal_memfile_broadcast.cpp b/ecal/core/src/registration/shm/ecal_memfile_broadcast.cpp index 7297157fb7..8d2a24b2ec 100644 --- a/ecal/core/src/registration/shm/ecal_memfile_broadcast.cpp +++ b/ecal/core/src/registration/shm/ecal_memfile_broadcast.cpp @@ -77,7 +77,7 @@ namespace eCAL const auto presumably_memfile_size = RelocatableCircularQueue::PresumablyOccupiedMemorySize(m_attributes.queue_size) + sizeof(SMemfileBroadcastHeader); - if (!m_broadcast_memfile->Create(m_attributes.domain.c_str(), true, presumably_memfile_size, true)) + if (!m_broadcast_memfile->Create(m_attributes.domain.c_str(), true, detail::Resolve(eCAL::GetConfiguration().transport_layer.shm.mutex_type), presumably_memfile_size, true)) { #ifndef NDEBUG std::cerr << "Unable to access broadcast memory file." << std::endl; diff --git a/ecal/core/src/registration/shm/ecal_memfile_broadcast_reader.cpp b/ecal/core/src/registration/shm/ecal_memfile_broadcast_reader.cpp index d9cf1ec6ca..87697fe762 100644 --- a/ecal/core/src/registration/shm/ecal_memfile_broadcast_reader.cpp +++ b/ecal/core/src/registration/shm/ecal_memfile_broadcast_reader.cpp @@ -79,7 +79,7 @@ namespace eCAL if (is_new_payload_memfile && broadcast_event->type != eMemfileBroadcastEventType::EVENT_REMOVED) { if(!memfile_broadcast_payload.payload_memfile->Create( - BuildPayloadMemfileName(m_memfile_broadcast->GetName(), event_id).c_str(), false, 0)) + BuildPayloadMemfileName(m_memfile_broadcast->GetName(), event_id).c_str(), false, detail::Resolve(eCAL::GetConfiguration().transport_layer.shm.mutex_type), 0)) { #ifndef NDEBUG std::cerr << "Error opening payload memory file" << '\n'; diff --git a/ecal/core/src/registration/shm/ecal_memfile_broadcast_writer.cpp b/ecal/core/src/registration/shm/ecal_memfile_broadcast_writer.cpp index 3b9f3856a7..09aa112b51 100644 --- a/ecal/core/src/registration/shm/ecal_memfile_broadcast_writer.cpp +++ b/ecal/core/src/registration/shm/ecal_memfile_broadcast_writer.cpp @@ -42,7 +42,7 @@ namespace eCAL m_memfile_broadcast = memfile_broadcast; m_event_id = CreateEventId(); m_payload_memfile = std::make_unique(m_memfile_map); - if (!m_payload_memfile->Create(BuildPayloadMemfileName(m_memfile_broadcast->GetName(), m_event_id).c_str(), true, 1024)) + if (!m_payload_memfile->Create(BuildPayloadMemfileName(m_memfile_broadcast->GetName(), m_event_id).c_str(), true, detail::Resolve(eCAL::GetConfiguration().transport_layer.shm.mutex_type), 1024)) { #ifndef NDEBUG std::cerr << "Unable to create payload memfile" << '\n'; @@ -64,7 +64,7 @@ namespace eCAL { auto payload_memfile = std::make_unique(m_memfile_map); const auto event_id = CreateEventId(); - if (!payload_memfile->Create(BuildPayloadMemfileName(m_memfile_broadcast->GetName(), event_id).c_str(), true, size * 2)) + if (!payload_memfile->Create(BuildPayloadMemfileName(m_memfile_broadcast->GetName(), event_id).c_str(), true, detail::Resolve(eCAL::GetConfiguration().transport_layer.shm.mutex_type), size * 2)) { #ifndef NDEBUG std::cerr << "Unable to create new payload memory file" << '\n'; diff --git a/ecal/tests/cpp/config_test/src/ini_file.h b/ecal/tests/cpp/config_test/src/ini_file.h index 0c24fd5e76..d924a03716 100644 --- a/ecal/tests/cpp/config_test/src/ini_file.h +++ b/ecal/tests/cpp/config_test/src/ini_file.h @@ -95,11 +95,8 @@ const std::string ini_file_as_string_yaml = R"(# _____ _ _ # Reconnection attemps the session will try to reconnect in case of an issue max_reconnections: 7 - shm: - # Default memory file size for new publisher - memfile_min_size_bytes: 4096 - # Dynamic file size reserve before recreating memory file if topic size changes - memfile_reserve_percent: 50 + shm: + mutex_type: "recoverable_mutex" # Publisher specific base settings @@ -109,6 +106,7 @@ const std::string ini_file_as_string_yaml = R"(# _____ _ _ shm: # Enable layer enable: false + mutex_type: "mutex" # Enable zero copy shared memory transport mode zero_copy_mode: false # Force connected subscribers to send acknowledge event after processing the message. @@ -222,4 +220,4 @@ const std::string ini_file_as_string_yaml = R"(# _____ _ _ # UDP port: 14001 -)"; \ No newline at end of file +)"; diff --git a/ecal/tests/cpp/config_test/src/yaml_processing_test.cpp b/ecal/tests/cpp/config_test/src/yaml_processing_test.cpp index 3c4acb7f0d..e6c44de072 100644 --- a/ecal/tests/cpp/config_test/src/yaml_processing_test.cpp +++ b/ecal/tests/cpp/config_test/src/yaml_processing_test.cpp @@ -32,6 +32,7 @@ #endif #include "config/default_configuration.h" #include "ecal_def.h" +#include "io/mtx/shm_mutex_resolution.h" #ifdef ECAL_CORE_CONFIGURATION TEST(core_cpp_config_yaml /*unused*/, yaml_processing_comparison /*unused*/) @@ -67,6 +68,7 @@ TEST(core_cpp_config_yaml /*unused*/, yaml_processing_comparison /*unused*/) config.transport_layer.tcp.number_executor_reader = 9; config.transport_layer.tcp.number_executor_writer = 10; config.transport_layer.tcp.max_reconnections = 11; + config.transport_layer.shm.mutex_type = eCAL::TransportLayer::SHM::eMutexType::mutex; config.publisher.layer.shm.enable = false; config.publisher.layer.shm.zero_copy_mode = true; @@ -130,6 +132,7 @@ TEST(core_cpp_config_yaml /*unused*/, yaml_processing_comparison /*unused*/) EXPECT_EQ(config.transport_layer.tcp.number_executor_reader, config_from_yaml.transport_layer.tcp.number_executor_reader); EXPECT_EQ(config.transport_layer.tcp.number_executor_writer, config_from_yaml.transport_layer.tcp.number_executor_writer); EXPECT_EQ(config.transport_layer.tcp.max_reconnections, config_from_yaml.transport_layer.tcp.max_reconnections); + EXPECT_EQ(config.transport_layer.shm.mutex_type, config_from_yaml.transport_layer.shm.mutex_type); EXPECT_EQ(config.publisher.layer.shm.enable, config_from_yaml.publisher.layer.shm.enable); EXPECT_EQ(config.publisher.layer.shm.zero_copy_mode, config_from_yaml.publisher.layer.shm.zero_copy_mode); EXPECT_EQ(config.publisher.layer.shm.acknowledge_timeout_ms, config_from_yaml.publisher.layer.shm.acknowledge_timeout_ms); diff --git a/ecal/tests/cpp/io_memfile_test/CMakeLists.txt b/ecal/tests/cpp/io_memfile_test/CMakeLists.txt index b5ba9e3fb9..fdf20070a3 100644 --- a/ecal/tests/cpp/io_memfile_test/CMakeLists.txt +++ b/ecal/tests/cpp/io_memfile_test/CMakeLists.txt @@ -25,6 +25,7 @@ set(memfile_test_src src/memfile_test.cpp src/memfile_naming_test.cpp src/named_mutex_test.cpp + ${ECAL_CORE_PROJECT_ROOT}/core/src/config/shm_mutex.cpp ${ECAL_CORE_PROJECT_ROOT}/core/src/io/mtx/ecal_named_mutex.cpp ${ECAL_CORE_PROJECT_ROOT}/core/src/io/shm/ecal_memfile.cpp ${ECAL_CORE_PROJECT_ROOT}/core/src/io/shm/ecal_memfile_db.cpp diff --git a/ecal/tests/cpp/io_memfile_test/src/memfile_test.cpp b/ecal/tests/cpp/io_memfile_test/src/memfile_test.cpp index a06f8cf5dc..5ba1a0010d 100644 --- a/ecal/tests/cpp/io_memfile_test/src/memfile_test.cpp +++ b/ecal/tests/cpp/io_memfile_test/src/memfile_test.cpp @@ -20,6 +20,7 @@ #include "io/shm/ecal_memfile.h" #include "io/shm/ecal_memfile_db.h" +#include "io/mtx/shm_mutex_resolution.h" #include #include @@ -32,6 +33,11 @@ #include #include +namespace +{ + const auto default_mutex_type = eCAL::detail::Resolve(eCAL::TransportLayer::SHM::DefaultMutexType()); +} + namespace eCAL { std::shared_ptr g_memfile_map() @@ -62,7 +68,7 @@ TEST(core_cpp_core, MemFile_ReadWrite) EXPECT_EQ(false, mem_file.IsCreated()); // create memory file - EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, slen)); + EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, default_mutex_type, slen)); // check creation state EXPECT_EQ(true, mem_file.IsCreated()); @@ -159,7 +165,7 @@ TEST(core_cpp_core, MemFile_Perf) size_t slen = send_s.size(); // create memory file - EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, slen)); + EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, default_mutex_type, slen)); // start time auto start = std::chrono::high_resolution_clock::now(); @@ -215,7 +221,7 @@ TEST(core_cpp_core, MemFile_Concurrency) const size_t runs(10); // create memory file - EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, buflen)); + EXPECT_EQ(true, mem_file.Create(memfile_name.c_str(), true, default_mutex_type, buflen)); // producer thread auto num_writes(0); diff --git a/ecal/tests/cpp/io_memfile_test/src/named_mutex_test.cpp b/ecal/tests/cpp/io_memfile_test/src/named_mutex_test.cpp index 2c66e16731..0afebc188f 100644 --- a/ecal/tests/cpp/io_memfile_test/src/named_mutex_test.cpp +++ b/ecal/tests/cpp/io_memfile_test/src/named_mutex_test.cpp @@ -53,12 +53,14 @@ namespace return name; } + + const auto default_mutex_type = eCAL::detail::Resolve(eCAL::TransportLayer::SHM::DefaultMutexType()); } TEST(core_cpp_io, MutexLockUnlock) { const std::string mutex_name = RandomMutexName20(); - eCAL::CNamedMutex mutex(mutex_name); + eCAL::CNamedMutex mutex(mutex_name, default_mutex_type); EXPECT_EQ(true, mutex.Lock(0)); mutex.Unlock(); } @@ -83,7 +85,7 @@ void MutexParallelCreateTest(bool robust) for (int i = 0; i < runs; ++i) { barrier.wait(); - eCAL::CNamedMutex mutex(mutex_name + "_" + std::to_string(i), robust); + eCAL::CNamedMutex mutex(mutex_name + "_" + std::to_string(i), default_mutex_type); if (mutex.IsCreated()){++number_times_created;} barrier.wait(); if (mutex.Lock(100)) @@ -135,7 +137,7 @@ void MutexParallelLockUnlock(bool robust) auto mutex = [&barrier, &mutex_name, runs, robust, &lock_count]() { barrier.wait(); - eCAL::CNamedMutex mutex(mutex_name, robust); + eCAL::CNamedMutex mutex(mutex_name, default_mutex_type); for (int i = 1; i <= runs; ++i) { barrier.wait();