diff --git a/test/conformance/conformance_arena_constraints.cpp b/test/conformance/conformance_arena_constraints.cpp index f38d39cdea..e258a52767 100644 --- a/test/conformance/conformance_arena_constraints.cpp +++ b/test/conformance/conformance_arena_constraints.cpp @@ -101,55 +101,70 @@ TEST_CASE("Test create_numa_task_arenas conformance correctness") { } } -struct join_arena_observer : tbb::task_scheduler_observer { - join_arena_observer(tbb::task_arena &ta, int max_workers, int max_external_threads) - : tbb::task_scheduler_observer(ta) - , max_num_workers(max_workers), max_num_external_threads(max_external_threads) - { - observe(true); - } +//! Test that reserved slots parameter makes expected effect on task_arena objects +//! \brief \ref interface \ref error_guessing +TEST_CASE("Test reserved slots argument in create_numa_task_arenas") { + // The testing approach can be described as: + // - For every created NUMA-bound arena there are tasks enqueued into it, which wait on the + // barrier. + // - The barrier waits for a number of comers equal to arena concurrency + 1. + // - Among the comers there are worker and external threads whose numbers are adjusted in + // accordance to the test setup, which includes process affinity mask, NUMA node concurrency, + // cgroup's CPU limits set, number of reserved slots. + // - Test waits for worker threads to join first. Giving them an opportunity to take even the + // reserved slots. + // - Remaining slots are taken by external threads started in a separate thread, which is joined + // once the main external thread checks itself on the barrier without joining the arena. + // - After execution, the expected number of participated worker and external threads is checked. + + struct join_arena_observer : tbb::task_scheduler_observer { + join_arena_observer(tbb::task_arena &ta, int max_workers, int max_external_threads) + : tbb::task_scheduler_observer(ta) + , max_num_workers(max_workers), max_num_external_threads(max_external_threads) + { + observe(true); + } - void on_scheduler_entry(bool is_worker) override { - int current; - int expected_peak; - if (is_worker) { - // TODO: Adapt utils::ConcurrencyTracker for its reuse here and in the else branch below - current = num_workers.fetch_add(1, std::memory_order_relaxed) + 1; - expected_peak = current - 1; - while (current > expected_peak && - !peak_workers.compare_exchange_strong( - expected_peak, current, std::memory_order_relaxed)) {} - - REQUIRE_MESSAGE(current <= max_num_workers, "More than expected worker threads has joined arena"); - } else { - current = num_external_threads.fetch_add(1, std::memory_order_relaxed) + 1; - expected_peak = current - 1; - while (current > expected_peak && - !peak_external_threads.compare_exchange_strong( - expected_peak, current, std::memory_order_relaxed)) {} - REQUIRE_MESSAGE(current <= max_num_external_threads, "More than expected external threads has joined arena"); + void on_scheduler_entry(bool is_worker) override { + int current; + int expected_peak; + if (is_worker) { + // TODO: Adapt utils::ConcurrencyTracker for its reuse here and in the else branch below + current = num_workers.fetch_add(1, std::memory_order_relaxed) + 1; + expected_peak = current - 1; + while (current > expected_peak && + !peak_workers.compare_exchange_strong( + expected_peak, current, std::memory_order_relaxed)) {} + + REQUIRE_MESSAGE(current <= max_num_workers, + "More than expected worker threads has joined arena"); + } else { + current = num_external_threads.fetch_add(1, std::memory_order_relaxed) + 1; + expected_peak = current - 1; + while (current > expected_peak && + !peak_external_threads.compare_exchange_strong( + expected_peak, current, std::memory_order_relaxed)) {} + REQUIRE_MESSAGE(current <= max_num_external_threads, + "More than expected external threads has joined arena"); + } } - } - void on_scheduler_exit(bool is_worker) override { - if (is_worker) { - num_workers.fetch_sub(1, std::memory_order_relaxed); - } else { - num_external_threads.fetch_sub(1, std::memory_order_relaxed); - } - } + void on_scheduler_exit(bool is_worker) override { + if (is_worker) { + num_workers.fetch_sub(1, std::memory_order_relaxed); + } else { + num_external_threads.fetch_sub(1, std::memory_order_relaxed); + } + } - const int max_num_workers; - const int max_num_external_threads; - std::atomic_int num_workers{}; - std::atomic_int num_external_threads{}; - std::atomic_int peak_workers{}; - std::atomic_int peak_external_threads{}; -}; + const int max_num_workers; + const int max_num_external_threads; + std::atomic_int num_workers{}; + std::atomic_int num_external_threads{}; + std::atomic_int peak_workers{}; + std::atomic_int peak_external_threads{}; + }; -//! Test that reserved slots parameter makes expected effect on task_arena objects -//! \brief \ref interface \ref error_guessing -TEST_CASE("Test reserved slots argument in create_numa_task_arenas") { system_info::initialize(); std::vector numa_nodes_info = system_info::get_numa_nodes_info(); int expected_numa_concurrency = @@ -157,56 +172,56 @@ TEST_CASE("Test reserved slots argument in create_numa_task_arenas") { [](const index_info &lhs, const index_info &rhs) { return lhs.concurrency < rhs.concurrency; })->concurrency; - const int max_threads = int(utils::get_platform_max_threads()); // respects cgroup - const int max_tbb_workers = max_threads - 1; + + // Having the only NUMA node means that the default number of workers is equal to concurrency of + // that single NUMA node - 1. Thus, for zero reserved slots in an arena bound to that NUMA node, + // workers won't be able to fully saturate it. Likewise, there can be other constraints imposed + // on the library (e.g., Linux cgroups) that impact the default number of workers instantiated. + // So, the test infers the maximum possible number of workers and adjust the test expectations + // accordingly. + const int max_workers = int(utils::get_platform_max_threads()) - 1; + for (int reserved_slots = 0; reserved_slots <= expected_numa_concurrency; ++reserved_slots) { auto numa_task_arenas = tbb::create_numa_task_arenas({}, reserved_slots); tbb::task_group tg{}; - // Having only NUMA node means that the default total number of workers is equal to - // concurrnecy of the single NUMA - 1. This means that for task_arena with reserved_slots=0 - // worker threads won't be able to fully saturate the arena. - // This flag is set to adjust test expectations accordingly. - bool workers_cannot_fully_occupy_arena = max_tbb_workers < expected_numa_concurrency && reserved_slots == 0; for (auto& ta : numa_task_arenas) { // For the case when task_arena is created with both max_concurrency and reserved_slots // equal to 1 oneTBB creates a special additional worker to execute an "enqueue" task. // That may temporarily increase max_concurrency of task_arena to 2 instead of 1, hence // we read max_concurrency during that enqueued task execution. int ta_concurrency; - ta.enqueue([&ta_concurrency] { ta_concurrency = tbb::this_task_arena::max_concurrency(); }, tg); + ta.enqueue( + [&ta_concurrency] { ta_concurrency = tbb::this_task_arena::max_concurrency(); }, tg + ); ta.wait_for(tg); - int max_num_workers = ta_concurrency - - std::min(ta_concurrency, reserved_slots) - - int(workers_cannot_fully_occupy_arena); + const int num_workers = std::min(std::max(0, ta_concurrency - reserved_slots), + max_workers); + const int num_external_threads = ta_concurrency - num_workers; - int max_num_external_threads = std::min(ta_concurrency, reserved_slots); - int num_tasks = ta_concurrency - int(workers_cannot_fully_occupy_arena); + join_arena_observer observer{ta, num_workers, num_external_threads}; - join_arena_observer observer {ta, max_num_workers, max_num_external_threads}; - utils::SpinBarrier barrier{(std::size_t)num_tasks + std::size_t(!reserved_slots)}; - for (int w = 0; w < num_tasks; ++w) { + utils::SpinBarrier barrier{(std::size_t)ta_concurrency + /*for main thread*/1}; + for (int w = 0; w < ta_concurrency; ++w) { // Enqueue a task for each arena slot ta.enqueue([&barrier] { barrier.wait(); }, tg); } - // Waiting a bit to give workers an opportunity to occupy more arena slots than - // are dedicated to workers. Thus, stressing the expectation that workers cannot occupy + // Waiting a bit to give workers an opportunity to occupy more arena slots than are + // dedicated to workers. Thus, stressing the expectation that workers cannot occupy // reserved slots. - if (reserved_slots > 0 && max_num_workers > 0) - std::this_thread::sleep_for(std::chrono::milliseconds{1}); + std::this_thread::sleep_for(std::chrono::milliseconds{1}); - utils::NativeParallelFor(reserved_slots, - [&ta, &tg] (int) { ta.wait_for(tg); }); - - if (!reserved_slots) { - barrier.wait(); - } - - REQUIRE(observer.peak_workers.load(std::memory_order_relaxed) == max_num_workers); - REQUIRE(observer.peak_external_threads.load(std::memory_order_relaxed) == max_num_external_threads); + std::thread t([&num_external_threads, &ta, &tg] { + utils::NativeParallelFor(num_external_threads, [&ta, &tg](int) { ta.wait_for(tg); }); + }); + barrier.wait(); + t.join(); observer.observe(false); ta.wait_for(tg); + + REQUIRE(observer.peak_workers == num_workers); + REQUIRE(observer.peak_external_threads == num_external_threads); } } }