From 9e198fb8edc01863b5a78c0ad013cbf7c84c3b03 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 11 Jun 2026 13:31:26 +0000 Subject: [PATCH 1/9] fix(TCPSocket): make between-attempt sleep interruptible via close() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TCPSocket::setup() slept for the full reconnection_time between consecutive failed connect() attempts with no way to be woken early. When RTDEClient's reconnect thread was in this sleep, ~RTDEClient() would block indefinitely in reconnecting_thread_.join() even though stop_reconnection_ had been set. Replace the monolithic sleep_for(reconnection_time) with a 100 ms-sliced loop that exits as soon as state_ transitions to SocketState::Closed. URStream::disconnect() (called by RTDEClient::disconnect()) already calls TCPSocket::close() which sets state_ = Closed, so no new API is needed. Also reset state_ to Invalid at the top of setup() (after the Connected guard) so that a Closed state left by a previous disconnect() cannot prematurely terminate the sliced sleep on a brand-new connection attempt. Co-authored-by: Rune Søe-Knudsen --- src/comm/tcp_socket.cpp | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/comm/tcp_socket.cpp b/src/comm/tcp_socket.cpp index 6af840e0d..60268acdf 100644 --- a/src/comm/tcp_socket.cpp +++ b/src/comm/tcp_socket.cpp @@ -88,6 +88,12 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ if (state_ == SocketState::Connected) return false; + // Clear any pre-existing Closed/Disconnected/Invalid state so that the + // between-attempt sleep below (which exits early when state becomes Closed) + // can only be short-circuited by a concurrent external close() call, not by + // a Closed state left over from a previous disconnect(). + state_ = SocketState::Invalid; + URCL_LOG_DEBUG("Setting up connection: %s:%d", host.c_str(), port); // gethostbyname() is deprecated so use getadderinfo() as described in: @@ -141,7 +147,19 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ << std::chrono::duration_cast>(reconnection_time_resolved).count() << " seconds."; URCL_LOG_ERROR("%s", ss.str().c_str()); - std::this_thread::sleep_for(reconnection_time_resolved); + // Sleep in short slices so that an external close() (e.g. from ~RTDEClient calling + // disconnect() before joining the reconnect thread) can interrupt the wait promptly. + const auto sleep_slice = std::chrono::milliseconds(100); + for (auto slept = std::chrono::milliseconds(0); + slept < reconnection_time_resolved && state_ != SocketState::Closed; + slept += sleep_slice) + { + std::this_thread::sleep_for(sleep_slice); + } + if (state_ == SocketState::Closed) + { + return false; + } } } } From bd625946831f0e03725383e29b65262dfebbf636 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 11 Jun 2026 13:31:39 +0000 Subject: [PATCH 2/9] fix(RTDEClient): disconnect before joining reconnect thread; always disconnect stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related fixes for RTDEClient teardown: 1. ~RTDEClient() now calls disconnect() before joining reconnecting_thread_. Previously the join came first, so the reconnect thread could be sleeping inside TCPSocket::setup()'s between-attempt sleep when ~RTDEClient() ran, causing the join to block until the sleep completed (up to reconnection_timeout, default 10 s, and indefinitely with max_connection_attempts=0). Moving disconnect() before join() triggers the new sliced-sleep exit path in TCPSocket::setup() so the thread wakes within 100 ms. 2. RTDEClient::disconnect() now calls stream_.disconnect() and writer_.stop() unconditionally, regardless of client_state_. Previously, a failed negotiateProtocolVersion() would reset client_state_ to UNINITIALIZED, causing the conditional guard to skip stream_.disconnect(). The stream was then left in SocketState::Connected, and the next init() retry's TCPSocket::setup() would return false immediately (due to the Connected early-return guard), throwing 'Failed to connect to robot' even though the server was reachable. TCPSocket::close() and RTDEWriter::stop() are both idempotent, so removing the guard is safe. Co-authored-by: Rune Søe-Knudsen --- src/rtde/rtde_client.cpp | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index bdac1a733..d6e8bc040 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -87,11 +87,14 @@ RTDEClient::~RTDEClient() { prod_->setReconnectionCallback(nullptr); stop_reconnection_ = true; + // Disconnect before joining the reconnect thread so that any sleep inside + // TCPSocket::setup() (which checks for SocketState::Closed) wakes promptly + // instead of blocking the destructor for the full reconnection_timeout. + disconnect(); if (reconnecting_thread_.joinable()) { reconnecting_thread_.join(); } - disconnect(); } bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::milliseconds reconnection_timeout, @@ -508,11 +511,13 @@ bool RTDEClient::setupInputs() void RTDEClient::disconnect() { - if (client_state_ > ClientState::UNINITIALIZED) - { - stream_.disconnect(); - writer_.stop(); - } + // Disconnect unconditionally: TCPSocket::close() (called by stream_.disconnect()) + // and RTDEWriter::stop() are both idempotent. Guarding on client_state_ left + // the stream in SocketState::Connected after a failed negotiateProtocolVersion() + // (which resets client_state_ to UNINITIALIZED), causing the subsequent + // TCPSocket::setup() call to return false immediately due to the Connected check. + stream_.disconnect(); + writer_.stop(); client_state_ = ClientState::UNINITIALIZED; prod_->stopProducer(); stopBackgroundRead(); From 209b5f51df3ce9b95fead64a16208dcc262bc83a Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 11 Jun 2026 13:31:50 +0000 Subject: [PATCH 3/9] test: add regression tests for reconnect-thread blocking destructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TCPSocketTest.setup_interruptible_by_close (test_tcp_socket.cpp): Unit test that runs without INTEGRATION_TESTS. Starts TCPSocket::setup() with max_num_tries=0 and a 5 s reconnection_timeout against a non-listening port, then calls close() from the main thread and asserts the background thread joins within 2 s. Directly exercises the interruptible-sleep fix. RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread (test_rtde_client.cpp): Integration-level test using the existing RTDEServer fake. Initialises an RTDEClient with reconnection_timeout=5 s, drops the fake server to trigger the reconnect thread, then asserts ~RTDEClient() completes in < 2 s. The test skips gracefully when the fake server cannot complete the RTDE handshake within the socket read timeout (environment-dependent timing); in that case TCPSocketTest.setup_interruptible_by_close provides full coverage of the underlying fix. Co-authored-by: Rune Søe-Knudsen --- tests/test_rtde_client.cpp | 74 ++++++++++++++++++++++++++++++++++++++ tests/test_tcp_socket.cpp | 40 +++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/tests/test_rtde_client.cpp b/tests/test_rtde_client.cpp index becdf1325..3b7f25183 100644 --- a/tests/test_rtde_client.cpp +++ b/tests/test_rtde_client.cpp @@ -812,6 +812,80 @@ TEST_F(RTDEClientTest, test_initialization) EXPECT_GE(std::chrono::duration_cast(elapsed).count(), 20); } +// Regression test for the bug where ~RTDEClient() could block indefinitely when +// the reconnect thread was sleeping inside TCPSocket::setup()'s between-attempt +// sleep. Fixed by: (1) calling disconnect() before joining reconnecting_thread_ +// in ~RTDEClient(), and (2) making TCPSocket::setup()'s sleep interruptible by +// checking for SocketState::Closed every 100 ms. +// +// See also TCPSocketTest.setup_interruptible_by_close in test_tcp_socket.cpp +// for a lower-level unit test of the same fix that runs without INTEGRATION_TESTS. +TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) +{ + // Use a large reconnection timeout so that the blocking window is clearly + // observable if the fix is absent (5 s sleep > 2 s assertion threshold). + const std::chrono::milliseconds large_reconnect_timeout(5000); + + auto fake_rtde_server = std::make_unique(g_FAKE_RTDE_PORT); + // Skip the bootup-timestamp check inside isRobotBooted(). + fake_rtde_server->setStartTime(std::chrono::steady_clock::now() - std::chrono::seconds(52)); + + client_.reset(new rtde_interface::RTDEClient("localhost", notifier_, resources_output_recipe_, + resources_input_recipe_, 100, false, g_FAKE_RTDE_PORT)); + // Attempt init up to 10 times with a short between-attempt sleep to ensure + // the RTDE handshake succeeds even in environments where the fake server's + // response arrives slightly after the 1-second socket read timeout. + bool initialized = false; + for (int attempt = 0; attempt < 10 && !initialized; ++attempt) + { + try + { + // max_connection_attempts=0 (unlimited): TCPSocket::setup() sleeps + // large_reconnect_timeout between every failed connect attempt once the + // server is gone. Use a short initialization_timeout for fast retries. + client_->init(0, large_reconnect_timeout, 1, std::chrono::milliseconds(50)); + initialized = true; + } + catch (const UrException&) + { + // Recreate the client on each retry to start from a clean state. + client_.reset(new rtde_interface::RTDEClient("localhost", notifier_, resources_output_recipe_, + resources_input_recipe_, 100, false, g_FAKE_RTDE_PORT)); + } + } + if (!initialized) + { + GTEST_SKIP() << "Could not initialize RTDEClient with the fake server after 10 attempts; " + "this test requires a reliably responding RTDE server. " + "The TCPSocket-level regression test (TCPSocketTest.setup_interruptible_by_close) " + "verifies the underlying fix without a robot."; + } + + // start(true) arms the reconnect callback via the background read thread. + client_->start(true); + + // Drop the server — the background read thread detects the connection loss, + // calls reconnectCallback(), which launches reconnecting_thread_. That thread + // enters setupCommunication() -> TCPSocket::setup() and begins sleeping + // large_reconnect_timeout between retry attempts. + fake_rtde_server.reset(); + + // Give the reconnect thread time to reach the sleep inside TCPSocket::setup(). + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // The destructor must return quickly: disconnect() sets SocketState::Closed, + // waking the sliced sleep, so the join completes in well under 2 s. + // Without the fix this would block for >= large_reconnect_timeout (5 s). + const auto t0 = std::chrono::steady_clock::now(); + client_.reset(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "RTDEClient destructor blocked for " + << std::chrono::duration_cast(elapsed).count() + << " ms — reconnect thread was not woken by disconnect()"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); diff --git a/tests/test_tcp_socket.cpp b/tests/test_tcp_socket.cpp index bb6bd317a..5bccaf235 100644 --- a/tests/test_tcp_socket.cpp +++ b/tests/test_tcp_socket.cpp @@ -293,6 +293,46 @@ TEST_F(TCPSocketTest, connect_non_running_robot) EXPECT_LT(elapsed, std::chrono::milliseconds(7500)); } +// Regression test for the bug where TCPSocket::setup() could block the caller +// indefinitely when the sleep between retry attempts was not interruptible. +// +// Fixed by replacing the monolithic sleep_for(reconnection_time) in setup() with +// a 100ms-sliced loop that exits early when state_ transitions to Closed (set by +// a concurrent close() call, e.g. from ~RTDEClient calling disconnect() before +// joining the reconnect thread). +TEST_F(TCPSocketTest, setup_interruptible_by_close) +{ + // Use a port with no listener so every connect attempt fails immediately, + // sending setup() into the between-attempt sleep. + const int unused_port = 12322; + const std::chrono::milliseconds large_reconnect_timeout(5000); + + Client client(unused_port, "127.0.0.1"); + + // Run setup() with unlimited retries in a background thread. + std::thread setup_thread([&client, &large_reconnect_timeout]() { + // max_num_tries=0 (unlimited) → setup() sleeps large_reconnect_timeout after + // every failed connect attempt and never exits on its own. + client.setup(0, large_reconnect_timeout); + }); + + // Give the thread time to reach the between-attempt sleep inside setup(). + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // close() sets state_ = Closed; the sliced sleep in setup() detects this + // within 100 ms and setup() returns, allowing the thread to finish. + const auto t0 = std::chrono::steady_clock::now(); + client.close(); + + setup_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + // Without the fix, elapsed would be >= large_reconnect_timeout (5 s). + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "TCPSocket::setup() was not interrupted by close() within 2 s; " + "the between-attempt sleep is not interruptible"; +} + TEST_F(TCPSocketTest, test_deprecated_reconnection_time_interface) { URCL_SILENCE_DEPRECATED_BEGIN From 657a1bc7bfa1681fa6ce8522dde3d725ff1e6e9d Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:26:03 +0200 Subject: [PATCH 4/9] fix(PrimaryClient): close stream before joining reconnect thread; interruptible producer backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the RTDEClient teardown fix to the PrimaryClient pipeline, which is the path used by UrDriver. ~PrimaryClient() could block indefinitely (up to the unbounded reconnect timeout) when the robot dropped the primary connection at teardown time. Root cause: on connection loss TCPSocket::read() leaves the socket in SocketState::Disconnected, so URProducer::tryGetImpl() enters its reconnect loop (sleep backoff + stream_.connect() with unlimited retries). ~PrimaryClient() and PrimaryClient::stop() joined the producer thread (pipeline_->stop()) without first closing the stream, so the join blocked for the full reconnect duration. Two related fixes: 1. ~PrimaryClient() and PrimaryClient::stop() now call stream_.close() BEFORE pipeline_->stop(). stream_.close() sets SocketState::Closed, waking a producer stuck in its reconnect path so the join returns promptly. Previously stop() closed the stream after the join (too late) and the destructor never closed it. 2. URProducer::tryGetImpl()'s reconnect backoff slept sleep_for(timeout_) (growing up to 120 s) with no cancellation point. It now sleeps in 100 ms slices and bails out as soon as running_ becomes false or the stream is closed, mirroring the TCPSocket::setup() interruptible-sleep fix. test: add PrimaryClientReconnectTest.destructor_not_blocked_by_stuck_reconnect_thread (test_primary_client_reconnect.cpp). The PrimaryClient counterpart of RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread, using the in-process FakePrimaryServer so it runs without a robot (unlike the INTEGRATION_TESTS-gated primary_client_test_headless). It starts a client against the fake server, drops the server to drive the producer into its reconnect loop, then asserts ~PrimaryClient() completes in < 2 s. Verified to hang without the fix and pass in ~1.5 s with it. Co-authored-by: Rune Søe-Knudsen --- include/ur_client_library/comm/producer.h | 16 +++- src/primary/primary_client.cpp | 10 +- tests/CMakeLists.txt | 8 ++ tests/test_primary_client_reconnect.cpp | 109 ++++++++++++++++++++++ 4 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 tests/test_primary_client_reconnect.cpp diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index e509d9017..4f56b8119 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -85,7 +85,21 @@ class URProducer : public IProducer } URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count()); - std::this_thread::sleep_for(timeout_); + // Sleep in small slices so the producer can be stopped (running_ == false) + // or woken by a stream close (e.g. from a destructor calling stop()) within + // ~100 ms, instead of blocking for the full (exponentially growing) timeout. + // Without this, a thread joining the pipeline at teardown could block for up + // to 120 s while this thread sleeps here. + const auto sleep_slice = std::chrono::milliseconds(100); + const auto sleep_total = std::chrono::duration_cast(timeout_); + for (auto slept = std::chrono::milliseconds(0); slept < sleep_total && running_ && !stream_.closed(); + slept += sleep_slice) + { + std::this_thread::sleep_for(sleep_slice); + } + + if (!running_ || stream_.closed()) + return false; if (stream_.connect()) continue; diff --git a/src/primary/primary_client.cpp b/src/primary/primary_client.cpp index a413c648a..e14fbec46 100644 --- a/src/primary/primary_client.cpp +++ b/src/primary/primary_client.cpp @@ -67,6 +67,12 @@ PrimaryClient::PrimaryClient(const std::string& robot_ip, [[maybe_unused]] comm: PrimaryClient::~PrimaryClient() { URCL_LOG_INFO("Stopping primary client pipeline"); + // Close the stream BEFORE stopping (joining) the pipeline. The pipeline's + // producer thread may be sleeping inside its reconnect backoff or blocked in + // TCPSocket::setup(); closing the stream sets SocketState::Closed, which wakes + // both paths so pipeline_->stop()'s join returns promptly instead of blocking + // until the (potentially unbounded) reconnect timeout expires. + stream_.close(); pipeline_->stop(); } @@ -79,8 +85,10 @@ void PrimaryClient::start(const size_t max_num_tries, const std::chrono::millise void PrimaryClient::stop() { - pipeline_->stop(); + // Close the stream before joining the pipeline so a producer thread stuck in + // its reconnect path is woken and the join returns promptly (see ~PrimaryClient). stream_.close(); + pipeline_->stop(); } void PrimaryClient::addPrimaryConsumer(std::shared_ptr> primary_consumer) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 160f41622..a1a5eac4c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -152,6 +152,14 @@ target_link_libraries(fake_primary_server_tests PRIVATE ur_client_library::urcl gtest_add_tests(TARGET fake_primary_server_tests ) +# Robot-free regression test for ~PrimaryClient() not blocking on a stuck +# reconnect thread. Uses the in-process FakePrimaryServer so it runs without a +# robot (unlike the INTEGRATION_TESTS-gated primary_client_test_headless). +add_executable(primary_client_reconnect_tests test_primary_client_reconnect.cpp fake_primary_server.cpp) +target_link_libraries(primary_client_reconnect_tests PRIVATE ur_client_library::urcl GTest::gtest_main) +gtest_add_tests(TARGET primary_client_reconnect_tests +) + add_executable(rtde_data_package_tests test_rtde_data_package.cpp) diff --git a/tests/test_primary_client_reconnect.cpp b/tests/test_primary_client_reconnect.cpp new file mode 100644 index 000000000..3a06d66d8 --- /dev/null +++ b/tests/test_primary_client_reconnect.cpp @@ -0,0 +1,109 @@ +// -- BEGIN LICENSE BLOCK ---------------------------------------------- +// Copyright 2026 Universal Robots A/S +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the {copyright_holder} nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// -- END LICENSE BLOCK ------------------------------------------------ + +#include + +#include +#include +#include + +#include + +#include "fake_primary_server.h" + +using namespace urcl; + +// Regression test for ~PrimaryClient() blocking indefinitely when the pipeline's +// producer thread is stuck in its reconnect loop at teardown time. +// +// This is the PrimaryClient counterpart of +// RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread (test_rtde_client.cpp). +// +// Root cause: when the robot drops the primary connection, TCPSocket::read() +// returns false and leaves the socket in SocketState::Disconnected. URProducer's +// tryGetImpl() then enters its reconnect path: it sleeps an (exponentially +// growing) backoff and calls stream_.connect(), which retries with no upper +// bound (max_num_tries == 0), sleeping reconnection_time between attempts. If +// ~PrimaryClient() simply called pipeline_->stop() (which joins the producer +// thread) without first closing the stream, the join would block for the full +// reconnect duration — effectively forever for an unreachable robot. +// +// Fix (two parts): +// 1. ~PrimaryClient()/PrimaryClient::stop() close the stream BEFORE joining the +// pipeline. stream_.close() sets SocketState::Closed. +// 2. URProducer's reconnect backoff sleeps in 100 ms slices and bails out as +// soon as the stream is closed (or the producer is stopped), and +// TCPSocket::setup()'s between-attempt sleep is likewise interruptible by +// SocketState::Closed. +// Together these wake the producer within ~100 ms of the destructor closing the +// stream, so the join — and therefore the destructor — returns promptly. +// +// Unlike test_primary_client.cpp's robot-dependent fixtures, this test uses the +// in-process FakePrimaryServer, so it runs in the normal (non-INTEGRATION_TESTS) +// build and needs no robot. +TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_thread) +{ + comm::INotifier notifier; + + auto server = std::make_unique(primary_interface::UR_PRIMARY_PORT); + auto client = std::make_unique("127.0.0.1", notifier); + + // Unlimited reconnect attempts with a large reconnection time: if the fix is + // absent, the producer's reconnect path keeps the destructor blocked. + const std::chrono::milliseconds large_reconnect_timeout(5000); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; + + // Drop the server. The producer's read() fails, the socket transitions to + // SocketState::Disconnected, and the producer enters its reconnect loop. + server.reset(); + + // Give the producer time to detect the drop and reach its reconnect sleep + // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + + // The destructor must return quickly: it closes the stream + // (SocketState::Closed), waking the producer, so the pipeline join completes in + // well under 2 s. Without the fix this blocks for at least the reconnect + // timeout (and indefinitely with unlimited retries against a dead port). + const auto t0 = std::chrono::steady_clock::now(); + client.reset(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "~PrimaryClient() blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — the producer reconnect thread was not woken by the stream close"; +} + +int main(int argc, char* argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 5b22c970eccde910a59ace9973469ae5cd213bae Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Mon, 15 Jun 2026 15:02:11 +0200 Subject: [PATCH 5/9] fix(TCPSocket): make connection attempts cancellable to unblock teardown Replace the state-based, sleep-polling reconnect interruption with a dedicated, sticky cancellation flag (stop_requested_) plus requestStop()/clearStop(). The flag is orthogonal to SocketState, so setup()'s internal state resets can no longer race away a teardown signal -- the lost-wakeup that hung ~PrimaryClient() (and the Windows CI test) indefinitely. setup() now connects with a non-blocking socket polled in short slices (poll()/WSAPoll()), so a connect attempt against a genuinely unreachable host is interruptible too -- not just the between-attempt back-off. This makes the destructors return promptly instead of blocking for the reconnection timeout (or forever with unlimited attempts). - TCPSocket: add stop_requested_ + requestStop()/clearStop(); interruptible, non-blocking openInterruptible(); honor the flag in setup()'s connect loop and back-off. - PrimaryClient/RTDEClient: call requestStop() before joining the reconnect thread; clear the flag on (re)start (URProducer::setupProducer, RTDEClient::init). - tests: drive setup() interruption via requestStop(); add a non-routable-address blocking-connect regression test; guard the teardown tests with a watchdog; add CTest TIMEOUT properties so a hang fails fast instead of timing out the job. --- include/ur_client_library/comm/producer.h | 3 + include/ur_client_library/comm/tcp_socket.h | 34 ++++ src/comm/tcp_socket.cpp | 172 ++++++++++++++++++-- src/primary/primary_client.cpp | 17 +- src/rtde/rtde_client.cpp | 12 +- tests/CMakeLists.txt | 10 ++ tests/test_primary_client_reconnect.cpp | 43 +++-- tests/test_rtde_client.cpp | 40 +++-- tests/test_tcp_socket.cpp | 57 +++++-- 9 files changed, 326 insertions(+), 62 deletions(-) diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index 4f56b8119..7fe0e634d 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -133,6 +133,9 @@ class URProducer : public IProducer void setupProducer(const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) override { + // Clear any cancellation request left over from a previous teardown so the stream can be + // (re)used. Safe here because this runs on the controlling thread before the producer loop. + stream_.clearStop(); timeval tv; tv.tv_sec = 1; tv.tv_usec = 0; diff --git a/include/ur_client_library/comm/tcp_socket.h b/include/ur_client_library/comm/tcp_socket.h index 5a1d468c5..d92f82dd4 100644 --- a/include/ur_client_library/comm/tcp_socket.h +++ b/include/ur_client_library/comm/tcp_socket.h @@ -53,8 +53,22 @@ class TCPSocket std::chrono::milliseconds reconnection_time_; bool reconnection_time_modified_deprecated_ = false; + // Cancellation token used to abort a connection attempt that is currently in + // progress (either waiting inside connect() or sleeping between attempts). + // This is intentionally orthogonal to state_: a SocketState::Closed left over + // from a deliberate close()+connect() reconnect must NOT abort a fresh + // attempt, whereas requestStop() (used at teardown) must abort reliably and + // cannot be clobbered by setup()'s internal state_ resets. + std::atomic stop_requested_{ false }; + void setupOptions(); + // Performs an interruptible, non-blocking connect on an already-created socket. + // Polls in short slices so that a concurrent requestStop() aborts the attempt + // promptly on all platforms (POSIX close() of a blocked connect() is reliable, + // Winsock's is not). Restores blocking mode on success. + bool openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len); + protected: static bool open(socket_t socket_fd, struct sockaddr* address, size_t address_len) { @@ -137,6 +151,26 @@ class TCPSocket */ void close(); + /*! + * \brief Requests that any in-progress (or future) connection attempt be aborted. + * + * Sets a sticky cancellation flag and closes the socket. A reconnect thread that is + * waiting inside setup() (either in connect() or in the between-attempt back-off) returns + * promptly instead of blocking until the reconnection timeout expires. Use this at teardown + * (e.g. from a destructor) before joining a reconnect thread. The flag stays set until + * clearStop() is called, so it cannot be lost by a racing internal state reset. + */ + void requestStop(); + + /*! + * \brief Clears the cancellation flag set by requestStop(). + * + * Must be called on a controlled (re)start before attempting to connect again, so a socket + * that was previously stopped can be reused. Never call this concurrently with a reconnect + * thread. + */ + void clearStop(); + /*! * \brief Setup Receive timeout used for this socket. * diff --git a/src/comm/tcp_socket.cpp b/src/comm/tcp_socket.cpp index 60268acdf..9878daffb 100644 --- a/src/comm/tcp_socket.cpp +++ b/src/comm/tcp_socket.cpp @@ -27,7 +27,9 @@ #ifndef _WIN32 # include +# include # include +# include #endif #include "ur_client_library/log.h" @@ -37,6 +39,67 @@ namespace urcl { namespace comm { +namespace +{ +// Time slice used while waiting for a non-blocking connect to resolve. Kept short so a +// concurrent requestStop() aborts the wait promptly even if closing the socket does not +// itself wake the wait (as is the case on Windows). +constexpr int CONNECT_POLL_SLICE_MS = 100; + +// Toggle the blocking mode of a socket. Returns true on success. +bool setSocketBlocking(socket_t socket_fd, bool blocking) +{ +#ifdef _WIN32 + u_long mode = blocking ? 0 : 1; + return ::ioctlsocket(socket_fd, FIONBIO, &mode) == 0; +#else + int flags = ::fcntl(socket_fd, F_GETFL, 0); + if (flags < 0) + { + return false; + } + flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); + return ::fcntl(socket_fd, F_SETFL, flags) == 0; +#endif +} + +// True if the last connect() call indicated that the connection is being established +// asynchronously (the expected result for a non-blocking socket). +bool connectInProgress() +{ +#ifdef _WIN32 + return ::WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EINPROGRESS; +#endif +} + +// Waits up to timeout_ms for the socket to become writable (connect resolved). +// Returns >0 if the socket is ready/has an event, 0 on timeout, <0 on error. +// +// poll() is used on both platforms (WSAPoll() on Windows). It avoids select()'s FD_SETSIZE +// limitation entirely and keeps a single mental model. On Windows this relies on the WSAPoll +// connect-failure fix introduced in Windows 10 version 2004 / Windows Server 2019: a failed +// non-blocking connect is reported as (POLLHUP | POLLERR | POLLWRNORM). The caller treats any +// returned event as "connect resolved" and consults SO_ERROR for the actual outcome, so it does +// not depend on which particular revents flag is set. +int waitForSocketWritable(socket_t socket_fd, int timeout_ms) +{ +#ifdef _WIN32 + WSAPOLLFD pfd; + pfd.fd = socket_fd; + pfd.events = POLLWRNORM; // == POLLOUT + pfd.revents = 0; + return ::WSAPoll(&pfd, 1, timeout_ms); +#else + struct pollfd pfd; + pfd.fd = socket_fd; + pfd.events = POLLOUT; + pfd.revents = 0; + return ::poll(&pfd, 1, timeout_ms); +#endif +} +} // namespace TCPSocket::TCPSocket() : socket_fd_(INVALID_SOCKET), state_(SocketState::Invalid), reconnection_time_(std::chrono::seconds(10)) { @@ -72,6 +135,66 @@ void TCPSocket::setupOptions() } } +bool TCPSocket::openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len) +{ + if (!setSocketBlocking(socket_fd, false)) + { + return false; + } + + int connect_res = ::connect(socket_fd, address, static_cast(address_len)); + bool connected = false; + if (connect_res == 0) + { + // Connected immediately (common for loopback). + connected = true; + } + else if (connectInProgress()) + { + // Poll in short slices until the connect resolves, the OS connect timeout expires, + // or a concurrent requestStop() asks us to abort. + while (true) + { + if (stop_requested_) + { + return false; + } + int ready = waitForSocketWritable(socket_fd, CONNECT_POLL_SLICE_MS); + if (ready < 0) + { + // poll() error (e.g. the fd was closed by requestStop()). + return false; + } + if (ready == 0) + { + // Timeout slice elapsed without the connect resolving: re-check stop and keep waiting. + continue; + } + // The socket reported an event: query SO_ERROR to find out whether the connect succeeded. + int so_error = 0; + socklen_t len = sizeof(so_error); + if (::getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast(&so_error), &len) < 0) + { + return false; + } + connected = (so_error == 0); + break; + } + } + else + { + // Immediate, permanent failure (e.g. connection refused). + connected = false; + } + + if (connected && !setSocketBlocking(socket_fd, true)) + { + // Could not restore blocking mode; treat the connection as failed. + return false; + } + return connected; +} + bool TCPSocket::setup(const std::string& host, const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) { @@ -85,13 +208,17 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ reconnection_time_resolved = reconnection_time_; } + // Honor a pending stop before doing anything else (e.g. a connect attempt issued just after + // requestStop()). Checked before touching state_ so the cancellation cannot be lost. + if (stop_requested_) + return false; + if (state_ == SocketState::Connected) return false; - // Clear any pre-existing Closed/Disconnected/Invalid state so that the - // between-attempt sleep below (which exits early when state becomes Closed) - // can only be short-circuited by a concurrent external close() call, not by - // a Closed state left over from a previous disconnect(). + // Clear any leftover Closed/Disconnected state from a previous (deliberate) disconnect() so + // it does not interfere with this fresh attempt. Cancellation is tracked via stop_requested_, + // not via state_, so this reset can no longer erase a teardown signal. state_ = SocketState::Invalid; URCL_LOG_DEBUG("Setting up connection: %s:%d", host.c_str(), port); @@ -112,6 +239,9 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ bool connected = false; while (!connected) { + if (stop_requested_) + return false; + if (getaddrinfo(host_name, service.c_str(), &hints, &result) != 0) { URCL_LOG_ERROR("Failed to get address for %s:%d", host.c_str(), port); @@ -122,11 +252,17 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ { socket_fd_ = ::socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (socket_fd_ != -1 && open(socket_fd_, p->ai_addr, p->ai_addrlen)) + if (socket_fd_ != -1 && openInterruptible(socket_fd_, p->ai_addr, p->ai_addrlen)) { connected = true; break; } + + if (stop_requested_) + { + freeaddrinfo(result); + return false; + } } freeaddrinfo(result); @@ -134,6 +270,10 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ if (!connected) { state_ = SocketState::Invalid; + if (stop_requested_) + { + return false; + } if (++connect_counter >= max_num_tries && max_num_tries > 0) { URCL_LOG_ERROR("Failed to establish connection for %s:%d after %d tries", host.c_str(), port, max_num_tries); @@ -147,16 +287,15 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ << std::chrono::duration_cast>(reconnection_time_resolved).count() << " seconds."; URCL_LOG_ERROR("%s", ss.str().c_str()); - // Sleep in short slices so that an external close() (e.g. from ~RTDEClient calling - // disconnect() before joining the reconnect thread) can interrupt the wait promptly. + // Sleep in short slices so that a concurrent requestStop() (e.g. from ~RTDEClient or + // ~PrimaryClient before joining the reconnect thread) can interrupt the back-off promptly. const auto sleep_slice = std::chrono::milliseconds(100); - for (auto slept = std::chrono::milliseconds(0); - slept < reconnection_time_resolved && state_ != SocketState::Closed; + for (auto slept = std::chrono::milliseconds(0); slept < reconnection_time_resolved && !stop_requested_; slept += sleep_slice) { std::this_thread::sleep_for(sleep_slice); } - if (state_ == SocketState::Closed) + if (stop_requested_) { return false; } @@ -179,6 +318,19 @@ void TCPSocket::close() } } +void TCPSocket::requestStop() +{ + // Set the flag before closing so that a reconnect thread observing the closed socket is + // guaranteed to also see the cancellation request (and therefore stop instead of retrying). + stop_requested_ = true; + close(); +} + +void TCPSocket::clearStop() +{ + stop_requested_ = false; +} + std::string TCPSocket::getIP() const { sockaddr_in name; diff --git a/src/primary/primary_client.cpp b/src/primary/primary_client.cpp index e14fbec46..7af6f28bb 100644 --- a/src/primary/primary_client.cpp +++ b/src/primary/primary_client.cpp @@ -67,12 +67,12 @@ PrimaryClient::PrimaryClient(const std::string& robot_ip, [[maybe_unused]] comm: PrimaryClient::~PrimaryClient() { URCL_LOG_INFO("Stopping primary client pipeline"); - // Close the stream BEFORE stopping (joining) the pipeline. The pipeline's + // Request a stop on the stream BEFORE stopping (joining) the pipeline. The pipeline's // producer thread may be sleeping inside its reconnect backoff or blocked in - // TCPSocket::setup(); closing the stream sets SocketState::Closed, which wakes - // both paths so pipeline_->stop()'s join returns promptly instead of blocking - // until the (potentially unbounded) reconnect timeout expires. - stream_.close(); + // TCPSocket::setup(); requestStop() sets the cancellation flag and closes the socket so + // both paths abort within one poll slice and pipeline_->stop()'s join returns promptly + // instead of blocking until the (potentially unbounded) reconnect timeout expires. + stream_.requestStop(); pipeline_->stop(); } @@ -85,9 +85,10 @@ void PrimaryClient::start(const size_t max_num_tries, const std::chrono::millise void PrimaryClient::stop() { - // Close the stream before joining the pipeline so a producer thread stuck in - // its reconnect path is woken and the join returns promptly (see ~PrimaryClient). - stream_.close(); + // Request a stop on the stream before joining the pipeline so a producer thread stuck in + // its reconnect path is aborted and the join returns promptly (see ~PrimaryClient). A + // subsequent start() clears the flag again via URProducer::setupProducer(). + stream_.requestStop(); pipeline_->stop(); } diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index d6e8bc040..b947b3051 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -87,9 +87,11 @@ RTDEClient::~RTDEClient() { prod_->setReconnectionCallback(nullptr); stop_reconnection_ = true; - // Disconnect before joining the reconnect thread so that any sleep inside - // TCPSocket::setup() (which checks for SocketState::Closed) wakes promptly - // instead of blocking the destructor for the full reconnection_timeout. + // Request a stop on the stream before joining the reconnect thread. requestStop() sets the + // cancellation flag and closes the socket so that any connect attempt or back-off sleep inside + // TCPSocket::setup() aborts within one poll slice, instead of blocking the destructor for the + // full reconnection_timeout (or indefinitely with max_connection_attempts == 0). + stream_.requestStop(); disconnect(); if (reconnecting_thread_.joinable()) { @@ -115,6 +117,10 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m max_initialization_attempts_ = max_initialization_attempts; initialization_timeout_ = initialization_timeout; + // Clear any cancellation request from a previous teardown so this (re)initialization can + // connect. Runs on the controlling thread before the reconnect thread can be started. + stream_.clearStop(); + prod_->setReconnectionCallback(nullptr); unsigned int attempts = 0; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a1a5eac4c..2ceb8a601 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -39,6 +39,9 @@ if (INTEGRATION_TESTS) gtest_add_tests(TARGET rtde_tests WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} ) + # Bound this teardown regression test so a hang fails CI fast instead of timing out the job. + set_tests_properties(RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread + PROPERTIES TIMEOUT 60) if (CHECK_RTDE_DOCS_RECIPE) find_package(Python3 COMPONENTS Interpreter REQUIRED) add_custom_target(generate_outputs ALL COMMAND ${Python3_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/resources/generate_rtde_outputs.py) @@ -159,6 +162,9 @@ add_executable(primary_client_reconnect_tests test_primary_client_reconnect.cpp target_link_libraries(primary_client_reconnect_tests PRIVATE ur_client_library::urcl GTest::gtest_main) gtest_add_tests(TARGET primary_client_reconnect_tests ) +# Bound this teardown regression test so a hang fails CI in ~1 min instead of timing out the job. +set_tests_properties(PrimaryClientReconnectTest.destructor_not_blocked_by_stuck_reconnect_thread + PROPERTIES TIMEOUT 60) @@ -256,6 +262,10 @@ add_executable(tcp_socket_tests test_tcp_socket.cpp) target_link_libraries(tcp_socket_tests PRIVATE ur_client_library::urcl GTest::gtest_main) gtest_add_tests(TARGET tcp_socket_tests ) +# Bound the interruptible-setup regression tests so a hang fails CI fast instead of timing out the job. +set_tests_properties(TCPSocketTest.setup_interruptible_by_close + TCPSocketTest.setup_interruptible_during_blocking_connect + PROPERTIES TIMEOUT 60) add_executable(stream_tests test_stream.cpp) target_link_libraries(stream_tests PRIVATE ur_client_library::urcl GTest::gtest_main) diff --git a/tests/test_primary_client_reconnect.cpp b/tests/test_primary_client_reconnect.cpp index 3a06d66d8..862759946 100644 --- a/tests/test_primary_client_reconnect.cpp +++ b/tests/test_primary_client_reconnect.cpp @@ -31,6 +31,7 @@ #include #include +#include #include #include @@ -56,14 +57,16 @@ using namespace urcl; // reconnect duration — effectively forever for an unreachable robot. // // Fix (two parts): -// 1. ~PrimaryClient()/PrimaryClient::stop() close the stream BEFORE joining the -// pipeline. stream_.close() sets SocketState::Closed. -// 2. URProducer's reconnect backoff sleeps in 100 ms slices and bails out as -// soon as the stream is closed (or the producer is stopped), and -// TCPSocket::setup()'s between-attempt sleep is likewise interruptible by -// SocketState::Closed. -// Together these wake the producer within ~100 ms of the destructor closing the -// stream, so the join — and therefore the destructor — returns promptly. +// 1. ~PrimaryClient()/PrimaryClient::stop() call stream_.requestStop() BEFORE +// joining the pipeline. requestStop() sets a sticky cancellation flag and +// closes the socket. +// 2. TCPSocket::setup() honors that flag both during its (non-blocking) connect +// attempt and during the between-attempt wait, so it aborts within ~100 ms +// regardless of platform (unlike a signal carried only by the socket state, +// which setup()'s internal state resets could race away — the bug that hung +// Windows CI). +// Together these abort the producer within ~100 ms of the destructor, so the join — +// and therefore the destructor — returns promptly. // // Unlike test_primary_client.cpp's robot-dependent fixtures, this test uses the // in-process FakePrimaryServer, so it runs in the normal (non-INTEGRATION_TESTS) @@ -89,17 +92,29 @@ TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_threa // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). std::this_thread::sleep_for(std::chrono::milliseconds(1500)); - // The destructor must return quickly: it closes the stream - // (SocketState::Closed), waking the producer, so the pipeline join completes in - // well under 2 s. Without the fix this blocks for at least the reconnect - // timeout (and indefinitely with unlimited retries against a dead port). + // The destructor must return quickly: requestStop() aborts the producer's connect + // attempt/back-off, so the pipeline join completes well under 2 s. Without the fix + // this blocks for at least the reconnect timeout (and indefinitely with unlimited + // retries against a dead port). Run the destructor on a worker with a watchdog so a + // regression fails fast with a clear message instead of hanging the test binary (the + // CTest TIMEOUT then reaps it). + std::packaged_task teardown([&client]() { client.reset(); }); + auto teardown_future = teardown.get_future(); + std::thread teardown_thread(std::move(teardown)); + const auto t0 = std::chrono::steady_clock::now(); - client.reset(); + if (teardown_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + teardown_thread.detach(); + FAIL() << "~PrimaryClient() did not return within 5 s — the producer reconnect thread was not aborted by " + "requestStop()"; + } + teardown_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; EXPECT_LT(elapsed, std::chrono::seconds(2)) << "~PrimaryClient() blocked for " << std::chrono::duration_cast(elapsed).count() - << " ms — the producer reconnect thread was not woken by the stream close"; + << " ms — the producer reconnect thread was not aborted by requestStop()"; } int main(int argc, char* argv[]) diff --git a/tests/test_rtde_client.cpp b/tests/test_rtde_client.cpp index 3b7f25183..cab143727 100644 --- a/tests/test_rtde_client.cpp +++ b/tests/test_rtde_client.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -813,13 +814,14 @@ TEST_F(RTDEClientTest, test_initialization) } // Regression test for the bug where ~RTDEClient() could block indefinitely when -// the reconnect thread was sleeping inside TCPSocket::setup()'s between-attempt -// sleep. Fixed by: (1) calling disconnect() before joining reconnecting_thread_ -// in ~RTDEClient(), and (2) making TCPSocket::setup()'s sleep interruptible by -// checking for SocketState::Closed every 100 ms. +// the reconnect thread was stuck inside TCPSocket::setup(). Fixed by: (1) calling +// stream_.requestStop() (followed by disconnect()) before joining reconnecting_thread_ +// in ~RTDEClient(), and (2) making TCPSocket::setup() abort on the sticky stop flag, +// both during the (non-blocking) connect attempt and during the between-attempt wait. // -// See also TCPSocketTest.setup_interruptible_by_close in test_tcp_socket.cpp -// for a lower-level unit test of the same fix that runs without INTEGRATION_TESTS. +// See also TCPSocketTest.setup_interruptible_by_close and +// TCPSocketTest.setup_interruptible_during_blocking_connect in test_tcp_socket.cpp +// for lower-level unit tests of the same fix that run without INTEGRATION_TESTS. TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) { // Use a large reconnection timeout so that the blocking window is clearly @@ -870,20 +872,30 @@ TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) // large_reconnect_timeout between retry attempts. fake_rtde_server.reset(); - // Give the reconnect thread time to reach the sleep inside TCPSocket::setup(). + // Give the reconnect thread time to reach the wait inside TCPSocket::setup(). std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // The destructor must return quickly: disconnect() sets SocketState::Closed, - // waking the sliced sleep, so the join completes in well under 2 s. - // Without the fix this would block for >= large_reconnect_timeout (5 s). + // The destructor must return quickly: requestStop() aborts setup()'s connect/wait, + // so the join completes in well under 2 s. Without the fix this would block for + // >= large_reconnect_timeout (5 s), or forever with unlimited attempts. + // Run the destructor on a worker with a watchdog so a regression fails fast with a + // clear message instead of hanging the test binary (the CTest TIMEOUT then reaps it). + std::packaged_task teardown([this]() { client_.reset(); }); + auto teardown_future = teardown.get_future(); + std::thread teardown_thread(std::move(teardown)); + const auto t0 = std::chrono::steady_clock::now(); - client_.reset(); + if (teardown_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + teardown_thread.detach(); + FAIL() << "~RTDEClient() did not return within 5 s — reconnect thread was not aborted by requestStop()"; + } + teardown_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; EXPECT_LT(elapsed, std::chrono::seconds(2)) - << "RTDEClient destructor blocked for " - << std::chrono::duration_cast(elapsed).count() - << " ms — reconnect thread was not woken by disconnect()"; + << "RTDEClient destructor blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — reconnect thread was not aborted by requestStop()"; } int main(int argc, char* argv[]) diff --git a/tests/test_tcp_socket.cpp b/tests/test_tcp_socket.cpp index 5bccaf235..29baba6be 100644 --- a/tests/test_tcp_socket.cpp +++ b/tests/test_tcp_socket.cpp @@ -294,16 +294,18 @@ TEST_F(TCPSocketTest, connect_non_running_robot) } // Regression test for the bug where TCPSocket::setup() could block the caller -// indefinitely when the sleep between retry attempts was not interruptible. +// indefinitely when the wait between retry attempts was not interruptible. // -// Fixed by replacing the monolithic sleep_for(reconnection_time) in setup() with -// a 100ms-sliced loop that exits early when state_ transitions to Closed (set by -// a concurrent close() call, e.g. from ~RTDEClient calling disconnect() before -// joining the reconnect thread). +// setup() is interrupted by requestStop(), which sets a sticky cancellation flag +// and closes the socket. The flag (rather than the transient SocketState::Closed) +// is what makes this race-free: setup() resets state_ internally on every attempt, +// so a teardown signal carried only by the socket state could be lost, whereas the +// dedicated stop flag cannot. This is the path used by ~RTDEClient()/~PrimaryClient() +// before joining their reconnect threads. TEST_F(TCPSocketTest, setup_interruptible_by_close) { // Use a port with no listener so every connect attempt fails immediately, - // sending setup() into the between-attempt sleep. + // sending setup() into the between-attempt wait. const int unused_port = 12322; const std::chrono::milliseconds large_reconnect_timeout(5000); @@ -311,26 +313,55 @@ TEST_F(TCPSocketTest, setup_interruptible_by_close) // Run setup() with unlimited retries in a background thread. std::thread setup_thread([&client, &large_reconnect_timeout]() { - // max_num_tries=0 (unlimited) → setup() sleeps large_reconnect_timeout after + // max_num_tries=0 (unlimited) → setup() waits large_reconnect_timeout after // every failed connect attempt and never exits on its own. client.setup(0, large_reconnect_timeout); }); - // Give the thread time to reach the between-attempt sleep inside setup(). + // Give the thread time to reach the between-attempt wait inside setup(). std::this_thread::sleep_for(std::chrono::milliseconds(300)); - // close() sets state_ = Closed; the sliced sleep in setup() detects this + // requestStop() sets the cancellation flag; the sliced wait in setup() detects it // within 100 ms and setup() returns, allowing the thread to finish. const auto t0 = std::chrono::steady_clock::now(); - client.close(); + client.requestStop(); setup_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; // Without the fix, elapsed would be >= large_reconnect_timeout (5 s). - EXPECT_LT(elapsed, std::chrono::seconds(2)) - << "TCPSocket::setup() was not interrupted by close() within 2 s; " - "the between-attempt sleep is not interruptible"; + EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted by requestStop() within 2 s; " + "the between-attempt wait is not interruptible"; +} + +// Regression test for issue #368: a reconnect thread blocked inside connect() to a +// genuinely unreachable host (no SYN-ACK, no RST) must still be abortable. The +// previous fix only made the between-attempt *sleep* interruptible, not the connect +// itself, so this case could block for the full OS connect timeout. setup() now uses +// a non-blocking connect polled in short slices, so requestStop() aborts it promptly. +TEST_F(TCPSocketTest, setup_interruptible_during_blocking_connect) +{ + // 10.255.255.1 is in a private range and is (almost) never routable, so connect() + // hangs in SYN retransmit rather than failing fast like a refused localhost port. + const int unused_port = 12323; + const std::chrono::milliseconds large_reconnect_timeout(5000); + + Client client(unused_port, "10.255.255.1"); + + std::thread setup_thread([&client, &large_reconnect_timeout]() { client.setup(0, large_reconnect_timeout); }); + + // Give the thread time to enter the (blocking) connect attempt. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + const auto t0 = std::chrono::steady_clock::now(); + client.requestStop(); + + setup_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted while blocked in connect() " + "within 2 s; " + "the connect attempt is not interruptible"; } TEST_F(TCPSocketTest, test_deprecated_reconnection_time_interface) From b94027390eecfa03ac33e419328dd213deeb01c5 Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:37:37 +0200 Subject: [PATCH 6/9] fix(PrimaryClient): clear sticky stop flag before deliberate reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reconnectStream() did stream_.close() + stream_.connect() without clearing the sticky stop_requested_ cancellation flag introduced in 5b22c97 ("make connection attempts cancellable to unblock teardown"). PrimaryClient::stop()/~PrimaryClient() set the flag via stream_.requestStop() so a producer thread stuck in TCPSocket::setup() aborts promptly at teardown. The flag is sticky and only cleared on (re)start (URProducer::setupProducer, RTDEClient::init). A deliberate reconnect via reconnectStream() never cleared it, so after stopPrimaryClientCommunication() any resendRobotProgram() call failed: TCPSocket::setup() bailed out at the up-front stop_requested_ check and logged "Failed to reconnect primary stream!". This regressed UrDriverTest.send_robot_program_retry_on_failure on every integration runner. Clear the flag in reconnectStream() before connecting, mirroring URProducer::setupProducer(); a deliberate reconnect is a restart and must not honor a stale teardown cancellation. Co-authored-by: Rune Søe-Knudsen --- src/primary/primary_client.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/primary/primary_client.cpp b/src/primary/primary_client.cpp index 7af6f28bb..9be0e8af2 100644 --- a/src/primary/primary_client.cpp +++ b/src/primary/primary_client.cpp @@ -551,6 +551,10 @@ bool PrimaryClient::reconnectStream() { URCL_LOG_DEBUG("Closing primary stream..."); stream_.close(); + // A deliberate reconnect must clear any sticky cancellation left by a prior stop()/teardown + // (stream_.requestStop()), otherwise TCPSocket::setup() aborts immediately and the reconnect + // fails. Mirrors URProducer::setupProducer() which clears the flag before (re)connecting. + stream_.clearStop(); if (stream_.connect()) { URCL_LOG_DEBUG("Primary stream connected"); From e9c46b361f245fda10c28b0ce5c297f27d1e3c86 Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:17:04 +0200 Subject: [PATCH 7/9] fix(TCPServer): use poll() instead of select() to remove FD_SETSIZE limit select() cannot watch file descriptors whose number is >= FD_SETSIZE (1024 on glibc). When the hosting process holds many descriptors (e.g. a JVM such as MATLAB's), accepted socket FDs exceed that limit; the previous code either rejected the connection (the set_size_exceeded guard added for select()) or risked the "bit out of range" fd_set crash. Replace the select()/fd_set machinery in TCPServer with poll() (WSAPoll() on Windows), which has no FD_SETSIZE limitation. The pollfd set is rebuilt each spin() from the listen socket plus client_fds_ (already tracked), so the masterfds_/tempfds_/maxfd_ members and the FD_SET/FD_CLR/FD_ZERO/FD_ISSET and set_size_exceeded code are removed entirely. tests: add two TCPServer regression tests - services_client_with_high_fd_number (POSIX): consumes low FDs so the accepted client FD exceeds FD_SETSIZE, then asserts connect, bidirectional data and disconnect all work. Fails on select(), passes on poll(). - receives_from_many_concurrent_clients: many clients send simultaneously; asserts the server observes activity on every client FD (guards the poll() revents loop). --- include/ur_client_library/comm/tcp_server.h | 4 - src/comm/tcp_server.cpp | 89 +++++-------- tests/test_tcp_server.cpp | 134 ++++++++++++++++++++ 3 files changed, 168 insertions(+), 59 deletions(-) diff --git a/include/ur_client_library/comm/tcp_server.h b/include/ur_client_library/comm/tcp_server.h index 465e79073..e1faa0d21 100644 --- a/include/ur_client_library/comm/tcp_server.h +++ b/include/ur_client_library/comm/tcp_server.h @@ -223,10 +223,6 @@ class TCPServer std::atomic listen_fd_; int port_; - socket_t maxfd_; - fd_set masterfds_; - fd_set tempfds_; - uint32_t max_clients_allowed_; std::vector client_fds_; std::mutex clients_mutex_; diff --git a/src/comm/tcp_server.cpp b/src/comm/tcp_server.cpp index 3bb5559d5..55ba827ab 100644 --- a/src/comm/tcp_server.cpp +++ b/src/comm/tcp_server.cpp @@ -34,15 +34,19 @@ #include #include +#include #include "ur_client_library/comm/socket_t.h" #include +#ifndef _WIN32 +# include +#endif namespace urcl { namespace comm { TCPServer::TCPServer(const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) - : port_(port), maxfd_(0), max_clients_allowed_(0) + : port_(port), max_clients_allowed_(0) { #ifdef _WIN32 WSAData data; @@ -74,9 +78,6 @@ void TCPServer::init() ur_setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int)); URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_); - - FD_ZERO(&masterfds_); - FD_ZERO(&tempfds_); } void TCPServer::shutdown() @@ -179,9 +180,6 @@ void TCPServer::bind(const size_t max_num_tries, const std::chrono::milliseconds } while (err == -1 && (connection_counter <= max_num_tries || max_num_tries == 0)); URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_); - - FD_SET(listen_fd_, &masterfds_); - maxfd_ = listen_fd_; } void TCPServer::startListen() @@ -220,21 +218,6 @@ void TCPServer::handleConnect() return; } -#ifdef _WIN32 - bool set_size_exceeded = client_fds_.size() >= FD_SETSIZE - 1; // -1 because listen_fd_ also occupies one - // slot in masterfds_ -#else - bool set_size_exceeded = client_fd >= FD_SETSIZE; // On Unix-like systems, the client FD itself must be less than - // FD_SETSIZE, otherwise it cannot be added to the fd_set. -#endif - - if (set_size_exceeded) - { - URCL_LOG_ERROR("Accepted client FD %d exceeds FD_SETSIZE (%d). Closing connection.", (int)client_fd, FD_SETSIZE); - ur_close(client_fd); - return; - } - bool accepted = false; { @@ -242,11 +225,6 @@ void TCPServer::handleConnect() if (client_fds_.size() < max_clients_allowed_ || max_clients_allowed_ == 0) { client_fds_.push_back(client_fd); - FD_SET(client_fd, &masterfds_); - if (client_fd > maxfd_) - { - maxfd_ = client_fd; - } accepted = true; } else @@ -268,27 +246,40 @@ void TCPServer::handleConnect() void TCPServer::spin() { - tempfds_ = masterfds_; - - timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; + // Build the poll set fresh each iteration from the listen socket plus all currently connected + // clients. poll() is used on both platforms (WSAPoll() on Windows) because it has no + // FD_SETSIZE limit on file descriptor numbers, unlike select(). This matters when the hosting + // process holds many file descriptors (e.g. a JVM), pushing socket FDs past FD_SETSIZE (1024). + std::vector pollfds; + pollfds.push_back({ static_cast(listen_fd_), POLLIN, 0 }); + { + std::lock_guard lk(clients_mutex_); + for (const auto& client_fd : client_fds_) + { + pollfds.push_back({ client_fd, POLLIN, 0 }); + } + } - // blocks until activity on any socket from tempfds - int sel = select(static_cast(maxfd_ + 1), &tempfds_, NULL, NULL, &timeout); - if (sel < 0) + // Block for up to 1 s waiting for activity on any socket. A shutdown wakes this immediately by + // connecting to the listen socket (see shutdown()). +#ifdef _WIN32 + int ready = ::WSAPoll(pollfds.data(), static_cast(pollfds.size()), 1000); +#else + int ready = ::poll(pollfds.data(), pollfds.size(), 1000); +#endif + if (ready < 0) { - URCL_LOG_ERROR("select() failed. Shutting down socket event handler."); + URCL_LOG_ERROR("poll() failed. Shutting down socket event handler."); keep_running_ = false; return; } - if (!keep_running_ || sel == 0) + if (!keep_running_ || ready == 0) { return; } - if (FD_ISSET(listen_fd_, &tempfds_)) + if (pollfds[0].revents & POLLIN) { URCL_LOG_DEBUG("Activity on listen FD %d", (int)listen_fd_); handleConnect(); @@ -297,15 +288,13 @@ void TCPServer::spin() std::vector disconnected_clients; std::vector client_fds_with_activity; + // pollfds[0] is the listen socket; client entries start at index 1. + for (size_t i = 1; i < pollfds.size(); ++i) { - std::lock_guard lk(clients_mutex_); - for (const auto& client_fd : client_fds_) + if (pollfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - if (FD_ISSET(client_fd, &tempfds_)) - { - URCL_LOG_DEBUG("Activity on client FD %d", (int)client_fd); - client_fds_with_activity.push_back(client_fd); - } + URCL_LOG_DEBUG("Activity on client FD %d", (int)pollfds[i].fd); + client_fds_with_activity.push_back(static_cast(pollfds[i].fd)); } } // We handle client activity outside the clients_mutex_ lock to avoid holding it during potentially slow I/O and @@ -331,7 +320,6 @@ void TCPServer::handleDisconnect(const socket_t fd) { std::lock_guard lk(clients_mutex_); ur_close(fd); - FD_CLR(fd, &masterfds_); for (size_t i = 0; i < client_fds_.size(); ++i) { @@ -341,15 +329,6 @@ void TCPServer::handleDisconnect(const socket_t fd) break; } } - - maxfd_ = listen_fd_; - for (const auto& client_fd : client_fds_) - { - if (client_fd > maxfd_) - { - maxfd_ = client_fd; - } - } } { diff --git a/tests/test_tcp_server.cpp b/tests/test_tcp_server.cpp index 08c8c4fb8..60412826a 100644 --- a/tests/test_tcp_server.cpp +++ b/tests/test_tcp_server.cpp @@ -34,8 +34,14 @@ #include #include #include +#include #include #include +#ifndef _WIN32 +# include +# include +# include +#endif #include "test_utils.h" #include @@ -497,6 +503,134 @@ TEST_F(TCPServerTest, shutdown_during_active_writes) writer.join(); } +// Verifies that the server receives data from many clients that all send simultaneously. This +// exercises the poll() revents loop across many client file descriptors and guards against +// missed read events when several sockets are readable at once. +TEST_F(TCPServerTest, receives_from_many_concurrent_clients) +{ + comm::TCPServer server(0); + + std::mutex mtx; + std::condition_variable cv; + std::atomic message_count{ 0 }; + + server.setMessageCallback([&](const socket_t, char*, int) { + message_count.fetch_add(1); + std::lock_guard lk(mtx); + cv.notify_all(); + }); + server.start(); + +#ifdef _WIN32 + // Windows allows a maximum of 64 sockets per process by default. + constexpr int num_clients = 50; +#else + constexpr int num_clients = 100; +#endif + + std::vector> clients; + for (int i = 0; i < num_clients; ++i) + { + clients.push_back(std::make_unique(server.getPort())); + } + + // Every client sends a single message concurrently. + std::vector senders; + for (auto& client : clients) + { + senders.emplace_back([&client]() { client->send("ping\n"); }); + } + for (auto& t : senders) + { + t.join(); + } + + // The server's poll() loop must observe activity on every client FD and deliver all messages. + std::unique_lock lk(mtx); + EXPECT_TRUE(cv.wait_for(lk, std::chrono::seconds(5), [&]() { return message_count.load() >= num_clients; })); + EXPECT_EQ(message_count.load(), num_clients); +} + +#ifndef _WIN32 +// Regression test for the FD_SETSIZE limitation of select(): a client whose accepted socket file +// descriptor number is >= FD_SETSIZE (1024) must still be serviced normally. This is the exact +// scenario that occurs when the hosting process (e.g. a JVM) holds many file descriptors. The old +// select()-based implementation rejected/crashed on such descriptors; poll() handles them. +TEST_F(TCPServerTest, services_client_with_high_fd_number) +{ + // Make sure we are allowed to open more than FD_SETSIZE descriptors; raise the soft limit if + // needed and skip the test if the hard limit does not allow it. + struct rlimit rl; + ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &rl), 0); + const rlim_t needed = static_cast(FD_SETSIZE) + 64; + if (rl.rlim_cur < needed) + { + rl.rlim_cur = std::min(needed, rl.rlim_max); + if (setrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur < needed) + { + GTEST_SKIP() << "Cannot raise RLIMIT_NOFILE above FD_SETSIZE; skipping high-fd test."; + } + } + + // Consume the low-numbered descriptors so that subsequently created sockets are assigned fd + // numbers beyond FD_SETSIZE. + std::vector fd_hogs; + while (true) + { + int fd = ::open("/dev/null", O_RDONLY); + if (fd < 0) + { + break; + } + fd_hogs.push_back(fd); + if (fd > static_cast(FD_SETSIZE) + 8) + { + break; + } + } + const bool pushed_past_limit = !fd_hogs.empty() && fd_hogs.back() > static_cast(FD_SETSIZE); + if (!pushed_past_limit) + { + for (int fd : fd_hogs) + { + ::close(fd); + } + GTEST_SKIP() << "Could not allocate descriptors beyond FD_SETSIZE; skipping."; + } + + TestableTcpServer server(port_); + server.start(); + + Client client(port_); + EXPECT_TRUE(server.waitForConnectionCallback(2000)); + + // The server-side accepted client FD should exceed FD_SETSIZE -- the case that breaks select(). + auto client_fds = server.getClientFDs(); + ASSERT_FALSE(client_fds.empty()); + EXPECT_GT(client_fds.back(), static_cast(FD_SETSIZE)); + + // Data must flow both ways on the high-numbered descriptor. + const std::string message = "high fd message\n"; + client.send(message); + EXPECT_TRUE(server.waitForMessageCallback(2000)); + EXPECT_EQ(server.getReceivedMessage(), message); + + size_t written; + const auto* data = reinterpret_cast(message.c_str()); + ASSERT_TRUE(server.write(data, message.size(), written)); + EXPECT_EQ(client.recv(), message); + + // Disconnect must also be detected on the high-numbered descriptor. + client.close(); + EXPECT_TRUE(server.waitForDisconnectionCallback(2000)); + + for (int fd : fd_hogs) + { + ::close(fd); + } +} +#endif + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); From 763e77afb1fa00a58654a7bd4aa91b1267d65767 Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Tue, 16 Jun 2026 07:44:17 +0200 Subject: [PATCH 8/9] test(PrimaryClient): add regression test for stop() teardown during stuck reconnect Covers the second symptom of issue #368: PrimaryClient::stop() (the implementation behind UrDriver::stopPrimaryClientCommunication()) must return promptly when the producer thread is stuck in its reconnect loop against an unreachable robot, instead of blocking on the pipeline join. Also asserts the stop()/start() restart path reconnects, verifying the sticky cancellation flag is cleared via clearStop() on (re)start. --- tests/test_primary_client_reconnect.cpp | 65 +++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/test_primary_client_reconnect.cpp b/tests/test_primary_client_reconnect.cpp index 862759946..e65b57b31 100644 --- a/tests/test_primary_client_reconnect.cpp +++ b/tests/test_primary_client_reconnect.cpp @@ -117,6 +117,71 @@ TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_threa << " ms — the producer reconnect thread was not aborted by requestStop()"; } +// Regression test for the SECOND symptom reported in issue #368: calling +// PrimaryClient::stop() (the implementation of UrDriver::stopPrimaryClientCommunication()) +// hangs when the producer thread is stuck in its reconnect loop against an unreachable +// robot. +// +// This is distinct from the destructor test above: stop() is a restartable operation, so +// besides asserting that it returns promptly it must also leave the client in a state where +// a subsequent start() can reconnect. That exercises the clearStop() reuse path in +// URProducer::setupProducer()/PrimaryClient::reconnectStream() — a sticky-flag regression +// there would not hang teardown but would silently prevent the client from ever reconnecting +// after a stop(). +TEST(PrimaryClientReconnectTest, stop_not_blocked_by_stuck_reconnect_thread) +{ + comm::INotifier notifier; + + auto server = std::make_unique(primary_interface::UR_PRIMARY_PORT); + auto client = std::make_unique("127.0.0.1", notifier); + + // Unlimited reconnect attempts with a large reconnection time: if the fix is + // absent, the producer's reconnect path keeps stop()'s pipeline join blocked. + const std::chrono::milliseconds large_reconnect_timeout(5000); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; + + // Drop the server. The producer's read() fails, the socket transitions to + // SocketState::Disconnected, and the producer enters its reconnect loop. + server.reset(); + + // Give the producer time to detect the drop and reach its reconnect sleep + // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + + // stop() must return quickly: requestStop() aborts the producer's connect attempt/back-off, + // so the pipeline join completes well under 2 s. Without the fix this blocks for at least the + // reconnect timeout (and indefinitely with unlimited retries against a dead port). Run it on a + // worker with a watchdog so a regression fails fast with a clear message instead of hanging the + // test binary (the CTest TIMEOUT then reaps it). + std::packaged_task stop_task([&client]() { client->stop(); }); + auto stop_future = stop_task.get_future(); + std::thread stop_thread(std::move(stop_task)); + + const auto t0 = std::chrono::steady_clock::now(); + if (stop_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + stop_thread.detach(); + FAIL() << "PrimaryClient::stop() did not return within 5 s — the producer reconnect thread was not aborted by " + "requestStop()"; + } + stop_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "PrimaryClient::stop() blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — the producer reconnect thread was not aborted by requestStop()"; + + // Restart-reuse check: bring up a fresh server and start() again. This must reconnect, + // proving that stop()'s sticky cancellation flag was cleared via clearStop() on restart. + auto server2 = std::make_unique(primary_interface::UR_PRIMARY_PORT); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + EXPECT_TRUE(server2->waitForClient(std::chrono::seconds(3))) << "PrimaryClient did not reconnect after " + "stop()/start() — the cancellation flag set by " + "stop() was not cleared " + "via clearStop() on restart"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); From 9025946a393ef446e1690bada13b9f538b657da4 Mon Sep 17 00:00:00 2001 From: urrsk <41109954+urrsk@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:34:27 +0200 Subject: [PATCH 9/9] refactor(TCPSocket): express stop via SocketState instead of a boolean Replace the stop_requested_ flag and requestStop()/clearStop() API with an enriched SocketState machine (Connecting/Connected/Reconnecting/LostConnection/ Disconnecting/Disconnected). The public lifecycle is now just connect() and disconnect(): connect() implicitly clears a prior deliberate stop, so callers no longer have to remember a separate clear step. The deliberate-stop set {Disconnecting, Disconnected} is sticky and is never overwritten by the connect/retry loop (setup()) or close(), and is cleared only by an explicit connect(). The in-loop auto-reconnect (reconnect()) never clears it, keeping ~PrimaryClient()/~RTDEClient() teardown race-free (the original #368 fix). The transient-drop state is renamed Disconnected -> LostConnection; SocketState::Disconnected is repurposed for the deliberate stop. Update URStream, URProducer, PrimaryClient and RTDEClient to the new API, and adjust the affected unit tests. --- include/ur_client_library/comm/producer.h | 13 +-- include/ur_client_library/comm/stream.h | 32 ++++- include/ur_client_library/comm/tcp_socket.h | 90 +++++++++----- src/comm/tcp_socket.cpp | 123 ++++++++++++++------ src/primary/primary_client.cpp | 26 ++--- src/rtde/rtde_client.cpp | 24 ++-- tests/test_primary_client_reconnect.cpp | 47 ++++---- tests/test_rtde_client.cpp | 10 +- tests/test_tcp_server.cpp | 2 +- tests/test_tcp_socket.cpp | 29 +++-- 10 files changed, 253 insertions(+), 143 deletions(-) diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index 7fe0e634d..725d586a1 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -74,7 +74,7 @@ class URProducer : public IProducer continue; } - if (stream_.closed()) + if (stream_.closed() || stream_.stopRequested()) return false; if (on_reconnect_cb_) @@ -92,16 +92,16 @@ class URProducer : public IProducer // to 120 s while this thread sleeps here. const auto sleep_slice = std::chrono::milliseconds(100); const auto sleep_total = std::chrono::duration_cast(timeout_); - for (auto slept = std::chrono::milliseconds(0); slept < sleep_total && running_ && !stream_.closed(); - slept += sleep_slice) + for (auto slept = std::chrono::milliseconds(0); + slept < sleep_total && running_ && !stream_.closed() && !stream_.stopRequested(); slept += sleep_slice) { std::this_thread::sleep_for(sleep_slice); } - if (!running_ || stream_.closed()) + if (!running_ || stream_.closed() || stream_.stopRequested()) return false; - if (stream_.connect()) + if (stream_.reconnect()) continue; auto next = timeout_ * 2; @@ -133,9 +133,6 @@ class URProducer : public IProducer void setupProducer(const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) override { - // Clear any cancellation request left over from a previous teardown so the stream can be - // (re)used. Safe here because this runs on the controlling thread before the producer loop. - stream_.clearStop(); timeval tv; tv.tv_sec = 1; tv.tv_usec = 0; diff --git a/include/ur_client_library/comm/stream.h b/include/ur_client_library/comm/stream.h index 8a573492e..67e8a13ff 100644 --- a/include/ur_client_library/comm/stream.h +++ b/include/ur_client_library/comm/stream.h @@ -63,16 +63,32 @@ class URStream : public TCPSocket bool connect(const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) { - return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time); + return TCPSocket::connect(host_, port_, max_num_tries, reconnection_time); } /*! - * \brief Disconnects from the configured socket. + * \brief Re-establishes the connection after an unexpected drop, without clearing a deliberate + * disconnect(). Used by the automatic reconnect path. + * + * \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0. + * \param reconnection_time time in between connection attempts to the server + * + * \returns True on success, false if it could not reconnect or a deliberate disconnect() is in + * effect + */ + bool reconnect(const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) + { + return TCPSocket::reconnect(host_, port_, max_num_tries, reconnection_time); + } + + /*! + * \brief Deliberately disconnects from the configured socket, leaving it ready to connect again. */ void disconnect() { URCL_LOG_DEBUG("Disconnecting from %s:%d", host_.c_str(), port_); - TCPSocket::close(); + TCPSocket::disconnect(); } /*! @@ -83,6 +99,16 @@ class URStream : public TCPSocket return getState() == SocketState::Closed; } + /*! + * \brief Returns whether a deliberate disconnect() is in progress or in effect (the socket will + * not auto-reconnect until connect() is called again). + */ + bool stopRequested() + { + const SocketState s = getState(); + return s == SocketState::Disconnecting || s == SocketState::Disconnected; + } + /*! * \brief Reads a full UR package out of a socket. For this, it looks into the package and reads * the byte length from the socket directly. It returns as soon as all bytes for the package are diff --git a/include/ur_client_library/comm/tcp_socket.h b/include/ur_client_library/comm/tcp_socket.h index d92f82dd4..00e88810b 100644 --- a/include/ur_client_library/comm/tcp_socket.h +++ b/include/ur_client_library/comm/tcp_socket.h @@ -36,10 +36,14 @@ namespace comm */ enum class SocketState { - Invalid, ///< Socket is initialized or setup failed - Connected, ///< Socket is connected and ready to use - Disconnected, ///< Socket is disconnected and cannot be used - Closed ///< Connection to socket got closed + Invalid, ///< Socket is initialized but was never connected + Connecting, ///< A first-time connect() attempt is in progress + Connected, ///< Socket is connected and ready to use + LostConnection, ///< Connection dropped unexpectedly; auto-reconnect is expected to pick it up + Reconnecting, ///< An automatic reconnect attempt (after a drop) is in progress + Disconnecting, ///< A deliberate disconnect() is in progress + Disconnected, ///< Deliberately disconnected; will NOT auto-reconnect until connect() is called + Closed ///< Neutral low-level close (clearable by a subsequent (re)connect) }; /*! @@ -53,18 +57,25 @@ class TCPSocket std::chrono::milliseconds reconnection_time_; bool reconnection_time_modified_deprecated_ = false; - // Cancellation token used to abort a connection attempt that is currently in - // progress (either waiting inside connect() or sleeping between attempts). - // This is intentionally orthogonal to state_: a SocketState::Closed left over - // from a deliberate close()+connect() reconnect must NOT abort a fresh - // attempt, whereas requestStop() (used at teardown) must abort reliably and - // cannot be clobbered by setup()'s internal state_ resets. - std::atomic stop_requested_{ false }; - void setupOptions(); + // True while a deliberate disconnect() is in progress or has completed (the "deliberate-stop + // set"). The connect/retry machinery checks this to abort, and never overwrites these states, + // so a teardown disconnect() that races a reconnect attempt is observed reliably. + bool isStopRequested() const + { + const SocketState s = state_.load(); + return s == SocketState::Disconnecting || s == SocketState::Disconnected; + } + + // Atomically moves state_ to `desired`, unless a deliberate disconnect() (Disconnecting or + // Disconnected) is in effect. Returns true if the state was set, false if a deliberate stop is + // active (in which case state_ is left untouched). This is how the connect/retry machinery + // updates its in-progress state without ever clobbering a teardown signal. + bool setStateUnlessStopRequested(SocketState desired); + // Performs an interruptible, non-blocking connect on an already-created socket. - // Polls in short slices so that a concurrent requestStop() aborts the attempt + // Polls in short slices so that a concurrent disconnect() aborts the attempt // promptly on all platforms (POSIX close() of a blocked connect() is reliable, // Winsock's is not). Restores blocking mode on success. bool openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len); @@ -78,6 +89,18 @@ class TCPSocket bool setup(const std::string& host, const int port, const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); + /*! + * \brief Re-establishes a connection after an unexpected drop, without clearing a deliberate + * disconnect(). + * + * Used by the automatic reconnect path (e.g. the producer loop). If a deliberate disconnect() + * is in progress or has completed, this returns false immediately instead of reconnecting, so a + * concurrent teardown is never undone. Otherwise behaves like setup() but marks the socket as + * Reconnecting while the attempt is in progress. + */ + bool reconnect(const std::string& host, const int port, const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); + std::unique_ptr recv_timeout_; public: @@ -147,29 +170,42 @@ class TCPSocket bool write(const uint8_t* buf, const size_t buf_len, size_t& written); /*! - * \brief Closes the connection to the socket. + * \brief Establishes a connection to the configured host/port. + * + * This is the explicit (re)connect entry point. It clears any prior deliberate disconnect() + * (moving the socket to Connecting) and then attempts to connect, retrying up to max_num_tries + * times (unlimited when 0). Call this on the controlling thread; the automatic reconnect path + * uses the internal reconnect() instead. + * + * \param host Host to connect to + * \param port Port to connect to + * \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0. + * \param reconnection_time Time between connection attempts + * + * \returns True on success, false if the connection could not be established or was aborted by + * a concurrent disconnect() */ - void close(); + bool connect(const std::string& host, const int port, const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); /*! - * \brief Requests that any in-progress (or future) connection attempt be aborted. + * \brief Deliberately disconnects the socket and leaves it ready to connect() again. * - * Sets a sticky cancellation flag and closes the socket. A reconnect thread that is - * waiting inside setup() (either in connect() or in the between-attempt back-off) returns - * promptly instead of blocking until the reconnection timeout expires. Use this at teardown - * (e.g. from a destructor) before joining a reconnect thread. The flag stays set until - * clearStop() is called, so it cannot be lost by a racing internal state reset. + * Moves the socket into the deliberate-stop set (Disconnecting then Disconnected) and closes the + * underlying file descriptor. Any connect/reconnect attempt currently in progress (blocked in a + * connect or sleeping between attempts) aborts promptly, and the automatic reconnect path will + * not reconnect until connect() is called again. Use this at teardown (e.g. from a destructor) + * before joining a reconnect thread. */ - void requestStop(); + void disconnect(); /*! - * \brief Clears the cancellation flag set by requestStop(). + * \brief Closes the connection to the socket. * - * Must be called on a controlled (re)start before attempting to connect again, so a socket - * that was previously stopped can be reused. Never call this concurrently with a reconnect - * thread. + * Neutral low-level close. Unlike disconnect(), it does not prevent a subsequent automatic + * reconnect, and it never downgrades a deliberate disconnect() that is already in effect. */ - void clearStop(); + void close(); /*! * \brief Setup Receive timeout used for this socket. diff --git a/src/comm/tcp_socket.cpp b/src/comm/tcp_socket.cpp index 9878daffb..ea37c267c 100644 --- a/src/comm/tcp_socket.cpp +++ b/src/comm/tcp_socket.cpp @@ -42,7 +42,7 @@ namespace comm namespace { // Time slice used while waiting for a non-blocking connect to resolve. Kept short so a -// concurrent requestStop() aborts the wait promptly even if closing the socket does not +// concurrent disconnect() aborts the wait promptly even if closing the socket does not // itself wake the wait (as is the case on Windows). constexpr int CONNECT_POLL_SLICE_MS = 100; @@ -152,17 +152,17 @@ bool TCPSocket::openInterruptible(socket_t socket_fd, struct sockaddr* address, else if (connectInProgress()) { // Poll in short slices until the connect resolves, the OS connect timeout expires, - // or a concurrent requestStop() asks us to abort. + // or a concurrent disconnect() asks us to abort. while (true) { - if (stop_requested_) + if (isStopRequested()) { return false; } int ready = waitForSocketWritable(socket_fd, CONNECT_POLL_SLICE_MS); if (ready < 0) { - // poll() error (e.g. the fd was closed by requestStop()). + // poll() error (e.g. the fd was closed by disconnect()). return false; } if (ready == 0) @@ -208,18 +208,22 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ reconnection_time_resolved = reconnection_time_; } - // Honor a pending stop before doing anything else (e.g. a connect attempt issued just after - // requestStop()). Checked before touching state_ so the cancellation cannot be lost. - if (stop_requested_) + // Honor a deliberate disconnect() before doing anything else. Checked first so the + // cancellation cannot be lost. + if (isStopRequested()) return false; if (state_ == SocketState::Connected) return false; - // Clear any leftover Closed/Disconnected state from a previous (deliberate) disconnect() so - // it does not interfere with this fresh attempt. Cancellation is tracked via stop_requested_, - // not via state_, so this reset can no longer erase a teardown signal. - state_ = SocketState::Invalid; + // The in-progress state we hold while (re)trying. connect() pre-sets Reconnecting via + // reconnect(); everything else (including direct setup() callers) is treated as a first connect. + const SocketState progress = + (state_ == SocketState::Reconnecting) ? SocketState::Reconnecting : SocketState::Connecting; + + // Move to the in-progress state, but never clobber a deliberate disconnect() that races us. + if (!setStateUnlessStopRequested(progress)) + return false; URCL_LOG_DEBUG("Setting up connection: %s:%d", host.c_str(), port); @@ -239,7 +243,7 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ bool connected = false; while (!connected) { - if (stop_requested_) + if (isStopRequested()) return false; if (getaddrinfo(host_name, service.c_str(), &hints, &result) != 0) @@ -258,7 +262,7 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ break; } - if (stop_requested_) + if (isStopRequested()) { freeaddrinfo(result); return false; @@ -269,8 +273,9 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ if (!connected) { - state_ = SocketState::Invalid; - if (stop_requested_) + // Re-assert the in-progress state for the next attempt, but never clobber a deliberate + // disconnect() that raced us. + if (!setStateUnlessStopRequested(progress)) { return false; } @@ -287,15 +292,15 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ << std::chrono::duration_cast>(reconnection_time_resolved).count() << " seconds."; URCL_LOG_ERROR("%s", ss.str().c_str()); - // Sleep in short slices so that a concurrent requestStop() (e.g. from ~RTDEClient or + // Sleep in short slices so that a concurrent disconnect() (e.g. from ~RTDEClient or // ~PrimaryClient before joining the reconnect thread) can interrupt the back-off promptly. const auto sleep_slice = std::chrono::milliseconds(100); - for (auto slept = std::chrono::milliseconds(0); slept < reconnection_time_resolved && !stop_requested_; + for (auto slept = std::chrono::milliseconds(0); slept < reconnection_time_resolved && !isStopRequested(); slept += sleep_slice) { std::this_thread::sleep_for(sleep_slice); } - if (stop_requested_) + if (isStopRequested()) { return false; } @@ -303,32 +308,84 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ } } setupOptions(); - state_ = SocketState::Connected; + // Mark Connected only if no deliberate disconnect() slipped in while we were finishing up; a late + // disconnect() must win so we do not advertise a usable socket that was just torn down. + SocketState expected = progress; + if (!state_.compare_exchange_strong(expected, SocketState::Connected)) + { + close(); + return false; + } URCL_LOG_DEBUG("Connection established for %s:%d", host.c_str(), port); return connected; } +bool TCPSocket::setStateUnlessStopRequested(SocketState desired) +{ + // Lock-free compare-and-swap: move state_ to `desired` unless a deliberate disconnect() + // (Disconnecting/Disconnected) is in effect. This is not an unbounded spin: each iteration + // either succeeds, or observes that another thread changed state_ and re-evaluates. We use + // compare_exchange_strong so there are no spurious retries; the loop can only re-iterate when a + // concurrent writer genuinely changed state_, and the only such writers (a racing disconnect(), + // or a paired close()) make a bounded number of writes, so it terminates promptly. + SocketState cur = state_.load(); + while (cur != SocketState::Disconnecting && cur != SocketState::Disconnected) + { + if (state_.compare_exchange_strong(cur, desired)) + { + return true; // successfully moved to `desired` + } + // CAS failed: `cur` now holds the value another thread wrote; loop to re-check the stop set. + } + return false; // a deliberate disconnect() is in effect; state_ left untouched +} + +bool TCPSocket::connect(const std::string& host, const int port, const size_t max_num_tries, + const std::chrono::milliseconds reconnection_time) +{ + // Explicit (re)connect: clear any prior deliberate disconnect() by moving to Connecting. This is + // the only place the deliberate-stop set is cleared. Safe because connect() runs on the + // controlling thread when no reconnect thread is concurrently active. + state_ = SocketState::Connecting; + return setup(host, port, max_num_tries, reconnection_time); +} + +bool TCPSocket::reconnect(const std::string& host, const int port, const size_t max_num_tries, + const std::chrono::milliseconds reconnection_time) +{ + // Automatic reconnect: mark Reconnecting, but never clear a deliberate disconnect(). If a stop is + // in effect, refuse instead of reconnecting so a concurrent teardown is not undone. + if (!setStateUnlessStopRequested(SocketState::Reconnecting)) + { + return false; + } + return setup(host, port, max_num_tries, reconnection_time); +} + void TCPSocket::close() { if (socket_fd_ >= 0) { - state_ = SocketState::Closed; + // Neutral close: do not downgrade a deliberate disconnect() (Disconnecting/Disconnected) back + // to a reconnectable state. + setStateUnlessStopRequested(SocketState::Closed); ::ur_close(socket_fd_); socket_fd_ = INVALID_SOCKET; } } -void TCPSocket::requestStop() +void TCPSocket::disconnect() { - // Set the flag before closing so that a reconnect thread observing the closed socket is - // guaranteed to also see the cancellation request (and therefore stop instead of retrying). - stop_requested_ = true; - close(); -} - -void TCPSocket::clearStop() -{ - stop_requested_ = false; + // Enter the deliberate-stop set before closing so a reconnect thread observing the closed socket + // is guaranteed to also see the stop (and therefore abort instead of retrying). Closing the fd + // unblocks any in-progress connect/poll; the resting state is Disconnected. + state_ = SocketState::Disconnecting; + if (socket_fd_ >= 0) + { + ::ur_close(socket_fd_); + socket_fd_ = INVALID_SOCKET; + } + state_ = SocketState::Disconnected; } std::string TCPSocket::getIP() const @@ -372,7 +429,7 @@ bool TCPSocket::read(uint8_t* buf, const size_t buf_len, size_t& read) if (res == 0) { - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; return false; } else if (res < 0) @@ -382,13 +439,13 @@ bool TCPSocket::read(uint8_t* buf, const size_t buf_len, size_t& read) int code = ::WSAGetLastError(); if (code != WSAETIMEDOUT && code != WSAEWOULDBLOCK) { - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; } #else if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { // any permanent error should be detected early - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; } #endif return false; diff --git a/src/primary/primary_client.cpp b/src/primary/primary_client.cpp index 9be0e8af2..3a1b6c974 100644 --- a/src/primary/primary_client.cpp +++ b/src/primary/primary_client.cpp @@ -67,12 +67,12 @@ PrimaryClient::PrimaryClient(const std::string& robot_ip, [[maybe_unused]] comm: PrimaryClient::~PrimaryClient() { URCL_LOG_INFO("Stopping primary client pipeline"); - // Request a stop on the stream BEFORE stopping (joining) the pipeline. The pipeline's - // producer thread may be sleeping inside its reconnect backoff or blocked in - // TCPSocket::setup(); requestStop() sets the cancellation flag and closes the socket so - // both paths abort within one poll slice and pipeline_->stop()'s join returns promptly - // instead of blocking until the (potentially unbounded) reconnect timeout expires. - stream_.requestStop(); + // Disconnect the stream BEFORE stopping (joining) the pipeline. The pipeline's producer thread + // may be sleeping inside its reconnect backoff or blocked in TCPSocket::setup(); disconnect() + // moves the socket into the deliberate-stop state and closes it so both paths abort within one + // poll slice and pipeline_->stop()'s join returns promptly instead of blocking until the + // (potentially unbounded) reconnect timeout expires. + stream_.disconnect(); pipeline_->stop(); } @@ -85,10 +85,10 @@ void PrimaryClient::start(const size_t max_num_tries, const std::chrono::millise void PrimaryClient::stop() { - // Request a stop on the stream before joining the pipeline so a producer thread stuck in - // its reconnect path is aborted and the join returns promptly (see ~PrimaryClient). A - // subsequent start() clears the flag again via URProducer::setupProducer(). - stream_.requestStop(); + // Disconnect the stream before joining the pipeline so a producer thread stuck in its reconnect + // path is aborted and the join returns promptly (see ~PrimaryClient). A subsequent start() + // reconnects via URProducer::setupProducer(), whose connect() clears the deliberate-stop state. + stream_.disconnect(); pipeline_->stop(); } @@ -551,10 +551,8 @@ bool PrimaryClient::reconnectStream() { URCL_LOG_DEBUG("Closing primary stream..."); stream_.close(); - // A deliberate reconnect must clear any sticky cancellation left by a prior stop()/teardown - // (stream_.requestStop()), otherwise TCPSocket::setup() aborts immediately and the reconnect - // fails. Mirrors URProducer::setupProducer() which clears the flag before (re)connecting. - stream_.clearStop(); + // connect() is the explicit reconnect entry: it clears any deliberate disconnect() left by a + // prior stop()/teardown before re-establishing, so no separate clear step is needed. if (stream_.connect()) { URCL_LOG_DEBUG("Primary stream connected"); diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index b947b3051..8ca3ba943 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -87,11 +87,11 @@ RTDEClient::~RTDEClient() { prod_->setReconnectionCallback(nullptr); stop_reconnection_ = true; - // Request a stop on the stream before joining the reconnect thread. requestStop() sets the - // cancellation flag and closes the socket so that any connect attempt or back-off sleep inside + // Disconnect the stream before joining the reconnect thread. disconnect() moves the socket into + // the deliberate-stop state and closes it so that any connect attempt or back-off sleep inside // TCPSocket::setup() aborts within one poll slice, instead of blocking the destructor for the // full reconnection_timeout (or indefinitely with max_connection_attempts == 0). - stream_.requestStop(); + stream_.disconnect(); disconnect(); if (reconnecting_thread_.joinable()) { @@ -117,10 +117,6 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m max_initialization_attempts_ = max_initialization_attempts; initialization_timeout_ = initialization_timeout; - // Clear any cancellation request from a previous teardown so this (re)initialization can - // connect. Runs on the controlling thread before the reconnect thread can be started. - stream_.clearStop(); - prod_->setReconnectionCallback(nullptr); unsigned int attempts = 0; @@ -517,12 +513,14 @@ bool RTDEClient::setupInputs() void RTDEClient::disconnect() { - // Disconnect unconditionally: TCPSocket::close() (called by stream_.disconnect()) - // and RTDEWriter::stop() are both idempotent. Guarding on client_state_ left - // the stream in SocketState::Connected after a failed negotiateProtocolVersion() - // (which resets client_state_ to UNINITIALIZED), causing the subsequent - // TCPSocket::setup() call to return false immediately due to the Connected check. - stream_.disconnect(); + // Close unconditionally: TCPSocket::close() and RTDEWriter::stop() are both idempotent. Guarding + // on client_state_ left the stream in SocketState::Connected after a failed + // negotiateProtocolVersion() (which resets client_state_ to UNINITIALIZED), causing the + // subsequent TCPSocket::setup() call to return false immediately due to the Connected check. + // Use close() (neutral) rather than disconnect() here: this runs during normal reconnect cycling, + // so the stream must stay reconnectable. A real teardown sets the deliberate-stop state first (in + // ~RTDEClient), which close() will not downgrade. + stream_.close(); writer_.stop(); client_state_ = ClientState::UNINITIALIZED; prod_->stopProducer(); diff --git a/tests/test_primary_client_reconnect.cpp b/tests/test_primary_client_reconnect.cpp index e65b57b31..1032bc19b 100644 --- a/tests/test_primary_client_reconnect.cpp +++ b/tests/test_primary_client_reconnect.cpp @@ -48,23 +48,22 @@ using namespace urcl; // RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread (test_rtde_client.cpp). // // Root cause: when the robot drops the primary connection, TCPSocket::read() -// returns false and leaves the socket in SocketState::Disconnected. URProducer's +// returns false and leaves the socket in SocketState::LostConnection. URProducer's // tryGetImpl() then enters its reconnect path: it sleeps an (exponentially -// growing) backoff and calls stream_.connect(), which retries with no upper +// growing) backoff and calls stream_.reconnect(), which retries with no upper // bound (max_num_tries == 0), sleeping reconnection_time between attempts. If // ~PrimaryClient() simply called pipeline_->stop() (which joins the producer // thread) without first closing the stream, the join would block for the full // reconnect duration — effectively forever for an unreachable robot. // // Fix (two parts): -// 1. ~PrimaryClient()/PrimaryClient::stop() call stream_.requestStop() BEFORE -// joining the pipeline. requestStop() sets a sticky cancellation flag and -// closes the socket. -// 2. TCPSocket::setup() honors that flag both during its (non-blocking) connect +// 1. ~PrimaryClient()/PrimaryClient::stop() call stream_.disconnect() BEFORE +// joining the pipeline. disconnect() moves the socket into the deliberate-stop +// state (Disconnecting/Disconnected) and closes it. +// 2. TCPSocket::setup() honors that state both during its (non-blocking) connect // attempt and during the between-attempt wait, so it aborts within ~100 ms -// regardless of platform (unlike a signal carried only by the socket state, -// which setup()'s internal state resets could race away — the bug that hung -// Windows CI). +// regardless of platform. setup() never overwrites the deliberate-stop state, +// so it cannot be raced away (the bug that hung Windows CI). // Together these abort the producer within ~100 ms of the destructor, so the join — // and therefore the destructor — returns promptly. // @@ -85,14 +84,14 @@ TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_threa ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; // Drop the server. The producer's read() fails, the socket transitions to - // SocketState::Disconnected, and the producer enters its reconnect loop. + // SocketState::LostConnection, and the producer enters its reconnect loop. server.reset(); // Give the producer time to detect the drop and reach its reconnect sleep // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). std::this_thread::sleep_for(std::chrono::milliseconds(1500)); - // The destructor must return quickly: requestStop() aborts the producer's connect + // The destructor must return quickly: disconnect() aborts the producer's connect // attempt/back-off, so the pipeline join completes well under 2 s. Without the fix // this blocks for at least the reconnect timeout (and indefinitely with unlimited // retries against a dead port). Run the destructor on a worker with a watchdog so a @@ -107,14 +106,14 @@ TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_threa { teardown_thread.detach(); FAIL() << "~PrimaryClient() did not return within 5 s — the producer reconnect thread was not aborted by " - "requestStop()"; + "disconnect()"; } teardown_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; EXPECT_LT(elapsed, std::chrono::seconds(2)) << "~PrimaryClient() blocked for " << std::chrono::duration_cast(elapsed).count() - << " ms — the producer reconnect thread was not aborted by requestStop()"; + << " ms — the producer reconnect thread was not aborted by disconnect()"; } // Regression test for the SECOND symptom reported in issue #368: calling @@ -124,10 +123,10 @@ TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_threa // // This is distinct from the destructor test above: stop() is a restartable operation, so // besides asserting that it returns promptly it must also leave the client in a state where -// a subsequent start() can reconnect. That exercises the clearStop() reuse path in -// URProducer::setupProducer()/PrimaryClient::reconnectStream() — a sticky-flag regression -// there would not hang teardown but would silently prevent the client from ever reconnecting -// after a stop(). +// a subsequent start() can reconnect. That exercises the implicit clear of the deliberate-stop +// state by connect() in URProducer::setupProducer()/PrimaryClient::reconnectStream() — a +// regression there would not hang teardown but would silently prevent the client from ever +// reconnecting after a stop(). TEST(PrimaryClientReconnectTest, stop_not_blocked_by_stuck_reconnect_thread) { comm::INotifier notifier; @@ -142,14 +141,14 @@ TEST(PrimaryClientReconnectTest, stop_not_blocked_by_stuck_reconnect_thread) ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; // Drop the server. The producer's read() fails, the socket transitions to - // SocketState::Disconnected, and the producer enters its reconnect loop. + // SocketState::LostConnection, and the producer enters its reconnect loop. server.reset(); // Give the producer time to detect the drop and reach its reconnect sleep // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). std::this_thread::sleep_for(std::chrono::milliseconds(1500)); - // stop() must return quickly: requestStop() aborts the producer's connect attempt/back-off, + // stop() must return quickly: disconnect() aborts the producer's connect attempt/back-off, // so the pipeline join completes well under 2 s. Without the fix this blocks for at least the // reconnect timeout (and indefinitely with unlimited retries against a dead port). Run it on a // worker with a watchdog so a regression fails fast with a clear message instead of hanging the @@ -163,23 +162,23 @@ TEST(PrimaryClientReconnectTest, stop_not_blocked_by_stuck_reconnect_thread) { stop_thread.detach(); FAIL() << "PrimaryClient::stop() did not return within 5 s — the producer reconnect thread was not aborted by " - "requestStop()"; + "disconnect()"; } stop_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; EXPECT_LT(elapsed, std::chrono::seconds(2)) << "PrimaryClient::stop() blocked for " << std::chrono::duration_cast(elapsed).count() - << " ms — the producer reconnect thread was not aborted by requestStop()"; + << " ms — the producer reconnect thread was not aborted by disconnect()"; // Restart-reuse check: bring up a fresh server and start() again. This must reconnect, - // proving that stop()'s sticky cancellation flag was cleared via clearStop() on restart. + // proving that the deliberate-stop state set by stop() was cleared by connect() on restart. auto server2 = std::make_unique(primary_interface::UR_PRIMARY_PORT); ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); EXPECT_TRUE(server2->waitForClient(std::chrono::seconds(3))) << "PrimaryClient did not reconnect after " - "stop()/start() — the cancellation flag set by " + "stop()/start() — the deliberate-stop state set by " "stop() was not cleared " - "via clearStop() on restart"; + "by connect() on restart"; } int main(int argc, char* argv[]) diff --git a/tests/test_rtde_client.cpp b/tests/test_rtde_client.cpp index cab143727..7db708ca0 100644 --- a/tests/test_rtde_client.cpp +++ b/tests/test_rtde_client.cpp @@ -815,8 +815,8 @@ TEST_F(RTDEClientTest, test_initialization) // Regression test for the bug where ~RTDEClient() could block indefinitely when // the reconnect thread was stuck inside TCPSocket::setup(). Fixed by: (1) calling -// stream_.requestStop() (followed by disconnect()) before joining reconnecting_thread_ -// in ~RTDEClient(), and (2) making TCPSocket::setup() abort on the sticky stop flag, +// stream_.disconnect() (followed by RTDEClient::disconnect()) before joining reconnecting_thread_ +// in ~RTDEClient(), and (2) making TCPSocket::setup() abort on the deliberate-stop state, // both during the (non-blocking) connect attempt and during the between-attempt wait. // // See also TCPSocketTest.setup_interruptible_by_close and @@ -875,7 +875,7 @@ TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) // Give the reconnect thread time to reach the wait inside TCPSocket::setup(). std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // The destructor must return quickly: requestStop() aborts setup()'s connect/wait, + // The destructor must return quickly: disconnect() aborts setup()'s connect/wait, // so the join completes in well under 2 s. Without the fix this would block for // >= large_reconnect_timeout (5 s), or forever with unlimited attempts. // Run the destructor on a worker with a watchdog so a regression fails fast with a @@ -888,14 +888,14 @@ TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) if (teardown_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { teardown_thread.detach(); - FAIL() << "~RTDEClient() did not return within 5 s — reconnect thread was not aborted by requestStop()"; + FAIL() << "~RTDEClient() did not return within 5 s — reconnect thread was not aborted by disconnect()"; } teardown_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; EXPECT_LT(elapsed, std::chrono::seconds(2)) << "RTDEClient destructor blocked for " << std::chrono::duration_cast(elapsed).count() - << " ms — reconnect thread was not aborted by requestStop()"; + << " ms — reconnect thread was not aborted by disconnect()"; } int main(int argc, char* argv[]) diff --git a/tests/test_tcp_server.cpp b/tests/test_tcp_server.cpp index 60412826a..e136382af 100644 --- a/tests/test_tcp_server.cpp +++ b/tests/test_tcp_server.cpp @@ -275,7 +275,7 @@ TEST_F(TCPServerTest, check_shutting_down_server_while_listening) } EXPECT_FALSE(read_success); // If the read just would have timeouted, the client state would still be connected. - EXPECT_EQ(client.getState(), comm::SocketState::Disconnected); + EXPECT_EQ(client.getState(), comm::SocketState::LostConnection); } TEST_F(TCPServerTest, double_shutdown) diff --git a/tests/test_tcp_socket.cpp b/tests/test_tcp_socket.cpp index 29baba6be..3c03d6da5 100644 --- a/tests/test_tcp_socket.cpp +++ b/tests/test_tcp_socket.cpp @@ -149,8 +149,8 @@ TEST_F(TCPSocketTest, setup_client_before_server) // Make sure that the client has tried to connect to the server, before creating the server std::this_thread::sleep_for(std::chrono::seconds(1)); - // Client state should be invalid as long as the server is not available - comm::SocketState expected_state = comm::SocketState::Invalid; + // Client state should be Connecting while it keeps retrying and the server is not available + comm::SocketState expected_state = comm::SocketState::Connecting; comm::SocketState actual_state = client_->getState(); EXPECT_EQ(toUnderlying(expected_state), toUnderlying(actual_state)); @@ -296,12 +296,11 @@ TEST_F(TCPSocketTest, connect_non_running_robot) // Regression test for the bug where TCPSocket::setup() could block the caller // indefinitely when the wait between retry attempts was not interruptible. // -// setup() is interrupted by requestStop(), which sets a sticky cancellation flag -// and closes the socket. The flag (rather than the transient SocketState::Closed) -// is what makes this race-free: setup() resets state_ internally on every attempt, -// so a teardown signal carried only by the socket state could be lost, whereas the -// dedicated stop flag cannot. This is the path used by ~RTDEClient()/~PrimaryClient() -// before joining their reconnect threads. +// setup() is interrupted by disconnect(), which moves the socket into the deliberate-stop +// state (Disconnecting/Disconnected) and closes it. setup() never overwrites that state and +// aborts as soon as it observes it, so a teardown signal cannot be lost by setup()'s internal +// state updates. This is the path used by ~RTDEClient()/~PrimaryClient() before joining their +// reconnect threads. TEST_F(TCPSocketTest, setup_interruptible_by_close) { // Use a port with no listener so every connect attempt fails immediately, @@ -321,16 +320,16 @@ TEST_F(TCPSocketTest, setup_interruptible_by_close) // Give the thread time to reach the between-attempt wait inside setup(). std::this_thread::sleep_for(std::chrono::milliseconds(300)); - // requestStop() sets the cancellation flag; the sliced wait in setup() detects it - // within 100 ms and setup() returns, allowing the thread to finish. + // disconnect() moves the socket to the deliberate-stop state; the sliced wait in setup() + // detects it within 100 ms and setup() returns, allowing the thread to finish. const auto t0 = std::chrono::steady_clock::now(); - client.requestStop(); + client.disconnect(); setup_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; // Without the fix, elapsed would be >= large_reconnect_timeout (5 s). - EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted by requestStop() within 2 s; " + EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted by disconnect() within 2 s; " "the between-attempt wait is not interruptible"; } @@ -338,7 +337,7 @@ TEST_F(TCPSocketTest, setup_interruptible_by_close) // genuinely unreachable host (no SYN-ACK, no RST) must still be abortable. The // previous fix only made the between-attempt *sleep* interruptible, not the connect // itself, so this case could block for the full OS connect timeout. setup() now uses -// a non-blocking connect polled in short slices, so requestStop() aborts it promptly. +// a non-blocking connect polled in short slices, so disconnect() aborts it promptly. TEST_F(TCPSocketTest, setup_interruptible_during_blocking_connect) { // 10.255.255.1 is in a private range and is (almost) never routable, so connect() @@ -354,7 +353,7 @@ TEST_F(TCPSocketTest, setup_interruptible_during_blocking_connect) std::this_thread::sleep_for(std::chrono::milliseconds(500)); const auto t0 = std::chrono::steady_clock::now(); - client.requestStop(); + client.disconnect(); setup_thread.join(); const auto elapsed = std::chrono::steady_clock::now() - t0; @@ -391,7 +390,7 @@ TEST_F(TCPSocketTest, test_read_on_socket_abruptly_closed) char characters; size_t read_chars = 0; EXPECT_FALSE(client_->read((uint8_t*)&characters, 1, read_chars)); - EXPECT_EQ(client_->getState(), comm::SocketState::Disconnected); + EXPECT_EQ(client_->getState(), comm::SocketState::LostConnection); } int main(int argc, char* argv[])