diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index e509d9017..725d586a1 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -74,7 +74,7 @@ class URProducer : public IProducer continue; } - if (stream_.closed()) + if (stream_.closed() || stream_.stopRequested()) return false; if (on_reconnect_cb_) @@ -85,9 +85,23 @@ class URProducer : public IProducer } URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count()); - std::this_thread::sleep_for(timeout_); + // Sleep in small slices so the producer can be stopped (running_ == false) + // or woken by a stream close (e.g. from a destructor calling stop()) within + // ~100 ms, instead of blocking for the full (exponentially growing) timeout. + // Without this, a thread joining the pipeline at teardown could block for up + // to 120 s while this thread sleeps here. + const auto sleep_slice = std::chrono::milliseconds(100); + const auto sleep_total = std::chrono::duration_cast(timeout_); + for (auto slept = std::chrono::milliseconds(0); + slept < sleep_total && running_ && !stream_.closed() && !stream_.stopRequested(); slept += sleep_slice) + { + std::this_thread::sleep_for(sleep_slice); + } + + if (!running_ || stream_.closed() || stream_.stopRequested()) + return false; - if (stream_.connect()) + if (stream_.reconnect()) continue; auto next = timeout_ * 2; diff --git a/include/ur_client_library/comm/stream.h b/include/ur_client_library/comm/stream.h index 8a573492e..67e8a13ff 100644 --- a/include/ur_client_library/comm/stream.h +++ b/include/ur_client_library/comm/stream.h @@ -63,16 +63,32 @@ class URStream : public TCPSocket bool connect(const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) { - return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time); + return TCPSocket::connect(host_, port_, max_num_tries, reconnection_time); } /*! - * \brief Disconnects from the configured socket. + * \brief Re-establishes the connection after an unexpected drop, without clearing a deliberate + * disconnect(). Used by the automatic reconnect path. + * + * \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0. + * \param reconnection_time time in between connection attempts to the server + * + * \returns True on success, false if it could not reconnect or a deliberate disconnect() is in + * effect + */ + bool reconnect(const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) + { + return TCPSocket::reconnect(host_, port_, max_num_tries, reconnection_time); + } + + /*! + * \brief Deliberately disconnects from the configured socket, leaving it ready to connect again. */ void disconnect() { URCL_LOG_DEBUG("Disconnecting from %s:%d", host_.c_str(), port_); - TCPSocket::close(); + TCPSocket::disconnect(); } /*! @@ -83,6 +99,16 @@ class URStream : public TCPSocket return getState() == SocketState::Closed; } + /*! + * \brief Returns whether a deliberate disconnect() is in progress or in effect (the socket will + * not auto-reconnect until connect() is called again). + */ + bool stopRequested() + { + const SocketState s = getState(); + return s == SocketState::Disconnecting || s == SocketState::Disconnected; + } + /*! * \brief Reads a full UR package out of a socket. For this, it looks into the package and reads * the byte length from the socket directly. It returns as soon as all bytes for the package are diff --git a/include/ur_client_library/comm/tcp_server.h b/include/ur_client_library/comm/tcp_server.h index 465e79073..e1faa0d21 100644 --- a/include/ur_client_library/comm/tcp_server.h +++ b/include/ur_client_library/comm/tcp_server.h @@ -223,10 +223,6 @@ class TCPServer std::atomic listen_fd_; int port_; - socket_t maxfd_; - fd_set masterfds_; - fd_set tempfds_; - uint32_t max_clients_allowed_; std::vector client_fds_; std::mutex clients_mutex_; diff --git a/include/ur_client_library/comm/tcp_socket.h b/include/ur_client_library/comm/tcp_socket.h index 5a1d468c5..00e88810b 100644 --- a/include/ur_client_library/comm/tcp_socket.h +++ b/include/ur_client_library/comm/tcp_socket.h @@ -36,10 +36,14 @@ namespace comm */ enum class SocketState { - Invalid, ///< Socket is initialized or setup failed - Connected, ///< Socket is connected and ready to use - Disconnected, ///< Socket is disconnected and cannot be used - Closed ///< Connection to socket got closed + Invalid, ///< Socket is initialized but was never connected + Connecting, ///< A first-time connect() attempt is in progress + Connected, ///< Socket is connected and ready to use + LostConnection, ///< Connection dropped unexpectedly; auto-reconnect is expected to pick it up + Reconnecting, ///< An automatic reconnect attempt (after a drop) is in progress + Disconnecting, ///< A deliberate disconnect() is in progress + Disconnected, ///< Deliberately disconnected; will NOT auto-reconnect until connect() is called + Closed ///< Neutral low-level close (clearable by a subsequent (re)connect) }; /*! @@ -55,6 +59,27 @@ class TCPSocket void setupOptions(); + // True while a deliberate disconnect() is in progress or has completed (the "deliberate-stop + // set"). The connect/retry machinery checks this to abort, and never overwrites these states, + // so a teardown disconnect() that races a reconnect attempt is observed reliably. + bool isStopRequested() const + { + const SocketState s = state_.load(); + return s == SocketState::Disconnecting || s == SocketState::Disconnected; + } + + // Atomically moves state_ to `desired`, unless a deliberate disconnect() (Disconnecting or + // Disconnected) is in effect. Returns true if the state was set, false if a deliberate stop is + // active (in which case state_ is left untouched). This is how the connect/retry machinery + // updates its in-progress state without ever clobbering a teardown signal. + bool setStateUnlessStopRequested(SocketState desired); + + // Performs an interruptible, non-blocking connect on an already-created socket. + // Polls in short slices so that a concurrent disconnect() aborts the attempt + // promptly on all platforms (POSIX close() of a blocked connect() is reliable, + // Winsock's is not). Restores blocking mode on success. + bool openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len); + protected: static bool open(socket_t socket_fd, struct sockaddr* address, size_t address_len) { @@ -64,6 +89,18 @@ class TCPSocket bool setup(const std::string& host, const int port, const size_t max_num_tries = 0, const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); + /*! + * \brief Re-establishes a connection after an unexpected drop, without clearing a deliberate + * disconnect(). + * + * Used by the automatic reconnect path (e.g. the producer loop). If a deliberate disconnect() + * is in progress or has completed, this returns false immediately instead of reconnecting, so a + * concurrent teardown is never undone. Otherwise behaves like setup() but marks the socket as + * Reconnecting while the attempt is in progress. + */ + bool reconnect(const std::string& host, const int port, const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); + std::unique_ptr recv_timeout_; public: @@ -132,8 +169,41 @@ class TCPSocket */ bool write(const uint8_t* buf, const size_t buf_len, size_t& written); + /*! + * \brief Establishes a connection to the configured host/port. + * + * This is the explicit (re)connect entry point. It clears any prior deliberate disconnect() + * (moving the socket to Connecting) and then attempts to connect, retrying up to max_num_tries + * times (unlimited when 0). Call this on the controlling thread; the automatic reconnect path + * uses the internal reconnect() instead. + * + * \param host Host to connect to + * \param port Port to connect to + * \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0. + * \param reconnection_time Time between connection attempts + * + * \returns True on success, false if the connection could not be established or was aborted by + * a concurrent disconnect() + */ + bool connect(const std::string& host, const int port, const size_t max_num_tries = 0, + const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME); + + /*! + * \brief Deliberately disconnects the socket and leaves it ready to connect() again. + * + * Moves the socket into the deliberate-stop set (Disconnecting then Disconnected) and closes the + * underlying file descriptor. Any connect/reconnect attempt currently in progress (blocked in a + * connect or sleeping between attempts) aborts promptly, and the automatic reconnect path will + * not reconnect until connect() is called again. Use this at teardown (e.g. from a destructor) + * before joining a reconnect thread. + */ + void disconnect(); + /*! * \brief Closes the connection to the socket. + * + * Neutral low-level close. Unlike disconnect(), it does not prevent a subsequent automatic + * reconnect, and it never downgrades a deliberate disconnect() that is already in effect. */ void close(); diff --git a/src/comm/tcp_server.cpp b/src/comm/tcp_server.cpp index 3bb5559d5..55ba827ab 100644 --- a/src/comm/tcp_server.cpp +++ b/src/comm/tcp_server.cpp @@ -34,15 +34,19 @@ #include #include +#include #include "ur_client_library/comm/socket_t.h" #include +#ifndef _WIN32 +# include +#endif namespace urcl { namespace comm { TCPServer::TCPServer(const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) - : port_(port), maxfd_(0), max_clients_allowed_(0) + : port_(port), max_clients_allowed_(0) { #ifdef _WIN32 WSAData data; @@ -74,9 +78,6 @@ void TCPServer::init() ur_setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int)); URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_); - - FD_ZERO(&masterfds_); - FD_ZERO(&tempfds_); } void TCPServer::shutdown() @@ -179,9 +180,6 @@ void TCPServer::bind(const size_t max_num_tries, const std::chrono::milliseconds } while (err == -1 && (connection_counter <= max_num_tries || max_num_tries == 0)); URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_); - - FD_SET(listen_fd_, &masterfds_); - maxfd_ = listen_fd_; } void TCPServer::startListen() @@ -220,21 +218,6 @@ void TCPServer::handleConnect() return; } -#ifdef _WIN32 - bool set_size_exceeded = client_fds_.size() >= FD_SETSIZE - 1; // -1 because listen_fd_ also occupies one - // slot in masterfds_ -#else - bool set_size_exceeded = client_fd >= FD_SETSIZE; // On Unix-like systems, the client FD itself must be less than - // FD_SETSIZE, otherwise it cannot be added to the fd_set. -#endif - - if (set_size_exceeded) - { - URCL_LOG_ERROR("Accepted client FD %d exceeds FD_SETSIZE (%d). Closing connection.", (int)client_fd, FD_SETSIZE); - ur_close(client_fd); - return; - } - bool accepted = false; { @@ -242,11 +225,6 @@ void TCPServer::handleConnect() if (client_fds_.size() < max_clients_allowed_ || max_clients_allowed_ == 0) { client_fds_.push_back(client_fd); - FD_SET(client_fd, &masterfds_); - if (client_fd > maxfd_) - { - maxfd_ = client_fd; - } accepted = true; } else @@ -268,27 +246,40 @@ void TCPServer::handleConnect() void TCPServer::spin() { - tempfds_ = masterfds_; - - timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; + // Build the poll set fresh each iteration from the listen socket plus all currently connected + // clients. poll() is used on both platforms (WSAPoll() on Windows) because it has no + // FD_SETSIZE limit on file descriptor numbers, unlike select(). This matters when the hosting + // process holds many file descriptors (e.g. a JVM), pushing socket FDs past FD_SETSIZE (1024). + std::vector pollfds; + pollfds.push_back({ static_cast(listen_fd_), POLLIN, 0 }); + { + std::lock_guard lk(clients_mutex_); + for (const auto& client_fd : client_fds_) + { + pollfds.push_back({ client_fd, POLLIN, 0 }); + } + } - // blocks until activity on any socket from tempfds - int sel = select(static_cast(maxfd_ + 1), &tempfds_, NULL, NULL, &timeout); - if (sel < 0) + // Block for up to 1 s waiting for activity on any socket. A shutdown wakes this immediately by + // connecting to the listen socket (see shutdown()). +#ifdef _WIN32 + int ready = ::WSAPoll(pollfds.data(), static_cast(pollfds.size()), 1000); +#else + int ready = ::poll(pollfds.data(), pollfds.size(), 1000); +#endif + if (ready < 0) { - URCL_LOG_ERROR("select() failed. Shutting down socket event handler."); + URCL_LOG_ERROR("poll() failed. Shutting down socket event handler."); keep_running_ = false; return; } - if (!keep_running_ || sel == 0) + if (!keep_running_ || ready == 0) { return; } - if (FD_ISSET(listen_fd_, &tempfds_)) + if (pollfds[0].revents & POLLIN) { URCL_LOG_DEBUG("Activity on listen FD %d", (int)listen_fd_); handleConnect(); @@ -297,15 +288,13 @@ void TCPServer::spin() std::vector disconnected_clients; std::vector client_fds_with_activity; + // pollfds[0] is the listen socket; client entries start at index 1. + for (size_t i = 1; i < pollfds.size(); ++i) { - std::lock_guard lk(clients_mutex_); - for (const auto& client_fd : client_fds_) + if (pollfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - if (FD_ISSET(client_fd, &tempfds_)) - { - URCL_LOG_DEBUG("Activity on client FD %d", (int)client_fd); - client_fds_with_activity.push_back(client_fd); - } + URCL_LOG_DEBUG("Activity on client FD %d", (int)pollfds[i].fd); + client_fds_with_activity.push_back(static_cast(pollfds[i].fd)); } } // We handle client activity outside the clients_mutex_ lock to avoid holding it during potentially slow I/O and @@ -331,7 +320,6 @@ void TCPServer::handleDisconnect(const socket_t fd) { std::lock_guard lk(clients_mutex_); ur_close(fd); - FD_CLR(fd, &masterfds_); for (size_t i = 0; i < client_fds_.size(); ++i) { @@ -341,15 +329,6 @@ void TCPServer::handleDisconnect(const socket_t fd) break; } } - - maxfd_ = listen_fd_; - for (const auto& client_fd : client_fds_) - { - if (client_fd > maxfd_) - { - maxfd_ = client_fd; - } - } } { diff --git a/src/comm/tcp_socket.cpp b/src/comm/tcp_socket.cpp index 6af840e0d..ea37c267c 100644 --- a/src/comm/tcp_socket.cpp +++ b/src/comm/tcp_socket.cpp @@ -27,7 +27,9 @@ #ifndef _WIN32 # include +# include # include +# include #endif #include "ur_client_library/log.h" @@ -37,6 +39,67 @@ namespace urcl { namespace comm { +namespace +{ +// Time slice used while waiting for a non-blocking connect to resolve. Kept short so a +// concurrent disconnect() aborts the wait promptly even if closing the socket does not +// itself wake the wait (as is the case on Windows). +constexpr int CONNECT_POLL_SLICE_MS = 100; + +// Toggle the blocking mode of a socket. Returns true on success. +bool setSocketBlocking(socket_t socket_fd, bool blocking) +{ +#ifdef _WIN32 + u_long mode = blocking ? 0 : 1; + return ::ioctlsocket(socket_fd, FIONBIO, &mode) == 0; +#else + int flags = ::fcntl(socket_fd, F_GETFL, 0); + if (flags < 0) + { + return false; + } + flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); + return ::fcntl(socket_fd, F_SETFL, flags) == 0; +#endif +} + +// True if the last connect() call indicated that the connection is being established +// asynchronously (the expected result for a non-blocking socket). +bool connectInProgress() +{ +#ifdef _WIN32 + return ::WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EINPROGRESS; +#endif +} + +// Waits up to timeout_ms for the socket to become writable (connect resolved). +// Returns >0 if the socket is ready/has an event, 0 on timeout, <0 on error. +// +// poll() is used on both platforms (WSAPoll() on Windows). It avoids select()'s FD_SETSIZE +// limitation entirely and keeps a single mental model. On Windows this relies on the WSAPoll +// connect-failure fix introduced in Windows 10 version 2004 / Windows Server 2019: a failed +// non-blocking connect is reported as (POLLHUP | POLLERR | POLLWRNORM). The caller treats any +// returned event as "connect resolved" and consults SO_ERROR for the actual outcome, so it does +// not depend on which particular revents flag is set. +int waitForSocketWritable(socket_t socket_fd, int timeout_ms) +{ +#ifdef _WIN32 + WSAPOLLFD pfd; + pfd.fd = socket_fd; + pfd.events = POLLWRNORM; // == POLLOUT + pfd.revents = 0; + return ::WSAPoll(&pfd, 1, timeout_ms); +#else + struct pollfd pfd; + pfd.fd = socket_fd; + pfd.events = POLLOUT; + pfd.revents = 0; + return ::poll(&pfd, 1, timeout_ms); +#endif +} +} // namespace TCPSocket::TCPSocket() : socket_fd_(INVALID_SOCKET), state_(SocketState::Invalid), reconnection_time_(std::chrono::seconds(10)) { @@ -72,6 +135,66 @@ void TCPSocket::setupOptions() } } +bool TCPSocket::openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len) +{ + if (!setSocketBlocking(socket_fd, false)) + { + return false; + } + + int connect_res = ::connect(socket_fd, address, static_cast(address_len)); + bool connected = false; + if (connect_res == 0) + { + // Connected immediately (common for loopback). + connected = true; + } + else if (connectInProgress()) + { + // Poll in short slices until the connect resolves, the OS connect timeout expires, + // or a concurrent disconnect() asks us to abort. + while (true) + { + if (isStopRequested()) + { + return false; + } + int ready = waitForSocketWritable(socket_fd, CONNECT_POLL_SLICE_MS); + if (ready < 0) + { + // poll() error (e.g. the fd was closed by disconnect()). + return false; + } + if (ready == 0) + { + // Timeout slice elapsed without the connect resolving: re-check stop and keep waiting. + continue; + } + // The socket reported an event: query SO_ERROR to find out whether the connect succeeded. + int so_error = 0; + socklen_t len = sizeof(so_error); + if (::getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast(&so_error), &len) < 0) + { + return false; + } + connected = (so_error == 0); + break; + } + } + else + { + // Immediate, permanent failure (e.g. connection refused). + connected = false; + } + + if (connected && !setSocketBlocking(socket_fd, true)) + { + // Could not restore blocking mode; treat the connection as failed. + return false; + } + return connected; +} + bool TCPSocket::setup(const std::string& host, const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) { @@ -85,9 +208,23 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ reconnection_time_resolved = reconnection_time_; } + // Honor a deliberate disconnect() before doing anything else. Checked first so the + // cancellation cannot be lost. + if (isStopRequested()) + return false; + if (state_ == SocketState::Connected) return false; + // The in-progress state we hold while (re)trying. connect() pre-sets Reconnecting via + // reconnect(); everything else (including direct setup() callers) is treated as a first connect. + const SocketState progress = + (state_ == SocketState::Reconnecting) ? SocketState::Reconnecting : SocketState::Connecting; + + // Move to the in-progress state, but never clobber a deliberate disconnect() that races us. + if (!setStateUnlessStopRequested(progress)) + return false; + URCL_LOG_DEBUG("Setting up connection: %s:%d", host.c_str(), port); // gethostbyname() is deprecated so use getadderinfo() as described in: @@ -106,6 +243,9 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ bool connected = false; while (!connected) { + if (isStopRequested()) + return false; + if (getaddrinfo(host_name, service.c_str(), &hints, &result) != 0) { URCL_LOG_ERROR("Failed to get address for %s:%d", host.c_str(), port); @@ -116,18 +256,29 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ { socket_fd_ = ::socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (socket_fd_ != -1 && open(socket_fd_, p->ai_addr, p->ai_addrlen)) + if (socket_fd_ != -1 && openInterruptible(socket_fd_, p->ai_addr, p->ai_addrlen)) { connected = true; break; } + + if (isStopRequested()) + { + freeaddrinfo(result); + return false; + } } freeaddrinfo(result); if (!connected) { - state_ = SocketState::Invalid; + // Re-assert the in-progress state for the next attempt, but never clobber a deliberate + // disconnect() that raced us. + if (!setStateUnlessStopRequested(progress)) + { + return false; + } if (++connect_counter >= max_num_tries && max_num_tries > 0) { URCL_LOG_ERROR("Failed to establish connection for %s:%d after %d tries", host.c_str(), port, max_num_tries); @@ -141,24 +292,100 @@ bool TCPSocket::setup(const std::string& host, const int port, const size_t max_ << std::chrono::duration_cast>(reconnection_time_resolved).count() << " seconds."; URCL_LOG_ERROR("%s", ss.str().c_str()); - std::this_thread::sleep_for(reconnection_time_resolved); + // Sleep in short slices so that a concurrent disconnect() (e.g. from ~RTDEClient or + // ~PrimaryClient before joining the reconnect thread) can interrupt the back-off promptly. + const auto sleep_slice = std::chrono::milliseconds(100); + for (auto slept = std::chrono::milliseconds(0); slept < reconnection_time_resolved && !isStopRequested(); + slept += sleep_slice) + { + std::this_thread::sleep_for(sleep_slice); + } + if (isStopRequested()) + { + return false; + } } } } setupOptions(); - state_ = SocketState::Connected; + // Mark Connected only if no deliberate disconnect() slipped in while we were finishing up; a late + // disconnect() must win so we do not advertise a usable socket that was just torn down. + SocketState expected = progress; + if (!state_.compare_exchange_strong(expected, SocketState::Connected)) + { + close(); + return false; + } URCL_LOG_DEBUG("Connection established for %s:%d", host.c_str(), port); return connected; } +bool TCPSocket::setStateUnlessStopRequested(SocketState desired) +{ + // Lock-free compare-and-swap: move state_ to `desired` unless a deliberate disconnect() + // (Disconnecting/Disconnected) is in effect. This is not an unbounded spin: each iteration + // either succeeds, or observes that another thread changed state_ and re-evaluates. We use + // compare_exchange_strong so there are no spurious retries; the loop can only re-iterate when a + // concurrent writer genuinely changed state_, and the only such writers (a racing disconnect(), + // or a paired close()) make a bounded number of writes, so it terminates promptly. + SocketState cur = state_.load(); + while (cur != SocketState::Disconnecting && cur != SocketState::Disconnected) + { + if (state_.compare_exchange_strong(cur, desired)) + { + return true; // successfully moved to `desired` + } + // CAS failed: `cur` now holds the value another thread wrote; loop to re-check the stop set. + } + return false; // a deliberate disconnect() is in effect; state_ left untouched +} + +bool TCPSocket::connect(const std::string& host, const int port, const size_t max_num_tries, + const std::chrono::milliseconds reconnection_time) +{ + // Explicit (re)connect: clear any prior deliberate disconnect() by moving to Connecting. This is + // the only place the deliberate-stop set is cleared. Safe because connect() runs on the + // controlling thread when no reconnect thread is concurrently active. + state_ = SocketState::Connecting; + return setup(host, port, max_num_tries, reconnection_time); +} + +bool TCPSocket::reconnect(const std::string& host, const int port, const size_t max_num_tries, + const std::chrono::milliseconds reconnection_time) +{ + // Automatic reconnect: mark Reconnecting, but never clear a deliberate disconnect(). If a stop is + // in effect, refuse instead of reconnecting so a concurrent teardown is not undone. + if (!setStateUnlessStopRequested(SocketState::Reconnecting)) + { + return false; + } + return setup(host, port, max_num_tries, reconnection_time); +} + void TCPSocket::close() { if (socket_fd_ >= 0) { - state_ = SocketState::Closed; + // Neutral close: do not downgrade a deliberate disconnect() (Disconnecting/Disconnected) back + // to a reconnectable state. + setStateUnlessStopRequested(SocketState::Closed); + ::ur_close(socket_fd_); + socket_fd_ = INVALID_SOCKET; + } +} + +void TCPSocket::disconnect() +{ + // Enter the deliberate-stop set before closing so a reconnect thread observing the closed socket + // is guaranteed to also see the stop (and therefore abort instead of retrying). Closing the fd + // unblocks any in-progress connect/poll; the resting state is Disconnected. + state_ = SocketState::Disconnecting; + if (socket_fd_ >= 0) + { ::ur_close(socket_fd_); socket_fd_ = INVALID_SOCKET; } + state_ = SocketState::Disconnected; } std::string TCPSocket::getIP() const @@ -202,7 +429,7 @@ bool TCPSocket::read(uint8_t* buf, const size_t buf_len, size_t& read) if (res == 0) { - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; return false; } else if (res < 0) @@ -212,13 +439,13 @@ bool TCPSocket::read(uint8_t* buf, const size_t buf_len, size_t& read) int code = ::WSAGetLastError(); if (code != WSAETIMEDOUT && code != WSAEWOULDBLOCK) { - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; } #else if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { // any permanent error should be detected early - state_ = SocketState::Disconnected; + state_ = SocketState::LostConnection; } #endif return false; diff --git a/src/primary/primary_client.cpp b/src/primary/primary_client.cpp index a413c648a..3a1b6c974 100644 --- a/src/primary/primary_client.cpp +++ b/src/primary/primary_client.cpp @@ -67,6 +67,12 @@ PrimaryClient::PrimaryClient(const std::string& robot_ip, [[maybe_unused]] comm: PrimaryClient::~PrimaryClient() { URCL_LOG_INFO("Stopping primary client pipeline"); + // Disconnect the stream BEFORE stopping (joining) the pipeline. The pipeline's producer thread + // may be sleeping inside its reconnect backoff or blocked in TCPSocket::setup(); disconnect() + // moves the socket into the deliberate-stop state and closes it so both paths abort within one + // poll slice and pipeline_->stop()'s join returns promptly instead of blocking until the + // (potentially unbounded) reconnect timeout expires. + stream_.disconnect(); pipeline_->stop(); } @@ -79,8 +85,11 @@ void PrimaryClient::start(const size_t max_num_tries, const std::chrono::millise void PrimaryClient::stop() { + // Disconnect the stream before joining the pipeline so a producer thread stuck in its reconnect + // path is aborted and the join returns promptly (see ~PrimaryClient). A subsequent start() + // reconnects via URProducer::setupProducer(), whose connect() clears the deliberate-stop state. + stream_.disconnect(); pipeline_->stop(); - stream_.close(); } void PrimaryClient::addPrimaryConsumer(std::shared_ptr> primary_consumer) @@ -542,6 +551,8 @@ bool PrimaryClient::reconnectStream() { URCL_LOG_DEBUG("Closing primary stream..."); stream_.close(); + // connect() is the explicit reconnect entry: it clears any deliberate disconnect() left by a + // prior stop()/teardown before re-establishing, so no separate clear step is needed. if (stream_.connect()) { URCL_LOG_DEBUG("Primary stream connected"); diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index bdac1a733..8ca3ba943 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -87,11 +87,16 @@ RTDEClient::~RTDEClient() { prod_->setReconnectionCallback(nullptr); stop_reconnection_ = true; + // Disconnect the stream before joining the reconnect thread. disconnect() moves the socket into + // the deliberate-stop state and closes it so that any connect attempt or back-off sleep inside + // TCPSocket::setup() aborts within one poll slice, instead of blocking the destructor for the + // full reconnection_timeout (or indefinitely with max_connection_attempts == 0). + stream_.disconnect(); + disconnect(); if (reconnecting_thread_.joinable()) { reconnecting_thread_.join(); } - disconnect(); } bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::milliseconds reconnection_timeout, @@ -508,11 +513,15 @@ bool RTDEClient::setupInputs() void RTDEClient::disconnect() { - if (client_state_ > ClientState::UNINITIALIZED) - { - stream_.disconnect(); - writer_.stop(); - } + // Close unconditionally: TCPSocket::close() and RTDEWriter::stop() are both idempotent. Guarding + // on client_state_ left the stream in SocketState::Connected after a failed + // negotiateProtocolVersion() (which resets client_state_ to UNINITIALIZED), causing the + // subsequent TCPSocket::setup() call to return false immediately due to the Connected check. + // Use close() (neutral) rather than disconnect() here: this runs during normal reconnect cycling, + // so the stream must stay reconnectable. A real teardown sets the deliberate-stop state first (in + // ~RTDEClient), which close() will not downgrade. + stream_.close(); + writer_.stop(); client_state_ = ClientState::UNINITIALIZED; prod_->stopProducer(); stopBackgroundRead(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 160f41622..2ceb8a601 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -39,6 +39,9 @@ if (INTEGRATION_TESTS) gtest_add_tests(TARGET rtde_tests WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} ) + # Bound this teardown regression test so a hang fails CI fast instead of timing out the job. + set_tests_properties(RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread + PROPERTIES TIMEOUT 60) if (CHECK_RTDE_DOCS_RECIPE) find_package(Python3 COMPONENTS Interpreter REQUIRED) add_custom_target(generate_outputs ALL COMMAND ${Python3_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/resources/generate_rtde_outputs.py) @@ -152,6 +155,17 @@ target_link_libraries(fake_primary_server_tests PRIVATE ur_client_library::urcl gtest_add_tests(TARGET fake_primary_server_tests ) +# Robot-free regression test for ~PrimaryClient() not blocking on a stuck +# reconnect thread. Uses the in-process FakePrimaryServer so it runs without a +# robot (unlike the INTEGRATION_TESTS-gated primary_client_test_headless). +add_executable(primary_client_reconnect_tests test_primary_client_reconnect.cpp fake_primary_server.cpp) +target_link_libraries(primary_client_reconnect_tests PRIVATE ur_client_library::urcl GTest::gtest_main) +gtest_add_tests(TARGET primary_client_reconnect_tests +) +# Bound this teardown regression test so a hang fails CI in ~1 min instead of timing out the job. +set_tests_properties(PrimaryClientReconnectTest.destructor_not_blocked_by_stuck_reconnect_thread + PROPERTIES TIMEOUT 60) + add_executable(rtde_data_package_tests test_rtde_data_package.cpp) @@ -248,6 +262,10 @@ add_executable(tcp_socket_tests test_tcp_socket.cpp) target_link_libraries(tcp_socket_tests PRIVATE ur_client_library::urcl GTest::gtest_main) gtest_add_tests(TARGET tcp_socket_tests ) +# Bound the interruptible-setup regression tests so a hang fails CI fast instead of timing out the job. +set_tests_properties(TCPSocketTest.setup_interruptible_by_close + TCPSocketTest.setup_interruptible_during_blocking_connect + PROPERTIES TIMEOUT 60) add_executable(stream_tests test_stream.cpp) target_link_libraries(stream_tests PRIVATE ur_client_library::urcl GTest::gtest_main) diff --git a/tests/test_primary_client_reconnect.cpp b/tests/test_primary_client_reconnect.cpp new file mode 100644 index 000000000..1032bc19b --- /dev/null +++ b/tests/test_primary_client_reconnect.cpp @@ -0,0 +1,188 @@ +// -- BEGIN LICENSE BLOCK ---------------------------------------------- +// Copyright 2026 Universal Robots A/S +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the {copyright_holder} nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// -- END LICENSE BLOCK ------------------------------------------------ + +#include + +#include +#include +#include +#include + +#include + +#include "fake_primary_server.h" + +using namespace urcl; + +// Regression test for ~PrimaryClient() blocking indefinitely when the pipeline's +// producer thread is stuck in its reconnect loop at teardown time. +// +// This is the PrimaryClient counterpart of +// RTDEClientTest.destructor_not_blocked_by_stuck_reconnect_thread (test_rtde_client.cpp). +// +// Root cause: when the robot drops the primary connection, TCPSocket::read() +// returns false and leaves the socket in SocketState::LostConnection. URProducer's +// tryGetImpl() then enters its reconnect path: it sleeps an (exponentially +// growing) backoff and calls stream_.reconnect(), which retries with no upper +// bound (max_num_tries == 0), sleeping reconnection_time between attempts. If +// ~PrimaryClient() simply called pipeline_->stop() (which joins the producer +// thread) without first closing the stream, the join would block for the full +// reconnect duration — effectively forever for an unreachable robot. +// +// Fix (two parts): +// 1. ~PrimaryClient()/PrimaryClient::stop() call stream_.disconnect() BEFORE +// joining the pipeline. disconnect() moves the socket into the deliberate-stop +// state (Disconnecting/Disconnected) and closes it. +// 2. TCPSocket::setup() honors that state both during its (non-blocking) connect +// attempt and during the between-attempt wait, so it aborts within ~100 ms +// regardless of platform. setup() never overwrites the deliberate-stop state, +// so it cannot be raced away (the bug that hung Windows CI). +// Together these abort the producer within ~100 ms of the destructor, so the join — +// and therefore the destructor — returns promptly. +// +// Unlike test_primary_client.cpp's robot-dependent fixtures, this test uses the +// in-process FakePrimaryServer, so it runs in the normal (non-INTEGRATION_TESTS) +// build and needs no robot. +TEST(PrimaryClientReconnectTest, destructor_not_blocked_by_stuck_reconnect_thread) +{ + comm::INotifier notifier; + + auto server = std::make_unique(primary_interface::UR_PRIMARY_PORT); + auto client = std::make_unique("127.0.0.1", notifier); + + // Unlimited reconnect attempts with a large reconnection time: if the fix is + // absent, the producer's reconnect path keeps the destructor blocked. + const std::chrono::milliseconds large_reconnect_timeout(5000); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; + + // Drop the server. The producer's read() fails, the socket transitions to + // SocketState::LostConnection, and the producer enters its reconnect loop. + server.reset(); + + // Give the producer time to detect the drop and reach its reconnect sleep + // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + + // The destructor must return quickly: disconnect() aborts the producer's connect + // attempt/back-off, so the pipeline join completes well under 2 s. Without the fix + // this blocks for at least the reconnect timeout (and indefinitely with unlimited + // retries against a dead port). Run the destructor on a worker with a watchdog so a + // regression fails fast with a clear message instead of hanging the test binary (the + // CTest TIMEOUT then reaps it). + std::packaged_task teardown([&client]() { client.reset(); }); + auto teardown_future = teardown.get_future(); + std::thread teardown_thread(std::move(teardown)); + + const auto t0 = std::chrono::steady_clock::now(); + if (teardown_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + teardown_thread.detach(); + FAIL() << "~PrimaryClient() did not return within 5 s — the producer reconnect thread was not aborted by " + "disconnect()"; + } + teardown_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "~PrimaryClient() blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — the producer reconnect thread was not aborted by disconnect()"; +} + +// Regression test for the SECOND symptom reported in issue #368: calling +// PrimaryClient::stop() (the implementation of UrDriver::stopPrimaryClientCommunication()) +// hangs when the producer thread is stuck in its reconnect loop against an unreachable +// robot. +// +// This is distinct from the destructor test above: stop() is a restartable operation, so +// besides asserting that it returns promptly it must also leave the client in a state where +// a subsequent start() can reconnect. That exercises the implicit clear of the deliberate-stop +// state by connect() in URProducer::setupProducer()/PrimaryClient::reconnectStream() — a +// regression there would not hang teardown but would silently prevent the client from ever +// reconnecting after a stop(). +TEST(PrimaryClientReconnectTest, stop_not_blocked_by_stuck_reconnect_thread) +{ + comm::INotifier notifier; + + auto server = std::make_unique(primary_interface::UR_PRIMARY_PORT); + auto client = std::make_unique("127.0.0.1", notifier); + + // Unlimited reconnect attempts with a large reconnection time: if the fix is + // absent, the producer's reconnect path keeps stop()'s pipeline join blocked. + const std::chrono::milliseconds large_reconnect_timeout(5000); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + ASSERT_TRUE(server->waitForClient()) << "PrimaryClient never connected to the fake server"; + + // Drop the server. The producer's read() fails, the socket transitions to + // SocketState::LostConnection, and the producer enters its reconnect loop. + server.reset(); + + // Give the producer time to detect the drop and reach its reconnect sleep + // (initial backoff is 1 s, after which it sleeps inside TCPSocket::setup()). + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + + // stop() must return quickly: disconnect() aborts the producer's connect attempt/back-off, + // so the pipeline join completes well under 2 s. Without the fix this blocks for at least the + // reconnect timeout (and indefinitely with unlimited retries against a dead port). Run it on a + // worker with a watchdog so a regression fails fast with a clear message instead of hanging the + // test binary (the CTest TIMEOUT then reaps it). + std::packaged_task stop_task([&client]() { client->stop(); }); + auto stop_future = stop_task.get_future(); + std::thread stop_thread(std::move(stop_task)); + + const auto t0 = std::chrono::steady_clock::now(); + if (stop_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + stop_thread.detach(); + FAIL() << "PrimaryClient::stop() did not return within 5 s — the producer reconnect thread was not aborted by " + "disconnect()"; + } + stop_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "PrimaryClient::stop() blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — the producer reconnect thread was not aborted by disconnect()"; + + // Restart-reuse check: bring up a fresh server and start() again. This must reconnect, + // proving that the deliberate-stop state set by stop() was cleared by connect() on restart. + auto server2 = std::make_unique(primary_interface::UR_PRIMARY_PORT); + ASSERT_NO_THROW(client->start(/*max_num_tries=*/0, large_reconnect_timeout)); + EXPECT_TRUE(server2->waitForClient(std::chrono::seconds(3))) << "PrimaryClient did not reconnect after " + "stop()/start() — the deliberate-stop state set by " + "stop() was not cleared " + "by connect() on restart"; +} + +int main(int argc, char* argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/test_rtde_client.cpp b/tests/test_rtde_client.cpp index becdf1325..7db708ca0 100644 --- a/tests/test_rtde_client.cpp +++ b/tests/test_rtde_client.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -812,6 +813,91 @@ TEST_F(RTDEClientTest, test_initialization) EXPECT_GE(std::chrono::duration_cast(elapsed).count(), 20); } +// Regression test for the bug where ~RTDEClient() could block indefinitely when +// the reconnect thread was stuck inside TCPSocket::setup(). Fixed by: (1) calling +// stream_.disconnect() (followed by RTDEClient::disconnect()) before joining reconnecting_thread_ +// in ~RTDEClient(), and (2) making TCPSocket::setup() abort on the deliberate-stop state, +// both during the (non-blocking) connect attempt and during the between-attempt wait. +// +// See also TCPSocketTest.setup_interruptible_by_close and +// TCPSocketTest.setup_interruptible_during_blocking_connect in test_tcp_socket.cpp +// for lower-level unit tests of the same fix that run without INTEGRATION_TESTS. +TEST_F(RTDEClientTest, destructor_not_blocked_by_stuck_reconnect_thread) +{ + // Use a large reconnection timeout so that the blocking window is clearly + // observable if the fix is absent (5 s sleep > 2 s assertion threshold). + const std::chrono::milliseconds large_reconnect_timeout(5000); + + auto fake_rtde_server = std::make_unique(g_FAKE_RTDE_PORT); + // Skip the bootup-timestamp check inside isRobotBooted(). + fake_rtde_server->setStartTime(std::chrono::steady_clock::now() - std::chrono::seconds(52)); + + client_.reset(new rtde_interface::RTDEClient("localhost", notifier_, resources_output_recipe_, + resources_input_recipe_, 100, false, g_FAKE_RTDE_PORT)); + // Attempt init up to 10 times with a short between-attempt sleep to ensure + // the RTDE handshake succeeds even in environments where the fake server's + // response arrives slightly after the 1-second socket read timeout. + bool initialized = false; + for (int attempt = 0; attempt < 10 && !initialized; ++attempt) + { + try + { + // max_connection_attempts=0 (unlimited): TCPSocket::setup() sleeps + // large_reconnect_timeout between every failed connect attempt once the + // server is gone. Use a short initialization_timeout for fast retries. + client_->init(0, large_reconnect_timeout, 1, std::chrono::milliseconds(50)); + initialized = true; + } + catch (const UrException&) + { + // Recreate the client on each retry to start from a clean state. + client_.reset(new rtde_interface::RTDEClient("localhost", notifier_, resources_output_recipe_, + resources_input_recipe_, 100, false, g_FAKE_RTDE_PORT)); + } + } + if (!initialized) + { + GTEST_SKIP() << "Could not initialize RTDEClient with the fake server after 10 attempts; " + "this test requires a reliably responding RTDE server. " + "The TCPSocket-level regression test (TCPSocketTest.setup_interruptible_by_close) " + "verifies the underlying fix without a robot."; + } + + // start(true) arms the reconnect callback via the background read thread. + client_->start(true); + + // Drop the server — the background read thread detects the connection loss, + // calls reconnectCallback(), which launches reconnecting_thread_. That thread + // enters setupCommunication() -> TCPSocket::setup() and begins sleeping + // large_reconnect_timeout between retry attempts. + fake_rtde_server.reset(); + + // Give the reconnect thread time to reach the wait inside TCPSocket::setup(). + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // The destructor must return quickly: disconnect() aborts setup()'s connect/wait, + // so the join completes in well under 2 s. Without the fix this would block for + // >= large_reconnect_timeout (5 s), or forever with unlimited attempts. + // Run the destructor on a worker with a watchdog so a regression fails fast with a + // clear message instead of hanging the test binary (the CTest TIMEOUT then reaps it). + std::packaged_task teardown([this]() { client_.reset(); }); + auto teardown_future = teardown.get_future(); + std::thread teardown_thread(std::move(teardown)); + + const auto t0 = std::chrono::steady_clock::now(); + if (teardown_future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) + { + teardown_thread.detach(); + FAIL() << "~RTDEClient() did not return within 5 s — reconnect thread was not aborted by disconnect()"; + } + teardown_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) + << "RTDEClient destructor blocked for " << std::chrono::duration_cast(elapsed).count() + << " ms — reconnect thread was not aborted by disconnect()"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); diff --git a/tests/test_tcp_server.cpp b/tests/test_tcp_server.cpp index 08c8c4fb8..e136382af 100644 --- a/tests/test_tcp_server.cpp +++ b/tests/test_tcp_server.cpp @@ -34,8 +34,14 @@ #include #include #include +#include #include #include +#ifndef _WIN32 +# include +# include +# include +#endif #include "test_utils.h" #include @@ -269,7 +275,7 @@ TEST_F(TCPServerTest, check_shutting_down_server_while_listening) } EXPECT_FALSE(read_success); // If the read just would have timeouted, the client state would still be connected. - EXPECT_EQ(client.getState(), comm::SocketState::Disconnected); + EXPECT_EQ(client.getState(), comm::SocketState::LostConnection); } TEST_F(TCPServerTest, double_shutdown) @@ -497,6 +503,134 @@ TEST_F(TCPServerTest, shutdown_during_active_writes) writer.join(); } +// Verifies that the server receives data from many clients that all send simultaneously. This +// exercises the poll() revents loop across many client file descriptors and guards against +// missed read events when several sockets are readable at once. +TEST_F(TCPServerTest, receives_from_many_concurrent_clients) +{ + comm::TCPServer server(0); + + std::mutex mtx; + std::condition_variable cv; + std::atomic message_count{ 0 }; + + server.setMessageCallback([&](const socket_t, char*, int) { + message_count.fetch_add(1); + std::lock_guard lk(mtx); + cv.notify_all(); + }); + server.start(); + +#ifdef _WIN32 + // Windows allows a maximum of 64 sockets per process by default. + constexpr int num_clients = 50; +#else + constexpr int num_clients = 100; +#endif + + std::vector> clients; + for (int i = 0; i < num_clients; ++i) + { + clients.push_back(std::make_unique(server.getPort())); + } + + // Every client sends a single message concurrently. + std::vector senders; + for (auto& client : clients) + { + senders.emplace_back([&client]() { client->send("ping\n"); }); + } + for (auto& t : senders) + { + t.join(); + } + + // The server's poll() loop must observe activity on every client FD and deliver all messages. + std::unique_lock lk(mtx); + EXPECT_TRUE(cv.wait_for(lk, std::chrono::seconds(5), [&]() { return message_count.load() >= num_clients; })); + EXPECT_EQ(message_count.load(), num_clients); +} + +#ifndef _WIN32 +// Regression test for the FD_SETSIZE limitation of select(): a client whose accepted socket file +// descriptor number is >= FD_SETSIZE (1024) must still be serviced normally. This is the exact +// scenario that occurs when the hosting process (e.g. a JVM) holds many file descriptors. The old +// select()-based implementation rejected/crashed on such descriptors; poll() handles them. +TEST_F(TCPServerTest, services_client_with_high_fd_number) +{ + // Make sure we are allowed to open more than FD_SETSIZE descriptors; raise the soft limit if + // needed and skip the test if the hard limit does not allow it. + struct rlimit rl; + ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &rl), 0); + const rlim_t needed = static_cast(FD_SETSIZE) + 64; + if (rl.rlim_cur < needed) + { + rl.rlim_cur = std::min(needed, rl.rlim_max); + if (setrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur < needed) + { + GTEST_SKIP() << "Cannot raise RLIMIT_NOFILE above FD_SETSIZE; skipping high-fd test."; + } + } + + // Consume the low-numbered descriptors so that subsequently created sockets are assigned fd + // numbers beyond FD_SETSIZE. + std::vector fd_hogs; + while (true) + { + int fd = ::open("/dev/null", O_RDONLY); + if (fd < 0) + { + break; + } + fd_hogs.push_back(fd); + if (fd > static_cast(FD_SETSIZE) + 8) + { + break; + } + } + const bool pushed_past_limit = !fd_hogs.empty() && fd_hogs.back() > static_cast(FD_SETSIZE); + if (!pushed_past_limit) + { + for (int fd : fd_hogs) + { + ::close(fd); + } + GTEST_SKIP() << "Could not allocate descriptors beyond FD_SETSIZE; skipping."; + } + + TestableTcpServer server(port_); + server.start(); + + Client client(port_); + EXPECT_TRUE(server.waitForConnectionCallback(2000)); + + // The server-side accepted client FD should exceed FD_SETSIZE -- the case that breaks select(). + auto client_fds = server.getClientFDs(); + ASSERT_FALSE(client_fds.empty()); + EXPECT_GT(client_fds.back(), static_cast(FD_SETSIZE)); + + // Data must flow both ways on the high-numbered descriptor. + const std::string message = "high fd message\n"; + client.send(message); + EXPECT_TRUE(server.waitForMessageCallback(2000)); + EXPECT_EQ(server.getReceivedMessage(), message); + + size_t written; + const auto* data = reinterpret_cast(message.c_str()); + ASSERT_TRUE(server.write(data, message.size(), written)); + EXPECT_EQ(client.recv(), message); + + // Disconnect must also be detected on the high-numbered descriptor. + client.close(); + EXPECT_TRUE(server.waitForDisconnectionCallback(2000)); + + for (int fd : fd_hogs) + { + ::close(fd); + } +} +#endif + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); diff --git a/tests/test_tcp_socket.cpp b/tests/test_tcp_socket.cpp index bb6bd317a..3c03d6da5 100644 --- a/tests/test_tcp_socket.cpp +++ b/tests/test_tcp_socket.cpp @@ -149,8 +149,8 @@ TEST_F(TCPSocketTest, setup_client_before_server) // Make sure that the client has tried to connect to the server, before creating the server std::this_thread::sleep_for(std::chrono::seconds(1)); - // Client state should be invalid as long as the server is not available - comm::SocketState expected_state = comm::SocketState::Invalid; + // Client state should be Connecting while it keeps retrying and the server is not available + comm::SocketState expected_state = comm::SocketState::Connecting; comm::SocketState actual_state = client_->getState(); EXPECT_EQ(toUnderlying(expected_state), toUnderlying(actual_state)); @@ -293,6 +293,76 @@ TEST_F(TCPSocketTest, connect_non_running_robot) EXPECT_LT(elapsed, std::chrono::milliseconds(7500)); } +// Regression test for the bug where TCPSocket::setup() could block the caller +// indefinitely when the wait between retry attempts was not interruptible. +// +// setup() is interrupted by disconnect(), which moves the socket into the deliberate-stop +// state (Disconnecting/Disconnected) and closes it. setup() never overwrites that state and +// aborts as soon as it observes it, so a teardown signal cannot be lost by setup()'s internal +// state updates. This is the path used by ~RTDEClient()/~PrimaryClient() before joining their +// reconnect threads. +TEST_F(TCPSocketTest, setup_interruptible_by_close) +{ + // Use a port with no listener so every connect attempt fails immediately, + // sending setup() into the between-attempt wait. + const int unused_port = 12322; + const std::chrono::milliseconds large_reconnect_timeout(5000); + + Client client(unused_port, "127.0.0.1"); + + // Run setup() with unlimited retries in a background thread. + std::thread setup_thread([&client, &large_reconnect_timeout]() { + // max_num_tries=0 (unlimited) → setup() waits large_reconnect_timeout after + // every failed connect attempt and never exits on its own. + client.setup(0, large_reconnect_timeout); + }); + + // Give the thread time to reach the between-attempt wait inside setup(). + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // disconnect() moves the socket to the deliberate-stop state; the sliced wait in setup() + // detects it within 100 ms and setup() returns, allowing the thread to finish. + const auto t0 = std::chrono::steady_clock::now(); + client.disconnect(); + + setup_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + // Without the fix, elapsed would be >= large_reconnect_timeout (5 s). + EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted by disconnect() within 2 s; " + "the between-attempt wait is not interruptible"; +} + +// Regression test for issue #368: a reconnect thread blocked inside connect() to a +// genuinely unreachable host (no SYN-ACK, no RST) must still be abortable. The +// previous fix only made the between-attempt *sleep* interruptible, not the connect +// itself, so this case could block for the full OS connect timeout. setup() now uses +// a non-blocking connect polled in short slices, so disconnect() aborts it promptly. +TEST_F(TCPSocketTest, setup_interruptible_during_blocking_connect) +{ + // 10.255.255.1 is in a private range and is (almost) never routable, so connect() + // hangs in SYN retransmit rather than failing fast like a refused localhost port. + const int unused_port = 12323; + const std::chrono::milliseconds large_reconnect_timeout(5000); + + Client client(unused_port, "10.255.255.1"); + + std::thread setup_thread([&client, &large_reconnect_timeout]() { client.setup(0, large_reconnect_timeout); }); + + // Give the thread time to enter the (blocking) connect attempt. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + const auto t0 = std::chrono::steady_clock::now(); + client.disconnect(); + + setup_thread.join(); + const auto elapsed = std::chrono::steady_clock::now() - t0; + + EXPECT_LT(elapsed, std::chrono::seconds(2)) << "TCPSocket::setup() was not interrupted while blocked in connect() " + "within 2 s; " + "the connect attempt is not interruptible"; +} + TEST_F(TCPSocketTest, test_deprecated_reconnection_time_interface) { URCL_SILENCE_DEPRECATED_BEGIN @@ -320,7 +390,7 @@ TEST_F(TCPSocketTest, test_read_on_socket_abruptly_closed) char characters; size_t read_chars = 0; EXPECT_FALSE(client_->read((uint8_t*)&characters, 1, read_chars)); - EXPECT_EQ(client_->getState(), comm::SocketState::Disconnected); + EXPECT_EQ(client_->getState(), comm::SocketState::LostConnection); } int main(int argc, char* argv[])