diff --git a/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp b/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp index 9d07b49750e7..11d3c98767c8 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp @@ -265,7 +265,7 @@ namespace hpx::execution::experimental { // TODO: receiver is moved before the visit, but // the invoke inside the visit may throw. r.op_state.predecessor_error - .template emplace( + .template emplace>( HPX_FORWARD(Error, error)); hpx::visit( diff --git a/libs/core/execution/include/hpx/execution/algorithms/split.hpp b/libs/core/execution/include/hpx/execution/algorithms/split.hpp index 440b894d5d58..849fd6503a8c 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/split.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/split.hpp @@ -283,6 +283,19 @@ namespace hpx::execution::experimental { } }; + // schedule_completion dispatches a stored continuation to + // the correct execution context. The base implementation fires + // the continuation inline (no scheduler attached). Subclasses + // override this to reroute through a specific scheduler, + // ensuring P2300 get_completion_scheduler guarantees hold for + // late-arriving subscribers (i.e. when predecessor_done is + // already true at the time add_continuation is called). + virtual void schedule_completion( + continuation_type&& continuation) + { + continuation(); + } + virtual void set_predecessor_done() { predecessor_done = true; @@ -325,7 +338,7 @@ namespace hpx::execution::experimental { if (!continuations.empty()) { - for (auto const& continuation : continuations) + for (auto& continuation : continuations) { continuation(); } @@ -345,13 +358,18 @@ namespace hpx::execution::experimental { // If we read predecessor_done here it means that one of // set_error/set_stopped/set_value has been called and // values/errors have been stored into the shared state. - // We can trigger the continuation directly. - // TODO: Should this preserve the scheduler? It does not - // if we call set_* inline. - hpx::visit( - done_error_value_visitor{ - HPX_FORWARD(Receiver, receiver)}, - v); + // We dispatch the completion through schedule_completion + // so that any attached scheduler is honoured, satisfying + // the P2300 get_completion_scheduler contract for late + // subscribers. + schedule_completion([this, + receiver = HPX_FORWARD(Receiver, + receiver)]() mutable { + hpx::visit( + done_error_value_visitor{ + HPX_MOVE(receiver)}, + v); + }); } else { @@ -364,13 +382,18 @@ namespace hpx::execution::experimental { { // By the time the lock has been taken, // predecessor_done might already be true and we can - // release the lock early and call the continuation - // directly again. + // release the lock early and dispatch through + // schedule_completion to honour the scheduler. l.unlock(); - hpx::visit( - done_error_value_visitor{ - HPX_FORWARD(Receiver, receiver)}, - v); + schedule_completion( + [this, + receiver = HPX_FORWARD( + Receiver, receiver)]() mutable { + hpx::visit( + done_error_value_visitor{ + HPX_MOVE(receiver)}, + v); + }); } else { @@ -380,7 +403,8 @@ namespace hpx::execution::experimental { // other threads may also try to add continuations // to the vector and the vector is not threadsafe in // itself. The continuation will be called later - // when set_error/set_stopped/set_value is called. + // when set_error/set_stopped/set_value is called + // (via set_predecessor_done). continuations.emplace_back( [this, receiver = HPX_FORWARD( @@ -454,16 +478,158 @@ namespace hpx::execution::experimental { } }; + // shared_state_scheduler wraps a generic Scheduler and overrides + // schedule_completion so that late-arriving subscribers receive + // their completion signal dispatched on the scheduler's execution + // context, preserving the P2300 get_completion_scheduler contract. + // + // Note: this is intentionally separate from shared_state_run_loop + // to avoid adding a run_loop dependency for general schedulers. + template + struct shared_state_scheduler : shared_state + { + HPX_NO_UNIQUE_ADDRESS std::decay_t sched; + + using continuation_type = + typename shared_state::continuation_type; + using base_alloc_type = typename shared_state::allocator_type; + + struct schedule_op_holder; + + struct schedule_receiver + { + hpx::intrusive_ptr holder; + + friend void tag_invoke( + set_value_t, schedule_receiver&& r) noexcept + { + auto h = HPX_MOVE(r.holder); + HPX_MOVE(h->cont)(); + } + + template + [[noreturn]] friend void tag_invoke( + set_error_t, schedule_receiver&&, Error&&) noexcept + { + std::terminate(); + } + + friend void tag_invoke( + set_stopped_t, schedule_receiver&& r) noexcept + { + auto h = HPX_MOVE(r.holder); + } + + friend empty_env tag_invoke( + get_env_t, schedule_receiver const&) noexcept + { + return {}; + } + }; + + using schedule_sender_type = hpx::util::invoke_result_t< + hpx::execution::experimental::schedule_t, + std::decay_t&>; + using op_state_type = + connect_result_t; + + struct schedule_op_holder + { + using holder_alloc_type = + typename std::allocator_traits:: + template rebind_alloc; + + continuation_type cont; + hpx::util::atomic_count ref_count{0}; + HPX_NO_UNIQUE_ADDRESS holder_alloc_type alloc; + op_state_type op_state; + + schedule_op_holder(continuation_type&& c, + std::decay_t& s, holder_alloc_type const& a) + : cont(HPX_MOVE(c)) + , alloc(a) + , op_state(hpx::execution::experimental::connect( + hpx::execution::experimental::schedule(s), + schedule_receiver{ + hpx::intrusive_ptr(this)})) + { + } + + friend void intrusive_ptr_add_ref( + schedule_op_holder* p) noexcept + { + p->ref_count.increment(); + } + + friend void intrusive_ptr_release( + schedule_op_holder* p) noexcept + { + if (p->ref_count.decrement() == 0) + { + std::atomic_thread_fence(std::memory_order_acquire); + holder_alloc_type a(p->alloc); + std::allocator_traits::destroy( + a, p); + std::allocator_traits< + holder_alloc_type>::deallocate(a, p, 1); + } + } + }; + + // clang-format off + template >> + )> + // clang-format on + shared_state_scheduler(Sender_&& sender, + typename shared_state::allocator_type const& alloc, + std::decay_t scheduler_) + : shared_state(HPX_FORWARD(Sender_, sender), alloc) + , sched(HPX_MOVE(scheduler_)) + { + } + + ~shared_state_scheduler() override = default; + + void schedule_completion( + continuation_type&& continuation) override + { + using holder_alloc_type = + typename schedule_op_holder::holder_alloc_type; + using holder_alloc_traits = + std::allocator_traits; + using holder_unique_ptr = + std::unique_ptr>; + + holder_alloc_type holder_alloc(this->alloc); + holder_unique_ptr p( + holder_alloc_traits::allocate(holder_alloc, 1), + hpx::util::allocator_deleter{ + holder_alloc}); + holder_alloc_traits::construct(holder_alloc, p.get(), + HPX_MOVE(continuation), sched, holder_alloc); + + hpx::intrusive_ptr owner(p.release()); + hpx::execution::experimental::start(owner->op_state); + } + }; + hpx::intrusive_ptr state; - template + template > || + std::is_same_v, + run_loop_scheduler>>> split_sender(Sender_&& sender, Allocator const& allocator, Scheduler_&& scheduler = Scheduler_{}) : scheduler(HPX_FORWARD(Scheduler_, scheduler)) { - using allocator_type = Allocator; using other_allocator = typename std::allocator_traits< - allocator_type>::template rebind_alloc; + Allocator>::template rebind_alloc; using allocator_traits = std::allocator_traits; using unique_ptr = std::unique_ptr>; @@ -476,9 +642,37 @@ namespace hpx::execution::experimental { alloc, p.get(), HPX_FORWARD(Sender_, sender), allocator); state = p.release(); - // Eager submission means that we start the predecessor - // operation state already when creating the sender. We don't - // wait for another receiver to be connected. + if constexpr (Type == submission_type::eager) + { + state->start(); + } + } + + template > && + !std::is_same_v, + run_loop_scheduler>, + int> = 0> + split_sender( + Sender_&& sender, Allocator const& allocator, Sched_&& sched) + : scheduler(HPX_FORWARD(Sched_, sched)) + { + using sched_shared_state = + shared_state_scheduler>; + using other_allocator = typename std::allocator_traits< + Allocator>::template rebind_alloc; + using allocator_traits = std::allocator_traits; + using unique_ptr = std::unique_ptr>; + + other_allocator alloc(allocator); + unique_ptr p(allocator_traits::allocate(alloc, 1), + hpx::util::allocator_deleter{alloc}); + + allocator_traits::construct(alloc, p.get(), + HPX_FORWARD(Sender_, sender), allocator, scheduler); + state = p.release(); + if constexpr (Type == submission_type::eager) { state->start(); @@ -490,10 +684,8 @@ namespace hpx::execution::experimental { run_loop_scheduler const& sched) : scheduler(sched) { - using allocator_type = Allocator; - using other_allocator = - typename std::allocator_traits:: - template rebind_alloc; + using other_allocator = typename std::allocator_traits< + Allocator>::template rebind_alloc; using allocator_traits = std::allocator_traits; using unique_ptr = std::unique_ptr>; @@ -507,9 +699,6 @@ namespace hpx::execution::experimental { sched.get_run_loop()); state = p.release(); - // Eager submission means that we start the predecessor - // operation state already when creating the sender. We don't - // wait for another receiver to be connected. if constexpr (Type == submission_type::eager) { state->start(); @@ -635,6 +824,32 @@ namespace hpx::execution::experimental { HPX_FORWARD(Sender, sender), allocator, sched}; } + // Scheduler-aware split for generic (non-run_loop) schedulers. + // Dispatches completions for late-arriving subscribers through the + // provided scheduler, satisfying the P2300 get_completion_scheduler + // contract. This overload is selected when passing a scheduler + // explicitly: tag_invoke(split, my_scheduler, sender, allocator). + // clang-format off + template , + HPX_CONCEPT_REQUIRES_( + hpx::execution::experimental::is_scheduler_v && + !std::is_same_v, + hpx::execution::experimental::run_loop_scheduler> && + hpx::execution::experimental::is_sender_v && + hpx::traits::is_allocator_v + )> + // clang-format on + friend constexpr HPX_FORCEINLINE auto tag_invoke(split_t, + Scheduler&& scheduler, Sender&& sender, + Allocator const& allocator = {}) + { + return detail::split_sender>{ + HPX_FORWARD(Sender, sender), allocator, + HPX_FORWARD(Scheduler, scheduler)}; + } + // clang-format off template , diff --git a/libs/core/execution/tests/unit/CMakeLists.txt b/libs/core/execution/tests/unit/CMakeLists.txt index 0e3d8fbe1361..de8afeea6601 100644 --- a/libs/core/execution/tests/unit/CMakeLists.txt +++ b/libs/core/execution/tests/unit/CMakeLists.txt @@ -17,6 +17,7 @@ set(tests algorithm_let_value algorithm_run_loop algorithm_split + algorithm_split_scheduler algorithm_start_detached algorithm_sync_wait algorithm_sync_wait_with_variant diff --git a/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp new file mode 100644 index 000000000000..9bbe0157bf09 --- /dev/null +++ b/libs/core/execution/tests/unit/algorithm_split_scheduler.cpp @@ -0,0 +1,199 @@ +// Copyright (c) 2026 Arpit Khandelwal +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Regression test for P2300 split scheduler preservation. +// +// When execution::split(scheduler, sender) is used and the predecessor sender +// has already completed by the time a new receiver connects (i.e. +// predecessor_done == true), the downstream completion must be dispatched +// through the scheduler's execution context, not fired inline on the thread +// that calls add_continuation. +// +// Before the fix, shared_state::add_continuation() had: +// // TODO: Should this preserve the scheduler? It does not if we call +// // set_* inline. +// hpx::visit(done_error_value_visitor{...}, v); +// +// After the fix, add_continuation() calls schedule_completion() which routes +// through the attached scheduler for scheduler-aware shared states, or fires +// inline (existing behaviour) when no scheduler is attached. + +#include +#include +#include +#include + +#include "algorithm_test_utils.hpp" + +#include +#include +#include +#include +#include + +namespace ex = hpx::execution::experimental; +namespace tt = hpx::this_thread::experimental; + +int hpx_main() +{ + // ----------------------------------------------------------------------- + // Test 1: basic split, no scheduler — values round-trip correctly. + // Regression guard: no behavioural change for the scheduler-free path. + // ----------------------------------------------------------------------- + { + auto s = ex::split(ex::just(std::make_tuple(42, std::string("hello")))); + + std::atomic count{0}; + + // First subscriber (predecessor not yet complete when connecting) + tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { + auto const& [x, msg] = tuple_val; + HPX_TEST_EQ(x, 42); + HPX_TEST_EQ(msg, std::string("hello")); + ++count; + })); + + // Second subscriber (predecessor_done == true) + tt::sync_wait(ex::then(s, [&](auto const& tuple_val) { + auto const& [x, msg] = tuple_val; + HPX_TEST_EQ(x, 42); + HPX_TEST_EQ(msg, std::string("hello")); + ++count; + })); + + HPX_TEST_EQ(count.load(), 2); + } + + // ----------------------------------------------------------------------- + // Test 2: split with a custom scheduler using a scheduler-aware sender. + // + // This test uses `ex::split(ex::transfer(ex::just(100), sched))`. + // `ex::transfer` attaches `sched` as the completion scheduler of the + // predecessor sender, and `ex::split` must preserve that scheduler when a + // late subscriber connects after the predecessor has already completed. + // Late-arriving subscribers must get their completion on the custom scheduler, + // not inline on this thread. + // ----------------------------------------------------------------------- + { + std::atomic schedule_called{false}; + std::atomic execute_called{false}; + std::atomic tag_invoke_overload_called{false}; + example_scheduler sched{ + schedule_called, execute_called, tag_invoke_overload_called}; + + // `ex::transfer(ex::just(100), sched)` gives a sender whose completion + // scheduler is `sched`, so `ex::split` picks up the scheduler + // automatically. + auto shared_s = ex::split(ex::transfer(ex::just(100), sched)); + + std::atomic call_count{0}; + + // First subscriber — drains, making predecessor_done = true. + tt::sync_wait(ex::then(shared_s, [&](int v) { + HPX_TEST_EQ(v, 100); + ++call_count; + })); + HPX_TEST_EQ(call_count.load(), 1); + + schedule_called = false; + + // Second subscriber — predecessor_done is now true. + // Before the fix: fires inline on THIS thread. + // After the fix: dispatched through the custom scheduler. + tt::sync_wait(ex::then(shared_s, [&](int v) { + HPX_TEST_EQ(v, 100); + ++call_count; + })); + HPX_TEST_EQ(call_count.load(), 2); + HPX_TEST(schedule_called.load()); + } + + // ----------------------------------------------------------------------- + // Test 3: explicit scheduler overload: + // tag_invoke(split, scheduler, sender, allocator) + // This directly exercises the new split_t tag_invoke and the + // shared_state_scheduler constructor. + // ----------------------------------------------------------------------- + { + ex::thread_pool_scheduler sched{}; + + // Use tag_invoke directly to exercise the new overload. + auto shared_s = + hpx::functional::tag_invoke(ex::split, sched, ex::just(7)); + + std::atomic sum{0}; + + // Drain so predecessor_done becomes true. + tt::sync_wait(ex::then(shared_s, [&](int v) { sum += v; })); + HPX_TEST_EQ(sum.load(), 7); + + // Multiple late subscribers: all must receive the value. + constexpr int N = 4; + std::vector threads; + threads.reserve(N); + for (int i = 0; i < N; ++i) + { + threads.emplace_back([&] { + tt::sync_wait(ex::then(shared_s, [&](int v) { sum += v; })); + }); + } + for (auto& t : threads) + t.join(); + + HPX_TEST_EQ(sum.load(), (N + 1) * 7); + } + + // ----------------------------------------------------------------------- + // Test 4: ensure_started (eager submission_type) is unaffected. + // ----------------------------------------------------------------------- + { + std::atomic called{false}; + + auto s = ex::ensure_started(ex::just(std::string("eager"))); + + auto result = tt::sync_wait( + ex::then(HPX_MOVE(s), [&](std::string const& v) -> std::string { + HPX_TEST_EQ(v, std::string("eager")); + called = true; + return v; + })); + + HPX_TEST(result.has_value()); + HPX_TEST_EQ(hpx::get<0>(*result), std::string("eager")); + HPX_TEST(called.load()); + } + + // ----------------------------------------------------------------------- + // Test 5: error propagation through split — both subscribers see error. + // ----------------------------------------------------------------------- + { + auto s = ex::split(error_sender{}); + + std::atomic error_count{0}; + + // First subscriber + tt::sync_wait(ex::let_error(s, [&](std::exception_ptr) { + ++error_count; + return ex::just(); + })); + // Second subscriber (predecessor already done with error) + tt::sync_wait(ex::let_error(s, [&](std::exception_ptr) { + ++error_count; + return ex::just(); + })); + + HPX_TEST_EQ(error_count.load(), 2); + } + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + return hpx::util::report_errors(); +}