From 4afbe1bc6a15f6ac0bdc086f63dda650a83cfbce Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Sat, 11 Apr 2026 11:25:00 +0530 Subject: [PATCH 1/6] execution: fix split scheduler preservation for late subscribers (P2300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When split(scheduler, sender) is used and the predecessor has already completed (predecessor_done == true) by the time add_continuation() is called by a new subscriber, the downstream completion signal was fired inline on whatever thread called add_continuation(). This violated the P2300 get_completion_scheduler contract, which requires that completions on the set_value and set_stopped signals are dispatched by the scheduler passed to split. The code itself acknowledged this with a TODO comment: // TODO: Should this preserve the scheduler? It does not // if we call set_* inline. Fix: * Add a virtual schedule_completion(continuation_type&&) method to shared_state with a default implementation that fires inline (preserving existing behaviour for the scheduler-free case). * Replace the two "fire inline" paths inside add_continuation (the predecessor_done fast-path and the lock-then-done path) with calls to schedule_completion, so all completion dispatch goes through a single overridable hook. * Add shared_state_scheduler — a new subclass of shared_state — that overrides schedule_completion to post the continuation through schedule(sched). The operation state is kept alive via a self-owning intrusive_ptr-based holder (mirroring the pattern in start_detached.hpp), so the async lifetime is correct regardless of how quickly the thread pool processes the work item. * Add a second constructor overload to split_sender for generic (non-run_loop) schedulers that allocates shared_state_scheduler instead of plain shared_state. * Add algorithm_split_scheduler unit test that covers: - Basic split with no scheduler (regression guard) - split with thread_pool_scheduler: late subscriber receives value on the pool, not inline - Multiple concurrent late subscribers all receive the value - ensure_started (eager submission) is unaffected No behavioural change for the scheduler-free split or the run_loop split; only the generic-scheduler path gains the new subclass. Signed-off-by: arpittkhandelwal --- .../hpx/execution/algorithms/split.hpp | 294 +++++++++++++++++- libs/core/execution/tests/unit/CMakeLists.txt | 1 + .../tests/unit/algorithm_split_scheduler.cpp | 186 +++++++++++ 3 files changed, 465 insertions(+), 16 deletions(-) create mode 100644 libs/core/execution/tests/unit/algorithm_split_scheduler.cpp diff --git a/libs/core/execution/include/hpx/execution/algorithms/split.hpp b/libs/core/execution/include/hpx/execution/algorithms/split.hpp index 440b894d5d58..7a4a8b3315dc 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/split.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/split.hpp @@ -283,6 +283,19 @@ namespace hpx::execution::experimental { } }; + // schedule_completion dispatches a stored continuation to + // the correct execution context. The base implementation fires + // the continuation inline (no scheduler attached). Subclasses + // override this to reroute through a specific scheduler, + // ensuring P2300 get_completion_scheduler guarantees hold for + // late-arriving subscribers (i.e. when predecessor_done is + // already true at the time add_continuation is called). + virtual void schedule_completion( + continuation_type&& continuation) + { + continuation(); + } + virtual void set_predecessor_done() { predecessor_done = true; @@ -325,7 +338,7 @@ namespace hpx::execution::experimental { if (!continuations.empty()) { - for (auto const& continuation : continuations) + for (auto& continuation : continuations) { continuation(); } @@ -345,13 +358,18 @@ namespace hpx::execution::experimental { // If we read predecessor_done here it means that one of // set_error/set_stopped/set_value has been called and // values/errors have been stored into the shared state. - // We can trigger the continuation directly. - // TODO: Should this preserve the scheduler? It does not - // if we call set_* inline. - hpx::visit( - done_error_value_visitor{ - HPX_FORWARD(Receiver, receiver)}, - v); + // We dispatch the completion through schedule_completion + // so that any attached scheduler is honoured, satisfying + // the P2300 get_completion_scheduler contract for late + // subscribers. + schedule_completion([this, + receiver = HPX_FORWARD(Receiver, + receiver)]() mutable { + hpx::visit( + done_error_value_visitor{ + HPX_MOVE(receiver)}, + v); + }); } else { @@ -364,13 +382,18 @@ namespace hpx::execution::experimental { { // By the time the lock has been taken, // predecessor_done might already be true and we can - // release the lock early and call the continuation - // directly again. + // release the lock early and dispatch through + // schedule_completion to honour the scheduler. l.unlock(); - hpx::visit( - done_error_value_visitor{ - HPX_FORWARD(Receiver, receiver)}, - v); + schedule_completion( + [this, + receiver = HPX_FORWARD( + Receiver, receiver)]() mutable { + hpx::visit( + done_error_value_visitor{ + HPX_MOVE(receiver)}, + v); + }); } else { @@ -380,7 +403,8 @@ namespace hpx::execution::experimental { // other threads may also try to add continuations // to the vector and the vector is not threadsafe in // itself. The continuation will be called later - // when set_error/set_stopped/set_value is called. + // when set_error/set_stopped/set_value is called + // (via set_predecessor_done). continuations.emplace_back( [this, receiver = HPX_FORWARD( @@ -454,9 +478,186 @@ namespace hpx::execution::experimental { } }; + // shared_state_scheduler wraps a generic Scheduler and overrides + // schedule_completion so that late-arriving subscribers receive + // their completion signal dispatched on the scheduler's execution + // context, preserving the P2300 get_completion_scheduler contract. + // + // Note: this is intentionally separate from shared_state_run_loop + // to avoid adding a run_loop dependency for general schedulers. + template + struct shared_state_scheduler : shared_state + { + // Store a decay-copy of the scheduler, matching the rule that + // tag_invoke(get_completion_scheduler_t, split_sender) + // returns an equivalent scheduler. + HPX_NO_UNIQUE_ADDRESS std::decay_t sched; + + // clang-format off + template >> + )> + // clang-format on + shared_state_scheduler(Sender_&& sender, + typename shared_state::allocator_type const& alloc, + Sched&& scheduler) + : shared_state(HPX_FORWARD(Sender_, sender), alloc) + , sched(HPX_FORWARD(Sched, scheduler)) + { + } + + ~shared_state_scheduler() override = default; + + // Dispatch the continuation through the scheduler so that the + // downstream receiver executes on the correct execution context + // even when predecessor_done is already true (late subscriber). + void schedule_completion( + typename shared_state::continuation_type&& continuation) + override + { + using continuation_type = + typename shared_state::continuation_type; + using base_alloc_type = + typename shared_state::allocator_type; + + // Self-owning holder using HPX's allocator — mirrors exactly + // how shared_state and operation_state_holder manage their + // own lifetimes via intrusive_ptr + allocator_traits. + struct schedule_op_holder; + + struct schedule_receiver + { + hpx::intrusive_ptr holder; + + friend void tag_invoke( + set_value_t, schedule_receiver&& r) noexcept + { + // Move continuation out first; releasing holder + // (and destroying it) must come after the + // continuation has been invoked. + auto h = HPX_MOVE(r.holder); + HPX_MOVE(h->cont)(); + // h now goes out of scope → intrusive_ptr_release + // → allocator destroy+deallocate. + } + + template + [[noreturn]] friend void tag_invoke( + set_error_t, schedule_receiver&&, Error&&) noexcept + { + // schedule() must not produce errors. + std::terminate(); + } + + friend void tag_invoke( + set_stopped_t, schedule_receiver&& r) noexcept + { + // Scheduler stopped: drop continuation silently. + r.holder.reset(); + } + + friend empty_env tag_invoke( + get_env_t, schedule_receiver const&) noexcept + { + return {}; + } + }; + + using schedule_sender_type = hpx::util::invoke_result_t< + hpx::execution::experimental::schedule_t, + std::decay_t&>; + using op_state_type = connect_result_t; + + // Use the same allocator that manages shared_state's own + // lifetime, rebound for schedule_op_holder. + struct schedule_op_holder + { + using holder_alloc_type = + typename std::allocator_traits:: + template rebind_alloc; + + continuation_type cont; + hpx::util::atomic_count ref_count{0}; + HPX_NO_UNIQUE_ADDRESS holder_alloc_type alloc; + op_state_type op_state; + + schedule_op_holder(continuation_type&& c, + std::decay_t& s, holder_alloc_type const& a) + : cont(HPX_MOVE(c)) + , alloc(a) + , op_state(hpx::execution::experimental::connect( + hpx::execution::experimental::schedule(s), + schedule_receiver{ + hpx::intrusive_ptr( + this)})) + { + } + + friend void intrusive_ptr_add_ref( + schedule_op_holder* p) noexcept + { + p->ref_count.increment(); + } + + friend void intrusive_ptr_release( + schedule_op_holder* p) noexcept + { + if (p->ref_count.decrement() == 0) + { + // Copy allocator out before destroying self, + // matching the pattern in shared_state. + std::atomic_thread_fence( + std::memory_order_acquire); + holder_alloc_type a(p->alloc); + std::allocator_traits< + holder_alloc_type>::destroy(a, p); + std::allocator_traits< + holder_alloc_type>::deallocate(a, p, 1); + } + } + }; + + using holder_alloc_type = + typename std::allocator_traits:: + template rebind_alloc; + using holder_alloc_traits = + std::allocator_traits; + using holder_unique_ptr = + std::unique_ptr>; + + // Construct the holder using the shared_state allocator. + // unique_ptr guards against leaks if construct() throws. + holder_alloc_type holder_alloc(this->alloc); + holder_unique_ptr p( + holder_alloc_traits::allocate(holder_alloc, 1), + hpx::util::allocator_deleter{ + holder_alloc}); + holder_alloc_traits::construct(holder_alloc, p.get(), + HPX_MOVE(continuation), sched, holder_alloc); + + // Keep an owning reference while start() executes so that + // a synchronous set_value() cannot destroy the holder + // before start() returns (which would dereference freed + // memory via the raw op_state pointer). + hpx::intrusive_ptr owner(p.release()); + hpx::execution::experimental::start(owner->op_state); + // owner goes out of scope here; if start() was synchronous + // the holder is destroyed now; otherwise schedule_receiver + // holds the last reference until set_value/set_stopped. + } + }; + hpx::intrusive_ptr state; - template + template > || + std::is_same_v, + run_loop_scheduler>>> split_sender(Sender_&& sender, Allocator const& allocator, Scheduler_&& scheduler = Scheduler_{}) : scheduler(HPX_FORWARD(Scheduler_, scheduler)) @@ -485,6 +686,41 @@ namespace hpx::execution::experimental { } } + // Constructor for a generic (non-run_loop) scheduler: creates + // shared_state_scheduler so that late-arriving subscribers have + // their completions dispatched on the scheduler's context. + // SFINAE ensures this overload only fires for schedulers that are + // not run_loop_scheduler (which has its own explicit overload below). + template > && + !std::is_same_v, run_loop_scheduler>>> + split_sender( + Sender_&& sender, Allocator const& allocator, Sched_&& sched) + : scheduler(HPX_FORWARD(Sched_, sched)) + { + using sched_shared_state = + shared_state_scheduler>; + using other_allocator = typename std::allocator_traits< + Allocator>::template rebind_alloc; + using allocator_traits = std::allocator_traits; + using unique_ptr = std::unique_ptr>; + + other_allocator alloc(allocator); + unique_ptr p(allocator_traits::allocate(alloc, 1), + hpx::util::allocator_deleter{alloc}); + + allocator_traits::construct(alloc, p.get(), + HPX_FORWARD(Sender_, sender), allocator, scheduler); + state = p.release(); + + if constexpr (Type == submission_type::eager) + { + state->start(); + } + } + template split_sender(Sender_&& sender, Allocator const& allocator, run_loop_scheduler const& sched) @@ -635,6 +871,32 @@ namespace hpx::execution::experimental { HPX_FORWARD(Sender, sender), allocator, sched}; } + // Scheduler-aware split for generic (non-run_loop) schedulers. + // Dispatches completions for late-arriving subscribers through the + // provided scheduler, satisfying the P2300 get_completion_scheduler + // contract. This overload is selected when passing a scheduler + // explicitly: tag_invoke(split, my_scheduler, sender, allocator). + // clang-format off + template , + HPX_CONCEPT_REQUIRES_( + hpx::execution::experimental::is_scheduler_v && + !std::is_same_v, + hpx::execution::experimental::run_loop_scheduler> && + hpx::execution::experimental::is_sender_v && + hpx::traits::is_allocator_v + )> + // clang-format on + friend constexpr HPX_FORCEINLINE auto tag_invoke(split_t, + Scheduler&& scheduler, Sender&& sender, + Allocator const& allocator = {}) + { + return detail::split_sender>{ + HPX_FORWARD(Sender, sender), allocator, + HPX_FORWARD(Scheduler, scheduler)}; + } + // clang-format off template , diff --git a/libs/core/execution/tests/unit/CMakeLists.txt b/libs/core/execution/tests/unit/CMakeLists.txt index 0e3d8fbe1361..de8afeea6601 100644 --- a/libs/core/execution/tests/unit/CMakeLists.txt +++ b/libs/core/execution/tests/unit/CMakeLists.txt @@ -17,6 +17,7 @@ set(tests algorithm_let_value algorithm_run_loop algorithm_split + algorithm_split_scheduler algorithm_start_detached algorithm_sync_wait algorithm_sync_wait_with_variant diff --git a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp new file mode 100644 index 000000000000..82800e239960 --- /dev/null +++ b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp @@ -0,0 +1,186 @@ +// Copyright (c) 2026 Arpit Khandelwal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Regression test for P2300 split scheduler preservation. +// +// When execution::split(scheduler, sender) is used and the predecessor sender +// has already completed by the time a new receiver connects (i.e. +// predecessor_done == true), the downstream completion must be dispatched +// through the scheduler's execution context, not fired inline on the thread +// that calls add_continuation. +// +// Before the fix, shared_state::add_continuation() had: +// // TODO: Should this preserve the scheduler? It does not if we call +// // set_* inline. +// hpx::visit(done_error_value_visitor{...}, v); +// +// After the fix, add_continuation() calls schedule_completion() which routes +// through the attached scheduler for scheduler-aware shared states, or fires +// inline (existing behaviour) when no scheduler is attached. + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace ex = hpx::execution::experimental; +namespace tt = hpx::this_thread::experimental; + +int hpx_main() +{ + // ----------------------------------------------------------------------- + // Test 1: basic split, no scheduler — values round-trip correctly. + // Regression guard: no behavioural change for the scheduler-free path. + // ----------------------------------------------------------------------- + { + auto s = ex::split(ex::just(42, std::string("hello"))); + + std::atomic count{0}; + + // First subscriber (predecessor not yet complete when connecting) + tt::sync_wait(ex::then(s, [&](int x, std::string const& msg) { + HPX_TEST_EQ(x, 42); + HPX_TEST_EQ(msg, std::string("hello")); + ++count; + })); + + // Second subscriber (predecessor_done == true) + tt::sync_wait(ex::then(s, [&](int x, std::string const& msg) { + HPX_TEST_EQ(x, 42); + HPX_TEST_EQ(msg, std::string("hello")); + ++count; + })); + + HPX_TEST_EQ(count.load(), 2); + } + + // ----------------------------------------------------------------------- + // Test 2: split with thread_pool_scheduler via pipeline syntax. + // + // The pipeline `sched | ex::split(just(100))` triggers + // tag_override_invoke which reads get_completion_scheduler + // from the sender, and routes to our new generic-scheduler tag_invoke on + // split_t. Late-arriving subscribers must get their completion on the + // thread pool, not inline on this thread. + // ----------------------------------------------------------------------- + { + ex::thread_pool_scheduler sched{}; + + // just(100) | transfer(sched) gives a sender whose completion + // scheduler is sched, so split picks up the scheduler automatically. + auto shared_s = ex::split(ex::transfer(ex::just(100), sched)); + + std::atomic call_count{0}; + + // First subscriber — drains, making predecessor_done = true. + tt::sync_wait(ex::then(shared_s, [&](int v) { + HPX_TEST_EQ(v, 100); + ++call_count; + })); + HPX_TEST_EQ(call_count.load(), 1); + + // Second subscriber — predecessor_done is now true. + // Before the fix: fires inline on THIS thread. + // After the fix: dispatched through thread_pool_scheduler. + tt::sync_wait(ex::then(shared_s, [&](int v) { + HPX_TEST_EQ(v, 100); + ++call_count; + })); + HPX_TEST_EQ(call_count.load(), 2); + } + + // ----------------------------------------------------------------------- + // Test 3: explicit scheduler overload: + // tag_invoke(split, scheduler, sender, allocator) + // This directly exercises the new split_t tag_invoke and the + // shared_state_scheduler constructor. + // ----------------------------------------------------------------------- + { + ex::thread_pool_scheduler sched{}; + + // Use tag_invoke directly to exercise the new overload. + auto shared_s = + hpx::functional::tag_invoke(ex::split, sched, ex::just(7)); + + std::atomic sum{0}; + + // Drain so predecessor_done becomes true. + tt::sync_wait(ex::then(shared_s, [&](int v) { sum += v; })); + HPX_TEST_EQ(sum.load(), 7); + + // Multiple late subscribers: all must receive the value. + constexpr int N = 4; + std::vector threads; + threads.reserve(N); + for (int i = 0; i < N; ++i) + { + threads.emplace_back([&] { + tt::sync_wait(ex::then(shared_s, [&](int v) { sum += v; })); + }); + } + for (auto& t : threads) + t.join(); + + HPX_TEST_EQ(sum.load(), (N + 1) * 7); + } + + // ----------------------------------------------------------------------- + // Test 4: ensure_started (eager submission_type) is unaffected. + // ----------------------------------------------------------------------- + { + std::atomic called{false}; + + auto s = ex::ensure_started(ex::just(std::string("eager"))); + + auto result = tt::sync_wait( + ex::then(HPX_MOVE(s), [&](std::string const& v) -> std::string { + HPX_TEST_EQ(v, std::string("eager")); + called = true; + return v; + })); + + HPX_TEST(result.has_value()); + HPX_TEST_EQ(hpx::get<0>(*result), std::string("eager")); + HPX_TEST(called.load()); + } + + // ----------------------------------------------------------------------- + // Test 5: error propagation through split — both subscribers see error. + // ----------------------------------------------------------------------- + { + auto s = ex::split(ex::just_error( + std::make_exception_ptr(std::runtime_error("oops")))); + + std::atomic error_count{0}; + + // First subscriber + tt::sync_wait(ex::let_error(s, [&](std::exception_ptr) { + ++error_count; + return ex::just(); + })); + // Second subscriber (predecessor already done with error) + tt::sync_wait(ex::let_error(s, [&](std::exception_ptr) { + ++error_count; + return ex::just(); + })); + + HPX_TEST_EQ(error_count.load(), 2); + } + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + return hpx::util::report_errors(); +} From 97300eefd77eeb104de2fb25b51830b207c24f7c Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Sat, 11 Apr 2026 18:25:40 +0530 Subject: [PATCH 2/6] execution: fix Clang-20 redeclaration and friend-function errors in split --- .../hpx/execution/algorithms/split.hpp | 238 +++++++----------- 1 file changed, 96 insertions(+), 142 deletions(-) diff --git a/libs/core/execution/include/hpx/execution/algorithms/split.hpp b/libs/core/execution/include/hpx/execution/algorithms/split.hpp index 7a4a8b3315dc..cc0b8fc1ed0e 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/split.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/split.hpp @@ -488,149 +488,122 @@ namespace hpx::execution::experimental { template struct shared_state_scheduler : shared_state { - // Store a decay-copy of the scheduler, matching the rule that - // tag_invoke(get_completion_scheduler_t, split_sender) - // returns an equivalent scheduler. HPX_NO_UNIQUE_ADDRESS std::decay_t sched; - // clang-format off - template >> - )> - // clang-format on - shared_state_scheduler(Sender_&& sender, - typename shared_state::allocator_type const& alloc, - Sched&& scheduler) - : shared_state(HPX_FORWARD(Sender_, sender), alloc) - , sched(HPX_FORWARD(Sched, scheduler)) - { - } + using continuation_type = + typename shared_state::continuation_type; + using base_alloc_type = typename shared_state::allocator_type; - ~shared_state_scheduler() override = default; + struct schedule_op_holder; - // Dispatch the continuation through the scheduler so that the - // downstream receiver executes on the correct execution context - // even when predecessor_done is already true (late subscriber). - void schedule_completion( - typename shared_state::continuation_type&& continuation) - override + struct schedule_receiver { - using continuation_type = - typename shared_state::continuation_type; - using base_alloc_type = - typename shared_state::allocator_type; + hpx::intrusive_ptr holder; - // Self-owning holder using HPX's allocator — mirrors exactly - // how shared_state and operation_state_holder manage their - // own lifetimes via intrusive_ptr + allocator_traits. - struct schedule_op_holder; + friend void tag_invoke( + set_value_t, schedule_receiver&& r) noexcept + { + auto h = HPX_MOVE(r.holder); + HPX_MOVE(h->cont)(); + } - struct schedule_receiver + template + [[noreturn]] friend void tag_invoke( + set_error_t, schedule_receiver&&, Error&&) noexcept { - hpx::intrusive_ptr holder; + std::terminate(); + } - friend void tag_invoke( - set_value_t, schedule_receiver&& r) noexcept - { - // Move continuation out first; releasing holder - // (and destroying it) must come after the - // continuation has been invoked. - auto h = HPX_MOVE(r.holder); - HPX_MOVE(h->cont)(); - // h now goes out of scope → intrusive_ptr_release - // → allocator destroy+deallocate. - } + friend void tag_invoke( + set_stopped_t, schedule_receiver&& r) noexcept + { + r.holder.reset(); + } - template - [[noreturn]] friend void tag_invoke( - set_error_t, schedule_receiver&&, Error&&) noexcept - { - // schedule() must not produce errors. - std::terminate(); - } + friend empty_env tag_invoke( + get_env_t, schedule_receiver const&) noexcept + { + return {}; + } + }; - friend void tag_invoke( - set_stopped_t, schedule_receiver&& r) noexcept - { - // Scheduler stopped: drop continuation silently. - r.holder.reset(); - } + using schedule_sender_type = hpx::util::invoke_result_t< + hpx::execution::experimental::schedule_t, + std::decay_t&>; + using op_state_type = connect_result_t; - friend empty_env tag_invoke( - get_env_t, schedule_receiver const&) noexcept - { - return {}; - } - }; + struct schedule_op_holder + { + using holder_alloc_type = + typename std::allocator_traits:: + template rebind_alloc; - using schedule_sender_type = hpx::util::invoke_result_t< - hpx::execution::experimental::schedule_t, - std::decay_t&>; - using op_state_type = connect_result_t; + continuation_type cont; + hpx::util::atomic_count ref_count{0}; + HPX_NO_UNIQUE_ADDRESS holder_alloc_type alloc; + op_state_type op_state; + + schedule_op_holder(continuation_type&& c, + std::decay_t& s, holder_alloc_type const& a) + : cont(HPX_MOVE(c)) + , alloc(a) + , op_state(hpx::execution::experimental::connect( + hpx::execution::experimental::schedule(s), + schedule_receiver{ + hpx::intrusive_ptr(this)})) + { + } - // Use the same allocator that manages shared_state's own - // lifetime, rebound for schedule_op_holder. - struct schedule_op_holder + friend void intrusive_ptr_add_ref( + schedule_op_holder* p) noexcept { - using holder_alloc_type = - typename std::allocator_traits:: - template rebind_alloc; - - continuation_type cont; - hpx::util::atomic_count ref_count{0}; - HPX_NO_UNIQUE_ADDRESS holder_alloc_type alloc; - op_state_type op_state; - - schedule_op_holder(continuation_type&& c, - std::decay_t& s, holder_alloc_type const& a) - : cont(HPX_MOVE(c)) - , alloc(a) - , op_state(hpx::execution::experimental::connect( - hpx::execution::experimental::schedule(s), - schedule_receiver{ - hpx::intrusive_ptr( - this)})) - { - } + p->ref_count.increment(); + } - friend void intrusive_ptr_add_ref( - schedule_op_holder* p) noexcept + friend void intrusive_ptr_release( + schedule_op_holder* p) noexcept + { + if (p->ref_count.decrement() == 0) { - p->ref_count.increment(); + std::atomic_thread_fence(std::memory_order_acquire); + holder_alloc_type a(p->alloc); + std::allocator_traits< + holder_alloc_type>::destroy(a, p); + std::allocator_traits< + holder_alloc_type>::deallocate(a, p, 1); } + } + }; - friend void intrusive_ptr_release( - schedule_op_holder* p) noexcept - { - if (p->ref_count.decrement() == 0) - { - // Copy allocator out before destroying self, - // matching the pattern in shared_state. - std::atomic_thread_fence( - std::memory_order_acquire); - holder_alloc_type a(p->alloc); - std::allocator_traits< - holder_alloc_type>::destroy(a, p); - std::allocator_traits< - holder_alloc_type>::deallocate(a, p, 1); - } - } - }; + // clang-format off + template >> + )> + // clang-format on + shared_state_scheduler(Sender_&& sender, + typename shared_state::allocator_type const& alloc, + std::decay_t scheduler_) + : shared_state(HPX_FORWARD(Sender_, sender), alloc) + , sched(HPX_MOVE(scheduler_)) + { + } + ~shared_state_scheduler() override = default; + + void schedule_completion( + continuation_type&& continuation) override + { using holder_alloc_type = - typename std::allocator_traits:: - template rebind_alloc; + typename schedule_op_holder::holder_alloc_type; using holder_alloc_traits = std::allocator_traits; using holder_unique_ptr = std::unique_ptr>; - // Construct the holder using the shared_state allocator. - // unique_ptr guards against leaks if construct() throws. holder_alloc_type holder_alloc(this->alloc); holder_unique_ptr p( holder_alloc_traits::allocate(holder_alloc, 1), @@ -639,15 +612,8 @@ namespace hpx::execution::experimental { holder_alloc_traits::construct(holder_alloc, p.get(), HPX_MOVE(continuation), sched, holder_alloc); - // Keep an owning reference while start() executes so that - // a synchronous set_value() cannot destroy the holder - // before start() returns (which would dereference freed - // memory via the raw op_state pointer). hpx::intrusive_ptr owner(p.release()); hpx::execution::experimental::start(owner->op_state); - // owner goes out of scope here; if start() was synchronous - // the holder is destroyed now; otherwise schedule_receiver - // holds the last reference until set_value/set_stopped. } }; @@ -662,9 +628,8 @@ namespace hpx::execution::experimental { Scheduler_&& scheduler = Scheduler_{}) : scheduler(HPX_FORWARD(Scheduler_, scheduler)) { - using allocator_type = Allocator; using other_allocator = typename std::allocator_traits< - allocator_type>::template rebind_alloc; + Allocator>::template rebind_alloc; using allocator_traits = std::allocator_traits; using unique_ptr = std::unique_ptr>; @@ -677,24 +642,18 @@ namespace hpx::execution::experimental { alloc, p.get(), HPX_FORWARD(Sender_, sender), allocator); state = p.release(); - // Eager submission means that we start the predecessor - // operation state already when creating the sender. We don't - // wait for another receiver to be connected. if constexpr (Type == submission_type::eager) { state->start(); } } - // Constructor for a generic (non-run_loop) scheduler: creates - // shared_state_scheduler so that late-arriving subscribers have - // their completions dispatched on the scheduler's context. - // SFINAE ensures this overload only fires for schedulers that are - // not run_loop_scheduler (which has its own explicit overload below). template > && - !std::is_same_v, run_loop_scheduler>>> + !std::is_same_v, + run_loop_scheduler>, + int> = 0> split_sender( Sender_&& sender, Allocator const& allocator, Sched_&& sched) : scheduler(HPX_FORWARD(Sched_, sched)) @@ -726,10 +685,8 @@ namespace hpx::execution::experimental { run_loop_scheduler const& sched) : scheduler(sched) { - using allocator_type = Allocator; - using other_allocator = - typename std::allocator_traits:: - template rebind_alloc; + using other_allocator = typename std::allocator_traits< + Allocator>::template rebind_alloc; using allocator_traits = std::allocator_traits; using unique_ptr = std::unique_ptr>; @@ -743,9 +700,6 @@ namespace hpx::execution::experimental { sched.get_run_loop()); state = p.release(); - // Eager submission means that we start the predecessor - // operation state already when creating the sender. We don't - // wait for another receiver to be connected. if constexpr (Type == submission_type::eager) { state->start(); From a8dc458df139b1ffa0d17e49b8a1cf27e8205ba5 Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Thu, 30 Apr 2026 00:53:48 +0530 Subject: [PATCH 3/6] Fix split test compile errors and clang-format --- .../tests/unit/algorithm_split_scheduler.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp index 82800e239960..6f4782b09415 100644 --- a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp +++ b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp @@ -26,9 +26,12 @@ #include #include +#include "algorithm_test_utils.hpp" + #include #include #include +#include #include namespace ex = hpx::execution::experimental; @@ -41,19 +44,23 @@ int hpx_main() // Regression guard: no behavioural change for the scheduler-free path. // ----------------------------------------------------------------------- { - auto s = ex::split(ex::just(42, std::string("hello"))); + auto s = ex::split(ex::just(std::make_tuple(42, std::string("hello")))); std::atomic count{0}; // First subscriber (predecessor not yet complete when connecting) - tt::sync_wait(ex::then(s, [&](int x, std::string const& msg) { + tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { + int x = std::get<0>(tuple_val); + std::string const& msg = std::get<1>(tuple_val); HPX_TEST_EQ(x, 42); HPX_TEST_EQ(msg, std::string("hello")); ++count; })); // Second subscriber (predecessor_done == true) - tt::sync_wait(ex::then(s, [&](int x, std::string const& msg) { + tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { + int x = std::get<0>(tuple_val); + std::string const& msg = std::get<1>(tuple_val); HPX_TEST_EQ(x, 42); HPX_TEST_EQ(msg, std::string("hello")); ++count; @@ -156,8 +163,7 @@ int hpx_main() // Test 5: error propagation through split — both subscribers see error. // ----------------------------------------------------------------------- { - auto s = ex::split(ex::just_error( - std::make_exception_ptr(std::runtime_error("oops")))); + auto s = ex::split(error_sender{}); std::atomic error_count{0}; From c0f2bb3132090628ad109e2f0d0214f62ffd9af0 Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Thu, 30 Apr 2026 00:57:31 +0530 Subject: [PATCH 4/6] Apply clang-format to split.hpp --- .../include/hpx/execution/algorithms/split.hpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libs/core/execution/include/hpx/execution/algorithms/split.hpp b/libs/core/execution/include/hpx/execution/algorithms/split.hpp index cc0b8fc1ed0e..dd6e0dd0aabe 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/split.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/split.hpp @@ -530,8 +530,8 @@ namespace hpx::execution::experimental { using schedule_sender_type = hpx::util::invoke_result_t< hpx::execution::experimental::schedule_t, std::decay_t&>; - using op_state_type = connect_result_t; + using op_state_type = + connect_result_t; struct schedule_op_holder { @@ -568,8 +568,8 @@ namespace hpx::execution::experimental { { std::atomic_thread_fence(std::memory_order_acquire); holder_alloc_type a(p->alloc); - std::allocator_traits< - holder_alloc_type>::destroy(a, p); + std::allocator_traits::destroy( + a, p); std::allocator_traits< holder_alloc_type>::deallocate(a, p, 1); } @@ -649,8 +649,7 @@ namespace hpx::execution::experimental { } template > && + std::enable_if_t> && !std::is_same_v, run_loop_scheduler>, int> = 0> From 5253d68f88a866398f86a1acdcffa5e4db88e1af Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Thu, 30 Apr 2026 01:08:42 +0530 Subject: [PATCH 5/6] Address Copilot feedback on split and tests --- .../hpx/execution/algorithms/split.hpp | 2 +- .../tests/unit/algorithm_split_scheduler.cpp | 31 ++++++++++++------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/libs/core/execution/include/hpx/execution/algorithms/split.hpp b/libs/core/execution/include/hpx/execution/algorithms/split.hpp index dd6e0dd0aabe..849fd6503a8c 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/split.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/split.hpp @@ -517,7 +517,7 @@ namespace hpx::execution::experimental { friend void tag_invoke( set_stopped_t, schedule_receiver&& r) noexcept { - r.holder.reset(); + auto h = HPX_MOVE(r.holder); } friend empty_env tag_invoke( diff --git a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp index 6f4782b09415..7940269262d7 100644 --- a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp +++ b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp @@ -70,19 +70,25 @@ int hpx_main() } // ----------------------------------------------------------------------- - // Test 2: split with thread_pool_scheduler via pipeline syntax. + // Test 2: split with a custom scheduler using a scheduler-aware sender. // - // The pipeline `sched | ex::split(just(100))` triggers - // tag_override_invoke which reads get_completion_scheduler - // from the sender, and routes to our new generic-scheduler tag_invoke on - // split_t. Late-arriving subscribers must get their completion on the - // thread pool, not inline on this thread. + // This test uses `ex::split(ex::transfer(ex::just(100), sched))`. + // `ex::transfer` attaches `sched` as the completion scheduler of the + // predecessor sender, and `ex::split` must preserve that scheduler when a + // late subscriber connects after the predecessor has already completed. + // Late-arriving subscribers must get their completion on the custom scheduler, + // not inline on this thread. // ----------------------------------------------------------------------- { - ex::thread_pool_scheduler sched{}; - - // just(100) | transfer(sched) gives a sender whose completion - // scheduler is sched, so split picks up the scheduler automatically. + std::atomic schedule_called{false}; + std::atomic execute_called{false}; + std::atomic tag_invoke_overload_called{false}; + example_scheduler sched{ + schedule_called, execute_called, tag_invoke_overload_called}; + + // `ex::transfer(ex::just(100), sched)` gives a sender whose completion + // scheduler is `sched`, so `ex::split` picks up the scheduler + // automatically. auto shared_s = ex::split(ex::transfer(ex::just(100), sched)); std::atomic call_count{0}; @@ -94,14 +100,17 @@ int hpx_main() })); HPX_TEST_EQ(call_count.load(), 1); + schedule_called = false; + // Second subscriber — predecessor_done is now true. // Before the fix: fires inline on THIS thread. - // After the fix: dispatched through thread_pool_scheduler. + // After the fix: dispatched through the custom scheduler. tt::sync_wait(ex::then(shared_s, [&](int v) { HPX_TEST_EQ(v, 100); ++call_count; })); HPX_TEST_EQ(call_count.load(), 2); + HPX_TEST(schedule_called.load()); } // ----------------------------------------------------------------------- From 3b7a3c0388a2422733cdf324b0e9f47601bbee30 Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Thu, 30 Apr 2026 06:41:24 +0530 Subject: [PATCH 6/6] Fix clang-21 C++20 modules ADL issues with get and emplace --- .../include/hpx/execution/algorithms/let_error.hpp | 2 +- .../core/execution/tests/unit/algorithm_split_scheduler.cpp | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp b/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp index 9d07b49750e7..11d3c98767c8 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp @@ -265,7 +265,7 @@ namespace hpx::execution::experimental { // TODO: receiver is moved before the visit, but // the invoke inside the visit may throw. r.op_state.predecessor_error - .template emplace( + .template emplace>( HPX_FORWARD(Error, error)); hpx::visit( diff --git a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp index 7940269262d7..9bbe0157bf09 100644 --- a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp +++ b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp @@ -50,8 +50,7 @@ int hpx_main() // First subscriber (predecessor not yet complete when connecting) tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { - int x = std::get<0>(tuple_val); - std::string const& msg = std::get<1>(tuple_val); + auto const& [x, msg] = tuple_val; HPX_TEST_EQ(x, 42); HPX_TEST_EQ(msg, std::string("hello")); ++count; @@ -59,8 +58,7 @@ int hpx_main() // Second subscriber (predecessor_done == true) tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { - int x = std::get<0>(tuple_val); - std::string const& msg = std::get<1>(tuple_val); + auto const& [x, msg] = tuple_val; HPX_TEST_EQ(x, 42); HPX_TEST_EQ(msg, std::string("hello")); ++count;