Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 88 additions & 73 deletions test/conformance/conformance_arena_constraints.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,112 +101,127 @@ TEST_CASE("Test create_numa_task_arenas conformance correctness") {
}
}

struct join_arena_observer : tbb::task_scheduler_observer {
join_arena_observer(tbb::task_arena &ta, int max_workers, int max_external_threads)
: tbb::task_scheduler_observer(ta)
, max_num_workers(max_workers), max_num_external_threads(max_external_threads)
{
observe(true);
}
//! Test that reserved slots parameter makes expected effect on task_arena objects
//! \brief \ref interface \ref error_guessing
TEST_CASE("Test reserved slots argument in create_numa_task_arenas") {
// The testing approach can be described as:
// - For every created NUMA-bound arena there are tasks enqueued into it, which wait on the
// barrier.
// - The barrier waits for a number of comers equal to arena concurrency + 1.
// - Among the comers there are worker and external threads whose numbers are adjusted in
// accordance to the test setup, which includes process affinity mask, NUMA node concurrency,
// cgroup's CPU limits set, number of reserved slots.
// - Test waits for worker threads to join first. Giving them an opportunity to take even the
// reserved slots.
// - Remaining slots are taken by external threads started in a separate thread, which is joined
// once the main external thread checks itself on the barrier without joining the arena.
// - After execution, the expected number of participated worker and external threads is checked.

struct join_arena_observer : tbb::task_scheduler_observer {
join_arena_observer(tbb::task_arena &ta, int max_workers, int max_external_threads)
: tbb::task_scheduler_observer(ta)
, max_num_workers(max_workers), max_num_external_threads(max_external_threads)
{
observe(true);
}

void on_scheduler_entry(bool is_worker) override {
int current;
int expected_peak;
if (is_worker) {
// TODO: Adapt utils::ConcurrencyTracker for its reuse here and in the else branch below
current = num_workers.fetch_add(1, std::memory_order_relaxed) + 1;
expected_peak = current - 1;
while (current > expected_peak &&
!peak_workers.compare_exchange_strong(
expected_peak, current, std::memory_order_relaxed)) {}

REQUIRE_MESSAGE(current <= max_num_workers, "More than expected worker threads has joined arena");
} else {
current = num_external_threads.fetch_add(1, std::memory_order_relaxed) + 1;
expected_peak = current - 1;
while (current > expected_peak &&
!peak_external_threads.compare_exchange_strong(
expected_peak, current, std::memory_order_relaxed)) {}
REQUIRE_MESSAGE(current <= max_num_external_threads, "More than expected external threads has joined arena");
void on_scheduler_entry(bool is_worker) override {
int current;
int expected_peak;
if (is_worker) {
// TODO: Adapt utils::ConcurrencyTracker for its reuse here and in the else branch below
current = num_workers.fetch_add(1, std::memory_order_relaxed) + 1;
expected_peak = current - 1;
while (current > expected_peak &&
!peak_workers.compare_exchange_strong(
expected_peak, current, std::memory_order_relaxed)) {}

REQUIRE_MESSAGE(current <= max_num_workers,
"More than expected worker threads has joined arena");
} else {
current = num_external_threads.fetch_add(1, std::memory_order_relaxed) + 1;
expected_peak = current - 1;
while (current > expected_peak &&
!peak_external_threads.compare_exchange_strong(
expected_peak, current, std::memory_order_relaxed)) {}
REQUIRE_MESSAGE(current <= max_num_external_threads,
"More than expected external threads has joined arena");
}
}
}

void on_scheduler_exit(bool is_worker) override {
if (is_worker) {
num_workers.fetch_sub(1, std::memory_order_relaxed);
} else {
num_external_threads.fetch_sub(1, std::memory_order_relaxed);
}
}
void on_scheduler_exit(bool is_worker) override {
if (is_worker) {
num_workers.fetch_sub(1, std::memory_order_relaxed);
} else {
num_external_threads.fetch_sub(1, std::memory_order_relaxed);
}
}

const int max_num_workers;
const int max_num_external_threads;
std::atomic_int num_workers{};
std::atomic_int num_external_threads{};
std::atomic_int peak_workers{};
std::atomic_int peak_external_threads{};
};
const int max_num_workers;
const int max_num_external_threads;
std::atomic_int num_workers{};
std::atomic_int num_external_threads{};
std::atomic_int peak_workers{};
std::atomic_int peak_external_threads{};
};

//! Test that reserved slots parameter makes expected effect on task_arena objects
//! \brief \ref interface \ref error_guessing
TEST_CASE("Test reserved slots argument in create_numa_task_arenas") {
system_info::initialize();
std::vector<index_info> numa_nodes_info = system_info::get_numa_nodes_info();
int expected_numa_concurrency =
std::max_element(numa_nodes_info.begin(), numa_nodes_info.end(),
[](const index_info &lhs, const index_info &rhs) {
return lhs.concurrency < rhs.concurrency;
})->concurrency;
const int max_threads = int(utils::get_platform_max_threads()); // respects cgroup
const int max_tbb_workers = max_threads - 1;

// Having the only NUMA node means that the default number of workers is equal to concurrency of
// that single NUMA node - 1. Thus, for zero reserved slots in an arena bound to that NUMA node,
// workers won't be able to fully saturate it. Likewise, there can be other constraints imposed
// on the library (e.g., Linux cgroups) that impact the default number of workers instantiated.
// So, the test infers the maximum possible number of workers and adjust the test expectations
// accordingly.
const int max_workers = int(utils::get_platform_max_threads()) - 1;

for (int reserved_slots = 0; reserved_slots <= expected_numa_concurrency; ++reserved_slots) {
auto numa_task_arenas = tbb::create_numa_task_arenas({}, reserved_slots);
tbb::task_group tg{};
// Having only NUMA node means that the default total number of workers is equal to
// concurrnecy of the single NUMA - 1. This means that for task_arena with reserved_slots=0
// worker threads won't be able to fully saturate the arena.
// This flag is set to adjust test expectations accordingly.
bool workers_cannot_fully_occupy_arena = max_tbb_workers < expected_numa_concurrency && reserved_slots == 0;
for (auto& ta : numa_task_arenas) {
// For the case when task_arena is created with both max_concurrency and reserved_slots
// equal to 1 oneTBB creates a special additional worker to execute an "enqueue" task.
// That may temporarily increase max_concurrency of task_arena to 2 instead of 1, hence
// we read max_concurrency during that enqueued task execution.
int ta_concurrency;
ta.enqueue([&ta_concurrency] { ta_concurrency = tbb::this_task_arena::max_concurrency(); }, tg);
ta.enqueue(
[&ta_concurrency] { ta_concurrency = tbb::this_task_arena::max_concurrency(); }, tg
);
ta.wait_for(tg);

int max_num_workers = ta_concurrency -
std::min(ta_concurrency, reserved_slots) -
int(workers_cannot_fully_occupy_arena);
const int num_workers = std::min(std::max(0, ta_concurrency - reserved_slots),
max_workers);
const int num_external_threads = ta_concurrency - num_workers;

int max_num_external_threads = std::min(ta_concurrency, reserved_slots);
int num_tasks = ta_concurrency - int(workers_cannot_fully_occupy_arena);
join_arena_observer observer{ta, num_workers, num_external_threads};

join_arena_observer observer {ta, max_num_workers, max_num_external_threads};
utils::SpinBarrier barrier{(std::size_t)num_tasks + std::size_t(!reserved_slots)};
for (int w = 0; w < num_tasks; ++w) {
utils::SpinBarrier barrier{(std::size_t)ta_concurrency + /*for main thread*/1};
for (int w = 0; w < ta_concurrency; ++w) { // Enqueue a task for each arena slot
ta.enqueue([&barrier] { barrier.wait(); }, tg);
}

// Waiting a bit to give workers an opportunity to occupy more arena slots than
// are dedicated to workers. Thus, stressing the expectation that workers cannot occupy
// Waiting a bit to give workers an opportunity to occupy more arena slots than are
// dedicated to workers. Thus, stressing the expectation that workers cannot occupy
// reserved slots.
if (reserved_slots > 0 && max_num_workers > 0)
std::this_thread::sleep_for(std::chrono::milliseconds{1});
std::this_thread::sleep_for(std::chrono::milliseconds{1});

utils::NativeParallelFor(reserved_slots,
[&ta, &tg] (int) { ta.wait_for(tg); });

if (!reserved_slots) {
barrier.wait();
}

REQUIRE(observer.peak_workers.load(std::memory_order_relaxed) == max_num_workers);
REQUIRE(observer.peak_external_threads.load(std::memory_order_relaxed) == max_num_external_threads);
std::thread t([&num_external_threads, &ta, &tg] {
utils::NativeParallelFor(num_external_threads, [&ta, &tg](int) { ta.wait_for(tg); });
});
Comment on lines +214 to +216
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity,

Suggested change
std::thread t([&num_external_threads, &ta, &tg] {
utils::NativeParallelFor(num_external_threads, [&ta, &tg](int) { ta.wait_for(tg); });
});
std::thread t([num_external_threads, &ta, &tg] {
utils::NativeParallelFor(num_external_threads, [&ta, &tg](int) { ta.wait_for(tg); });
});

barrier.wait();
t.join();

observer.observe(false);
ta.wait_for(tg);

REQUIRE(observer.peak_workers == num_workers);
REQUIRE(observer.peak_external_threads == num_external_threads);
}
}
}
Expand Down
Loading