Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions include/ur_client_library/comm/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class URProducer : public IProducer<T>
continue;
}

if (stream_.closed())
if (stream_.closed() || stream_.stopRequested())
return false;

if (on_reconnect_cb_)
Expand All @@ -85,9 +85,23 @@ class URProducer : public IProducer<T>
}

URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count());
std::this_thread::sleep_for(timeout_);
// Sleep in small slices so the producer can be stopped (running_ == false)
// or woken by a stream close (e.g. from a destructor calling stop()) within
// ~100 ms, instead of blocking for the full (exponentially growing) timeout.
// Without this, a thread joining the pipeline at teardown could block for up
// to 120 s while this thread sleeps here.
const auto sleep_slice = std::chrono::milliseconds(100);
const auto sleep_total = std::chrono::duration_cast<std::chrono::milliseconds>(timeout_);
for (auto slept = std::chrono::milliseconds(0);
slept < sleep_total && running_ && !stream_.closed() && !stream_.stopRequested(); slept += sleep_slice)
{
std::this_thread::sleep_for(sleep_slice);
}

if (!running_ || stream_.closed() || stream_.stopRequested())
return false;

if (stream_.connect())
if (stream_.reconnect())
continue;

auto next = timeout_ * 2;
Expand Down
32 changes: 29 additions & 3 deletions include/ur_client_library/comm/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,32 @@ class URStream : public TCPSocket
bool connect(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time);
return TCPSocket::connect(host_, port_, max_num_tries, reconnection_time);
}

/*!
* \brief Disconnects from the configured socket.
* \brief Re-establishes the connection after an unexpected drop, without clearing a deliberate
* disconnect(). Used by the automatic reconnect path.
*
* \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0.
* \param reconnection_time time in between connection attempts to the server
*
* \returns True on success, false if it could not reconnect or a deliberate disconnect() is in
* effect
*/
bool reconnect(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
return TCPSocket::reconnect(host_, port_, max_num_tries, reconnection_time);
}

/*!
* \brief Deliberately disconnects from the configured socket, leaving it ready to connect again.
*/
void disconnect()
{
URCL_LOG_DEBUG("Disconnecting from %s:%d", host_.c_str(), port_);
TCPSocket::close();
TCPSocket::disconnect();
}

/*!
Expand All @@ -83,6 +99,16 @@ class URStream : public TCPSocket
return getState() == SocketState::Closed;
}

/*!
* \brief Returns whether a deliberate disconnect() is in progress or in effect (the socket will
* not auto-reconnect until connect() is called again).
*/
bool stopRequested()
{
const SocketState s = getState();
return s == SocketState::Disconnecting || s == SocketState::Disconnected;
}

/*!
* \brief Reads a full UR package out of a socket. For this, it looks into the package and reads
* the byte length from the socket directly. It returns as soon as all bytes for the package are
Expand Down
4 changes: 0 additions & 4 deletions include/ur_client_library/comm/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,6 @@ class TCPServer
std::atomic<socket_t> listen_fd_;
int port_;

socket_t maxfd_;
fd_set masterfds_;
fd_set tempfds_;

uint32_t max_clients_allowed_;
std::vector<socket_t> client_fds_;
std::mutex clients_mutex_;
Expand Down
78 changes: 74 additions & 4 deletions include/ur_client_library/comm/tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ namespace comm
*/
enum class SocketState
{
Invalid, ///< Socket is initialized or setup failed
Connected, ///< Socket is connected and ready to use
Disconnected, ///< Socket is disconnected and cannot be used
Closed ///< Connection to socket got closed
Invalid, ///< Socket is initialized but was never connected
Connecting, ///< A first-time connect() attempt is in progress
Connected, ///< Socket is connected and ready to use
LostConnection, ///< Connection dropped unexpectedly; auto-reconnect is expected to pick it up
Reconnecting, ///< An automatic reconnect attempt (after a drop) is in progress
Disconnecting, ///< A deliberate disconnect() is in progress
Disconnected, ///< Deliberately disconnected; will NOT auto-reconnect until connect() is called
Closed ///< Neutral low-level close (clearable by a subsequent (re)connect)
};

