Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
run: |
git submodule update --init \
thirdparty/asio/asio \
thirdparty/concurrentqueue/concurrentqueue \
thirdparty/benchmark/benchmark \
thirdparty/ecaludp/ecaludp \
thirdparty/fineftp/fineftp-server \
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ jobs:
run: |
git submodule update --init \
thirdparty/asio/asio \
thirdparty/concurrentqueue/concurrentqueue \
thirdparty/benchmark/benchmark \
thirdparty/ecaludp/ecaludp \
thirdparty/fineftp/fineftp-server \
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@
[submodule "thirdparty/protozero/protozero"]
path = thirdparty/protozero/protozero
url = https://github.com/mapbox/protozero.git
[submodule "thirdparty/concurrentqueue/concurrentqueue"]
path = thirdparty/concurrentqueue/concurrentqueue
url = https://github.com/cameron314/concurrentqueue.git
62 changes: 62 additions & 0 deletions LICENSES/LICENSES_all/concurrentqueue/LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
This license file applies to everything in this repository except that which
is explicitly annotated as being written by other authors, i.e. the Boost
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
dlib::pipe (ditto),
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
has a zlib license (embedded in lightweightsempahore.h).

---

Simplified BSD License:

Copyright (c) 2013-2016, Cameron Desrochers.
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

- Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

---

I have also chosen to dual-license under the Boost Software License as an alternative to
the Simplified BSD license above:

Boost Software License - Version 1.0 - August 17th, 2003

Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:

The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
10 changes: 10 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ eCAL makes use of the following external components:
- Git Submodule `/thirdparty/benchmark/benchmark`
- Testing

### concurrentqueue

- **License**: BSD-2-Clause
- **Copyright**: Copyright (c) 2013-2016, Cameron Desrochers.
- **Repository**: https://github.com/cameron314/concurrentqueue
- **Included in**:
- Git Submodule `/thirdparty/concurrentqueue/concurrentqueue`
- Windows builds
- Linux builds

### Cap'n Proto

- **License**: MIT
Expand Down
2 changes: 2 additions & 0 deletions app/rec/rec_client_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
find_package(Threads REQUIRED)
find_package(Protobuf REQUIRED)
find_package(spdlog REQUIRED)
find_package(concurrentqueue REQUIRED)

if (ECAL_USE_CURL)
find_package(CURL REQUIRED)
Expand Down Expand Up @@ -106,6 +107,7 @@ target_link_libraries(${PROJECT_NAME}
Threads::Threads
eCAL::ecal-utils
EcalParser
concurrentqueue::concurrentqueue
)

if(ECAL_USE_CURL)
Expand Down
80 changes: 62 additions & 18 deletions app/rec/rec_client_core/src/ecal_rec_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "rec_client_core/topic_info.h"
#include "rec_client_core/upload_config.h"

