diff --git a/app/rec/rec_client_core/include/rec_client_core/topic_info.h b/app/rec/rec_client_core/include/rec_client_core/topic_info.h index 19e5f4986b..1f1970acf6 100644 --- a/app/rec/rec_client_core/include/rec_client_core/topic_info.h +++ b/app/rec/rec_client_core/include/rec_client_core/topic_info.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ #include #include #include +#include namespace eCAL { @@ -75,5 +76,7 @@ namespace eCAL std::map> publishers_; }; + + using TopicInfoMap = std::map; } } \ No newline at end of file diff --git a/app/rec/rec_client_core/src/ecal_rec_impl.cpp b/app/rec/rec_client_core/src/ecal_rec_impl.cpp index da219b6a88..f06bf9728f 100644 --- a/app/rec/rec_client_core/src/ecal_rec_impl.cpp +++ b/app/rec/rec_client_core/src/ecal_rec_impl.cpp @@ -721,12 +721,12 @@ namespace eCAL return subscribed_topics; } - void EcalRecImpl::EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_) + void EcalRecImpl::EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SDataTypeInformation& datatype_info_, const eCAL::SReceiveCallbackData& callback_data_) { auto ecal_receive_time = eCAL::Time::ecal_clock::now(); auto system_receive_time = std::chrono::steady_clock::now(); - std::shared_ptr frame = std::make_shared(&data_, topic_id_.topic_name, ecal_receive_time, system_receive_time); + std::shared_ptr frame = std::make_shared(callback_data_, topic_id_, ecal_receive_time, system_receive_time); // Add to the pre-buffer (it is thread-safe by using a mutex internally) pre_buffer_.push_back(frame); @@ -743,7 +743,7 @@ namespace eCAL { // Add to the subscriber statistics const std::lock_guard subscriber_statistics_lock(subscriber_throughput_mutex_); - subscriber_throughput_statistics_.AddFrame(data_.buffer_size); + subscriber_throughput_statistics_.AddFrame(callback_data_.buffer_size); } } @@ -760,7 +760,7 @@ namespace eCAL pre_buffer_.remove_old_frames(); } - void EcalRecImpl::SetTopicInfo(const std::map& topic_info_map) + void EcalRecImpl::SetTopicInfo(const TopicInfoMap& topic_info_map) { // Create subscribers for new topics if necessary { @@ -801,7 +801,7 @@ namespace eCAL RemoveOldSubscribers_NoLock(filtered_topic_set); } - std::set EcalRecImpl::FilterAvailableTopics_NoLock(const std::map& topic_info_map) const + std::set EcalRecImpl::FilterAvailableTopics_NoLock(const TopicInfoMap& topic_info_map) const { std::set topic_set; @@ -815,13 +815,13 @@ namespace eCAL // Evaluate the record mode (All / Blacklist / Whitelist) if ((record_mode_ == RecordMode::Blacklist) - && (listed_topics_.find(topic_info.first) != listed_topics_.end())) + && (listed_topics_.find(topic_info.first.topic_name) != listed_topics_.end())) { // The topic is blacklisted continue; } else if ((record_mode_ == RecordMode::Whitelist) - && (listed_topics_.find(topic_info.first) == listed_topics_.end())) + && (listed_topics_.find(topic_info.first.topic_name) == listed_topics_.end())) { // The topic is not whitelisted continue; @@ -847,7 +847,7 @@ namespace eCAL } // Add the topic to the filtered set if we haven't found any reason not to do that :) - topic_set.emplace(topic_info.first); + topic_set.emplace(topic_info.first.topic_name); } return topic_set; @@ -867,7 +867,7 @@ namespace eCAL info_ = { false, "Error creating eCAL subsribers" }; continue; } - subscriber->SetReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_3)); + subscriber->SetReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); subscriber_map_.emplace(topic, std::move(subscriber)); } } diff --git a/app/rec/rec_client_core/src/ecal_rec_impl.h b/app/rec/rec_client_core/src/ecal_rec_impl.h index d73382a159..966bbc03a7 100644 --- a/app/rec/rec_client_core/src/ecal_rec_impl.h +++ b/app/rec/rec_client_core/src/ecal_rec_impl.h @@ -125,7 +125,7 @@ namespace eCAL std::set GetSubscribedTopics() const; - void EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_); + void EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SDataTypeInformation& datatype_info_, const eCAL::SReceiveCallbackData& data_); Throughput GetSubscriberThroughput() const; @@ -134,7 +134,7 @@ namespace eCAL ////////////////////////////////////// void GarbageCollect(); - void SetTopicInfo(const std::map& topic_info_map); + void SetTopicInfo(const TopicInfoMap& topic_info_map); ////////////////////////////////////////////////////////////////////////////// //// Private functions //// @@ -142,7 +142,7 @@ namespace eCAL private: void UpdateAndCleanSubscribers(); - std::set FilterAvailableTopics_NoLock(const std::map& topic_info_map) const; + std::set FilterAvailableTopics_NoLock(const TopicInfoMap& topic_info_map) const; void CreateNewSubscribers_NoLock(const std::set& topic_set); void RemoveOldSubscribers_NoLock(const std::set& topic_set); diff --git a/app/rec/rec_client_core/src/frame.h b/app/rec/rec_client_core/src/frame.h index d7f1862722..1499e8b682 100644 --- a/app/rec/rec_client_core/src/frame.h +++ b/app/rec/rec_client_core/src/frame.h @@ -32,16 +32,15 @@ namespace eCAL class Frame { public: - Frame(const eCAL::SReceiveCallbackData* const callback_data, const std::string& topic_name, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time) - : ecal_publish_time_(std::chrono::duration_cast(std::chrono::microseconds(callback_data->send_timestamp))) + Frame(const eCAL::SReceiveCallbackData& callback_data, const STopicId& topic_id, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time) + : ecal_publish_time_(std::chrono::duration_cast(std::chrono::microseconds(callback_data.send_timestamp))) , ecal_receive_time_(receive_time) , system_receive_time_(system_receive_time) - , topic_name_(topic_name) - , clock_(callback_data->send_clock) - , id_(0) // TODO: We don't receive ids any more. We shoud probably adapt the frame class here. + , topic_id_(topic_id) + , clock_(callback_data.send_clock) { - data_.reserve(callback_data->buffer_size); - data_.assign((char*)callback_data->buffer, (char*)callback_data->buffer + callback_data->buffer_size); + data_.reserve(callback_data.buffer_size); + data_.assign((char*)callback_data.buffer, (char*)callback_data.buffer + callback_data.buffer_size); } Frame() @@ -50,16 +49,14 @@ namespace eCAL , ecal_receive_time_(eCAL::Time::ecal_clock::time_point(eCAL::Time::ecal_clock::duration(0))) , system_receive_time_(std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(0))) , clock_(0) - , id_(0) {} std::vector data_; eCAL::Time::ecal_clock::time_point ecal_publish_time_; eCAL::Time::ecal_clock::time_point ecal_receive_time_; std::chrono::steady_clock::time_point system_receive_time_; - std::string topic_name_; + STopicId topic_id_; long long clock_; - long long id_; }; } } \ No newline at end of file diff --git a/app/rec/rec_client_core/src/job/hdf5_writer_thread.cpp b/app/rec/rec_client_core/src/job/hdf5_writer_thread.cpp index 51bbe52525..a1f4e73246 100644 --- a/app/rec/rec_client_core/src/job/hdf5_writer_thread.cpp +++ b/app/rec/rec_client_core/src/job/hdf5_writer_thread.cpp @@ -22,7 +22,7 @@ #include #include -#include +#include #include #include "ThreadingUtils/InterruptibleThread.h" @@ -51,7 +51,7 @@ namespace eCAL // Constructor & Destructor /////////////////////////////// - Hdf5WriterThread::Hdf5WriterThread(const JobConfig& job_config, const std::map& initial_topic_info_map, const std::deque>& initial_frame_buffer) + Hdf5WriterThread::Hdf5WriterThread(const JobConfig& job_config, const TopicInfoMap& initial_topic_info_map, const std::deque>& initial_frame_buffer) : InterruptibleThread () , job_config_ (job_config) , frame_buffer_ (initial_frame_buffer) @@ -69,7 +69,7 @@ namespace eCAL total_size_bytes_ += frame->data_.size(); } - hdf5_writer_ = std::make_unique(); + hdf5_writer_ = std::make_unique(); } Hdf5WriterThread::~Hdf5WriterThread() @@ -111,7 +111,7 @@ namespace eCAL } } - void Hdf5WriterThread::SetTopicInfo(std::map topic_info_map) + void Hdf5WriterThread::SetTopicInfo(TopicInfoMap topic_info_map) { std::unique_lock input_lock(input_mutex_); @@ -157,7 +157,7 @@ namespace eCAL // Topic info to write to the HDF5 file bool set_topic_info_map = false; - std::map topic_info_map_to_set; + TopicInfoMap topic_info_map_to_set; { // Lock the input mutex @@ -198,8 +198,10 @@ namespace eCAL for (const auto& topic : topic_info_map_to_set) { + // convert from eCAL core types to eCAL measurement types before writing to the measurement. + eh5::SChannel channel{ topic.first.topic_name, topic.first.topic_id.entity_id }; eCAL::experimental::measurement::base::DataTypeInformation const topic_info{ topic.second.tinfo_.name, topic.second.tinfo_.encoding, topic.second.tinfo_.descriptor }; - hdf5_writer_->SetChannelDataTypeInformation(topic.first, topic_info); + hdf5_writer_->SetChannelDataTypeInformation(channel, topic_info); } } else if (frame) @@ -210,14 +212,17 @@ namespace eCAL break; // Write Frame element to HDF5 + eh5::SWriteEntry entry; + // in hdf5, ids are integers, however SEntityIds are not, so we need to convert. + entry.channel = eh5::SChannel(frame->topic_id_.topic_name, frame->topic_id_.topic_id.entity_id); + entry.data = frame->data_.data(); + entry.size = frame->data_.size(); + entry.snd_timestamp = std::chrono::duration_cast(frame->ecal_publish_time_.time_since_epoch()).count(); + entry.rcv_timestamp = std::chrono::duration_cast(frame->ecal_receive_time_.time_since_epoch()).count(); + entry.clock = frame->clock_; + if (hdf5_writer_->AddEntryToFile( - frame->data_.data(), - frame->data_.size(), - std::chrono::duration_cast(frame->ecal_publish_time_.time_since_epoch()).count(), - std::chrono::duration_cast(frame->ecal_receive_time_.time_since_epoch()).count(), - frame->topic_name_, - frame->id_, - frame->clock_ + entry )) { const std::lock_guard throughput_statistics_lock(throughput_statistics_mutex_); @@ -306,7 +311,7 @@ namespace eCAL #endif // NDEBUG std::unique_lock hdf5_writer_lock(hdf5_writer_mutex_); - if (hdf5_writer_->Open(hdf5_dir, eCAL::eh5::v2::eAccessType::CREATE)) + if (hdf5_writer_->Open(hdf5_dir, eCAL::eh5::eAccessType::CREATE)) { #ifndef NDEBUG EcalRecLogger::Instance()->debug("Hdf5WriterThread::Open(): Successfully opened HDF5-Writer with path \"" + hdf5_dir + "\""); diff --git a/app/rec/rec_client_core/src/job/hdf5_writer_thread.h b/app/rec/rec_client_core/src/job/hdf5_writer_thread.h index d1de93a673..32ce2677d6 100644 --- a/app/rec/rec_client_core/src/job/hdf5_writer_thread.h +++ b/app/rec/rec_client_core/src/job/hdf5_writer_thread.h @@ -50,7 +50,7 @@ namespace eCAL // Constructor & Destructor /////////////////////////////// public: - Hdf5WriterThread(const JobConfig& job_config, const std::map& initial_topic_info_map = {}, const std::deque>& initial_frame_buffer = {}); + Hdf5WriterThread(const JobConfig& job_config, const TopicInfoMap& initial_topic_info_map = {}, const std::deque>& initial_frame_buffer = {}); ~Hdf5WriterThread(); @@ -62,7 +62,7 @@ namespace eCAL bool AddFrame(const std::shared_ptr& frame); - void SetTopicInfo(std::map topic_info_map); // CALL BY VALUE (-> copy) IS INTENDED! + void SetTopicInfo(TopicInfoMap topic_info_map); // CALL BY VALUE (-> copy) IS INTENDED! void Flush(); @@ -100,13 +100,13 @@ namespace eCAL size_t written_frames_count_; std::chrono::steady_clock::time_point first_written_frame_timestamp_; std::chrono::steady_clock::time_point last_written_frame_timestamp_; - std::map new_topic_info_map_; /**< The new topic info map that shall be set to the HDF5 writer */ + TopicInfoMap new_topic_info_map_; /**< The new topic info map that shall be set to the HDF5 writer */ bool new_topic_info_map_available_; /**< Telling that a new topic info map has been set from the outside. */ mutable RecHdf5JobStatus last_status_; - mutable std::mutex hdf5_writer_mutex_; - std::unique_ptr hdf5_writer_; + mutable std::mutex hdf5_writer_mutex_; + std::unique_ptr hdf5_writer_; std::atomic flushing_; diff --git a/app/rec/rec_client_core/src/job/record_job.cpp b/app/rec/rec_client_core/src/job/record_job.cpp index ddbfc84c36..851dacc41a 100644 --- a/app/rec/rec_client_core/src/job/record_job.cpp +++ b/app/rec/rec_client_core/src/job/record_job.cpp @@ -283,7 +283,7 @@ namespace eCAL return true; } - bool RecordJob::StartRecording(const std::map& initial_topic_info_map, const std::deque>& initial_frame_buffer) + bool RecordJob::StartRecording(const TopicInfoMap& initial_topic_info_map, const std::deque>& initial_frame_buffer) { std::unique_lock lock(job_mutex_); @@ -317,7 +317,7 @@ namespace eCAL } - bool RecordJob::SaveBuffer(const std::map& topic_info_map, const std::deque>& frame_buffer) + bool RecordJob::SaveBuffer(const TopicInfoMap& topic_info_map, const std::deque>& frame_buffer) { std::unique_lock lock(job_mutex_); @@ -344,7 +344,7 @@ namespace eCAL return hdf5_writer_thread_->AddFrame(frame); } - void RecordJob::SetTopicInfo(const std::map& topic_info_map) + void RecordJob::SetTopicInfo(const TopicInfoMap& topic_info_map) { std::shared_lock lock(job_mutex_); if ((main_recorder_state_ != JobState::Recording) || !hdf5_writer_thread_) diff --git a/app/rec/rec_client_core/src/job/record_job.h b/app/rec/rec_client_core/src/job/record_job.h index 70d1175fa8..5d7bc3396b 100644 --- a/app/rec/rec_client_core/src/job/record_job.h +++ b/app/rec/rec_client_core/src/job/record_job.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,12 +64,12 @@ namespace eCAL /////////////////////////////////////////////// public: bool InitializeMeasurementDirectory(); - bool StartRecording(const std::map& initial_topic_info_map, const std::deque>& initial_frame_buffer); + bool StartRecording(const TopicInfoMap& initial_topic_info_map, const std::deque>& initial_frame_buffer); bool StopRecording (); - bool SaveBuffer (const std::map& topic_info_map, const std::deque>& frame_buffer); + bool SaveBuffer (const TopicInfoMap& topic_info_map, const std::deque>& frame_buffer); bool AddFrame(const std::shared_ptr& frame); - void SetTopicInfo(const std::map& topic_info_map); + void SetTopicInfo(const TopicInfoMap& topic_info_map); eCAL::rec::Error Upload(const UploadConfig& upload_config); diff --git a/app/rec/rec_client_core/src/monitoring_thread.cpp b/app/rec/rec_client_core/src/monitoring_thread.cpp index 53efd33ff7..38a8b0cc15 100644 --- a/app/rec/rec_client_core/src/monitoring_thread.cpp +++ b/app/rec/rec_client_core/src/monitoring_thread.cpp @@ -23,16 +23,6 @@ #include -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable: 4100 4127 4146 4505 4800 4189 4592) // disable proto warnings -#endif -#include -#include -#ifdef _MSC_VER -#pragma warning(pop) -#endif - #include "rec_client_core/ecal_rec_logger.h" #include "ecal_rec_impl.h" @@ -50,156 +40,26 @@ namespace eCAL void MonitoringThread::Loop() { - std::string monitoring_string; - eCAL::pb::Monitoring monitoring_pb; + TopicInfoMap topic_info_snapshot; + std::set publisher_ids; + std::ignore = eCAL::Registration::GetPublisherIDs(publisher_ids); - if (eCAL::Monitoring::GetMonitoring(monitoring_string)) + for (const auto& publisher_id : publisher_ids) { - monitoring_pb.Clear(); - monitoring_pb.ParseFromString(monitoring_string); - - - { - std::lock_guard monitoring_lock(monitoring_mutex_); - - // Clear publisher lists of all topics - for (auto& topic_info : topic_info_map_) - { - topic_info.second.publishers_.clear(); - } - - // Collect all descriptors - std::map>> channel_descriptor_map; // ChannelName -> {Type, Descriptor} - std::map> type_descriptor_map; // Type -> Descriptor (used for topics that we know the type of, but have no other information available) - - static const int DESCRIPTION_AVAILABLE_QUALITYBIT = 0x1 << 3; // Having a descriptor at all is the most important thing - static const int INFO_COMES_FROM_CORRECT_TOPIC_QUALITYBIT = 0x1 << 2; // The information comes from the current topic (and has not been borrowed from another topic) - static const int INFO_COMES_FROM_PUBLISHER_QUALITYBIT = 0x1 << 1; // A descriptor coming from the publisher is better than one from a subsriber, as we assume that the publisher knows best what he is publishing - static const int TYPE_AVAILABLE_QUALITYBIT = 0x1 << 0; // Having information about the type's name available is nice but not that important to us. - - for (const auto& topic : monitoring_pb.topics()) - { - // Lookup the topic map entry - auto topic_info_map_it = topic_info_map_.find(topic.topic_name()); - if (topic_info_map_it == topic_info_map_.end()) - { - // Create a new topic entry - topic_info_map_.emplace(topic.topic_name(), eCAL::rec::TopicInfo("", "", "")); - topic_info_map_it = topic_info_map_.find(topic.topic_name()); - } - - // Create combined encoding:type type (to be fully compatible to old behavior) - std::string combined_enc_type = eCAL::Util::CombinedTopicEncodingAndType(topic.datatype_information().encoding(), topic.datatype_information().name()); - - // Evaluate the quality of the current descriptor information - int this_topic_info_quality = 0; - - if (!topic.datatype_information().descriptor_information().empty()) - { - this_topic_info_quality |= DESCRIPTION_AVAILABLE_QUALITYBIT; - } - - this_topic_info_quality |= INFO_COMES_FROM_CORRECT_TOPIC_QUALITYBIT; - - if (EcalUtils::String::Icompare(topic.direction(), "publisher")) - { - this_topic_info_quality |= INFO_COMES_FROM_PUBLISHER_QUALITYBIT; - - // Also update the publisher list - auto existing_publisher_it = topic_info_map_it->second.publishers_.find(topic.host_name()); - if (existing_publisher_it != topic_info_map_it->second.publishers_.end()) - { - existing_publisher_it->second.emplace(topic.unit_name()); - } - else - { - topic_info_map_it->second.publishers_.emplace(topic.host_name(), std::set{topic.unit_name()}); - } - } - - if (!combined_enc_type.empty()) - { - this_topic_info_quality |= TYPE_AVAILABLE_QUALITYBIT; - } - - // Update the channel_descriptor_map - { - auto channel_descriptor_map_it = channel_descriptor_map.find(topic.topic_name()); - if (channel_descriptor_map_it == channel_descriptor_map.end()) - { - // Save the new descriptor - channel_descriptor_map.emplace(topic.topic_name(), std::make_pair(this_topic_info_quality, std::make_pair(combined_enc_type, topic.datatype_information().descriptor_information()))); - } - else - { - if(channel_descriptor_map_it->second.first < this_topic_info_quality) - { - // If the old descriptor has a lower quality than the current descriptor, we may overwrite it! - channel_descriptor_map_it->second = std::make_pair(this_topic_info_quality, std::make_pair(combined_enc_type, topic.datatype_information().descriptor_information())); - } - } - } - - // Update the type_descriptor_map (can of course only work if we have the type information available) - if (!combined_enc_type.empty()) - { - int quality_for_other_channels = (this_topic_info_quality & ~INFO_COMES_FROM_CORRECT_TOPIC_QUALITYBIT); - - auto type_descriptor_map_it = type_descriptor_map.find(combined_enc_type); - if (type_descriptor_map_it == type_descriptor_map.end()) - { - // Save the new descriptor - type_descriptor_map.emplace(combined_enc_type, std::make_pair(quality_for_other_channels, topic.datatype_information().descriptor_information())); - } - else - { - if(type_descriptor_map_it->second.first < quality_for_other_channels) - { - // If the old descriptor has a lower quality than the current descriptor, we may overwrite it! - type_descriptor_map_it->second = std::make_pair(quality_for_other_channels, topic.datatype_information().descriptor_information()); - } - } - } - } - - // Update the type/descriptor information of the topic_info_map_ - for (auto& topic_info_map_entry : topic_info_map_) - { - auto channel_descriptor_entry_it = channel_descriptor_map.find(topic_info_map_entry.first); - if ((channel_descriptor_entry_it != channel_descriptor_map.end()) - && (channel_descriptor_entry_it->second.first >= topic_info_map_entry.second.description_quality_)) - { - topic_info_map_entry.second.SetLegacyType(channel_descriptor_entry_it->second.second.first); - topic_info_map_entry.second.tinfo_.descriptor = channel_descriptor_entry_it->second.second.second; - topic_info_map_entry.second.description_quality_ = channel_descriptor_entry_it->second.first; - } - - if (!topic_info_map_entry.second.GetLegacyType().empty()) - { - auto type_descriptor_entry_it = type_descriptor_map.find(topic_info_map_entry.second.GetLegacyType()); - if ((type_descriptor_entry_it != type_descriptor_map.end()) - && (type_descriptor_entry_it->second.first >= topic_info_map_entry.second.description_quality_)) - { - topic_info_map_entry.second.tinfo_.descriptor = type_descriptor_entry_it->second.second; - topic_info_map_entry.second.description_quality_ = type_descriptor_entry_it->second.first; - } - } - } - } - - recorder_.SetTopicInfo(topic_info_map_); - } - else - { - EcalRecLogger::Instance()->debug("eCAL::Monitoring::GetMonitoring - failure"); + SDataTypeInformation datatype_info; + eCAL::Registration::GetPublisherInfo(publisher_id, datatype_info); + topic_info_snapshot.emplace(std::make_pair( publisher_id , std::move(datatype_info))); } + + std::lock_guard monitoring_lock(monitoring_mutex_); + topic_info_map_ = std::move(topic_info_snapshot); } - TopicInfo MonitoringThread::GetTopicInfo(const std::string& topic_name) const + TopicInfo MonitoringThread::GetTopicInfo(const STopicId& topic) const { std::lock_guard monitoring_lock(monitoring_mutex_); - auto topic_info_map_it = topic_info_map_.find(topic_name); + auto topic_info_map_it = topic_info_map_.find(topic); if (topic_info_map_it != topic_info_map_.end()) { return topic_info_map_it->second; @@ -210,7 +70,7 @@ namespace eCAL } } - std::map MonitoringThread::GetTopicInfoMap() const + TopicInfoMap MonitoringThread::GetTopicInfoMap() const { std::lock_guard monitoring_lock(monitoring_mutex_); return topic_info_map_; diff --git a/app/rec/rec_client_core/src/monitoring_thread.h b/app/rec/rec_client_core/src/monitoring_thread.h index 8d5bd83d88..d1f0a4f2f5 100644 --- a/app/rec/rec_client_core/src/monitoring_thread.h +++ b/app/rec/rec_client_core/src/monitoring_thread.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ #include #include - +#include #include "rec_client_core/topic_info.h" namespace eCAL @@ -37,18 +37,18 @@ namespace eCAL MonitoringThread(EcalRecImpl& recorder); ~MonitoringThread(); - TopicInfo GetTopicInfo(const std::string& topic_name) const; + TopicInfo GetTopicInfo(const STopicId& topic) const; - std::map GetTopicInfoMap() const; + TopicInfoMap GetTopicInfoMap() const; protected: void Loop() override; private: - EcalRecImpl& recorder_; + EcalRecImpl& recorder_; mutable std::mutex monitoring_mutex_; - std::map topic_info_map_; + TopicInfoMap topic_info_map_; }; } } \ No newline at end of file diff --git a/app/rec/rec_gui/src/widgets/topic_widget/topic_list_model.cpp b/app/rec/rec_gui/src/widgets/topic_widget/topic_list_model.cpp index 073f66b24e..d39268b4c3 100644 --- a/app/rec/rec_gui/src/widgets/topic_widget/topic_list_model.cpp +++ b/app/rec/rec_gui/src/widgets/topic_widget/topic_list_model.cpp @@ -265,7 +265,7 @@ void TopicListModel::reset(const std::mapname.c_str(); // TODO this will print the first, maybe we need more here? topic_info.publishers_ = topic.second.publishers_; topic_info.listed_ = (listed_topics.find(topic.first) != listed_topics.end()); topic_info.in_topic_info_map_ = true; diff --git a/app/rec/rec_server_core/include/rec_server_core/rec_server_types.h b/app/rec/rec_server_core/include/rec_server_core/rec_server_types.h index 3c25babc1c..52d94f1992 100644 --- a/app/rec/rec_server_core/include/rec_server_core/rec_server_types.h +++ b/app/rec/rec_server_core/include/rec_server_core/rec_server_types.h @@ -36,21 +36,32 @@ namespace eCAL { namespace rec_server { + using TopicName_T = std::string; + using Hostname_T = std::string; + using UnitName_T = std::string; + using DataFrequency_T = double; + using ProcessID_T = int32_t; + using IsEcalRecClientRunning_T = bool; + struct TopicInfo { - TopicInfo(const std::string& type) - : type_(type) - {} + TopicInfo() + { + } - std::string type_; ///< Type of the topic (e.g. the protobuf-type) - std::map> publishers_; ///< {hostname: [publisher_names]} - std::map, double> rec_subscribers_; ///< {(hostname, process_id): data_frequency} - }; + void AddTypeInfo(const eCAL::SDataTypeInformation& data_type_info) + { + type_.insert(data_type_info); + } - typedef std::map TopicInfoMap_T; - typedef std::map HostsRunningEcalRec_T; - typedef std::function PostUpdateCallback_T; + std::set type_; + std::map> publishers_; + std::map, DataFrequency_T> rec_subscribers_; + }; - typedef std::map> RecorderStatusMap_T; + using TopicInfoMap_T = std::map; + using HostsRunningEcalRec_T = std::map; + using PostUpdateCallback_T = std::function; + using RecorderStatusMap_T = std::map>; } } diff --git a/app/rec/rec_server_core/src/monitoring_thread.cpp b/app/rec/rec_server_core/src/monitoring_thread.cpp index 45b602a2a8..963a194f06 100644 --- a/app/rec/rec_server_core/src/monitoring_thread.cpp +++ b/app/rec/rec_server_core/src/monitoring_thread.cpp @@ -23,16 +23,6 @@ #include -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable: 4100 4127 4146 4505 4800 4189 4592) // disable proto warnings -#endif -#include -#include -#ifdef _MSC_VER -#pragma warning(pop) -#endif - #include #include @@ -82,14 +72,10 @@ namespace eCAL void MonitoringThread::Loop() { - std::string monitoring_string; - eCAL::pb::Monitoring monitoring_pb; + eCAL::Monitoring::SMonitoring monitoring; - if (eCAL::Monitoring::GetMonitoring(monitoring_string)) + if (eCAL::Monitoring::GetMonitoring(monitoring)) { - monitoring_pb.Clear(); - monitoring_pb.ParseFromString(monitoring_string); - auto running_enabled_rec_clients = get_running_enabled_rec_clients_function_(); { @@ -104,9 +90,9 @@ namespace eCAL topic_info_map_.clear(); // Re-create host list - for (const auto& process : monitoring_pb.processes()) + for (const auto& process : monitoring.processes) { - std::string host_name = process.host_name(); + std::string host_name = process.host_name; auto existing_host_it = hosts_running_ecal_rec_.find(host_name); // Add host if it didn't exist already @@ -116,57 +102,54 @@ namespace eCAL } // set whether this host has an eCAL Rec client - if (process.unit_name() == "eCALRecClient") + if (process.unit_name == "eCALRecClient") { existing_host_it->second = true; } } + auto add_type_info = [this](auto topic) -> auto + { + auto topic_info_map_it = topic_info_map_.find(topic.topic_name); + if (topic_info_map_it == topic_info_map_.end()) + { + // Create a new topic entry + auto emplace_result = topic_info_map_.emplace(topic.topic_name, eCAL::rec_server::TopicInfo()); + topic_info_map_it = emplace_result.first; + } + topic_info_map_it->second.AddTypeInfo(topic.datatype_information); + return topic_info_map_it; + }; + // Update topic list - for (const auto& topic : monitoring_pb.topics()) + for (const auto& publisher : monitoring.publishers) { - // Create combined encoding:type type (to be fully compatible to old behavior) - std::string combined_enc_type = eCAL::Util::CombinedTopicEncodingAndType(topic.datatype_information().encoding(), topic.datatype_information().name()); + auto topic_info_map_it = add_type_info(publisher); - auto topic_info_map_it = topic_info_map_.find(topic.topic_name()); - if (topic_info_map_it != topic_info_map_.end()) + // Set the topic publisher + auto existing_publisher_it = topic_info_map_it->second.publishers_.find(publisher.host_name); + if (existing_publisher_it != topic_info_map_it->second.publishers_.end()) { - // Only update the values if there are information available - if (!combined_enc_type.empty() || !topic.datatype_information().name().empty()) - { - topic_info_map_it->second.type_ = combined_enc_type; - } + existing_publisher_it->second.emplace(publisher.unit_name); } else { - // Create a new topic entry - topic_info_map_.emplace(topic.topic_name(), eCAL::rec_server::TopicInfo(combined_enc_type)); - topic_info_map_it = topic_info_map_.find(topic.topic_name()); + topic_info_map_it->second.publishers_.emplace(publisher.host_name, std::set{publisher.unit_name}); } + } - // Set the topic publisher - if (EcalUtils::String::Icompare(topic.direction(), "publisher")) - { - auto existing_publisher_it = topic_info_map_it->second.publishers_.find(topic.host_name()); - if (existing_publisher_it != topic_info_map_it->second.publishers_.end()) - { - existing_publisher_it->second.emplace(topic.unit_name()); - } - else - { - topic_info_map_it->second.publishers_.emplace(topic.host_name(), std::set{topic.unit_name()}); - } - } + for (const auto& subscriber : monitoring.subscribers) + { + auto topic_info_map_it = add_type_info(subscriber); // Set the subscribing eCAL Rec instances - if (((topic.unit_name() == "eCALRecClient") || (topic.unit_name() == "eCALRecGUI")) - && EcalUtils::String::Icompare(topic.direction(), "subscriber")) + if ((subscriber.unit_name == "eCALRecClient") || (subscriber.unit_name == "eCALRecGUI")) { - auto running_enabled_rec_client_it = running_enabled_rec_clients.find(topic.host_name()); + auto running_enabled_rec_client_it = running_enabled_rec_clients.find(subscriber.host_name); if ((running_enabled_rec_client_it != running_enabled_rec_clients.end() - && (running_enabled_rec_client_it->second == topic.process_id()))) + && (running_enabled_rec_client_it->second == subscriber.process_id))) { - topic_info_map_it->second.rec_subscribers_[{topic.host_name(), topic.process_id()}] = (static_cast(topic.data_frequency()) / 1000.0); + topic_info_map_it->second.rec_subscribers_[{subscriber.host_name, subscriber.process_id}] = (static_cast(subscriber.data_frequency) / 1000.0); } } } diff --git a/app/rec/rec_server_core/src/monitoring_thread.h b/app/rec/rec_server_core/src/monitoring_thread.h index b7fee75988..2ef7705e74 100644 --- a/app/rec/rec_server_core/src/monitoring_thread.h +++ b/app/rec/rec_server_core/src/monitoring_thread.h @@ -48,8 +48,8 @@ namespace eCAL public: void SetPostUpdateCallbacks(const std::vector& post_update_callbacks); - std::map GetTopicInfoMap() const; - std::map GetHostsRunningEcalRec() const; + TopicInfoMap_T GetTopicInfoMap() const; + HostsRunningEcalRec_T GetHostsRunningEcalRec() const; ///////////////////////////////////////// // Interruptible Thread overrides