/*!
Expand All @@ -55,6 +59,27 @@ class TCPSocket

void setupOptions();

// True while a deliberate disconnect() is in progress or has completed (the "deliberate-stop
// set"). The connect/retry machinery checks this to abort, and never overwrites these states,
// so a teardown disconnect() that races a reconnect attempt is observed reliably.
bool isStopRequested() const
{
const SocketState s = state_.load();
return s == SocketState::Disconnecting || s == SocketState::Disconnected;
}

// Atomically moves state_ to `desired`, unless a deliberate disconnect() (Disconnecting or
// Disconnected) is in effect. Returns true if the state was set, false if a deliberate stop is
// active (in which case state_ is left untouched). This is how the connect/retry machinery
// updates its in-progress state without ever clobbering a teardown signal.
bool setStateUnlessStopRequested(SocketState desired);

// Performs an interruptible, non-blocking connect on an already-created socket.
// Polls in short slices so that a concurrent disconnect() aborts the attempt
// promptly on all platforms (POSIX close() of a blocked connect() is reliable,
// Winsock's is not). Restores blocking mode on success.
bool openInterruptible(socket_t socket_fd, struct sockaddr* address, size_t address_len);

protected:
static bool open(socket_t socket_fd, struct sockaddr* address, size_t address_len)
{
Expand All @@ -64,6 +89,18 @@ class TCPSocket
bool setup(const std::string& host, const int port, const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME);

/*!
* \brief Re-establishes a connection after an unexpected drop, without clearing a deliberate
* disconnect().
*
* Used by the automatic reconnect path (e.g. the producer loop). If a deliberate disconnect()
* is in progress or has completed, this returns false immediately instead of reconnecting, so a
* concurrent teardown is never undone. Otherwise behaves like setup() but marks the socket as
* Reconnecting while the attempt is in progress.
*/
bool reconnect(const std::string& host, const int port, const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME);

std::unique_ptr<timeval> recv_timeout_;

public:
Expand Down Expand Up @@ -132,8 +169,41 @@ class TCPSocket
*/
bool write(const uint8_t* buf, const size_t buf_len, size_t& written);

/*!
* \brief Establishes a connection to the configured host/port.
*
* This is the explicit (re)connect entry point. It clears any prior deliberate disconnect()
* (moving the socket to Connecting) and then attempts to connect, retrying up to max_num_tries
* times (unlimited when 0). Call this on the controlling thread; the automatic reconnect path
* uses the internal reconnect() instead.
*
* \param host Host to connect to
* \param port Port to connect to
* \param max_num_tries Maximum number of connection attempts before failing. Unlimited when 0.
* \param reconnection_time Time between connection attempts
*
* \returns True on success, false if the connection could not be established or was aborted by
* a concurrent disconnect()
*/
bool connect(const std::string& host, const int port, const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME);

/*!
* \brief Deliberately disconnects the socket and leaves it ready to connect() again.
*
* Moves the socket into the deliberate-stop set (Disconnecting then Disconnected) and closes the
* underlying file descriptor. Any connect/reconnect attempt currently in progress (blocked in a
* connect or sleeping between attempts) aborts promptly, and the automatic reconnect path will
* not reconnect until connect() is called again. Use this at teardown (e.g. from a destructor)
* before joining a reconnect thread.
*/
void disconnect();

/*!
* \brief Closes the connection to the socket.
*
* Neutral low-level close. Unlike disconnect(), it does not prevent a subsequent automatic
* reconnect, and it never downgrades a deliberate disconnect() that is already in effect.
*/
void close();

Expand Down
89 changes: 34 additions & 55 deletions src/comm/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@

#include <sstream>
#include <cstring>
#include <vector>
#include "ur_client_library/comm/socket_t.h"
#include <fcntl.h>
#ifndef _WIN32
# include <poll.h>
#endif

