diff --git a/.gitignore b/.gitignore index a596e2ddba..5ca03d6271 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,9 @@ CMakeUserPresets.json **/CMakeCache.txt **/Testing/ +# Claude Code +.claude/ + # IDE files .idea/ .vs/ diff --git a/cmake/asio-iocp-fix.cmake b/cmake/asio-iocp-fix.cmake new file mode 100644 index 0000000000..4aa18bfe42 --- /dev/null +++ b/cmake/asio-iocp-fix.cmake @@ -0,0 +1,115 @@ +# Patch for ASIO issue #312: IOCP out-of-resources error reporting. +# https://github.com/chriskohlhoff/asio/issues/312 +# +# When PostQueuedCompletionStatus fails due to resource exhaustion, the +# completion key (overlapped_contains_result) is lost. The operation falls +# back to an internal queue but do_one() dispatches it with the wrong +# error code and bytes_transferred, causing undefined behavior / crashes. +# +# Fix: store the completion key on the operation object before posting, +# so it survives the fallback path. +# +# Based on MongoDB's patch: +# https://github.com/mongodb-forks/asio/commit/d03c2e7002131305645374e735a8ece4191f2fc5 +# +# Tested against: ASIO 1.36.0 (asio-1-36-0 tag) +# The patch targets specific string patterns in ASIO's IOCP implementation. +# If ASIO is updated, verify the patterns still match or update this patch. + +function(apply_asio_iocp_fix asio_include_dir) + set(OP_HEADER "${asio_include_dir}/asio/detail/win_iocp_operation.hpp") + set(CTX_IMPL "${asio_include_dir}/asio/detail/impl/win_iocp_io_context.ipp") + + if(NOT EXISTS "${OP_HEADER}" OR NOT EXISTS "${CTX_IMPL}") + message(WARNING "ASIO IOCP fix: headers not found at ${asio_include_dir}, skipping patch") + return() + endif() + + # Check if already patched (look for our added member) + file(READ "${OP_HEADER}" op_content) + if(op_content MATCHES "completionKey_") + message(STATUS "ASIO IOCP fix: already applied") + return() + endif() + + # Verify we're working with a compatible ASIO version by checking + # that the patterns we need to replace actually exist. + set(patch_compatible TRUE) + + if(NOT op_content MATCHES "long ready_;") + message(WARNING "ASIO IOCP fix: 'long ready_' not found in win_iocp_operation.hpp — ASIO version may be incompatible") + set(patch_compatible FALSE) + endif() + + file(READ "${CTX_IMPL}" ctx_content) + + if(NOT ctx_content MATCHES "PostQueuedCompletionStatus\\(iocp_\\.handle, 0, 0, op\\)") + message(WARNING "ASIO IOCP fix: post_deferred_completion pattern not found in win_iocp_io_context.ipp — ASIO version may be incompatible") + set(patch_compatible FALSE) + endif() + + if(NOT ctx_content MATCHES "overlapped_contains_result, op\\)") + message(WARNING "ASIO IOCP fix: on_pending/on_completion pattern not found in win_iocp_io_context.ipp — ASIO version may be incompatible") + set(patch_compatible FALSE) + endif() + + if(NOT patch_compatible) + message(WARNING "ASIO IOCP fix: skipping patch due to incompatible ASIO version") + return() + endif() + + message(STATUS "Applying ASIO IOCP fix (issue #312)") + + # --- Patch win_iocp_operation.hpp --- + # Add completionKey_ member and accessor + + # Add member after "long ready_;" + string(REPLACE + "long ready_;" + "long ready_;\n ULONG_PTR completionKey_;" + op_content "${op_content}") + + # Initialize in constructor: find "ready_(0)" and add completionKey_(0) + string(REPLACE + "ready_(0)" + "ready_(0), completionKey_(0)" + op_content "${op_content}") + + # Add accessor method before the member block + string(REPLACE + "win_iocp_operation* next_;" + "ULONG_PTR& completionKey() { return completionKey_; }\n\n win_iocp_operation* next_;" + op_content "${op_content}") + + # Verify the operation header patch took effect + if(NOT op_content MATCHES "completionKey_") + message(WARNING "ASIO IOCP fix: failed to patch win_iocp_operation.hpp") + return() + endif() + + file(WRITE "${OP_HEADER}" "${op_content}") + + # --- Patch win_iocp_io_context.ipp --- + + # Fix post_deferred_completion: pass op->completionKey() instead of 0 for the key + string(REPLACE + "::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op)" + "::PostQueuedCompletionStatus(iocp_.handle, 0, op->completionKey(), op)" + ctx_content "${ctx_content}") + + # Fix on_pending/on_completion: store key on op before posting + string(REPLACE + "if (!::PostQueuedCompletionStatus(iocp_.handle,\n 0, overlapped_contains_result, op))" + "op->completionKey() = overlapped_contains_result;\n if (!::PostQueuedCompletionStatus(iocp_.handle,\n 0, op->completionKey(), op))" + ctx_content "${ctx_content}") + + # Verify the io_context patch took effect + if(NOT ctx_content MATCHES "op->completionKey\\(\\)") + message(WARNING "ASIO IOCP fix: failed to patch win_iocp_io_context.ipp") + return() + endif() + + file(WRITE "${CTX_IMPL}" "${ctx_content}") + + message(STATUS "ASIO IOCP fix applied successfully") +endfunction() diff --git a/include/glaze/net/websocket_client.hpp b/include/glaze/net/websocket_client.hpp index 827102cdb4..1beaf0a433 100644 --- a/include/glaze/net/websocket_client.hpp +++ b/include/glaze/net/websocket_client.hpp @@ -223,7 +223,35 @@ namespace glz resolver_.reset(); } - // Reset socket pointers (sockets already closed via force_close above) + // Cancel pending operations on raw sockets before destroying them. + // During the connection/handshake phase, the websocket_connection doesn't + // exist yet, so force_close() above is a no-op. We must cancel any pending + // operations on the raw sockets directly. + { + asio::error_code ec; + if (tcp_socket_) { + tcp_socket_->cancel(ec); + tcp_socket_->close(ec); + } +#ifdef GLZ_ENABLE_SSL + if (ssl_socket_) { + ssl_socket_->lowest_layer().cancel(ec); + ssl_socket_->lowest_layer().close(ec); + } +#endif + } + + // Drain any pending completion handlers (e.g. IOCP cancellation completions) + // so they are processed before the io_context or sockets are destroyed. + // Safety: handler shared_ptrs (on_error, on_message, etc.) are already cleared + // above, so any completion handlers that fire during poll() will find null + // handler pointers and skip user callbacks. + if (ctx) { + ctx->restart(); + ctx->poll(); + } + + // Now safe to reset socket pointers - all pending operations have completed tcp_socket_.reset(); #ifdef GLZ_ENABLE_SSL ssl_socket_.reset(); @@ -452,6 +480,12 @@ namespace glz ws_conn->set_client_mode(true); ws_conn->set_max_message_size(self->max_message_size); + // Set handlers before processing initial data, so that any + // frames in the initial data are properly dispatched. + if (self->on_message && *self->on_message) ws_conn->on_message(*self->on_message); + if (self->on_close && *self->on_close) ws_conn->on_close(*self->on_close); + if (self->on_error && *self->on_error) ws_conn->on_error(*self->on_error); + if (response_buf->size() > 0) { std::string_view initial_data{ static_cast(response_buf->data().data()), @@ -459,10 +493,6 @@ namespace glz ws_conn->set_initial_data(initial_data); } - if (self->on_message && *self->on_message) ws_conn->on_message(*self->on_message); - if (self->on_close && *self->on_close) ws_conn->on_close(*self->on_close); - if (self->on_error && *self->on_error) ws_conn->on_error(*self->on_error); - ws_conn->start_read(); { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 796b82df90..b160378201 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -62,6 +62,13 @@ else() GIT_SHALLOW TRUE ) FetchContent_MakeAvailable(asio) + + # Apply IOCP fix for issue #312 (Windows crash in op->complete) + if(WIN32) + include(${CMAKE_CURRENT_SOURCE_DIR}/../cmake/asio-iocp-fix.cmake) + apply_asio_iocp_fix("${asio_SOURCE_DIR}/asio/include") + endif() + add_library(glz_asio INTERFACE) target_include_directories(glz_asio INTERFACE ${asio_SOURCE_DIR}/asio/include) else() diff --git a/tests/networking_tests/websocket_test/CMakeLists.txt b/tests/networking_tests/websocket_test/CMakeLists.txt index 0718fd06ed..ccb92e00b4 100644 --- a/tests/networking_tests/websocket_test/CMakeLists.txt +++ b/tests/networking_tests/websocket_test/CMakeLists.txt @@ -34,6 +34,15 @@ target_link_libraries(${PROJECT_NAME} PRIVATE glz_test_exceptions) add_test(NAME ${PROJECT_NAME} COMMAND ${PROJECT_NAME}) +# WebSocket client lifetime tests (issue #2409) +project(websocket_client_lifetime_test) + +add_executable(${PROJECT_NAME} ${PROJECT_NAME}.cpp) + +target_link_libraries(${PROJECT_NAME} PRIVATE glz_test_exceptions) + +add_test(NAME ${PROJECT_NAME} COMMAND ${PROJECT_NAME}) + # Shared Context Bug Test project(shared_context_bug_test) diff --git a/tests/networking_tests/websocket_test/websocket_client_lifetime_test.cpp b/tests/networking_tests/websocket_test/websocket_client_lifetime_test.cpp new file mode 100644 index 0000000000..213e0ec979 --- /dev/null +++ b/tests/networking_tests/websocket_test/websocket_client_lifetime_test.cpp @@ -0,0 +1,437 @@ +// Glaze Library +// For the license information refer to glaze.hpp + +// Tests for WebSocket client lifetime safety (issue #2409) +// +// These tests verify that the websocket_client can be safely destroyed at any +// point during the connection lifecycle without causing crashes (access violations +// on Windows IOCP, segfaults on other platforms). +// +// The root cause of #2409: cancel_all() only called force_close() on the +// websocket_connection, but during the connection/handshake phase the connection +// doesn't exist yet (it's still monostate). The raw sockets (tcp_socket_, +// ssl_socket_) were destroyed while async operations (async_connect, +// async_handshake) were still pending, causing IOCP completions to reference +// freed memory. + +#include "glaze/net/websocket_client.hpp" + +#include +#include +#include +#include +#include + +#include "glaze/net/http_server.hpp" +#include "glaze/net/websocket_connection.hpp" +#include "ut/ut.hpp" + +using namespace ut; +using namespace glz; + +template +bool wait_for_condition(Predicate pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(5000)) +{ + auto start = std::chrono::steady_clock::now(); + while (!pred()) { + if (std::chrono::steady_clock::now() - start > timeout) { + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return true; +} + +// Helper to run a basic echo server +void run_echo_server(std::atomic& server_ready, std::atomic& should_stop, uint16_t port) +{ + http_server server; + auto ws_server = std::make_shared(); + + ws_server->on_open([](auto, const request&) {}); + ws_server->on_message([](auto conn, std::string_view message, ws_opcode opcode) { + if (opcode == ws_opcode::text) { + std::string echo_msg = "Echo: "; + echo_msg.append(message); + conn->send_text(echo_msg); + } + else if (opcode == ws_opcode::binary) { + conn->send_binary(message); + } + }); + ws_server->on_close([](auto, ws_close_code, std::string_view) {}); + ws_server->on_error([](auto, std::error_code) {}); + + server.websocket("/ws", ws_server); + + try { + server.bind(port); + server.start(); + server_ready = true; + + while (!should_stop) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + server.stop(); + } + catch (const std::exception& e) { + std::cerr << "Server Exception: " << e.what() << "\n"; + server_ready = true; + } +} + +// Helper to run a server that sends a message immediately on open +void run_immediate_send_server(std::atomic& server_ready, std::atomic& should_stop, uint16_t port) +{ + http_server server; + auto ws_server = std::make_shared(); + + ws_server->on_open([](auto conn, const request&) { + // Send a message immediately upon connection + conn->send_text("welcome"); + }); + ws_server->on_message([](auto, std::string_view, ws_opcode) {}); + ws_server->on_close([](auto, ws_close_code, std::string_view) {}); + ws_server->on_error([](auto, std::error_code) {}); + + server.websocket("/ws", ws_server); + + try { + server.bind(port); + server.start(); + server_ready = true; + + while (!should_stop) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + server.stop(); + } + catch (const std::exception& e) { + std::cerr << "Server Exception: " << e.what() << "\n"; + server_ready = true; + } +} + +// ============================================================================ +// Tests for issue #2409: safe destruction during connection lifecycle +// ============================================================================ + +suite websocket_client_lifetime_tests = [] { + // ----------------------------------------------------------------------- + // Test: Destroying client while DNS resolve is in progress + // Before the fix, the socket could be destroyed while async_connect or + // async_resolve was pending, causing use-after-free on IOCP. + // ----------------------------------------------------------------------- + "destroy_during_resolve"_test = [] { + for (int i = 0; i < 10; ++i) { + // Use a non-routable address so resolve might take time + // but mainly this tests the cleanup path in cancel_all() + websocket_client client; + client.on_error([](std::error_code) {}); + client.on_close([](ws_close_code, std::string_view) {}); + + client.connect("ws://localhost:19876/ws"); + + // Immediately destroy without calling run() - the async_resolve + // is queued but not yet started. cancel_all() must safely cancel + // the resolver and clean up the socket. + } + // If we get here without crashing, the test passes. + expect(true) << "Destroying client during resolve should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Destroying client during async_connect + // Start connecting to a real server, then destroy the client while + // async_connect may be in progress. + // ----------------------------------------------------------------------- + "destroy_during_connect"_test = [] { + uint16_t port = 18401; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_echo_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + for (int i = 0; i < 20; ++i) { + websocket_client client; + client.on_error([](std::error_code) {}); + client.on_close([](ws_close_code, std::string_view) {}); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client.connect(url); + + // Run the io_context very briefly to potentially get into the + // connect phase, then destroy the client. + client.context()->run_for(std::chrono::milliseconds(1)); + + // Client is destroyed here. cancel_all() must properly cancel + // the pending async_connect and drain IOCP completions. + } + + stop_server = true; + server_thread.join(); + + expect(true) << "Destroying client during connect should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Destroying client during HTTP handshake + // Let the TCP connection succeed but destroy during the WebSocket + // HTTP upgrade handshake. + // ----------------------------------------------------------------------- + "destroy_during_handshake"_test = [] { + uint16_t port = 18402; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_echo_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + for (int i = 0; i < 20; ++i) { + websocket_client client; + client.on_error([](std::error_code) {}); + client.on_close([](ws_close_code, std::string_view) {}); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client.connect(url); + + // Run long enough for the TCP connect to succeed but potentially + // not long enough for the full HTTP handshake. + client.context()->run_for(std::chrono::milliseconds(5)); + + // Client is destroyed here during handshake. + } + + stop_server = true; + server_thread.join(); + + expect(true) << "Destroying client during handshake should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Destroying client during active WebSocket communication + // Establish a full connection, start reading, then destroy. + // ----------------------------------------------------------------------- + "destroy_during_active_connection"_test = [] { + uint16_t port = 18403; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_echo_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + for (int i = 0; i < 10; ++i) { + std::atomic connected{false}; + + websocket_client client; + client.on_open([&connected]() { connected = true; }); + client.on_message([](std::string_view, ws_opcode) {}); + client.on_error([](std::error_code) {}); + client.on_close([](ws_close_code, std::string_view) {}); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client.connect(url); + + // Run until connected or timeout + std::thread client_thread([&client]() { client.context()->run(); }); + + // Wait a bit for the connection to establish + wait_for_condition([&] { return connected.load(); }, std::chrono::milliseconds(2000)); + + // Stop and destroy while the connection is active (async_read_some pending) + client.context()->stop(); + client_thread.join(); + + // Client destructor runs here with an active connection + } + + stop_server = true; + server_thread.join(); + + expect(true) << "Destroying client during active connection should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Rapid connect/destroy cycles + // Stress test: rapidly create, connect, and destroy clients. + // This maximizes the chance of hitting IOCP timing windows. + // ----------------------------------------------------------------------- + "rapid_connect_destroy_cycles"_test = [] { + uint16_t port = 18404; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_echo_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + // Do many rapid connect/destroy cycles + for (int i = 0; i < 50; ++i) { + auto client = std::make_unique(); + client->on_error([](std::error_code) {}); + client->on_close([](ws_close_code, std::string_view) {}); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client->connect(url); + + // Vary the timing to hit different phases of the connection + if (i % 3 == 0) { + // Destroy immediately (during resolve) + client.reset(); + } + else if (i % 3 == 1) { + // Run briefly then destroy (during connect/handshake) + client->context()->run_for(std::chrono::milliseconds(2)); + client.reset(); + } + else { + // Run longer then destroy (during handshake or active read) + client->context()->run_for(std::chrono::milliseconds(10)); + client.reset(); + } + } + + stop_server = true; + server_thread.join(); + + expect(true) << "Rapid connect/destroy cycles should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Destroying client that failed to connect + // Verify clean shutdown when connection was refused. + // ----------------------------------------------------------------------- + "destroy_after_connection_failure"_test = [] { + for (int i = 0; i < 10; ++i) { + websocket_client client; + std::atomic error_received{false}; + + client.on_error([&](std::error_code) { error_received = true; }); + client.on_close([](ws_close_code, std::string_view) {}); + + // Connect to a port with no server + client.connect("ws://localhost:19877/ws"); + + // Run until the error is received or timeout + std::thread client_thread([&client]() { client.context()->run(); }); + + wait_for_condition([&] { return error_received.load(); }, std::chrono::milliseconds(3000)); + + if (!client.context()->stopped()) { + client.context()->stop(); + } + client_thread.join(); + + // Client destructor runs here - must clean up properly + } + + expect(true) << "Destroying client after connection failure should not crash"; + }; + + // ----------------------------------------------------------------------- + // Test: Multiple clients sharing context, destroyed independently + // ----------------------------------------------------------------------- + "shared_context_destroy_order"_test = [] { + uint16_t port = 18405; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_echo_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + auto shared_ctx = std::make_shared(); + + { + websocket_client client1(shared_ctx); + websocket_client client2(shared_ctx); + + client1.on_error([](std::error_code) {}); + client1.on_close([](ws_close_code, std::string_view) {}); + client2.on_error([](std::error_code) {}); + client2.on_close([](ws_close_code, std::string_view) {}); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client1.connect(url); + client2.connect(url); + + shared_ctx->run_for(std::chrono::milliseconds(10)); + + // client2 destroyed first, then client1 - both must clean up safely + } + + // shared_ctx is still alive here + expect(true) << "Shared context clients destroyed in order should not crash"; + + stop_server = true; + server_thread.join(); + }; +}; + +// ============================================================================ +// Test for initial data handler ordering fix +// ============================================================================ + +suite websocket_initial_data_tests = [] { + // ----------------------------------------------------------------------- + // Test: Server sends message immediately on open + // Before the fix, handlers were set AFTER set_initial_data(), so if the + // server sent data that arrived in the same TCP segment as the HTTP + // upgrade response, the on_message handler would miss it. + // ----------------------------------------------------------------------- + "immediate_server_message_received"_test = [] { + uint16_t port = 18406; + std::atomic server_ready{false}; + std::atomic stop_server{false}; + + std::thread server_thread(run_immediate_send_server, std::ref(server_ready), std::ref(stop_server), port); + expect(wait_for_condition([&] { return server_ready.load(); })) << "Server failed to start"; + + websocket_client client; + std::atomic message_received{false}; + std::string received_msg; + std::mutex msg_mutex; + + client.on_open([]() {}); + + client.on_message([&](std::string_view message, ws_opcode) { + std::lock_guard lock(msg_mutex); + received_msg = std::string(message); + message_received = true; + }); + + client.on_error([](std::error_code ec) { + std::cerr << "Client error: " << ec.message() << "\n"; + }); + + client.on_close([&](ws_close_code, std::string_view) { client.context()->stop(); }); + + std::string url = "ws://localhost:" + std::to_string(port) + "/ws"; + client.connect(url); + + std::thread client_thread([&client]() { client.context()->run(); }); + + bool got_message = wait_for_condition([&] { return message_received.load(); }, std::chrono::milliseconds(3000)); + + // Close and clean up + if (got_message) { + client.close(); + } + + if (!client.context()->stopped()) { + client.context()->stop(); + } + client_thread.join(); + + expect(got_message) << "Should receive message sent immediately by server"; + + { + std::lock_guard lock(msg_mutex); + expect(received_msg == "welcome") << "Message content should be 'welcome'"; + } + + stop_server = true; + server_thread.join(); + }; +}; + +int main() {}