#include <array>
#include <chrono>
#include <cstdint>
#include <algorithm>
Expand Down Expand Up @@ -66,6 +67,59 @@ namespace eCAL
{
namespace rec
{
class EcalRecImpl::ReceiveDispatchThread : public InterruptibleLoopThread
Comment thread
KerstinKeller marked this conversation as resolved.
{
public:
ReceiveDispatchThread(EcalRecImpl& ecal_rec_impl_)
: InterruptibleLoopThread(std::chrono::milliseconds(10))
, ecal_rec_impl(ecal_rec_impl_)
, token(ecal_rec_impl_.receive_dispatch_queue_)
{
}

protected:
// forward data from the queue to the actual buffer
void Loop() override
{
constexpr int max_items = 16;
std::array<std::shared_ptr<Frame>, max_items> frames;
size_t no_items = 0;
do
{
no_items = ecal_rec_impl.receive_dispatch_queue_.try_dequeue_bulk(token, frames.data(), frames.size());
assert(no_items < max_items);

for (size_t i = 0; i < no_items; ++i)
{
auto& frame = frames[i];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: do not use array subscript when the index is not an integer constant expression [cppcoreguidelines-pro-bounds-constant-array-index]

            auto& frame = frames[i];
                          ^


// Add to the pre-buffer (it is thread-safe by using a mutex internally)
ecal_rec_impl.pre_buffer_.push_back(frame);

{
// Add to the currently recording job (if there is any)
const std::shared_lock<decltype(ecal_rec_impl.recorder_mutex_)> recorder_lock(ecal_rec_impl.recorder_mutex_);
if (ecal_rec_impl.recording_recorder_job_ != nullptr)
{
ecal_rec_impl.recording_recorder_job_->AddFrame(frame);
}
}

{
// Add to the subscriber statistics
const std::lock_guard<decltype(ecal_rec_impl.subscriber_throughput_mutex_)> subscriber_statistics_lock(ecal_rec_impl.subscriber_throughput_mutex_);
ecal_rec_impl.subscriber_throughput_statistics_.AddFrame(frame->data_.size());
Comment thread
KerstinKeller marked this conversation as resolved.
}
}
} while (no_items > 0);
}

private:
EcalRecImpl& ecal_rec_impl;
moodycamel::ConsumerToken token;
};


EcalRecImpl::EcalRecImpl()
: addon_manager_(std::make_unique<AddonManager>([this](int64_t job_id, const std::string& addon_id, const RecAddonJobStatus& job_status)
{
Expand All @@ -90,11 +144,15 @@ namespace eCAL

monitoring_thread_ = std::make_unique<MonitoringThread>(*this);
monitoring_thread_->Start();

receive_dispatch_thread = std::make_unique<ReceiveDispatchThread>(*this);
receive_dispatch_thread->Start();
}

EcalRecImpl::~EcalRecImpl()
{
DisconnectFromEcal();
receive_dispatch_thread->Interrupt();
receive_dispatch_thread->Join();

// Interrupt monitoring thread
monitoring_thread_->Interrupt();
Expand Down Expand Up @@ -723,28 +781,14 @@ namespace eCAL

void EcalRecImpl::EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_)
{
const thread_local moodycamel::ProducerToken token(receive_dispatch_queue_);

auto ecal_receive_time = eCAL::Time::ecal_clock::now();
auto system_receive_time = std::chrono::steady_clock::now();

std::shared_ptr<Frame> frame = std::make_shared<Frame>(&data_, topic_id_.topic_name, ecal_receive_time, system_receive_time);

// Add to the pre-buffer (it is thread-safe by using a mutex internally)
pre_buffer_.push_back(frame);

{
// Add to the currently recording job (if there is any)
std::shared_lock<decltype(recorder_mutex_)> recorder_lock(recorder_mutex_);
if (recording_recorder_job_ != nullptr)
{
recording_recorder_job_->AddFrame(std::move(frame));
}
}

{
// Add to the subscriber statistics
const std::lock_guard<decltype(subscriber_throughput_mutex_)> subscriber_statistics_lock(subscriber_throughput_mutex_);
subscriber_throughput_statistics_.AddFrame(data_.buffer_size);
}
receive_dispatch_queue_.enqueue(token, std::move(frame));
}

Throughput EcalRecImpl::GetSubscriberThroughput() const
Expand Down
8 changes: 8 additions & 0 deletions app/rec/rec_client_core/src/ecal_rec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include <string>
#include <utility>

#include <blockingconcurrentqueue.h>

#include <rec_client_core/job_config.h>
#include <rec_client_core/record_mode.h>
#include <rec_client_core/state.h>
Expand All @@ -54,6 +56,7 @@ namespace eCAL
class MonitoringThread;
class AddonManager;


struct SubscriberStatisticsEntry
{
std::chrono::steady_clock::time_point start_of_statistics;
Expand All @@ -63,6 +66,9 @@ namespace eCAL

class EcalRecImpl
{
private:
class ReceiveDispatchThread;

public:
EcalRecImpl();
~EcalRecImpl();
Expand Down Expand Up @@ -167,6 +173,8 @@ namespace eCAL
std::unique_ptr<GarbageCollectorTriggerThread> garbage_collector_trigger_thread_; /** frame_buffer_, buffer_writer_threads_, max_pre_buffer_length_ */
std::unique_ptr<MonitoringThread> monitoring_thread_; /** connected_to_ecal_, FilterAvailableTopics_NoLock(hosts_filter_, topic_whitelist_, topic_blacklist_), CreateNewSubscribers_NoLock(subscriber_map_), main_writer_thread_, buffer_writer_threads_ */

std::unique_ptr<ReceiveDispatchThread> receive_dispatch_thread;
moodycamel::BlockingConcurrentQueue<std::shared_ptr<Frame>> receive_dispatch_queue_; /* Lock-free queue to keep callbacks busy for the least amount of time */
// Pre-buffer
FrameBuffer pre_buffer_; /** < Thread-safe framebuffer */

Expand Down
1 change: 1 addition & 0 deletions cmake/submodule_dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(ecal_submodule_dependency_provider_root_dir ${CMAKE_CURRENT_LIST_DIR})
set(ecal_submodule_dependencies
asio
CMakeFunctions
concurrentqueue
CURL
ecaludp
fineftp
Expand Down
8 changes: 8 additions & 0 deletions doc/rst/license/thirdparty_licenses.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ Some dependencies, like GoogleTest, are not used in our officially distributed b

- Apache-2.0

* - :ref:`concurrentqueue <thirdparty_licenses_concurrentqueue>`

- Copyright (c) 2013-2016, Cameron Desrochers.

- BSD-2-Clause

* - :ref:`Cap'n Proto <thirdparty_licenses_cap_n_proto>`

- Copyright (c) 2013-2017 Sandstorm Development Group, Inc.; Cloudflare, Inc.; and other contributors. Each commit is copyright by its respective author or author's employer.
Expand Down Expand Up @@ -206,6 +212,8 @@ Some dependencies, like GoogleTest, are not used in our officially distributed b

thirdparty_licenses/benchmark

thirdparty_licenses/concurrentqueue

thirdparty_licenses/cap_n_proto

thirdparty_licenses/convert_utf
Expand Down
53 changes: 53 additions & 0 deletions doc/rst/license/thirdparty_licenses/concurrentqueue.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
..
THIS FILE IS AUTO-GENERATED AND SHOULD NOT BE EDITED MANUALLY.

.. include:: /include.txt

.. _thirdparty_licenses_concurrentqueue:

===============================================
concurrentqueue
===============================================

.. list-table::
:widths: 20 80
:header-rows: 0

* - **License**

- BSD-2-Clause

* - **Copyright**

- Copyright (c) 2013-2016, Cameron Desrochers.

* - **Repository**

- https://github.com/cameron314/concurrentqueue

* - **Upstream version** [#upstreamversion]_

- `593df78ec309be7a7b456b3334025ccade1d2d66 <https://github.com/cameron314/concurrentqueue/tree/593df78ec309be7a7b456b3334025ccade1d2d66>`_

* - **Integration**

-

- |fa-github| Git Submodule :file:`/thirdparty/concurrentqueue/concurrentqueue`

- |fa-windows| Windows builds

- |fa-ubuntu| Linux builds

.. [#upstreamversion] *The actual version used for building may differ from the listed dependency version.*
*Especially Linux binaries are often built against system packages, if available.*
*Check build files for further information.*

License Files
=============

:file:`LICENSE.md`
--------------------------------------------------------------------------------

.. literalinclude:: concurrentqueue/LICENSE.md
:language: none
Loading
Loading