namespace urcl
{
namespace comm
{
TCPServer::TCPServer(const int port, const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
: port_(port), maxfd_(0), max_clients_allowed_(0)
: port_(port), max_clients_allowed_(0)
{
#ifdef _WIN32
WSAData data;
Expand Down Expand Up @@ -74,9 +78,6 @@ void TCPServer::init()
ur_setsockopt(listen_fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(int));

URCL_LOG_DEBUG("Created socket with FD %d", (int)listen_fd_);

FD_ZERO(&masterfds_);
FD_ZERO(&tempfds_);
}

void TCPServer::shutdown()
Expand Down Expand Up @@ -179,9 +180,6 @@ void TCPServer::bind(const size_t max_num_tries, const std::chrono::milliseconds
} while (err == -1 && (connection_counter <= max_num_tries || max_num_tries == 0));

URCL_LOG_DEBUG("Bound %d:%d to FD %d", server_addr.sin_addr.s_addr, port_, (int)listen_fd_);

FD_SET(listen_fd_, &masterfds_);
maxfd_ = listen_fd_;
}

void TCPServer::startListen()
Expand Down Expand Up @@ -220,33 +218,13 @@ void TCPServer::handleConnect()
return;
}

#ifdef _WIN32
bool set_size_exceeded = client_fds_.size() >= FD_SETSIZE - 1; // -1 because listen_fd_ also occupies one
// slot in masterfds_
#else
bool set_size_exceeded = client_fd >= FD_SETSIZE; // On Unix-like systems, the client FD itself must be less than
// FD_SETSIZE, otherwise it cannot be added to the fd_set.
#endif

if (set_size_exceeded)
{
URCL_LOG_ERROR("Accepted client FD %d exceeds FD_SETSIZE (%d). Closing connection.", (int)client_fd, FD_SETSIZE);
ur_close(client_fd);
return;
}

bool accepted = false;

{
std::lock_guard<std::mutex> lk(clients_mutex_);
if (client_fds_.size() < max_clients_allowed_ || max_clients_allowed_ == 0)
{
client_fds_.push_back(client_fd);
FD_SET(client_fd, &masterfds_);
if (client_fd > maxfd_)
{
maxfd_ = client_fd;
}
accepted = true;
}
else
Expand All @@ -268,27 +246,40 @@ void TCPServer::handleConnect()

void TCPServer::spin()
{
tempfds_ = masterfds_;

timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
// Build the poll set fresh each iteration from the listen socket plus all currently connected
// clients. poll() is used on both platforms (WSAPoll() on Windows) because it has no
// FD_SETSIZE limit on file descriptor numbers, unlike select(). This matters when the hosting
// process holds many file descriptors (e.g. a JVM), pushing socket FDs past FD_SETSIZE (1024).
std::vector<struct pollfd> pollfds;
pollfds.push_back({ static_cast<socket_t>(listen_fd_), POLLIN, 0 });
{
std::lock_guard<std::mutex> lk(clients_mutex_);
for (const auto& client_fd : client_fds_)
{
pollfds.push_back({ client_fd, POLLIN, 0 });
}
}

// blocks until activity on any socket from tempfds
int sel = select(static_cast<int>(maxfd_ + 1), &tempfds_, NULL, NULL, &timeout);
if (sel < 0)
// Block for up to 1 s waiting for activity on any socket. A shutdown wakes this immediately by
// connecting to the listen socket (see shutdown()).
#ifdef _WIN32
int ready = ::WSAPoll(pollfds.data(), static_cast<ULONG>(pollfds.size()), 1000);
#else
int ready = ::poll(pollfds.data(), pollfds.size(), 1000);
#endif
if (ready < 0)
{
URCL_LOG_ERROR("select() failed. Shutting down socket event handler.");
URCL_LOG_ERROR("poll() failed. Shutting down socket event handler.");
keep_running_ = false;
return;
}

if (!keep_running_ || sel == 0)
if (!keep_running_ || ready == 0)
{
return;
}

if (FD_ISSET(listen_fd_, &tempfds_))
if (pollfds[0].revents & POLLIN)
{
URCL_LOG_DEBUG("Activity on listen FD %d", (int)listen_fd_);
handleConnect();
Expand All @@ -297,15 +288,13 @@ void TCPServer::spin()
std::vector<socket_t> disconnected_clients;
std::vector<socket_t> client_fds_with_activity;

// pollfds[0] is the listen socket; client entries start at index 1.
for (size_t i = 1; i < pollfds.size(); ++i)
{
std::lock_guard<std::mutex> lk(clients_mutex_);
for (const auto& client_fd : client_fds_)
if (pollfds[i].revents & (POLLIN | POLLHUP | POLLERR))
{
if (FD_ISSET(client_fd, &tempfds_))
{
URCL_LOG_DEBUG("Activity on client FD %d", (int)client_fd);
client_fds_with_activity.push_back(client_fd);
}
URCL_LOG_DEBUG("Activity on client FD %d", (int)pollfds[i].fd);
client_fds_with_activity.push_back(static_cast<socket_t>(pollfds[i].fd));
}
}
// We handle client activity outside the clients_mutex_ lock to avoid holding it during potentially slow I/O and
Expand All @@ -331,7 +320,6 @@ void TCPServer::handleDisconnect(const socket_t fd)
{
std::lock_guard<std::mutex> lk(clients_mutex_);
ur_close(fd);
FD_CLR(fd, &masterfds_);

for (size_t i = 0; i < client_fds_.size(); ++i)
{
Expand All @@ -341,15 +329,6 @@ void TCPServer::handleDisconnect(const socket_t fd)
break;
}
}

maxfd_ = listen_fd_;
for (const auto& client_fd : client_fds_)
{
if (client_fd > maxfd_)
{
maxfd_ = client_fd;
}
}
}

{
Expand Down
Loading
Loading