From c38401f52300bdf96ef04a251a384a7da6480665 Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 09:36:04 -0500 Subject: [PATCH 1/6] websocket optimization and benchmarking --- benchmarks/CMakeLists.txt | 3 + benchmarks/ws_benchmark/CMakeLists.txt | 65 +++ benchmarks/ws_benchmark/ws_benchmark.cpp | 470 +++++++++++++++++++++ include/glaze/net/websocket_connection.hpp | 223 ++++++---- 4 files changed, 685 insertions(+), 76 deletions(-) create mode 100644 benchmarks/ws_benchmark/CMakeLists.txt create mode 100644 benchmarks/ws_benchmark/ws_benchmark.cpp diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 505988c22d..3b181d0277 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -119,3 +119,6 @@ target_compile_options(ordered_map_memory PRIVATE # HTTP server benchmark: Glaze vs Boost.Beast add_subdirectory(http_benchmark) + +# WebSocket benchmark: Glaze vs uWebSockets +add_subdirectory(ws_benchmark) diff --git a/benchmarks/ws_benchmark/CMakeLists.txt b/benchmarks/ws_benchmark/CMakeLists.txt new file mode 100644 index 0000000000..f01158d5ca --- /dev/null +++ b/benchmarks/ws_benchmark/CMakeLists.txt @@ -0,0 +1,65 @@ +project(ws_benchmark) + +# uWebSockets uses epoll/kqueue — not available on Windows +if(WIN32) + message(STATUS "[bench] WebSocket benchmark not supported on Windows — skipping") + return() +endif() + +find_package(Boost QUIET CONFIG) +if(NOT Boost_FOUND) + message(STATUS "[bench] Boost not found — skipping WebSocket benchmark (requires Boost.Beast client)") + return() +endif() + +include(FetchContent) + +# Download uWebSockets and uSockets sources (Makefile projects, no CMakeLists.txt) +FetchContent_Declare(uWebSockets + GIT_REPOSITORY https://github.com/uNetworking/uWebSockets.git + GIT_TAG master + GIT_SHALLOW TRUE +) +FetchContent_Declare(uSockets + GIT_REPOSITORY https://github.com/uNetworking/uSockets.git + GIT_TAG master + GIT_SHALLOW TRUE +) + +set(FETCHCONTENT_QUIET OFF) +FetchContent_MakeAvailable(uWebSockets uSockets) + +# Build uSockets as static library (no SSL, no libuv — raw kqueue/epoll) +add_library(uSockets_static STATIC + ${usockets_SOURCE_DIR}/src/bsd.c + ${usockets_SOURCE_DIR}/src/context.c + ${usockets_SOURCE_DIR}/src/loop.c + ${usockets_SOURCE_DIR}/src/socket.c + ${usockets_SOURCE_DIR}/src/eventing/epoll_kqueue.c +) +target_include_directories(uSockets_static PUBLIC ${usockets_SOURCE_DIR}/src) +target_compile_definitions(uSockets_static PUBLIC LIBUS_NO_SSL) +target_compile_options(uSockets_static PRIVATE -O3 -w) + +# uWebSockets header-only interface +add_library(uWebSockets_headers INTERFACE) +target_include_directories(uWebSockets_headers INTERFACE ${uwebsockets_SOURCE_DIR}/src) +target_link_libraries(uWebSockets_headers INTERFACE uSockets_static) +target_compile_definitions(uWebSockets_headers INTERFACE UWS_NO_ZLIB) + +# Benchmark executable +add_executable(ws_benchmark ws_benchmark.cpp) +target_link_libraries(ws_benchmark PRIVATE + glaze + bencher::bencher + uWebSockets_headers +) +if(TARGET Boost::headers) + target_link_libraries(ws_benchmark PRIVATE Boost::headers) +elseif(TARGET Boost::boost) + target_link_libraries(ws_benchmark PRIVATE Boost::boost) +endif() +target_compile_options(ws_benchmark PRIVATE + $<$:-O3 -march=native> + $<$:/O2> +) diff --git a/benchmarks/ws_benchmark/ws_benchmark.cpp b/benchmarks/ws_benchmark/ws_benchmark.cpp new file mode 100644 index 0000000000..da89012ee3 --- /dev/null +++ b/benchmarks/ws_benchmark/ws_benchmark.cpp @@ -0,0 +1,470 @@ +// WebSocket Benchmark: Glaze vs uWebSockets +// +// Compares WebSocket echo performance between glz::websocket_server +// and uWebSockets. Uses Boost.Beast WebSocket client for load generation +// to avoid client-side bias. Both servers echo identical payloads. + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "bencher/bar_chart.hpp" +#include "bencher/bencher.hpp" +#include "bencher/diagnostics.hpp" +#include "bencher/file.hpp" + +#include "glaze/glaze.hpp" +#include "glaze/net/http_server.hpp" +#include "glaze/net/websocket_connection.hpp" + +#include "App.h" + +namespace net = boost::asio; +namespace beast = boost::beast; +namespace websocket = beast::websocket; +using tcp = net::ip::tcp; + +// --------------------------------------------------------------------------- +// Test payloads (shared by both servers) +// --------------------------------------------------------------------------- + +struct Message +{ + std::string message{}; +}; + +struct User +{ + int64_t id{}; + std::string name{}; + std::string email{}; + int64_t age{}; + bool active{}; + int64_t score{}; +}; + +static std::string small_payload; // 64 bytes +static std::string medium_payload; // 1KB +static std::string json_small; // {"message":"Hello, World!"} +static std::string json_users; // 100 users JSON array +static std::string large_payload; // 64KB + +static void init_payloads() +{ + small_payload = std::string(64, 'x'); + medium_payload = std::string(1024, 'x'); + large_payload = std::string(65536, 'x'); + + Message msg{"Hello, World!"}; + (void)glz::write_json(msg, json_small); + + auto make_users = [](int64_t count) { + std::vector v; + v.reserve(count); + for (int64_t i = 0; i < count; ++i) { + v.push_back( + {i, "User " + std::to_string(i), "user" + std::to_string(i) + "@test.com", 20 + (i % 50), (i % 2 == 0), + i * 10}); + } + return v; + }; + + auto users_100 = make_users(100); + (void)glz::write_json(users_100, json_users); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +static void wait_for_server(uint16_t port, int max_tries = 100) +{ + for (int i = 0; i < max_tries; ++i) { + try { + net::io_context io; + tcp::socket sock(io); + sock.connect({net::ip::make_address("127.0.0.1"), port}); + sock.close(); + return; + } + catch (...) { + } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + std::abort(); +} + +// Connect and upgrade to WebSocket using Beast +static websocket::stream ws_connect(net::io_context& io, uint16_t port) +{ + tcp::resolver resolver(io); + auto results = resolver.resolve("127.0.0.1", std::to_string(port)); + + websocket::stream ws(io); + net::connect(ws.next_layer(), results); + ws.handshake("127.0.0.1:" + std::to_string(port), "/ws"); + ws.text(true); + + return ws; +} + +// Send/receive N messages on a single WebSocket connection. +static uint64_t run_ws_echo_batch(uint16_t port, const std::string& payload, int batch_size) +{ + net::io_context io; + auto ws = ws_connect(io, port); + + beast::flat_buffer buffer; + for (int i = 0; i < batch_size; ++i) { + ws.write(net::buffer(payload)); + buffer.consume(buffer.size()); + ws.read(buffer); + } + + ws.close(websocket::close_code::normal); + return uint64_t(batch_size); +} + +// Open a new WebSocket connection, exchange one message, close. +static uint64_t run_ws_new_conn(uint16_t port, const std::string& payload) +{ + net::io_context io; + auto ws = ws_connect(io, port); + + ws.write(net::buffer(payload)); + beast::flat_buffer buffer; + ws.read(buffer); + + ws.close(websocket::close_code::normal); + return uint64_t(1); +} + +// Run concurrent WebSocket echo across multiple threads. +static uint64_t run_ws_concurrent(uint16_t port, const std::string& payload, int num_threads, int msgs_per_thread) +{ + std::atomic total{0}; + std::vector threads; + threads.reserve(num_threads); + + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([&] { + net::io_context io; + auto ws = ws_connect(io, port); + + beast::flat_buffer buffer; + for (int i = 0; i < msgs_per_thread; ++i) { + ws.write(net::buffer(payload)); + buffer.consume(buffer.size()); + ws.read(buffer); + total.fetch_add(1, std::memory_order_relaxed); + } + + ws.close(websocket::close_code::normal); + }); + } + + for (auto& t : threads) { + t.join(); + } + return total.load(); +} + +// --------------------------------------------------------------------------- +// Glaze WebSocket server +// --------------------------------------------------------------------------- + +struct glaze_ws_server +{ + glz::http_server<> server{}; + std::shared_ptr ws; + std::thread thread; + + glaze_ws_server(uint16_t port) + { + server.on_error([](std::error_code, std::source_location) {}); + + ws = std::make_shared(); + ws->on_message([](auto conn, std::string_view msg, glz::ws_opcode op) { + if (op == glz::ws_opcode::text) { + conn->send_text(msg); + } + else { + conn->send_binary(msg); + } + }); + + server.websocket("/ws", ws); + server.bind("127.0.0.1", port); + thread = std::thread([this]() { server.start(1); }); + wait_for_server(port); + } + + ~glaze_ws_server() + { + server.stop(); + if (thread.joinable()) thread.join(); + } +}; + +// --------------------------------------------------------------------------- +// uWebSockets server +// --------------------------------------------------------------------------- + +struct PerSocketData +{}; + +struct uws_server +{ + std::thread thread; + struct us_listen_socket_t* listen_socket = nullptr; + uWS::Loop* loop = nullptr; + + uws_server(uint16_t port) + { + std::promise ready; + auto future = ready.get_future(); + + thread = std::thread([this, port, &ready]() { + uWS::App app; + loop = uWS::Loop::get(); + + app.ws( + "/ws", + {.compression = uWS::DISABLED, + .maxPayloadLength = 128 * 1024, + .idleTimeout = 120, + .message = + [](auto* ws, std::string_view message, uWS::OpCode opCode) { ws->send(message, opCode); }}) + .listen(port, + [this, &ready](auto* token) { + listen_socket = token; + ready.set_value(token != nullptr); + }) + .run(); + }); + + if (!future.get()) { + throw std::runtime_error("uWebSockets failed to listen"); + } + wait_for_server(port); + } + + ~uws_server() + { + if (loop && listen_socket) { + loop->defer([ls = listen_socket]() { us_listen_socket_close(0, ls); }); + } + if (thread.joinable()) thread.join(); + } +}; + +// --------------------------------------------------------------------------- +// Validation +// --------------------------------------------------------------------------- + +static void validate_servers(uint16_t glaze_port, uint16_t uws_port) +{ + const std::string test_msg = "Hello, WebSocket benchmark!"; + + auto validate_echo = [&](uint16_t port, const char* name) { + net::io_context io; + auto ws = ws_connect(io, port); + + ws.write(net::buffer(test_msg)); + beast::flat_buffer buf; + ws.read(buf); + + std::string response = beast::buffers_to_string(buf.data()); + if (response != test_msg) { + std::fprintf(stderr, "VALIDATION FAILED: %s echo mismatch\n Expected: %s\n Got: %s\n", name, + test_msg.c_str(), response.c_str()); + std::abort(); + } + + ws.close(websocket::close_code::normal); + }; + + validate_echo(glaze_port, "Glaze"); + validate_echo(uws_port, "uWebSockets"); +} + +// --------------------------------------------------------------------------- +// Chart helper +// --------------------------------------------------------------------------- + +static std::string stage_bar_chart(const bencher::stage& stage) +{ + std::vector names; + std::vector data; + for (const auto& r : stage.results) { + names.push_back(r.name); + data.push_back(r.throughput_mb_per_sec); + } + chart_config cfg; + cfg.title = stage.name; + cfg.y_axis_label = stage.throughput_units_label; + cfg.chart_width = 1000; + cfg.chart_height = 700; + cfg.margin_bottom = 280; + cfg.font_size_bar_label = 16.0; + cfg.font_size_value_label = 16.0; + cfg.label_rotation = -45.0; + return generate_bar_chart_svg(names, data, cfg); +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +int main() +{ + init_payloads(); + + static constexpr uint16_t glaze_port = 19765; + static constexpr uint16_t uws_port = 19766; + static constexpr int batch = 1000; + + const unsigned hw_threads = (std::max)(1u, std::thread::hardware_concurrency()); + const int client_threads = static_cast(hw_threads); + + std::string all_markdown; + + { + glaze_ws_server glz_srv(glaze_port); + uws_server uws_srv(uws_port); + + validate_servers(glaze_port, uws_port); + + // Echo Small Text (64B) + { + bencher::stage stage; + stage.name = "Echo Small Text 64B (single connection)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_echo_batch(glaze_port, small_payload, batch); }); + stage.run("uWebSockets", [&] { return run_ws_echo_batch(uws_port, small_payload, batch); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_echo_small.svg"); + } + + // Echo Medium Text (1KB) + { + bencher::stage stage; + stage.name = "Echo Medium Text 1KB (single connection)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_echo_batch(glaze_port, medium_payload, batch); }); + stage.run("uWebSockets", [&] { return run_ws_echo_batch(uws_port, medium_payload, batch); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_echo_medium.svg"); + } + + // Echo JSON Small + { + bencher::stage stage; + stage.name = "Echo JSON Small (single connection)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_echo_batch(glaze_port, json_small, batch); }); + stage.run("uWebSockets", [&] { return run_ws_echo_batch(uws_port, json_small, batch); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_echo_json_small.svg"); + } + + // Echo JSON 100 Users + { + bencher::stage stage; + stage.name = "Echo JSON 100 Users (single connection)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_echo_batch(glaze_port, json_users, batch); }); + stage.run("uWebSockets", [&] { return run_ws_echo_batch(uws_port, json_users, batch); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_echo_json_users.svg"); + } + + // Echo Large Text (64KB) + { + bencher::stage stage; + stage.name = "Echo Large Text 64KB (single connection)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_echo_batch(glaze_port, large_payload, batch); }); + stage.run("uWebSockets", [&] { return run_ws_echo_batch(uws_port, large_payload, batch); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_echo_large.svg"); + } + + // Connection Upgrade (new WebSocket per message exchange) + { + bencher::stage stage; + stage.name = "Connection Upgrade (new WebSocket per message)"; + stage.throughput_units_label = "conn/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_new_conn(glaze_port, small_payload); }); + stage.run("uWebSockets", [&] { return run_ws_new_conn(uws_port, small_payload); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_connection_upgrade.svg"); + } + + // Concurrent Echo Small + { + bencher::stage stage; + stage.name = "Concurrent Echo Small (" + std::to_string(client_threads) + " clients x 1000 msg)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_concurrent(glaze_port, small_payload, client_threads, 1000); }); + stage.run("uWebSockets", [&] { return run_ws_concurrent(uws_port, small_payload, client_threads, 1000); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_concurrent_small.svg"); + } + + // Concurrent Echo JSON 100 Users + { + bencher::stage stage; + stage.name = + "Concurrent Echo JSON 100 Users (" + std::to_string(client_threads) + " clients x 1000 msg)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze", [&] { return run_ws_concurrent(glaze_port, json_users, client_threads, 1000); }); + stage.run("uWebSockets", [&] { return run_ws_concurrent(uws_port, json_users, client_threads, 1000); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_concurrent_json.svg"); + } + } + + bencher::save_file(all_markdown, "ws_benchmark.md"); + + return 0; +} diff --git a/include/glaze/net/websocket_connection.hpp b/include/glaze/net/websocket_connection.hpp index ccaeb6740c..7866174dd3 100644 --- a/include/glaze/net/websocket_connection.hpp +++ b/include/glaze/net/websocket_connection.hpp @@ -451,6 +451,7 @@ namespace glz std::weak_ptr server = std::weak_ptr{}) : socket_(socket), server_(std::move(server)) { + read_buf_.resize(initial_read_buf_size); // Try to get remote endpoint, but don't fail if it's not available yet try { remote_endpoint_ = socket_->lowest_layer().remote_endpoint(); @@ -560,16 +561,31 @@ namespace glz // Inject initial data read during handshake inline void set_initial_data(std::string_view data) { - frame_buffer_.insert(frame_buffer_.end(), data.begin(), data.end()); - size_t consumed = process_frames(frame_buffer_.data(), frame_buffer_.size()); - if (consumed > 0) { - frame_buffer_.erase(frame_buffer_.begin(), frame_buffer_.begin() + consumed); + if (buf_len_ + data.size() > read_buf_.size()) { + read_buf_.resize(buf_len_ + data.size()); } + std::memcpy(read_buf_.data() + buf_len_, data.data(), data.size()); + buf_len_ += data.size(); + + compact_read_buf(process_frames(read_buf_.data(), buf_len_)); } private: + static constexpr size_t initial_read_buf_size = 16384; size_t max_message_size_{1024 * 1024 * 16}; // 16 MB limit + // Shift unconsumed data to the front of read_buf_ after processing frames. + inline void compact_read_buf(size_t consumed) + { + if (consumed > 0) { + size_t remaining = buf_len_ - consumed; + if (remaining > 0) { + std::memmove(read_buf_.data(), read_buf_.data() + consumed, remaining); + } + buf_len_ = remaining; + } + } + inline void perform_handshake(const request& req) { // Validate WebSocket upgrade request @@ -673,20 +689,16 @@ namespace glz return; } - // Add received data to frame buffer - frame_buffer_.insert(frame_buffer_.end(), read_buffer_.begin(), read_buffer_.begin() + bytes_transferred); + buf_len_ += bytes_transferred; // Check buffer size limit (simple DoS protection) - if (frame_buffer_.size() > max_message_size_ + 1024) { // Allow some header overhead + if (buf_len_ > max_message_size_ + 1024) { // Allow some header overhead close(ws_close_code::message_too_big, "Buffer limit exceeded"); return; } - // Process complete frames - size_t consumed = process_frames(frame_buffer_.data(), frame_buffer_.size()); - if (consumed > 0) { - frame_buffer_.erase(frame_buffer_.begin(), frame_buffer_.begin() + consumed); - } + // Process complete frames directly from the flat buffer + compact_read_buf(process_frames(read_buf_.data(), buf_len_)); // Continue reading if connection is still open // RFC 6455 Section 7.1.1: The initiator must continue reading to receive the peer's @@ -758,21 +770,40 @@ namespace glz uint8_t* payload_ptr = data + offset + header_size; const auto payload_size = static_cast(payload_length); + // Fused unmask + ASCII detection: unmask payload and simultaneously check + // whether all bytes are ASCII (high bit clear). If all ASCII, we can skip + // the separate UTF-8 validation pass for text frames. + bool all_ascii = false; if (header.mask() && payload_size > 0) { - for (std::size_t i = 0; i < payload_size; ++i) { + uint32_t mask32; + std::memcpy(&mask32, mask_key.data(), 4); + uint64_t mask64 = (static_cast(mask32) << 32) | mask32; + uint64_t high_bits_acc = 0; + size_t i = 0; + for (; i + 8 <= payload_size; i += 8) { + uint64_t chunk; + std::memcpy(&chunk, payload_ptr + i, 8); + chunk ^= mask64; + std::memcpy(payload_ptr + i, &chunk, 8); + high_bits_acc |= chunk; + } + for (; i < payload_size; ++i) { payload_ptr[i] ^= mask_key[i % 4]; + high_bits_acc |= static_cast(payload_ptr[i]); } + all_ascii = !(high_bits_acc & 0x8080808080808080ULL); } // Handle the frame - handle_frame(header.opcode(), payload_ptr, payload_size, header.fin()); + handle_frame(header.opcode(), payload_ptr, payload_size, header.fin(), all_ascii); offset += header_size + static_cast(payload_length); } return offset; } - inline void handle_frame(ws_opcode opcode, const uint8_t* payload, std::size_t length, bool fin) + inline void handle_frame(ws_opcode opcode, const uint8_t* payload, std::size_t length, bool fin, + bool all_ascii) { switch (opcode) { case ws_opcode::text: @@ -790,7 +821,8 @@ namespace glz if (fin) { // Complete single-frame message - deliver directly (zero-copy) if (opcode == ws_opcode::text) { - if (!glz::validate_utf8(payload, length)) { + // Skip full UTF-8 validation if all bytes are ASCII (detected during unmask) + if (!all_ascii && !glz::validate_utf8(payload, length)) { close(ws_close_code::invalid_payload, "Invalid UTF-8"); return; } @@ -913,66 +945,71 @@ namespace glz } std::size_t header_size = get_frame_header_size(payload.size(), client_mode_); - auto frame_buffer = std::make_unique>(header_size + payload.size()); - - write_frame_header(opcode, payload.size(), fin, frame_buffer->data(), client_mode_); - std::copy(payload.begin(), payload.end(), frame_buffer->begin() + header_size); - - // Apply masking if in client mode - if (client_mode_ && payload.size() > 0) { - // Get the masking key from the header - std::size_t mask_key_offset = header_size - 4; - const uint8_t* mask_key = frame_buffer->data() + mask_key_offset; - // Mask the payload - for (std::size_t i = 0; i < payload.size(); ++i) { - (*frame_buffer)[header_size + i] ^= mask_key[i % 4]; - } - } + bool fast_path = false; - // Queue the frame and start writing if not already in progress { std::lock_guard lock(write_mutex_); - write_queue_.push_back(std::move(frame_buffer)); if (close_after) { close_after_write_ = true; // Set atomically with queuing } - if (write_in_progress_) { - return; // Another write is in progress, frame will be sent when it completes + if (!write_in_progress_) { + // Fast path: build frame directly in persistent write_buf_ (no allocation) + write_in_progress_ = true; + fast_path = true; + } + else { + // Slow path: allocate and queue (another write is in progress) + std::vector frame_data(header_size + payload.size()); + write_frame_header(opcode, payload.size(), fin, frame_data.data(), client_mode_); + std::memcpy(frame_data.data() + header_size, payload.data(), payload.size()); + if (client_mode_ && payload.size() > 0) { + apply_masking(frame_data.data(), header_size, payload.size()); + } + pending_writes_.push_back(std::move(frame_data)); + return; } - write_in_progress_ = true; } - // Start writing the queued frame - do_write(); + // Fast path: build frame in persistent buffer (capacity reused, no allocation). + // write_buf_ is exclusively owned by the writer when write_in_progress_ == true, + // so no lock is needed here. Other threads will queue to pending_writes_ instead. + write_buf_.resize(header_size + payload.size()); + write_frame_header(opcode, payload.size(), fin, write_buf_.data(), client_mode_); + std::memcpy(write_buf_.data() + header_size, payload.data(), payload.size()); + if (client_mode_ && payload.size() > 0) { + apply_masking(write_buf_.data(), header_size, payload.size()); + } + + start_async_write(); } - // Process the write queue - called when a write completes or when starting a new write - inline void do_write() + // Apply XOR masking to payload in a frame buffer (client mode only) + inline void apply_masking(uint8_t* frame_data, size_t header_size, size_t payload_size) { - std::unique_ptr> frame_buffer; - bool should_close_after = false; - - { - std::lock_guard lock(write_mutex_); - if (write_queue_.empty()) { - write_in_progress_ = false; - should_close_after = close_after_write_; - close_after_write_ = false; - } - else { - frame_buffer = std::move(write_queue_.front()); - write_queue_.pop_front(); - } + size_t mask_key_offset = header_size - 4; + const uint8_t* mask_key = frame_data + mask_key_offset; + uint8_t* payload = frame_data + header_size; + uint32_t mask32; + std::memcpy(&mask32, mask_key, 4); + uint64_t mask64 = (static_cast(mask32) << 32) | mask32; + size_t i = 0; + for (; i + 8 <= payload_size; i += 8) { + uint64_t chunk; + std::memcpy(&chunk, payload + i, 8); + chunk ^= mask64; + std::memcpy(payload + i, &chunk, 8); } - - // Handle close-after-write when queue is empty - if (!frame_buffer) { - if (should_close_after) { - schedule_close(); - } - return; + for (; i < payload_size; ++i) { + payload[i] ^= mask_key[i % 4]; } + } + // Start an async write from write_buf_. + // IMPORTANT: write_buf_ must not be modified between this call and the completion handler. + // The async_write references write_buf_.data() directly. Only drain_write_queue (called from + // the completion handler) may touch write_buf_ after the write finishes. + inline void start_async_write() + { // Hold socket_op_mutex_ while setting up the async write operation. // This prevents close() from being called while async_write() is setting up. // ASIO socket operations are NOT thread-safe for concurrent calls. @@ -989,7 +1026,7 @@ namespace glz std::lock_guard lock(write_mutex_); write_in_progress_ = false; had_close_pending = close_after_write_; - write_queue_.clear(); + pending_writes_.clear(); close_after_write_ = false; } if (had_close_pending) { @@ -999,18 +1036,15 @@ namespace glz } auto self = this->shared_from_this(); - // Create buffer view before moving frame_buffer into lambda (C++14 evaluation order safety) - auto buffer_view = asio::buffer(*frame_buffer); - asio::async_write(*socket, buffer_view, - [self, socket, frame_buffer = std::move(frame_buffer)](std::error_code ec, std::size_t) { + asio::async_write(*socket, asio::buffer(write_buf_.data(), write_buf_.size()), + [self, socket](std::error_code ec, size_t) { if (ec) { - // Mark connection as closing before notifying handlers to avoid re-entrant close - // attempts + // Mark as closing before notifying handlers to avoid re-entrant close attempts self->is_closing_ = true; { std::lock_guard lock(self->write_mutex_); self->write_in_progress_ = false; - self->write_queue_.clear(); + self->pending_writes_.clear(); self->close_after_write_ = false; } // Only call error handlers if not already closed (prevents calling handlers @@ -1023,11 +1057,42 @@ namespace glz return; } - // Process next frame in queue - self->do_write(); + // Drain pending writes + self->drain_write_queue(); }); } + // Process queued writes after a write completes + inline void drain_write_queue() + { + bool queue_empty = false; + bool should_close_after = false; + + { + std::lock_guard lock(write_mutex_); + if (pending_writes_.empty()) { + write_in_progress_ = false; + should_close_after = close_after_write_; + close_after_write_ = false; + queue_empty = true; + } + else { + // write_buf_ is exclusively owned by the writer when write_in_progress_ == true + write_buf_ = std::move(pending_writes_.front()); + pending_writes_.pop_front(); + } + } + + if (queue_empty) { + if (should_close_after) { + schedule_close(); + } + return; + } + + start_async_write(); + } + // Close the socket after the close frame has been sent (responder side) // RFC 6455 Section 7.1.1: "the server MUST close the connection immediately" // after sending the Close frame response. @@ -1200,8 +1265,8 @@ namespace glz std::shared_ptr socket_; std::weak_ptr server_; - std::array read_buffer_; - std::vector frame_buffer_; + std::vector read_buf_; // Flat read buffer (replaces read_buffer_ + frame_buffer_) + size_t buf_len_{0}; // Valid data in read_buf_[0..buf_len_) std::vector message_buffer_; ws_opcode current_opcode_{ws_opcode::continuation}; bool is_reading_frame_{false}; @@ -1212,9 +1277,10 @@ namespace glz asio::ip::tcp::endpoint remote_endpoint_; bool client_mode_{false}; // For client-side connections (requires masking) - // Write queue for serializing outgoing frames (prevents interleaved writes) + // Write state for serializing outgoing frames (prevents interleaved writes) std::mutex write_mutex_; - std::deque>> write_queue_; + std::vector write_buf_; // Persistent write buffer (capacity reused across messages) + std::deque> pending_writes_; // Queue for concurrent writes bool write_in_progress_{false}; bool close_after_write_{false}; // Close socket after write queue drains @@ -1278,8 +1344,13 @@ namespace glz return; } + // Grow buffer if full (capped at max message size + frame overhead) + if (buf_len_ == read_buf_.size()) { + read_buf_.resize((std::min)(read_buf_.size() * 2, max_message_size_ + 16384)); + } + auto self = this->shared_from_this(); - socket->async_read_some(asio::buffer(read_buffer_), + socket->async_read_some(asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), [self, socket](std::error_code ec, std::size_t bytes_transferred) { self->on_read(ec, bytes_transferred); }); From b4e157002477ceb1d8b6ceba6d732914b0485e8d Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 10:49:13 -0500 Subject: [PATCH 2/6] shared receive buffers --- benchmarks/ws_benchmark/ws_benchmark.cpp | 1 + include/glaze/net/http_server.hpp | 52 ++- include/glaze/net/websocket_connection.hpp | 357 ++++++++++++++++++--- 3 files changed, 366 insertions(+), 44 deletions(-) diff --git a/benchmarks/ws_benchmark/ws_benchmark.cpp b/benchmarks/ws_benchmark/ws_benchmark.cpp index da89012ee3..023b199866 100644 --- a/benchmarks/ws_benchmark/ws_benchmark.cpp +++ b/benchmarks/ws_benchmark/ws_benchmark.cpp @@ -202,6 +202,7 @@ struct glaze_ws_server server.websocket("/ws", ws); server.bind("127.0.0.1", port); + server.ws_recv_buffer_size(512 * 1024); // 512KB shared receive buffer per thread thread = std::thread([this]() { server.start(1); }); wait_for_server(port); } diff --git a/include/glaze/net/http_server.hpp b/include/glaze/net/http_server.hpp index f2bf2ad318..0ad06c0ce6 100644 --- a/include/glaze/net/http_server.hpp +++ b/include/glaze/net/http_server.hpp @@ -776,15 +776,29 @@ namespace glz } } + // Allocate shared WebSocket receive buffers if configured + if (ws_recv_buffer_size_ > 0 && actual_threads > 0) { + ws_recv_buffers_ = std::make_shared>>(actual_threads); + for (auto& buf : *ws_recv_buffers_) { + buf.resize(ws_recv_buffer_size_); + } + } + // Start the acceptor do_accept(); // Start worker threads (unless explicitly set to 0) if (io_context && actual_threads > 0) { + // Thread index map: each thread registers its ID → buffer index at startup. + // Thread-safe: each thread writes its own unique entry before io_context::run() + // processes any connections. + ws_thread_indices_ = std::make_shared(); + ws_thread_indices_->reserve(actual_threads); + threads.reserve(actual_threads); for (size_t i = 0; i < actual_threads; ++i) { - threads.emplace_back([this] { + threads.emplace_back([this, i] { + ws_thread_indices_->emplace(std::this_thread::get_id(), i); io_context->run(); - // Don't report errors during shutdown }); } } @@ -1195,6 +1209,31 @@ namespace glz return *this; } + /// @brief Set the shared WebSocket receive buffer size per thread. + /// + /// When enabled (size > 0), all WebSocket connections on a given thread share a + /// single receive buffer, eliminating per-connection allocation and reducing the + /// number of async read operations for large messages. + /// + /// @warning When shared receive buffers are enabled, the string_view passed to + /// on_message is ONLY valid for the duration of the callback. Retaining a reference + /// after the callback returns causes undefined behavior (silent data corruption). + /// Copy the data if you need it later. This is an opt-in performance mode — the + /// default (0) uses per-connection buffers where the view is valid until the next read. + /// + /// @param size Buffer size in bytes. 0 = disabled (default). Non-zero values + /// below 16KB are clamped to 16KB. Recommended: power-of-2 sizes + /// (e.g., 256KB, 512KB). + /// @return Reference to this server for method chaining + inline http_server& ws_recv_buffer_size(size_t size) + { + if (size > 0 && size < 16384) { + size = 16384; + } + ws_recv_buffer_size_ = size; + return *this; + } + /** * @brief Register a hook to be called when a request is received * @@ -1395,6 +1434,12 @@ namespace glz std::unordered_map> websocket_handlers_; std::unordered_map> streaming_handlers_; + // WebSocket shared receive buffers (opt-in via ws_recv_buffer_size()). + // Default 0 = per-connection buffers. When > 0, allocates one buffer per thread. + size_t ws_recv_buffer_size_{0}; + std::shared_ptr>> ws_recv_buffers_; + std::shared_ptr ws_thread_indices_; + // Wrapping middleware (executes around handlers) std::vector wrapping_middlewares_; @@ -2203,7 +2248,8 @@ namespace glz // Create WebSocket connection and start it // Uses socket_type which is either tcp::socket (ws://) or ssl::stream (wss://) auto socket_ptr = std::make_shared(std::move(conn->socket)); - auto ws_conn = std::make_shared>(socket_ptr, ws_it->second); + auto ws_conn = std::make_shared>( + socket_ptr, ws_it->second, ws_recv_buffers_, ws_thread_indices_); ws_conn->start(req); } diff --git a/include/glaze/net/websocket_connection.hpp b/include/glaze/net/websocket_connection.hpp index 7866174dd3..531d8639b0 100644 --- a/include/glaze/net/websocket_connection.hpp +++ b/include/glaze/net/websocket_connection.hpp @@ -7,14 +7,17 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include +#include #include #include @@ -256,6 +259,11 @@ namespace glz } } + // Thread ID → buffer index map for per-server shared receive buffers. + // Defined here (not in http_server.hpp) because websocket_connection references it. + // unordered_map for O(1) lookup — the map is built once at startup and read-only after. + using ws_thread_map = std::unordered_map; + // Forward declarations template struct websocket_connection; @@ -448,10 +456,16 @@ namespace glz { public: inline websocket_connection(std::shared_ptr socket, - std::weak_ptr server = std::weak_ptr{}) - : socket_(socket), server_(std::move(server)) + std::weak_ptr server = {}, + std::shared_ptr>> recv_buffers = {}, + std::shared_ptr thread_indices = {}) + : socket_(socket), server_(std::move(server)), + recv_buffers_(std::move(recv_buffers)), thread_indices_(std::move(thread_indices)) { - read_buf_.resize(initial_read_buf_size); + // read_buf_ starts empty — lazily allocated on first use. + // For shared-buffer connections, read_buf_ is only used for spill (rarely). + // For fallback connections, ensure_read_buf_capacity() allocates on first do_start_read. + // Try to get remote endpoint, but don't fail if it's not available yet try { remote_endpoint_ = socket_->lowest_layer().remote_endpoint(); @@ -572,6 +586,17 @@ namespace glz private: static constexpr size_t initial_read_buf_size = 16384; + + // Headroom for the fallback path's buffer growth cap. + static constexpr size_t fallback_buf_headroom = 16384; + + // Spill thrashing fallback threshold (3/4 of shared buffer). + static constexpr size_t spill_fallback_numerator = 3; + static constexpr size_t spill_fallback_denominator = 4; + + // Consecutive zero-spill read cycles before reclaiming oversized spill memory. + static constexpr size_t spill_reclaim_cycles = 16; + size_t max_message_size_{1024 * 1024 * 16}; // 16 MB limit // Shift unconsumed data to the front of read_buf_ after processing frames. @@ -586,6 +611,19 @@ namespace glz } } + // Lazy allocation and growth for per-connection read_buf_. + // Used by both the fallback branch of do_start_read and do_shared_read's fallback. + inline void ensure_read_buf_capacity() + { + if (read_buf_.empty()) { + read_buf_.resize(initial_read_buf_size); + } + if (buf_len_ == read_buf_.size()) { + // (std::min) parenthesized to prevent expansion of Windows min/max macros. + read_buf_.resize((std::min)(read_buf_.size() * 2, max_message_size_ + fallback_buf_headroom)); + } + } + inline void perform_handshake(const request& req) { // Validate WebSocket upgrade request @@ -675,36 +713,84 @@ namespace glz }); } + // Fallback read path: called by async_read_some completion handler (per-connection buffer). inline void on_read(std::error_code ec, std::size_t bytes_transferred) { if (ec) { - is_closing_ = true; - // Only call error handlers if not already closed (prevents calling handlers - // after force_close() when user's captured references may be invalid) - if (!closed_.load()) { - dispatch_error(ec); - } - // Close the connection after a read error (connection is likely broken) - do_close(); + on_read_error(ec); return; } buf_len_ += bytes_transferred; - // Check buffer size limit (simple DoS protection) - if (buf_len_ > max_message_size_ + 1024) { // Allow some header overhead - close(ws_close_code::message_too_big, "Buffer limit exceeded"); + // Per-message size protection is inside process_frames (payload_length > max_message_size_). + size_t consumed = 0; + try { + consumed = process_frames(read_buf_.data(), buf_len_); + } + catch (...) { + buf_len_ = 0; + close(ws_close_code::internal_error, "Exception in frame processing"); return; } - // Process complete frames directly from the flat buffer - compact_read_buf(process_frames(read_buf_.data(), buf_len_)); + compact_read_buf(consumed); + + if (!closed_.load() && handshake_complete_) { + start_read(); + } + } + + // Extracted error handler for both read paths. + inline void on_read_error(std::error_code ec) + { + read_pending_ = false; + is_closing_ = true; + if (!closed_.load()) dispatch_error(ec); + do_close(); + } + + // Shared buffer read path: processes data from the shared buffer, saves spill to read_buf_. + inline void on_read_shared(uint8_t* data, size_t total_len) + { + // If process_frames or a user callback throws, the connection is in an indeterminate + // state. Kill the connection to prevent reprocessing already-handled frames. + size_t consumed = 0; + try { + consumed = process_frames(data, total_len); + } + catch (...) { + buf_len_ = 0; + close(ws_close_code::internal_error, "Exception in frame processing"); + return; + } + + // Save unconsumed bytes to read_buf_ (spill) + size_t remaining = total_len - consumed; + if (remaining > 0) { + if (read_buf_.size() < remaining) { + read_buf_.resize(remaining); + } + std::memcpy(read_buf_.data(), data + consumed, remaining); + } + buf_len_ = remaining; + + // Deferred spill reclamation: only free oversized spill memory after + // spill_reclaim_cycles consecutive zero-spill cycles. + // swap() is the guaranteed-release idiom (= {} relies on implementation). + if (buf_len_ == 0) { + if (read_buf_.capacity() > initial_read_buf_size) { + ++zero_spill_count_; + if (zero_spill_count_ >= spill_reclaim_cycles) { + std::vector().swap(read_buf_); + zero_spill_count_ = 0; + } + } + } + else { + zero_spill_count_ = 0; + } - // Continue reading if connection is still open - // RFC 6455 Section 7.1.1: The initiator must continue reading to receive the peer's - // Close frame response before closing the TCP connection. - // Use closed_ (not is_closing_) to allow receiving close response during handshake. - // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.1 if (!closed_.load() && handshake_complete_) { start_read(); } @@ -1265,8 +1351,26 @@ namespace glz std::shared_ptr socket_; std::weak_ptr server_; - std::vector read_buf_; // Flat read buffer (replaces read_buffer_ + frame_buffer_) - size_t buf_len_{0}; // Valid data in read_buf_[0..buf_len_) + + // Shared receive buffer support (server-owned per-thread buffers). + // When recv_buffers_ is non-null, the shared buffer path is used (async_wait + + // synchronous read_some). When null, the fallback path is used (async_read_some + // into per-connection read_buf_). Connections hold a shared_ptr copy, guaranteeing + // the buffers outlive all connections even if the server is destroyed first. + std::shared_ptr>> recv_buffers_; + std::shared_ptr thread_indices_; + bool ssl_has_pending_{false}; // SSL drain loop didn't finish — force re-entry via asio::post + bool read_pending_{false}; // Reentrancy guard for do_start_read (intentionally non-atomic) + size_t zero_spill_count_{0}; // Consecutive zero-spill cycles for deferred reclamation + + // Dual-purpose buffer: + // - Shared path: spill buffer (unconsumed partial-frame bytes). Starts empty. + // - Fallback path: per-connection read buffer. Lazily allocated via ensure_read_buf_capacity(). + std::vector read_buf_; + // Dual-purpose length: + // - Shared path: number of spill bytes in read_buf_ + // - Fallback path: total valid buffered data in read_buf_[0..buf_len_) + size_t buf_len_{0}; std::vector message_buffer_; ws_opcode current_opcode_{ws_opcode::continuation}; bool is_reading_frame_{false}; @@ -1323,37 +1427,208 @@ namespace glz inline void on_error(std::function handler) { client_error_handler_ = std::move(handler); } private: + // Returns the buffer index for the current thread, or nullopt if not found. + // O(1) lookup — the map is built once at startup and read-only after. + inline std::optional find_thread_index() const + { + if (!thread_indices_) return std::nullopt; + auto it = thread_indices_->find(std::this_thread::get_id()); + if (it != thread_indices_->end()) return it->second; + return std::nullopt; + } + inline void do_start_read() { - // Check if already fully closed to avoid operations on closed sockets - // RFC 6455 Section 7.1.1: Use closed_ (not is_closing_) to allow receiving - // close response during handshake. The initiator must continue reading. - // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.1 - if (closed_.load()) { + if (closed_.load()) return; + + // Reentrancy guard: prevents double async_wait/async_read_some if a user + // callback (e.g. on_message handler) triggers start_read() on the same connection. + if (read_pending_) { +#ifndef NDEBUG + std::fprintf(stderr, "[glaze] warning: re-entrant start_read() detected — ignored\n"); +#endif return; } + read_pending_ = true; + + auto thread_idx = find_thread_index(); + bool use_shared = recv_buffers_ && thread_idx.has_value() + && *thread_idx < recv_buffers_->size(); + + // Spill thrashing mitigation: if spill exceeds 3/4 of the shared buffer, + // fall back to direct reads into read_buf_ to avoid O(n) copy-per-cycle. + if (use_shared) { + auto& shared_buf = (*recv_buffers_)[*thread_idx]; + if (buf_len_ > shared_buf.size() / spill_fallback_denominator * spill_fallback_numerator) { + use_shared = false; + } + } - // Hold socket_op_mutex_ while setting up the async read operation. - // This prevents close() from being called while async_read_some() is setting up. - // ASIO socket operations are NOT thread-safe for concurrent calls. + // Hold socket_op_mutex_ unconditionally while setting up the async operation. + // Both paths need it: async_wait needs a valid socket for registration, + // async_read_some needs it for the same reason. std::lock_guard lock(socket_op_mutex_); - - // Re-check under lock since socket may have been closed while waiting for mutex auto socket = socket_; if (!socket || !socket->lowest_layer().is_open()) { + read_pending_ = false; return; } - // Grow buffer if full (capped at max message size + frame overhead) - if (buf_len_ == read_buf_.size()) { - read_buf_.resize((std::min)(read_buf_.size() * 2, max_message_size_ + 16384)); + auto self = this->shared_from_this(); + + if (use_shared) { + if (ssl_has_pending_) { + // SSL layer has residual buffered data — async_wait won't fire. + // Post directly to re-enter do_shared_read immediately. + // ssl_has_pending_ is cleared at the top of do_shared_read (not here). + asio::post(socket->get_executor(), [self, socket]() { + self->do_shared_read(socket); + }); + } + else { + // Shared buffer path: async_wait + synchronous read_some. + // async_wait doesn't touch the buffer — it just waits for readability. + socket->lowest_layer().async_wait(asio::ip::tcp::socket::wait_read, + [self, socket](std::error_code ec) { + if (ec) { + self->on_read_error(ec); + return; + } + self->do_shared_read(socket); + }); + } + } + else { + // Fallback path: async_read_some into per-connection read_buf_. + ensure_read_buf_capacity(); + + socket->async_read_some( + asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), + [self, socket](std::error_code ec, size_t bytes_transferred) { + if (ec) { + self->on_read_error(ec); + } + else { + self->read_pending_ = false; + self->on_read(ec, bytes_transferred); + } + }); } + } - auto self = this->shared_from_this(); - socket->async_read_some(asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), - [self, socket](std::error_code ec, std::size_t bytes_transferred) { - self->on_read(ec, bytes_transferred); - }); + // Shared buffer read: synchronous read_some into the per-thread shared buffer. + // Called from both async_wait callback and asio::post (SSL pending). + inline void do_shared_read(std::shared_ptr socket) + { + // Clear ssl_has_pending_ at the point of action, not in the caller. + ssl_has_pending_ = false; + + // Re-check thread index — ASIO may dispatch this callback on any thread + // calling io_context::run(). Fall back to per-connection buffer if unindexed. + auto idx = find_thread_index(); + if (!idx.has_value() || !recv_buffers_ || *idx >= recv_buffers_->size()) { + // async_wait already fired — the socket is readable. Do a synchronous + // read into read_buf_ (per-connection) and process directly. + ensure_read_buf_capacity(); + std::error_code fallback_ec; + size_t n = 0; + { + std::lock_guard lock(socket_op_mutex_); + if (!socket_ || !socket_->lowest_layer().is_open()) { + read_pending_ = false; + return; + } + n = socket->read_some( + asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), + fallback_ec); + if (!fallback_ec && n == 0) { + fallback_ec = asio::error::eof; + } + } + if (fallback_ec) { + on_read_error(fallback_ec); + } + else { + read_pending_ = false; + on_read(fallback_ec, n); + } + return; + } + + auto& buf = (*recv_buffers_)[*idx]; + + // Copy spill data to shared buffer front (usually 0 bytes). + // Safe outside the lock: the shared buffer is server-owned (refcounted via + // shared_ptr). The ASIO single-threaded callback guarantee means no other + // callback on this thread touches the shared buffer concurrently. + std::error_code read_ec; + size_t offset = buf_len_; + if (offset > 0) { + if (offset > buf.size()) { + read_pending_ = false; + close(ws_close_code::internal_error, "Spill overflow"); + return; + } + std::memcpy(buf.data(), read_buf_.data(), offset); + } + + // Lock only for socket operations — close()/force_close() from another + // thread must not race with read_some(). + { + std::lock_guard lock(socket_op_mutex_); + if (!socket_ || !socket_->lowest_layer().is_open()) { + read_pending_ = false; + return; + } + + auto n = socket->read_some( + asio::buffer(buf.data() + offset, buf.size() - offset), read_ec); + if (!read_ec && n == 0) { + // TCP graceful close (EOF). Synchronous read_some returns 0 with no + // error — unlike async_read_some which surfaces error::eof. Without + // this check, the connection would busy-loop. + read_ec = asio::error::eof; + } + if (!read_ec) { + offset += n; + + // SSL: drain any buffered decrypted data (compile-time elided for plain TCP). + // Capped to minimize mutex hold time. If the cap is hit or the buffer fills, + // ssl_has_pending_ forces immediate re-entry on the next cycle via asio::post(). + if constexpr (!std::is_same_v) { + static constexpr size_t max_ssl_drain_iterations = 4; + size_t drain_count = 0; + while (offset < buf.size() && drain_count < max_ssl_drain_iterations) { + std::error_code drain_ec; + auto m = socket->read_some( + asio::buffer(buf.data() + offset, buf.size() - offset), + drain_ec); + if (drain_ec || m == 0) break; + offset += m; + ++drain_count; + } + if (offset >= buf.size() || drain_count >= max_ssl_drain_iterations) { + ssl_has_pending_ = true; + } + } + } + } + // Lock released — safe for user callbacks to call close()/send() + + if (read_ec) { + on_read_error(read_ec); + return; + } + + read_pending_ = false; + on_read_shared(buf.data(), offset); + +#ifndef NDEBUG + // Poison the shared buffer to detect use-after-callback bugs. + // Any retained string_view into the shared buffer will show 0xDE garbage. + // This is valid memory (server-owned, refcounted) — not a use-after-free. + std::memset(buf.data(), 0xDE, offset); +#endif } }; } \ No newline at end of file From 1d8b6094b462441ad27a73c126a99e5351e0c551 Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 10:53:45 -0500 Subject: [PATCH 3/6] boost asio ec --- include/glaze/net/websocket_connection.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/glaze/net/websocket_connection.hpp b/include/glaze/net/websocket_connection.hpp index 531d8639b0..693d3e1abc 100644 --- a/include/glaze/net/websocket_connection.hpp +++ b/include/glaze/net/websocket_connection.hpp @@ -1542,7 +1542,7 @@ namespace glz asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), fallback_ec); if (!fallback_ec && n == 0) { - fallback_ec = asio::error::eof; + fallback_ec = asio::error::make_error_code(asio::error::eof); } } if (fallback_ec) { @@ -1587,7 +1587,7 @@ namespace glz // TCP graceful close (EOF). Synchronous read_some returns 0 with no // error — unlike async_read_some which surfaces error::eof. Without // this check, the connection would busy-loop. - read_ec = asio::error::eof; + read_ec = asio::error::make_error_code(asio::error::eof); } if (!read_ec) { offset += n; From 005ddab4634aa7a900b902b6d11224c035b9874a Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 11:07:46 -0500 Subject: [PATCH 4/6] Update websocket_connection.hpp --- include/glaze/net/websocket_connection.hpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/glaze/net/websocket_connection.hpp b/include/glaze/net/websocket_connection.hpp index 693d3e1abc..cac50cfea4 100644 --- a/include/glaze/net/websocket_connection.hpp +++ b/include/glaze/net/websocket_connection.hpp @@ -1530,7 +1530,7 @@ namespace glz // async_wait already fired — the socket is readable. Do a synchronous // read into read_buf_ (per-connection) and process directly. ensure_read_buf_capacity(); - std::error_code fallback_ec; + asio::error_code fallback_ec; size_t n = 0; { std::lock_guard lock(socket_op_mutex_); @@ -1542,7 +1542,7 @@ namespace glz asio::buffer(read_buf_.data() + buf_len_, read_buf_.size() - buf_len_), fallback_ec); if (!fallback_ec && n == 0) { - fallback_ec = asio::error::make_error_code(asio::error::eof); + fallback_ec = asio::error::eof; } } if (fallback_ec) { @@ -1561,7 +1561,7 @@ namespace glz // Safe outside the lock: the shared buffer is server-owned (refcounted via // shared_ptr). The ASIO single-threaded callback guarantee means no other // callback on this thread touches the shared buffer concurrently. - std::error_code read_ec; + asio::error_code read_ec; size_t offset = buf_len_; if (offset > 0) { if (offset > buf.size()) { @@ -1587,7 +1587,7 @@ namespace glz // TCP graceful close (EOF). Synchronous read_some returns 0 with no // error — unlike async_read_some which surfaces error::eof. Without // this check, the connection would busy-loop. - read_ec = asio::error::make_error_code(asio::error::eof); + read_ec = asio::error::eof; } if (!read_ec) { offset += n; @@ -1599,7 +1599,7 @@ namespace glz static constexpr size_t max_ssl_drain_iterations = 4; size_t drain_count = 0; while (offset < buf.size() && drain_count < max_ssl_drain_iterations) { - std::error_code drain_ec; + asio::error_code drain_ec; auto m = socket->read_some( asio::buffer(buf.data() + offset, buf.size() - offset), drain_ec); From 9ede0304e0b47c4b3e436fc5c5e5ea796587ff14 Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 14:25:20 -0500 Subject: [PATCH 5/6] threading updates --- benchmarks/ws_benchmark/CMakeLists.txt | 4 +- benchmarks/ws_benchmark/ws_benchmark.cpp | 89 ++++++++++++++++++++-- include/glaze/net/http_server.hpp | 12 ++- include/glaze/net/websocket_connection.hpp | 6 +- 4 files changed, 97 insertions(+), 14 deletions(-) diff --git a/benchmarks/ws_benchmark/CMakeLists.txt b/benchmarks/ws_benchmark/CMakeLists.txt index f01158d5ca..ee6a1b8fa2 100644 --- a/benchmarks/ws_benchmark/CMakeLists.txt +++ b/benchmarks/ws_benchmark/CMakeLists.txt @@ -17,12 +17,12 @@ include(FetchContent) # Download uWebSockets and uSockets sources (Makefile projects, no CMakeLists.txt) FetchContent_Declare(uWebSockets GIT_REPOSITORY https://github.com/uNetworking/uWebSockets.git - GIT_TAG master + GIT_TAG v20.76.0 GIT_SHALLOW TRUE ) FetchContent_Declare(uSockets GIT_REPOSITORY https://github.com/uNetworking/uSockets.git - GIT_TAG master + GIT_TAG v0.8.8 GIT_SHALLOW TRUE ) diff --git a/benchmarks/ws_benchmark/ws_benchmark.cpp b/benchmarks/ws_benchmark/ws_benchmark.cpp index 023b199866..b3575a6fb4 100644 --- a/benchmarks/ws_benchmark/ws_benchmark.cpp +++ b/benchmarks/ws_benchmark/ws_benchmark.cpp @@ -185,8 +185,10 @@ struct glaze_ws_server glz::http_server<> server{}; std::shared_ptr ws; std::thread thread; + int server_threads_{}; - glaze_ws_server(uint16_t port) + glaze_ws_server(uint16_t port, int server_threads = 1) + : server_threads_(server_threads) { server.on_error([](std::error_code, std::source_location) {}); @@ -203,7 +205,7 @@ struct glaze_ws_server server.websocket("/ws", ws); server.bind("127.0.0.1", port); server.ws_recv_buffer_size(512 * 1024); // 512KB shared receive buffer per thread - thread = std::thread([this]() { server.start(1); }); + thread = std::thread([this, server_threads]() { server.start(server_threads); }); wait_for_server(port); } @@ -337,8 +339,10 @@ int main() std::string all_markdown; + std::printf("=== Single-threaded servers (1 server thread) ===\n\n"); + { - glaze_ws_server glz_srv(glaze_port); + glaze_ws_server glz_srv(glaze_port, 1); uws_server uws_srv(uws_port); validate_servers(glaze_port, uws_port); @@ -436,7 +440,7 @@ int main() // Concurrent Echo Small { bencher::stage stage; - stage.name = "Concurrent Echo Small (" + std::to_string(client_threads) + " clients x 1000 msg)"; + stage.name = "Concurrent Echo Small (" + std::to_string(client_threads) + " clients, 1 server thread)"; stage.throughput_units_label = "msg/s"; stage.throughput_units_divisor = 1; @@ -452,7 +456,7 @@ int main() { bencher::stage stage; stage.name = - "Concurrent Echo JSON 100 Users (" + std::to_string(client_threads) + " clients x 1000 msg)"; + "Concurrent Echo JSON 100 Users (" + std::to_string(client_threads) + " clients, 1 server thread)"; stage.throughput_units_label = "msg/s"; stage.throughput_units_divisor = 1; @@ -465,6 +469,81 @@ int main() } } + // ----------------------------------------------------------------------- + // Multi-threaded server benchmarks + // Glaze runs with server_threads == hw_threads; uWebSockets is single-threaded + // by design (one event loop). This tests Glaze's multi-threaded scaling. + // ----------------------------------------------------------------------- + + const int server_threads = client_threads; // match server threads to hardware concurrency + + std::printf("\n=== Multi-threaded Glaze server (%d server threads) vs single-threaded uWebSockets ===\n\n", + server_threads); + + { + static constexpr uint16_t glaze_mt_port = 19767; + static constexpr uint16_t uws_mt_port = 19768; + + glaze_ws_server glz_srv(glaze_mt_port, server_threads); + uws_server uws_srv(uws_mt_port); + + validate_servers(glaze_mt_port, uws_mt_port); + + // Multi-threaded Concurrent Echo Small + { + bencher::stage stage; + stage.name = "MT Concurrent Echo Small (" + std::to_string(client_threads) + " clients, Glaze " + + std::to_string(server_threads) + "T)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze (" + std::to_string(server_threads) + "T)", + [&] { return run_ws_concurrent(glaze_mt_port, small_payload, client_threads, 1000); }); + stage.run("uWebSockets (1T)", + [&] { return run_ws_concurrent(uws_mt_port, small_payload, client_threads, 1000); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_mt_concurrent_small.svg"); + } + + // Multi-threaded Concurrent Echo JSON 100 Users + { + bencher::stage stage; + stage.name = "MT Concurrent Echo JSON 100 Users (" + std::to_string(client_threads) + " clients, Glaze " + + std::to_string(server_threads) + "T)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze (" + std::to_string(server_threads) + "T)", + [&] { return run_ws_concurrent(glaze_mt_port, json_users, client_threads, 1000); }); + stage.run("uWebSockets (1T)", + [&] { return run_ws_concurrent(uws_mt_port, json_users, client_threads, 1000); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_mt_concurrent_json.svg"); + } + + // Multi-threaded Concurrent Echo Large (64KB) + { + bencher::stage stage; + stage.name = "MT Concurrent Echo Large 64KB (" + std::to_string(client_threads) + " clients, Glaze " + + std::to_string(server_threads) + "T)"; + stage.throughput_units_label = "msg/s"; + stage.throughput_units_divisor = 1; + + stage.run("Glaze (" + std::to_string(server_threads) + "T)", + [&] { return run_ws_concurrent(glaze_mt_port, large_payload, client_threads, 1000); }); + stage.run("uWebSockets (1T)", + [&] { return run_ws_concurrent(uws_mt_port, large_payload, client_threads, 1000); }); + + bencher::print_results(stage); + all_markdown += bencher::to_markdown(stage); + bencher::save_file(stage_bar_chart(stage), "ws_mt_concurrent_large.svg"); + } + } + bencher::save_file(all_markdown, "ws_benchmark.md"); return 0; diff --git a/include/glaze/net/http_server.hpp b/include/glaze/net/http_server.hpp index 0ad06c0ce6..fd91610ebf 100644 --- a/include/glaze/net/http_server.hpp +++ b/include/glaze/net/http_server.hpp @@ -789,15 +789,19 @@ namespace glz // Start worker threads (unless explicitly set to 0) if (io_context && actual_threads > 0) { // Thread index map: each thread registers its ID → buffer index at startup. - // Thread-safe: each thread writes its own unique entry before io_context::run() - // processes any connections. + // A mutex serializes the emplace calls (flat_map is not thread-safe for concurrent writes). + // After all threads have registered, the map is read-only — no synchronization needed. ws_thread_indices_ = std::make_shared(); ws_thread_indices_->reserve(actual_threads); + auto thread_map_mutex = std::make_shared(); threads.reserve(actual_threads); for (size_t i = 0; i < actual_threads; ++i) { - threads.emplace_back([this, i] { - ws_thread_indices_->emplace(std::this_thread::get_id(), i); + threads.emplace_back([this, i, thread_map_mutex] { + { + std::lock_guard lock(*thread_map_mutex); + ws_thread_indices_->emplace(std::this_thread::get_id(), i); + } io_context->run(); }); } diff --git a/include/glaze/net/websocket_connection.hpp b/include/glaze/net/websocket_connection.hpp index cac50cfea4..e3d495b935 100644 --- a/include/glaze/net/websocket_connection.hpp +++ b/include/glaze/net/websocket_connection.hpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include "glaze/containers/flat_map.hpp" #include // Optional OpenSSL support - detected at compile time @@ -261,8 +261,8 @@ namespace glz // Thread ID → buffer index map for per-server shared receive buffers. // Defined here (not in http_server.hpp) because websocket_connection references it. - // unordered_map for O(1) lookup — the map is built once at startup and read-only after. - using ws_thread_map = std::unordered_map; + // flat_map for cache-friendly O(log n) lookup — the map is built once at startup and read-only after. + using ws_thread_map = glz::flat_map; // Forward declarations template From 6cfa509fc6f00633de856eb43dd1af85125f4f20 Mon Sep 17 00:00:00 2001 From: Stephen Berry Date: Tue, 24 Mar 2026 14:36:25 -0500 Subject: [PATCH 6/6] Default to faster shared receive buffer --- benchmarks/ws_benchmark/ws_benchmark.cpp | 1 - include/glaze/net/http_server.hpp | 22 +++++++--------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/benchmarks/ws_benchmark/ws_benchmark.cpp b/benchmarks/ws_benchmark/ws_benchmark.cpp index b3575a6fb4..d259b75664 100644 --- a/benchmarks/ws_benchmark/ws_benchmark.cpp +++ b/benchmarks/ws_benchmark/ws_benchmark.cpp @@ -204,7 +204,6 @@ struct glaze_ws_server server.websocket("/ws", ws); server.bind("127.0.0.1", port); - server.ws_recv_buffer_size(512 * 1024); // 512KB shared receive buffer per thread thread = std::thread([this, server_threads]() { server.start(server_threads); }); wait_for_server(port); } diff --git a/include/glaze/net/http_server.hpp b/include/glaze/net/http_server.hpp index fd91610ebf..7186a83497 100644 --- a/include/glaze/net/http_server.hpp +++ b/include/glaze/net/http_server.hpp @@ -1215,19 +1215,11 @@ namespace glz /// @brief Set the shared WebSocket receive buffer size per thread. /// - /// When enabled (size > 0), all WebSocket connections on a given thread share a - /// single receive buffer, eliminating per-connection allocation and reducing the - /// number of async read operations for large messages. + /// All WebSocket connections on a given thread share a single receive buffer, + /// eliminating per-connection allocation. Set to 0 to disable (uses per-connection + /// buffers instead). Non-zero values below 16KB are clamped to 16KB. + /// Recommended: power-of-2 sizes (e.g., 256KB, 512KB). /// - /// @warning When shared receive buffers are enabled, the string_view passed to - /// on_message is ONLY valid for the duration of the callback. Retaining a reference - /// after the callback returns causes undefined behavior (silent data corruption). - /// Copy the data if you need it later. This is an opt-in performance mode — the - /// default (0) uses per-connection buffers where the view is valid until the next read. - /// - /// @param size Buffer size in bytes. 0 = disabled (default). Non-zero values - /// below 16KB are clamped to 16KB. Recommended: power-of-2 sizes - /// (e.g., 256KB, 512KB). /// @return Reference to this server for method chaining inline http_server& ws_recv_buffer_size(size_t size) { @@ -1438,9 +1430,9 @@ namespace glz std::unordered_map> websocket_handlers_; std::unordered_map> streaming_handlers_; - // WebSocket shared receive buffers (opt-in via ws_recv_buffer_size()). - // Default 0 = per-connection buffers. When > 0, allocates one buffer per thread. - size_t ws_recv_buffer_size_{0}; + // WebSocket shared receive buffers (one per thread). + // Set to 0 to disable (falls back to per-connection buffers). + size_t ws_recv_buffer_size_{512 * 1024}; std::shared_ptr>> ws_recv_buffers_; std::shared_ptr ws_thread_indices_;