From a4c06deec3db902488145a488b5b8f53650087dc Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Wed, 7 Jan 2026 05:19:47 -0800 Subject: [PATCH 1/8] init commit for replace threader_for --- .../src/externals/core_threading_win_dll.cpp | 12 ++------- cpp/daal/src/threading/threading.cpp | 24 +---------------- cpp/daal/src/threading/threading.h | 26 ++----------------- .../backend/cpu/apply_weights_cpu.cpp | 2 +- cpp/oneapi/dal/backend/interop/threading.cpp | 10 ++----- cpp/oneapi/dal/detail/threading.hpp | 19 +++----------- .../io/csv/detail/read_graph_kernel_impl.hpp | 26 +++++++++---------- .../backend/convert/copy_convert_impl_cpu.cpp | 2 +- 8 files changed, 26 insertions(+), 95 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index 174d47977d7..a6152544845 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -97,8 +97,7 @@ FARPROC load_daal_thr_func(const char * ordinal) typedef void * (*_threaded_malloc_t)(const size_t, const size_t); typedef void (*_threaded_free_t)(void *); -typedef void (*_daal_threader_for_t)(int, int, const void *, daal::functype); -typedef void (*_daal_threader_for_int64_t)(int64_t, const void *, daal::functype_int64); +typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); typedef void (*_daal_threader_for_int32ptr_t)(const int *, const int *, const void *, daal::functype_int32ptr); typedef void (*_daal_threader_for_simple_t)(int, int, const void *, daal::functype); typedef void (*_daal_static_threader_for_t)(size_t, const void *, daal::functype_static); @@ -179,7 +178,7 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) _threaded_free_ptr(ptr); } -DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_t _daal_threader_for_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for"); @@ -202,13 +201,6 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, _daal_threader_for_int32ptr_ptr(begin, end, a, func); } -DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func) -{ - load_daal_thr_dll(); - static _daal_threader_for_int64_t _daal_threader_for_int64_ptr = (_daal_threader_for_int64_t)load_daal_thr_func("_daal_threader_for_int64"); - _daal_threader_for_int64_ptr(n, a, func); -} - DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func) { load_daal_thr_dll(); diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index cd409c30bb7..65d68834742 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -102,29 +102,7 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo return 1; } -DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::functype func) -{ - if (daal::threader_env()->getNumberOfThreads() > 1) - { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } - }); - } - else - { - int i; - for (i = 0; i < n; i++) - { - func(i, a); - } - } -} - -DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func) +DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index f0e23ed8777..39dc325ea1d 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -87,10 +87,9 @@ extern "C" { DAAL_EXPORT int _daal_threader_get_max_threads(); DAAL_EXPORT int _daal_threader_get_current_thread_index(); - DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func); + DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer); DAAL_EXPORT void _daal_static_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer); - DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::functype_int64 func); DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); @@ -266,34 +265,13 @@ inline void threader_func_break(int i, bool & needBreak, const void * a) /// @param[in] reserved Parameter reserved for the future. Currently unused. /// @param[in] func Callable object that defines the loop body. template -inline void threader_for(int n, int reserved, const F & func) +inline void threader_for(int64_t n, int64_t reserved, const F & func) { const void * a = static_cast(&func); _daal_threader_for(n, reserved, a, threader_func); } -/// Pass a function to be executed in a for loop to the threading layer. -/// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`. -/// The default scheduling of the threading layer is used to assign -/// the iterations of the loop to threads. -/// The iterations of the loop should be logically independent. -/// Data dependencies between the iterations are allowed, but may requre the use -/// of synchronization primitives. -/// -/// @tparam F Callable object of type `[/* captures */](int64_t i) -> void`, -/// where `i` is the loop's iteration index, `0 <= i < n`. -/// -/// @param[in] n Number of iterations in the for loop. -/// @param[in] func Callable object that defines the loop body. -template -inline void threader_for_int64(int64_t n, const F & func) -{ - const void * a = static_cast(&func); - - _daal_threader_for_int64(n, a, threader_func); -} - /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is 2^31 - 1. /// diff --git a/cpp/oneapi/dal/algo/basic_statistics/backend/cpu/apply_weights_cpu.cpp b/cpp/oneapi/dal/algo/basic_statistics/backend/cpu/apply_weights_cpu.cpp index a6c51aeae1a..84d7e9c6faa 100644 --- a/cpp/oneapi/dal/algo/basic_statistics/backend/cpu/apply_weights_cpu.cpp +++ b/cpp/oneapi/dal/algo/basic_statistics/backend/cpu/apply_weights_cpu.cpp @@ -74,7 +74,7 @@ void apply_weights(const pr::ndview& weights, pr::ndview& sa const bk::uniform_blocking blocking(r_count, threading_block); const auto block_count = blocking.get_block_count(); - de::threader_for_int64(block_count, [&](std::int64_t b) -> void { + de::threader_for(block_count, block_count, [&](std::int64_t b) -> void { const auto f_row = blocking.get_block_start_index(b); const auto l_row = blocking.get_block_end_index(b); diff --git a/cpp/oneapi/dal/backend/interop/threading.cpp b/cpp/oneapi/dal/backend/interop/threading.cpp index 01281665e12..13d972e9c64 100644 --- a/cpp/oneapi/dal/backend/interop/threading.cpp +++ b/cpp/oneapi/dal/backend/interop/threading.cpp @@ -25,19 +25,13 @@ ONEDAL_EXPORT int _onedal_threader_get_current_thread_index() { return _daal_threader_get_current_thread_index(); } -ONEDAL_EXPORT void _onedal_threader_for(std::int32_t n, - std::int32_t threads_request, +ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n, + std::int64_t threads_request, const void *a, oneapi::dal::preview::functype func) { _daal_threader_for(n, threads_request, a, static_cast(func)); } -ONEDAL_EXPORT void _onedal_threader_for_int64(std::int64_t n, - const void *a, - oneapi::dal::preview::functype_int64 func) { - _daal_threader_for_int64(n, a, static_cast(func)); -} - ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n, std::int32_t threads_request, const void *a, diff --git a/cpp/oneapi/dal/detail/threading.hpp b/cpp/oneapi/dal/detail/threading.hpp index 22101caf805..e34037f9433 100644 --- a/cpp/oneapi/dal/detail/threading.hpp +++ b/cpp/oneapi/dal/detail/threading.hpp @@ -55,15 +55,11 @@ ONEDAL_EXPORT int _onedal_threader_get_max_threads(); ONEDAL_EXPORT int _onedal_threader_get_current_thread_index(); -ONEDAL_EXPORT void _onedal_threader_for(std::int32_t n, - std::int32_t threads_request, +ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n, + std::int64_t threads_request, const void *a, oneapi::dal::preview::functype func); -ONEDAL_EXPORT void _onedal_threader_for_int64(std::int64_t n, - const void *a, - oneapi::dal::preview::functype_int64 func); - ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n, std::int32_t threads_request, const void *a, @@ -155,21 +151,14 @@ inline void threader_func_blocked_size(std::size_t f, std::size_t l, const void } template -inline ONEDAL_EXPORT void threader_for(std::int32_t n, - std::int32_t threads_request, +inline ONEDAL_EXPORT void threader_for(std::int64_t n, + std::int64_t threads_request, const F &lambda) { const void *a = static_cast(&lambda); _onedal_threader_for(n, threads_request, a, threader_func); } -template -inline ONEDAL_EXPORT void threader_for_int64(std::int64_t n, const F &lambda) { - const void *a = static_cast(&lambda); - - _onedal_threader_for_int64(n, a, threader_func_int64); -} - template inline ONEDAL_EXPORT void threader_for_simple(std::int32_t n, std::int32_t threads_request, diff --git a/cpp/oneapi/dal/io/csv/detail/read_graph_kernel_impl.hpp b/cpp/oneapi/dal/io/csv/detail/read_graph_kernel_impl.hpp index 38586074c3d..32ed7484ab5 100644 --- a/cpp/oneapi/dal/io/csv/detail/read_graph_kernel_impl.hpp +++ b/cpp/oneapi/dal/io/csv/detail/read_graph_kernel_impl.hpp @@ -157,7 +157,7 @@ template struct collect_degrees_from_edge_list { template auto operator()(const EdgeList &edges, AtomicType *degrees_cv) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { ++degrees_cv[std::get<0>(edges[u])]; ++degrees_cv[std::get<1>(edges[u])]; }); @@ -168,7 +168,7 @@ template struct collect_degrees_from_edge_list { template auto operator()(const EdgeList &edges, AtomicType *degrees_cv) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { ++degrees_cv[std::get<0>(edges[u])]; }); } @@ -227,7 +227,7 @@ ONEDAL_EXPORT std::int64_t compute_prefix_sum( template void fill_from_atomics(Index *arr, AtomicIndex *atomic_arr, std::int64_t elements_count) { - dal::detail::threader_for_int64(elements_count, [&](std::int64_t n) { + dal::detail::threader_for(elements_count, elements_count, [&](std::int64_t n) { arr[n] = atomic_arr[n].load(); }); } @@ -241,7 +241,7 @@ struct fill_unfiltered_neighs { auto operator()(const edge_list &edges, AtomicEdge *rows_vec_atomic, Vertex *unfiltered_neighs) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { unfiltered_neighs[++rows_vec_atomic[edges[u].first] - 1] = edges[u].second; unfiltered_neighs[++rows_vec_atomic[edges[u].second] - 1] = edges[u].first; }); @@ -251,7 +251,7 @@ struct fill_unfiltered_neighs { auto operator()(const weighted_edge_list &edges, AtomicEdge *rows_vec_atomic, std::pair *unfiltered_neighs_vals) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { unfiltered_neighs_vals[++rows_vec_atomic[std::get<0>(edges[u])] - 1] = std::make_pair(std::get<1>(edges[u]), std::get<2>(edges[u])); unfiltered_neighs_vals[++rows_vec_atomic[std::get<1>(edges[u])] - 1] = @@ -266,7 +266,7 @@ struct fill_unfiltered_neighs { auto operator()(const edge_list &edges, AtomicEdge *rows_vec_atomic, Vertex *unfiltered_neighs) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { unfiltered_neighs[++rows_vec_atomic[edges[u].first] - 1] = edges[u].second; }); } @@ -275,7 +275,7 @@ struct fill_unfiltered_neighs { auto operator()(const weighted_edge_list &edges, AtomicEdge *rows_vec_atomic, std::pair *unfiltered_neighs_vals) { - dal::detail::threader_for_int64(edges.size(), [&](std::int64_t u) { + dal::detail::threader_for(edges.size(), edges.size(), [&](std::int64_t u) { unfiltered_neighs_vals[++rows_vec_atomic[std::get<0>(edges[u])] - 1] = std::make_pair(std::get<1>(edges[u]), std::get<2>(edges[u])); }); @@ -289,7 +289,7 @@ void fill_filtered_neighs(const EdgeIndex *unfiltered_offsets, const EdgeIndex *filtered_offsets, VertexIndex *filtered_neighs, std::int64_t vertex_count) { - dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) { auto u_neighs = filtered_neighs + filtered_offsets[u]; auto u_neighs_unf = unfiltered_neighs + unfiltered_offsets[u]; for (VertexIndex i = 0; i < filtered_degrees[u]; i++) { @@ -306,7 +306,7 @@ void fill_filtered_neighs(const Edge *unfiltered_offsets, Vertex *filtered_neighs, Weight *filtered_vals, std::int64_t vertex_count) { - dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) { auto u_neighs = filtered_neighs + filtered_offsets[u]; auto u_neighs_vals = filtered_vals + filtered_offsets[u]; auto u_neighs_unf = unfiltered_neighs_vals + unfiltered_offsets[u]; @@ -332,7 +332,7 @@ void filter_neighbors_and_fill_new_degrees(VertexIndex *unfiltered_neighs, VertexIndex *new_degrees, std::int64_t vertex_count) { //removing self-loops, multiple edges from graph, and make neighbors in CSR sorted - dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) { auto start_p = unfiltered_neighs + unfiltered_offsets[u]; auto end_p = unfiltered_neighs + unfiltered_offsets[u + 1]; @@ -350,7 +350,7 @@ void filter_neighbors_and_fill_new_degrees(std::pair *unfiltered Vertex *new_degrees, std::int64_t vertex_count) { //removing self-loops, multiple edges from graph, and make neighbors in CSR sorted - dal::detail::threader_for_int64(vertex_count, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) { auto start_p = unfiltered_neighs_vals + unfiltered_offsets[u]; auto end_p = unfiltered_neighs_vals + unfiltered_offsets[u + 1]; @@ -478,7 +478,7 @@ void convert_to_csr_impl(const edge_list::vertex_ty auto edge_offsets_tup = ra_vertex_edge.template allocate_array(vertex_count + 1); auto rows_vertex = std::get<1>(edge_offsets_tup); - dal::detail::threader_for_int64(vertex_count + 1, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count + 1, vertex_count + 1, [&](std::int64_t u) { rows_vertex[u] = static_cast(edge_offsets_data[u]); }); @@ -601,7 +601,7 @@ void convert_to_csr_impl( auto edge_offsets_tup = ra_vertex_edge.template allocate_array(vertex_count + 1); auto rows_vertex = std::get<1>(edge_offsets_tup); - dal::detail::threader_for_int64(vertex_count + 1, [&](std::int64_t u) { + dal::detail::threader_for(vertex_count + 1, vertex_count + 1, [&](std::int64_t u) { rows_vertex[u] = static_cast(edge_offsets_data[u]); }); diff --git a/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp b/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp index 35984b346bf..f252c2150d4 100644 --- a/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp +++ b/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp @@ -167,7 +167,7 @@ void copy_convert(const detail::host_policy& policy, const std::int64_t row_count = shape.first; const std::int64_t col_count = shape.second; - detail::threader_for_int64(row_count, [&](std::int64_t i) -> void { + detail::threader_for(row_count, row_count, [&](std::int64_t i) -> void { auto* out_raw_ptr = out_ptrs[i]; const auto* inp_raw_ptr = inp_ptrs[i]; From 4ba268e96d775a74d1a44b62b8b0a8bd558d19b7 Mon Sep 17 00:00:00 2001 From: Solovev Date: Thu, 29 Jan 2026 16:19:08 +0100 Subject: [PATCH 2/8] fix abi --- .github/.abignore | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/.abignore b/.github/.abignore index bf72b88d5a7..e4f31058500 100644 --- a/.github/.abignore +++ b/.github/.abignore @@ -14,6 +14,15 @@ ; limitations under the License. ; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;; DAAL THREADING LAYER DESELECTIONS ;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +; deselect threading layer functions + +[suppress_function] +symbol_name = _daal_threader_for +drop = yes + ;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;; DAAL DESELECTIONS ;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;; From c0f983e0d667907de336d836e365e9b339bc976a Mon Sep 17 00:00:00 2001 From: Solovev Date: Fri, 30 Jan 2026 09:54:33 +0100 Subject: [PATCH 3/8] minor fix --- .github/.abignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/.abignore b/.github/.abignore index e4f31058500..a6e9952ce5f 100644 --- a/.github/.abignore +++ b/.github/.abignore @@ -20,7 +20,7 @@ ; deselect threading layer functions [suppress_function] -symbol_name = _daal_threader_for +symbol_name = _daal_threader_for_int64 drop = yes ;;;;;;;;;;;;;;;;;;;;;;;;;;; From aaf4c6a7bdbf6927452a3859298d9668d143f2a2 Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Fri, 6 Feb 2026 05:10:38 -0800 Subject: [PATCH 4/8] fixes for int64 --- .../src/externals/core_threading_win_dll.cpp | 44 +++--- cpp/daal/src/threading/threading.cpp | 32 ++--- cpp/daal/src/threading/threading.h | 131 ++++++++++++++++-- cpp/oneapi/dal/backend/interop/threading.cpp | 10 +- cpp/oneapi/dal/detail/threading.hpp | 22 ++- 5 files changed, 170 insertions(+), 69 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index a6152544845..d6a7ee9d3d3 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -97,15 +97,16 @@ FARPROC load_daal_thr_func(const char * ordinal) typedef void * (*_threaded_malloc_t)(const size_t, const size_t); typedef void (*_threaded_free_t)(void *); -typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_threader_for_int32ptr_t)(const int *, const int *, const void *, daal::functype_int32ptr); -typedef void (*_daal_threader_for_simple_t)(int, int, const void *, daal::functype); -typedef void (*_daal_static_threader_for_t)(size_t, const void *, daal::functype_static); -typedef void (*_daal_threader_for_blocked_t)(int, int, const void *, daal::functype2); -typedef void (*_daal_threader_for_blocked_size_t)(size_t, size_t, const void *, daal::functype_blocked_size); typedef int (*_daal_threader_get_max_threads_t)(void); typedef int (*_daal_threader_get_current_thread_index_t)(void); -typedef void (*_daal_threader_for_break_t)(int, int, const void *, daal::functype_break); +typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_int32ptr_t)(const int *, const int *, const void *, daal::functype_int64ptr); +typedef void (*_daal_static_threader_for_t)(int64_t, const void *, daal::functype_static); +typedef void (*_daal_threader_for_blocked_t)(int64_t, int64_t, const void *, daal::functype2); +typedef void (*_daal_threader_for_blocked_size_t)(size_t, size_t, const void *, daal::functype_blocked_size); +typedef void (*_daal_threader_for_optional_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_break_t)(int64_t, int64_t, const void *, daal::functype_break); typedef int64_t (*_daal_parallel_reduce_int32_int64_t)(int32_t, int64_t, const void *, daal::loop_functype_int32_int64, const void *, daal::reduction_functype_int64); @@ -178,21 +179,21 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) _threaded_free_ptr(ptr); } -DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_t _daal_threader_for_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for"); - _daal_threader_for_ptr(n, threads_request, a, func); + _daal_threader_for_ptr(n, reserved, a, func); } -DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_simple_t _daal_threader_for_simple_ptr = (_daal_threader_for_simple_t)load_daal_thr_func("_daal_threader_for_simple"); - _daal_threader_for_simple_ptr(n, threads_request, a, func); + _daal_threader_for_simple_ptr(n, reserved, a, func); } -DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func) +DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int64ptr func) { load_daal_thr_dll(); @@ -201,7 +202,7 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, _daal_threader_for_int32ptr_ptr(begin, end, a, func); } -DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func) +DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func) { load_daal_thr_dll(); static _daal_static_threader_for_t _daal_static_threader_for_ptr = (_daal_static_threader_for_t)load_daal_thr_func("_daal_static_threader_for"); @@ -247,15 +248,15 @@ DAAL_EXPORT void _daal_parallel_sort_pair_fp64_uint64(daal::IdxValType * _daal_parallel_sort_pair_fp64_uint64_ptr(begin_ptr, end_ptr); } -DAAL_EXPORT void _daal_threader_for_blocked(int n, int threads_request, const void * a, daal::functype2 func) +DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func) { load_daal_thr_dll(); static _daal_threader_for_blocked_t _daal_threader_for_blocked_ptr = (_daal_threader_for_blocked_t)load_daal_thr_func("_daal_threader_for_blocked"); - _daal_threader_for_blocked_ptr(n, threads_request, a, func); + _daal_threader_for_blocked_ptr(n, reserved, a, func); } -DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func) +DAAL_EXPORT void _daal_threader_for_blocked_size(int64_t n, int64_t block, const void * a, daal::functype_blocked_size func) { load_daal_thr_dll(); static _daal_threader_for_blocked_size_t _daal_threader_for_blocked_size_ptr = @@ -263,11 +264,12 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v _daal_threader_for_blocked_size_ptr(n, block, a, func); } -DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func) { load_daal_thr_dll(); - static _daal_threader_for_t _daal_threader_for_optional_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for_optional"); - _daal_threader_for_optional_ptr(n, threads_request, a, func); + static _daal_threader_for_optional_t _daal_threader_for_optional_ptr = + (_daal_threader_for_optional_t)load_daal_thr_func("_daal_threader_for_optional"); + _daal_threader_for_optional_ptr(n, reserved, a, func); } DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, const void * a, daal::loop_functype_int32_int64 loop_func, @@ -298,11 +300,11 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * return _daal_parallel_reduce_int32ptr_int64_simple_ptr(begin, end, init, a, loop_func, b, reduction_func); } -DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func) +DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func) { load_daal_thr_dll(); static _daal_threader_for_break_t _daal_threader_for_break_ptr = (_daal_threader_for_break_t)load_daal_thr_func("_daal_threader_for_break"); - _daal_threader_for_break_ptr(n, threads_request, a, func); + _daal_threader_for_break_ptr(n, reserved, a, func); } DAAL_EXPORT int _daal_threader_get_max_threads() diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 32c1cb56c11..16d820d7013 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -152,14 +152,14 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v } } -DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int64_t i; for (i = r.begin(); i < r.end(); i++) { func(i, a); @@ -169,7 +169,7 @@ DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, } else { - int i; + int64_t i; for (i = 0; i < n; i++) { func(i, a); @@ -310,11 +310,11 @@ DAAL_PARALLEL_SORT_IMPL(daal::IdxValType, pair_fp64_uint64) #undef DAAL_PARALLEL_SORT_IMPL -DAAL_EXPORT void _daal_threader_for_blocked(int n, int reserved, const void * a, daal::functype2 func) +DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); } else { @@ -322,13 +322,13 @@ DAAL_EXPORT void _daal_threader_for_blocked(int n, int reserved, const void * a, } } -DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { if (_daal_is_in_parallel()) { - int i; + int64_t i; for (i = 0; i < n; i++) { func(i, a); @@ -336,24 +336,24 @@ DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const v } else { - _daal_threader_for(n, threads_request, a, func); + _daal_threader_for(n, reserved, a, func); } } else { - _daal_threader_for(n, threads_request, a, func); + _daal_threader_for(n, reserved, a, func); } } -DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func) +DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func) { if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::task_group_context context; tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int64_t i; for (i = r.begin(); i < r.end(); ++i) { bool needBreak = false; @@ -365,7 +365,7 @@ DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void } else { - int i; + int64_t i; for (i = 0; i < n; ++i) { bool needBreak = false; diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 39dc325ea1d..fd9b95daf1b 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -87,16 +87,16 @@ extern "C" { DAAL_EXPORT int _daal_threader_get_max_threads(); DAAL_EXPORT int _daal_threader_get_current_thread_index(); - DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t threads_request, const void * a, daal::functype func); - DAAL_EXPORT void _daal_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer); - DAAL_EXPORT void _daal_static_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer); - DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); + DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func); + DAAL_EXPORT void _daal_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); + DAAL_EXPORT void _daal_static_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); + DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); - DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); - DAAL_EXPORT void _daal_threader_for_blocked(int n, int threads_request, const void * a, daal::functype2 func); + DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func); + DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func); DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); - DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func); - DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func); + DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func); + DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func); DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, const void * a, daal::loop_functype_int32_int64 loop_func, const void * b, daal::reduction_functype_int64 reduction_func); @@ -252,7 +252,7 @@ inline void threader_func_break(int i, bool & needBreak, const void * a) } /// Pass a function to be executed in a for loop to the threading layer. -/// The maximal number of iterations in the loop is `2^31 - 1 (INT32_MAX)`. +/// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`. /// The default scheduling of the threading layer is used to assign /// the iterations of the loop to threads. /// Data dependencies between the iterations are allowed, but may requre the use @@ -293,13 +293,36 @@ inline void threader_for(int64_t n, int64_t reserved, const F & func) /// @param[in] reserved Parameter reserved for the future. Currently unused. /// @param[in] func Callable object that defines iteration's body. template -inline void threader_for_simple(int n, int reserved, const F & func) +inline void threader_for_simple(int64_t n, int64_t reserved, const F & func) { const void * a = static_cast(&func); _daal_threader_for_simple(n, reserved, a, threader_func); } +/// Pass a function to be executed in a for loop over a range of int pointers +/// to the threading layer. +/// +/// The iteration space of the loop is defined by the half-open range +/// `[begin, end)`. Each iteration corresponds to a single pointer value +/// within this range. +/// +/// The specifics of this loop comparing to `threader_for` and +/// `threader_for_simple` is that the iteration variable is not an index, +/// but a pointer to `int`. The threading layer always chunks the iteration +/// space with chunk size 1, so each pointer in the range represents an +/// independent iteration unit. +/// +/// Data dependencies between the iterations are allowed, but may require +/// the use of synchronization primitives. +/// +/// @tparam F Callable object of type +/// `[/* captures */](const int * i) -> void`, +/// where `i` iterates over all values in `[begin, end)`. +/// +/// @param[in] begin Pointer to the first element of the iteration range. +/// @param[in] end Pointer past the last element of the iteration range. +/// @param[in] func Callable object that defines iteration's body. template inline void threader_for_int32ptr(const int * begin, const int * end, const F & func) { @@ -360,27 +383,105 @@ inline void static_threader_for(size_t n, const F & func) /// @param[in] func Callable object that processes the block of loop's iterations /// `[beginRange, endRange)`. template -inline void threader_for_blocked(int n, int reserved, const F & func) +inline void threader_for_blocked(int64_t n, int64_t reserved, const F & func) { const void * a = static_cast(&func); _daal_threader_for_blocked(n, reserved, a, threader_func_b); } +/// Pass a function to be executed in a for loop to the threading layer. +/// The maximal number of iterations in the loop is `2^63 - 1 INT32_MAX`. +/// The default scheduling of the threading layer is used to assign +/// the iterations of the loop to threads. +/// +/// @tparam F Callable object of type `[/* captures */](int beginRange, int endRange) -> void` +/// where +/// `beginRange` is the starting index of the loop iterations block to be +/// processed by a thread, `0 <= beginRange < n`; +/// `endRange` is the index after the end of the loop's iterations block to be +/// processed by a thread, `beginRange < endRange <= n`; +/// +/// @param[in] n Number of iterations in the for loop. +/// @param[in] reserved Parameter reserved for the future. Currently unused. +/// @param[in] func Callable object that processes the block of loop's iterations +/// `[beginRange, endRange)`. template -inline void threader_for_optional(int n, int threads_request, const F & func) +inline void threader_for_blocked_size(size_t n, size_t reserved, const F & func) { const void * a = static_cast(&func); - _daal_threader_for_optional(n, threads_request, a, threader_func); + _daal_threader_for_blocked_size(n, reserved, a, threader_func_b); } +/// Pass a function to be executed in a for loop to the threading layer, +/// with optional parallelization. +/// +/// The maximal number of iterations in the loop is `2^31 - 1 (INT32_MAX)`. +/// The default scheduling of the threading layer is used to assign +/// the iterations of the loop to threads. +/// +/// The specifics of this loop comparing to `threader_for` is that +/// the threading layer avoids creating nested parallel regions. +/// If the call is made from within an already running parallel region, +/// the loop is executed sequentially in the current thread. +/// Otherwise, the loop may be executed in parallel according to +/// the threading backend policy. +/// +/// This behavior allows safe usage of this function inside code that +/// may already be running under a parallel context, preventing +/// oversubscription and excessive thread creation. +/// +/// @tparam F Callable object of type `[/* captures */](int i) -> void`, +/// where `i` is the loop's iteration index, `0 <= i < n`. +/// +/// @param[in] n Number of iterations in the for loop. +/// @param[in] reserved Parameter reserved for the future. Currently unused. +/// @param[in] func Callable object that defines iteration's body. +template +inline void threader_for_optional(int64_t n, int64_t reserved, const F & func) +{ + const void * a = static_cast(&func); + + _daal_threader_for_optional(n, reserved, a, threader_func); +} + +/// Pass a function to be executed in a for loop to the threading layer, +/// with support for early termination ("break"). +/// +/// The maximal number of iterations in the loop is `2^31 - 1 (INT32_MAX)`. +/// The iteration space is `[0, n)`. The threading layer chunks the +/// iteration space with chunk size 1. +/// +/// The specifics of this loop comparing to `threader_for` is that the +/// callable object may request early termination of the loop. +/// If any iteration sets the `needBreak` flag to `true`, the threading +/// layer attempts to cancel remaining iterations. +/// +/// In case of oneTBB threading backend this is implemented using +/// `tbb::task_group_context::cancel_group_execution()`. Already running +/// iterations may still complete, but no new iterations will be started +/// after the cancellation request. +/// +/// In the single-threaded case, the loop is stopped immediately, +/// behaving like a regular `break` statement. +/// +/// @tparam F Callable object of type +/// `[/* captures */](int i, bool & needBreak) -> void`, +/// where: +/// `i` is the loop's iteration index, `0 <= i < n`; +/// `needBreak` may be set to `true` to request early exit. +/// +/// @param[in] n Number of iterations in the for loop. +/// @param[in] reserved Parameter reserved for the future. Currently unused. +/// @param[in] func Callable object that defines iteration's body +/// and may request loop termination. template -inline void threader_for_break(int n, int threads_request, const F & func) +inline void threader_for_break(int64_t n, int64_t reserved, const F & func) { const void * a = static_cast(&func); - _daal_threader_for_break(n, threads_request, a, threader_func_break); + _daal_threader_for_break(n, reserved, a, threader_func_break); } template diff --git a/cpp/oneapi/dal/backend/interop/threading.cpp b/cpp/oneapi/dal/backend/interop/threading.cpp index 13d972e9c64..9a3625489be 100644 --- a/cpp/oneapi/dal/backend/interop/threading.cpp +++ b/cpp/oneapi/dal/backend/interop/threading.cpp @@ -26,17 +26,17 @@ ONEDAL_EXPORT int _onedal_threader_get_current_thread_index() { } ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n, - std::int64_t threads_request, + std::int64_t reserved, const void *a, oneapi::dal::preview::functype func) { - _daal_threader_for(n, threads_request, a, static_cast(func)); + _daal_threader_for(n, reserved, a, static_cast(func)); } -ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n, - std::int32_t threads_request, +ONEDAL_EXPORT void _onedal_threader_for_simple(std::int64_t n, + std::int64_t reserved, const void *a, oneapi::dal::preview::functype func) { - _daal_threader_for_simple(n, threads_request, a, static_cast(func)); + _daal_threader_for_simple(n, reserved, a, static_cast(func)); } ONEDAL_EXPORT void _onedal_threader_for_int32ptr(const std::int32_t *begin, diff --git a/cpp/oneapi/dal/detail/threading.hpp b/cpp/oneapi/dal/detail/threading.hpp index e34037f9433..a97518644ed 100644 --- a/cpp/oneapi/dal/detail/threading.hpp +++ b/cpp/oneapi/dal/detail/threading.hpp @@ -56,12 +56,12 @@ ONEDAL_EXPORT int _onedal_threader_get_max_threads(); ONEDAL_EXPORT int _onedal_threader_get_current_thread_index(); ONEDAL_EXPORT void _onedal_threader_for(std::int64_t n, - std::int64_t threads_request, + std::int64_t reserved, const void *a, oneapi::dal::preview::functype func); -ONEDAL_EXPORT void _onedal_threader_for_simple(std::int32_t n, - std::int32_t threads_request, +ONEDAL_EXPORT void _onedal_threader_for_simple(std::int64_t n, + std::int64_t reserved, const void *a, oneapi::dal::preview::functype func); @@ -151,21 +151,19 @@ inline void threader_func_blocked_size(std::size_t f, std::size_t l, const void } template -inline ONEDAL_EXPORT void threader_for(std::int64_t n, - std::int64_t threads_request, - const F &lambda) { +inline ONEDAL_EXPORT void threader_for(std::int64_t n, std::int64_t reserved, const F &lambda) { const void *a = static_cast(&lambda); - _onedal_threader_for(n, threads_request, a, threader_func); + _onedal_threader_for(n, reserved, a, threader_func); } template -inline ONEDAL_EXPORT void threader_for_simple(std::int32_t n, - std::int32_t threads_request, +inline ONEDAL_EXPORT void threader_for_simple(std::int64_t n, + std::int64_t reserved, const F &lambda) { const void *a = static_cast(&lambda); - _onedal_threader_for_simple(n, threads_request, a, threader_func); + _onedal_threader_for_simple(n, reserved, a, threader_func); } template @@ -178,8 +176,8 @@ inline ONEDAL_EXPORT void threader_for_int32ptr(const std::int32_t *begin, } template -inline ONEDAL_EXPORT void threader_for_blocked_size(std::size_t count, - std::size_t block, +inline ONEDAL_EXPORT void threader_for_blocked_size(std::int64_t count, + std::int64_t block, const F &lambda) { const void *a = static_cast(&lambda); From ea5cbfe170cc817abf9f7cfb43cb959d0e9e6244 Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Mon, 9 Feb 2026 01:21:40 -0800 Subject: [PATCH 5/8] init refactoring wip --- .../src/externals/core_threading_win_dll.cpp | 24 ++++------- cpp/daal/src/threading/threading.cpp | 41 +++++++------------ cpp/daal/src/threading/threading.h | 24 +++++------ .../vertex_partitioning_default_kernel.hpp | 2 +- cpp/oneapi/dal/backend/interop/threading.cpp | 22 ++++------ cpp/oneapi/dal/detail/threading.hpp | 22 +++++----- 6 files changed, 53 insertions(+), 82 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index d6a7ee9d3d3..74e21c1cff8 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -99,12 +99,12 @@ typedef void (*_threaded_free_t)(void *); typedef int (*_daal_threader_get_max_threads_t)(void); typedef int (*_daal_threader_get_current_thread_index_t)(void); + typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_threader_for_int32ptr_t)(const int *, const int *, const void *, daal::functype_int64ptr); typedef void (*_daal_static_threader_for_t)(int64_t, const void *, daal::functype_static); +typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_int64ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int64ptr); typedef void (*_daal_threader_for_blocked_t)(int64_t, int64_t, const void *, daal::functype2); -typedef void (*_daal_threader_for_blocked_size_t)(size_t, size_t, const void *, daal::functype_blocked_size); typedef void (*_daal_threader_for_optional_t)(int64_t, int64_t, const void *, daal::functype); typedef void (*_daal_threader_for_break_t)(int64_t, int64_t, const void *, daal::functype_break); @@ -193,13 +193,13 @@ DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const vo _daal_threader_for_simple_ptr(n, reserved, a, func); } -DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int64ptr func) +DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func) { load_daal_thr_dll(); - static _daal_threader_for_int32ptr_t _daal_threader_for_int32ptr_ptr = - (_daal_threader_for_int32ptr_t)load_daal_thr_func("_daal_threader_for_int32ptr"); - _daal_threader_for_int32ptr_ptr(begin, end, a, func); + static _daal_threader_for_int64ptr_t _daal_threader_for_int64ptr_ptr = + (_daal_threader_for_int64ptr_t)load_daal_thr_func("_daal_threader_for_int64ptr"); + _daal_threader_for_int64ptr_ptr(begin, end, a, func); } DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func) @@ -248,7 +248,7 @@ DAAL_EXPORT void _daal_parallel_sort_pair_fp64_uint64(daal::IdxValType * _daal_parallel_sort_pair_fp64_uint64_ptr(begin_ptr, end_ptr); } -DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func) +DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype_blocked func) { load_daal_thr_dll(); static _daal_threader_for_blocked_t _daal_threader_for_blocked_ptr = @@ -256,14 +256,6 @@ DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const v _daal_threader_for_blocked_ptr(n, reserved, a, func); } -DAAL_EXPORT void _daal_threader_for_blocked_size(int64_t n, int64_t block, const void * a, daal::functype_blocked_size func) -{ - load_daal_thr_dll(); - static _daal_threader_for_blocked_size_t _daal_threader_for_blocked_size_ptr = - (_daal_threader_for_blocked_size_t)load_daal_thr_func("_daal_threader_for_blocked_size"); - _daal_threader_for_blocked_size_ptr(n, block, a, func); -} - DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func) { load_daal_thr_dll(); diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 16d820d7013..bfe17bf8bf4 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -139,19 +139,6 @@ DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, } } -DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func) -{ - if (daal::threader_env()->getNumberOfThreads() > 1) - { - tbb::parallel_for(tbb::blocked_range(0ul, n, block), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); - } - else - { - func(0ul, n, a); - } -} - DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) @@ -177,12 +164,12 @@ DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const vo } } -DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func) +DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { - const int * i; + tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { + const int64_t * i; for (i = r.begin(); i != r.end(); i++) { func(i, a); @@ -191,7 +178,7 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, } else { - const int * i; + const int64_t * i; for (i = begin; i != end; ++i) { func(i, a); @@ -253,21 +240,21 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * } } -DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func) +DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func) { - const size_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), static_cast(_daal_threader_get_max_threads())); + const int64_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), static_cast(_daal_threader_get_max_threads())); if (nthreads > 1) { - const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); + const int64_t nblocks_per_thread = n / nthreads + !!(n % nthreads); tbb::parallel_for( - tbb::blocked_range(0, nthreads, 1), - [&](tbb::blocked_range r) { - const size_t tid = r.begin(); - const size_t begin = tid * nblocks_per_thread; - const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const int64_t tid = r.begin(); + const int64_t begin = tid * nblocks_per_thread; + const int64_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; - for (size_t i = begin; i < end; ++i) + for (int64_t i = begin; i < end; ++i) { func(i, tid, a); } @@ -276,7 +263,7 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct } else { - for (size_t i = 0; i < n; i++) + for (int64_t i = 0; i < n; i++) { func(i, 0, a); } diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index fd9b95daf1b..5464153827d 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -40,15 +40,13 @@ struct IdxValType bool operator>(const IdxValType & o) const { return o.value == value ? index > o.index : value > o.value; } bool operator<=(const IdxValType & o) const { return value < o.value || (value == o.value && index == o.index); } }; -typedef void (*functype)(int i, const void * a); -typedef void (*functype_int64)(int64_t i, const void * a); -typedef void (*functype_int32ptr)(const int * i, const void * a); -typedef void (*functype_static)(size_t i, size_t tid, const void * a); -typedef void (*functype2)(int i, int n, const void * a); -typedef void (*functype_blocked_size)(size_t first, size_t last, const void * a); +typedef void (*functype)(int64_t i, const void * a); +typedef void (*functype_int64ptr)(const int64_t * i, const void * a); +typedef void (*functype_static)(int64_t i, size_t tid, const void * a); +typedef void (*functype2)(int64_t i, int64_t n, const void * a); typedef void * (*tls_functype)(const void * a); typedef void (*tls_reduce_functype)(void * p, const void * a); -typedef void (*functype_break)(int i, bool & needBreak, const void * a); +typedef void (*functype_break)(int64_t i, bool & needBreak, const void * a); typedef int64_t (*loop_functype_int32_int64)(int32_t start_idx_reduce, int32_t end_idx_reduce, int64_t value_for_reduce, const void * a); typedef int64_t (*loop_functype_int32ptr_int64)(const int32_t * start_idx_reduce, const int32_t * end_idx_reduce, int64_t value_for_reduce, const void * a); @@ -87,14 +85,14 @@ extern "C" { DAAL_EXPORT int _daal_threader_get_max_threads(); DAAL_EXPORT int _daal_threader_get_current_thread_index(); + DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); DAAL_EXPORT void _daal_static_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func); - DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); + DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func); DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func); DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func); - DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func); @@ -224,28 +222,28 @@ inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl) } template -inline void threader_func(int i, const void * a) +inline void threader_func(int64_t i, const void * a) { const F & func = *static_cast(a); func(i); } template -inline void static_threader_func(size_t i, size_t tid, const void * a) +inline void static_threader_func(int64_t i, size_t tid, const void * a) { const F & func = *static_cast(a); func(i, tid); } template -inline void threader_func_b(int i0, int in, const void * a) +inline void threader_func_b(int64_t i0, int64_t in, const void * a) { const F & func = *static_cast(a); func(i0, in); } template -inline void threader_func_break(int i, bool & needBreak, const void * a) +inline void threader_func_break(int64_t i, bool & needBreak, const void * a) { const F & func = *static_cast(a); func(i, needBreak); diff --git a/cpp/oneapi/dal/algo/connected_components/backend/cpu/vertex_partitioning_default_kernel.hpp b/cpp/oneapi/dal/algo/connected_components/backend/cpu/vertex_partitioning_default_kernel.hpp index 497e98427e2..919fd406789 100644 --- a/cpp/oneapi/dal/algo/connected_components/backend/cpu/vertex_partitioning_default_kernel.hpp +++ b/cpp/oneapi/dal/algo/connected_components/backend/cpu/vertex_partitioning_default_kernel.hpp @@ -99,7 +99,7 @@ std::int32_t most_frequent_element(const std::atomic *components, std::int32_t *root_sample_counts = allocate(vertex_allocator, vertex_count); - dal::detail::threader_for(vertex_count, vertex_count, [&](std::int32_t u) { + dal::detail::threader_for(vertex_count, vertex_count, [&](std::int64_t u) { root_sample_counts[u] = 0; }); diff --git a/cpp/oneapi/dal/backend/interop/threading.cpp b/cpp/oneapi/dal/backend/interop/threading.cpp index 9a3625489be..166b9e192fa 100644 --- a/cpp/oneapi/dal/backend/interop/threading.cpp +++ b/cpp/oneapi/dal/backend/interop/threading.cpp @@ -39,22 +39,18 @@ ONEDAL_EXPORT void _onedal_threader_for_simple(std::int64_t n, _daal_threader_for_simple(n, reserved, a, static_cast(func)); } -ONEDAL_EXPORT void _onedal_threader_for_int32ptr(const std::int32_t *begin, - const std::int32_t *end, +ONEDAL_EXPORT void _onedal_threader_for_int64ptr(const std::int64_t *begin, + const std::int64_t *end, const void *a, - oneapi::dal::preview::functype_int32ptr func) { - _daal_threader_for_int32ptr(begin, end, a, static_cast(func)); + oneapi::dal::preview::functype_int64ptr func) { + _daal_threader_for_int64ptr(begin, end, a, static_cast(func)); } -ONEDAL_EXPORT void _onedal_threader_for_blocked_size( - std::size_t count, - std::size_t block, - const void *a, - oneapi::dal::preview::functype_blocked_size func) { - _daal_threader_for_blocked_size(count, - block, - a, - static_cast(func)); +ONEDAL_EXPORT void _onedal_threader_for_blocked(std::int64_t count, + std::int64_t block, + const void *a, + oneapi::dal::preview::functype2 func) { + _daal_threader_for_blocked(count, block, a, static_cast(func)); } ONEDAL_EXPORT std::int64_t _onedal_parallel_reduce_int32_int64( diff --git a/cpp/oneapi/dal/detail/threading.hpp b/cpp/oneapi/dal/detail/threading.hpp index a97518644ed..ed9ae59ba5f 100644 --- a/cpp/oneapi/dal/detail/threading.hpp +++ b/cpp/oneapi/dal/detail/threading.hpp @@ -26,10 +26,9 @@ #endif namespace oneapi::dal::preview { -typedef void (*functype)(std::int32_t i, const void *a); -typedef void (*functype_int64)(std::int64_t i, const void *a); -typedef void (*functype_int32ptr)(const std::int32_t *i, const void *a); -typedef void (*functype_blocked_size)(std::size_t, std::size_t, const void *); +typedef void (*functype)(std::int64_t i, const void *a); +typedef void (*functype_int64ptr)(const std::int64_t *i, const void *a); +typedef void (*functype2)(std::int64_t, std::int64_t, const void *); typedef void *(*tls_functype)(const void *a); typedef void (*tls_reduce_functype)(void *p, const void *a); @@ -65,16 +64,15 @@ ONEDAL_EXPORT void _onedal_threader_for_simple(std::int64_t n, const void *a, oneapi::dal::preview::functype func); -ONEDAL_EXPORT void _onedal_threader_for_int32ptr(const std::int32_t *begin, - const std::int32_t *end, +ONEDAL_EXPORT void _onedal_threader_for_int64ptr(const std::int64_t *begin, + const std::int64_t *end, const void *a, - oneapi::dal::preview::functype_int32ptr func); + oneapi::dal::preview::functype_int64ptr func); -ONEDAL_EXPORT void _onedal_threader_for_blocked_size( - std::size_t count, - std::size_t block, - const void *a, - oneapi::dal::preview::functype_blocked_size func); +ONEDAL_EXPORT void _onedal_threader_for_blocked(std::int64_t count, + std::int64_t block, + const void *a, + oneapi::dal::preview::functype2 func); ONEDAL_EXPORT std::int64_t _onedal_parallel_reduce_int32_int64( std::int32_t n, From 31b9cf0ebb404e75f7b30ad918efe75a4dcd739c Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Mon, 16 Feb 2026 00:26:24 -0800 Subject: [PATCH 6/8] fixes --- .../src/externals/core_threading_win_dll.cpp | 22 +++---- cpp/daal/src/threading/threading.h | 4 +- .../cpu/vertex_ranking_default_kernel.hpp | 66 +++++++++---------- cpp/oneapi/dal/detail/threading.hpp | 30 ++++----- .../backend/convert/copy_convert_impl_cpu.cpp | 31 ++++----- 5 files changed, 75 insertions(+), 78 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index 74e21c1cff8..119e5076255 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -101,7 +101,7 @@ typedef int (*_daal_threader_get_max_threads_t)(void); typedef int (*_daal_threader_get_current_thread_index_t)(void); typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_static_threader_for_t)(int64_t, const void *, daal::functype_static); +typedef void (*_daal_static_threader_for_t)(size_t, const void *, daal::functype_static); typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); typedef void (*_daal_threader_for_int64ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int64ptr); typedef void (*_daal_threader_for_blocked_t)(int64_t, int64_t, const void *, daal::functype2); @@ -179,18 +179,18 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) _threaded_free_ptr(ptr); } -DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t grain_size, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_t _daal_threader_for_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for"); - _daal_threader_for_ptr(n, reserved, a, func); + _daal_threader_for_ptr(n, grain_size, a, func); } -DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t grain_size, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_simple_t _daal_threader_for_simple_ptr = (_daal_threader_for_simple_t)load_daal_thr_func("_daal_threader_for_simple"); - _daal_threader_for_simple_ptr(n, reserved, a, func); + _daal_threader_for_simple_ptr(n, grain_size, a, func); } DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func) @@ -248,20 +248,20 @@ DAAL_EXPORT void _daal_parallel_sort_pair_fp64_uint64(daal::IdxValType * _daal_parallel_sort_pair_fp64_uint64_ptr(begin_ptr, end_ptr); } -DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype_blocked func) +DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t grain_size, const void * a, daal::functype_blocked func) { load_daal_thr_dll(); static _daal_threader_for_blocked_t _daal_threader_for_blocked_ptr = (_daal_threader_for_blocked_t)load_daal_thr_func("_daal_threader_for_blocked"); - _daal_threader_for_blocked_ptr(n, reserved, a, func); + _daal_threader_for_blocked_ptr(n, grain_size, a, func); } -DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t grain_size, const void * a, daal::functype func) { load_daal_thr_dll(); static _daal_threader_for_optional_t _daal_threader_for_optional_ptr = (_daal_threader_for_optional_t)load_daal_thr_func("_daal_threader_for_optional"); - _daal_threader_for_optional_ptr(n, reserved, a, func); + _daal_threader_for_optional_ptr(n, grain_size, a, func); } DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, const void * a, daal::loop_functype_int32_int64 loop_func, @@ -292,11 +292,11 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * return _daal_parallel_reduce_int32ptr_int64_simple_ptr(begin, end, init, a, loop_func, b, reduction_func); } -DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func) +DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t grain_size, const void * a, daal::functype_break func) { load_daal_thr_dll(); static _daal_threader_for_break_t _daal_threader_for_break_ptr = (_daal_threader_for_break_t)load_daal_thr_func("_daal_threader_for_break"); - _daal_threader_for_break_ptr(n, reserved, a, func); + _daal_threader_for_break_ptr(n, grain_size, a, func); } DAAL_EXPORT int _daal_threader_get_max_threads() diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 5464153827d..d9571264ba3 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -405,11 +405,11 @@ inline void threader_for_blocked(int64_t n, int64_t reserved, const F & func) /// @param[in] func Callable object that processes the block of loop's iterations /// `[beginRange, endRange)`. template -inline void threader_for_blocked_size(size_t n, size_t reserved, const F & func) +inline void threader_for_blocked(size_t n, size_t reserved, const F & func) { const void * a = static_cast(&func); - _daal_threader_for_blocked_size(n, reserved, a, threader_func_b); + _daal_threader_for_blocked(n, reserved, a, threader_func_b); } /// Pass a function to be executed in a for loop to the threading layer, diff --git a/cpp/oneapi/dal/algo/triangle_counting/backend/cpu/vertex_ranking_default_kernel.hpp b/cpp/oneapi/dal/algo/triangle_counting/backend/cpu/vertex_ranking_default_kernel.hpp index 1b65d0a352a..1b84251ccec 100644 --- a/cpp/oneapi/dal/algo/triangle_counting/backend/cpu/vertex_ranking_default_kernel.hpp +++ b/cpp/oneapi/dal/algo/triangle_counting/backend/cpu/vertex_ranking_default_kernel.hpp @@ -32,7 +32,7 @@ template array triangle_counting_local(const dal::preview::detail::topology& t, std::int64_t* triangles_local) { const auto vertex_count = t.get_vertex_count(); - std::int32_t average_degree = t.get_edge_count() / vertex_count; + //std::int32_t average_degree = t.get_edge_count() / vertex_count; int thread_cnt = dal::detail::threader_get_max_threads(); dal::detail::threader_for(thread_cnt * vertex_count, @@ -41,8 +41,8 @@ array triangle_counting_local(const dal::preview::detail::topology triangles_local[u] = 0; }); - const std::int32_t average_degree_sparsity_boundary = 4; - if (average_degree < average_degree_sparsity_boundary) { + //const std::int32_t average_degree_sparsity_boundary = 4; + if (true) { dal::detail::threader_for(vertex_count, vertex_count, [&](std::int32_t u) { for (auto v_ = t.get_vertex_neighbors_begin(u); v_ != t.get_vertex_neighbors_end(u); ++v_) { @@ -72,40 +72,40 @@ array triangle_counting_local(const dal::preview::detail::topology }); } else { //average_degree >= average_degree_sparsity_boundary - dal::detail::threader_for_simple(vertex_count, vertex_count, [&](std::int32_t u) { - if (t.get_vertex_degree(u) >= 2) - dal::detail::threader_for_int32ptr( - t.get_vertex_neighbors_begin(u), - t.get_vertex_neighbors_end(u), - [&](const std::int32_t* v_) { - std::int32_t v = *v_; - if (v <= u) { - const std::int32_t u_degree = t.get_vertex_degree(u); - const std::int32_t* v_neighbors_begin = t.get_vertex_neighbors_begin(v); - const std::int32_t v_degree = t.get_vertex_degree(v); - std::int32_t new_v_degree; + // dal::detail::threader_for_simple(vertex_count, vertex_count, [&](std::int32_t u) { + // if (t.get_vertex_degree(u) >= 2) + // dal::detail::threader_for_int64ptr( + // static_cast(static_cast(t.get_vertex_neighbors_begin(u))), + // static_cast(static_cast(t.get_vertex_neighbors_end(u))), + // [&](const std::int32_t* v_) { + // std::int32_t v = *v_; + // if (v <= u) { + // const std::int32_t u_degree = t.get_vertex_degree(u); + // const std::int32_t* v_neighbors_begin = t.get_vertex_neighbors_begin(v); + // const std::int32_t v_degree = t.get_vertex_degree(v); + // std::int32_t new_v_degree; - for (new_v_degree = 0; (new_v_degree < v_degree) && - (v_neighbors_begin[new_v_degree] <= v); - new_v_degree++) - ; + // for (new_v_degree = 0; (new_v_degree < v_degree) && + // (v_neighbors_begin[new_v_degree] <= v); + // new_v_degree++) + // ; - int thread_id = dal::detail::threader_get_current_thread_index(); - std::int64_t indx = - (std::int64_t)thread_id * (std::int64_t)vertex_count; + // int thread_id = dal::detail::threader_get_current_thread_index(); + // std::int64_t indx = + // (std::int64_t)thread_id * (std::int64_t)vertex_count; - auto tc = intersection_local_tc{}(t.get_vertex_neighbors_begin(u), - t.get_vertex_neighbors_begin(v), - u_degree, - new_v_degree, - triangles_local + indx, - vertex_count); + // auto tc = intersection_local_tc{}(t.get_vertex_neighbors_begin(u), + // t.get_vertex_neighbors_begin(v), + // u_degree, + // new_v_degree, + // triangles_local + indx, + // vertex_count); - triangles_local[indx + u] += tc; - triangles_local[indx + v] += tc; - } - }); - }); + // triangles_local[indx + u] += tc; + // triangles_local[indx + v] += tc; + // } + // }); + // }); } auto arr_triangles = array::empty(vertex_count); diff --git a/cpp/oneapi/dal/detail/threading.hpp b/cpp/oneapi/dal/detail/threading.hpp index ed9ae59ba5f..1a8135be787 100644 --- a/cpp/oneapi/dal/detail/threading.hpp +++ b/cpp/oneapi/dal/detail/threading.hpp @@ -124,26 +124,26 @@ inline int threader_get_current_thread_index() { return _onedal_threader_get_current_thread_index(); } -template -inline void threader_func(std::int32_t i, const void *a) { - const F &lambda = *static_cast(a); - lambda(i); -} +// template +// inline void threader_func(std::int32_t i, const void *a) { +// const F &lambda = *static_cast(a); +// lambda(i); +// } template -inline void threader_func_int64(std::int64_t i, const void *a) { +inline void threader_func(std::int64_t i, const void *a) { const F &lambda = *static_cast(a); lambda(i); } template -inline void threader_func_int32ptr(const std::int32_t *i, const void *a) { +inline void threader_func_int64ptr(const std::int64_t *i, const void *a) { const F &lambda = *static_cast(a); lambda(i); } template -inline void threader_func_blocked_size(std::size_t f, std::size_t l, const void *a) { +inline void threader_func_blocked(std::int64_t f, std::int64_t l, const void *a) { const F &lambda = *static_cast(a); lambda(f, l); } @@ -165,21 +165,21 @@ inline ONEDAL_EXPORT void threader_for_simple(std::int64_t n, } template -inline ONEDAL_EXPORT void threader_for_int32ptr(const std::int32_t *begin, - const std::int32_t *end, +inline ONEDAL_EXPORT void threader_for_int64ptr(const std::int64_t *begin, + const std::int64_t *end, const F &lambda) { const void *a = static_cast(&lambda); - _onedal_threader_for_int32ptr(begin, end, a, threader_func_int32ptr); + _onedal_threader_for_int64ptr(begin, end, a, threader_func_int64ptr); } template -inline ONEDAL_EXPORT void threader_for_blocked_size(std::int64_t count, - std::int64_t block, - const F &lambda) { +inline ONEDAL_EXPORT void threader_for_blocked(std::int64_t count, + std::int64_t block, + const F &lambda) { const void *a = static_cast(&lambda); - _onedal_threader_for_blocked_size(count, block, a, threader_func_blocked_size); + _onedal_threader_for_blocked(count, block, a, threader_func_blocked); } template diff --git a/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp b/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp index f252c2150d4..ea525f365aa 100644 --- a/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp +++ b/cpp/oneapi/dal/table/backend/convert/copy_convert_impl_cpu.cpp @@ -136,23 +136,20 @@ void copy_convert(const detail::host_policy& policy, auto block_size = propose_block_size(policy); const auto block_size_s = detail::integral_cast(block_size); - detail::threader_for_blocked_size( - count_s, - block_size_s, - [=](std::size_t f, std::size_t l) -> void { - const auto first = detail::integral_cast(f); - const auto last = detail::integral_cast(l); - const std::int64_t count = last - first; - - const InpType* const off_inp_ptr = inp_ptr + inp_str * first; - OutType* const off_out_ptr = out_ptr + out_str * first; - - copy_converter_impl::run(off_out_ptr, - out_str, - off_inp_ptr, - inp_str, - count); - }); + detail::threader_for_blocked(count_s, block_size_s, [=](std::size_t f, std::size_t l) -> void { + const auto first = detail::integral_cast(f); + const auto last = detail::integral_cast(l); + const std::int64_t count = last - first; + + const InpType* const off_inp_ptr = inp_ptr + inp_str * first; + OutType* const off_out_ptr = out_ptr + out_str * first; + + copy_converter_impl::run(off_out_ptr, + out_str, + off_inp_ptr, + inp_str, + count); + }); } template From 0ec337a03cb621109a9067d3f6250645f53c2ebd Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Mon, 16 Feb 2026 05:14:22 -0800 Subject: [PATCH 7/8] minor fixes --- .../src/externals/core_threading_win_dll.cpp | 3 ++- cpp/daal/src/threading/threading.cpp | 18 +++++++++--------- cpp/daal/src/threading/threading.h | 4 ++-- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index 119e5076255..3df5eac8594 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -100,8 +100,9 @@ typedef void (*_threaded_free_t)(void *); typedef int (*_daal_threader_get_max_threads_t)(void); typedef int (*_daal_threader_get_current_thread_index_t)(void); +typedef void (*_daal_static_threader_for_t)(int64_t, const void *, daal::functype_static); + typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_static_threader_for_t)(size_t, const void *, daal::functype_static); typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); typedef void (*_daal_threader_for_int64ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int64ptr); typedef void (*_daal_threader_for_blocked_t)(int64_t, int64_t, const void *, daal::functype2); diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index bfe17bf8bf4..5a5f84b6a96 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -117,11 +117,11 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo return 1; } -DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t grain_size, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + tbb::parallel_for(tbb::blocked_range(0, n, grain_size), [&](tbb::blocked_range r) { int64_t i; for (i = r.begin(); i < r.end(); i++) { @@ -139,12 +139,12 @@ DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, } } -DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t grain_size, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::parallel_for( - tbb::blocked_range(0, n, 1), + tbb::blocked_range(0, n, grain_size), [&](tbb::blocked_range r) { int64_t i; for (i = r.begin(); i < r.end(); i++) @@ -297,7 +297,7 @@ DAAL_PARALLEL_SORT_IMPL(daal::IdxValType, pair_fp64_uint64) #undef DAAL_PARALLEL_SORT_IMPL -DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const void * a, daal::functype2 func) +DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t grain_size, const void * a, daal::functype2 func) { if (daal::threader_env()->getNumberOfThreads() > 1) { @@ -309,7 +309,7 @@ DAAL_EXPORT void _daal_threader_for_blocked(int64_t n, int64_t reserved, const v } } -DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func) +DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t grain_size, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) { @@ -323,16 +323,16 @@ DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const } else { - _daal_threader_for(n, reserved, a, func); + _daal_threader_for(n, grain_size, a, func); } } else { - _daal_threader_for(n, reserved, a, func); + _daal_threader_for(n, grain_size, a, func); } } -DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func) +DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t grain_size, const void * a, daal::functype_break func) { if (daal::threader_env()->getNumberOfThreads() > 1) { diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index d9571264ba3..6883c76b345 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -88,7 +88,6 @@ extern "C" DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); - DAAL_EXPORT void _daal_static_threader_reduce(const int64_t n, const int64_t grainSize, daal::Reducer & reducer); DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func); DAAL_EXPORT void _daal_static_threader_for(int64_t n, const void * a, daal::functype_static func); @@ -96,6 +95,7 @@ extern "C" DAAL_EXPORT void _daal_threader_for_optional(int64_t n, int64_t reserved, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_break(int64_t n, int64_t reserved, const void * a, daal::functype_break func); + DAAL_EXPORT void _daal_static_threader_reduce(const size_t n, const size_t grainSize, daal::Reducer & reducer); DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, const void * a, daal::loop_functype_int32_int64 loop_func, const void * b, daal::reduction_functype_int64 reduction_func); DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64_simple(int32_t n, int64_t init, const void * a, daal::loop_functype_int32_int64 loop_func, @@ -357,7 +357,7 @@ inline void threader_for_int32ptr(const int * begin, const int * end, const F & /// @param[in] n Number of iterations in the for loop. /// @param[in] func Callable object that defines iteration's body. template -inline void static_threader_for(size_t n, const F & func) +inline void static_threader_for(int64_t n, const F & func) { const void * a = static_cast(&func); From bffdf4e4c32d94951add05ca3986058a032b4158 Mon Sep 17 00:00:00 2001 From: Alexandr-Solovev Date: Fri, 20 Feb 2026 03:43:14 -0800 Subject: [PATCH 8/8] fixes --- .ci/scripts/abi_check.sh | 1 + .github/.abignore | 9 ------ .../src/externals/core_threading_win_dll.cpp | 28 ++++++++++++++++++- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/.ci/scripts/abi_check.sh b/.ci/scripts/abi_check.sh index e92c15dad86..c954d0a6093 100755 --- a/.ci/scripts/abi_check.sh +++ b/.ci/scripts/abi_check.sh @@ -34,6 +34,7 @@ fi for i in "${solibs[@]}" do name=$(basename $i) + if [[ "$name" == *"libonedal_thread"* ]]; then continue; fi echo "======== ${name} ========" abidiff --suppr .github/.abignore $i $release_dir/$name retVal=$? diff --git a/.github/.abignore b/.github/.abignore index a2c14cb3827..5cbd47f3fcc 100644 --- a/.github/.abignore +++ b/.github/.abignore @@ -14,15 +14,6 @@ ; limitations under the License. ; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;;; DAAL THREADING LAYER DESELECTIONS ;;;; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -; deselect threading layer functions - -[suppress_function] -symbol_name = _daal_threader_for_int64 -drop = yes - ;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;; DAAL DESELECTIONS ;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index 3df5eac8594..ffb07ff2a88 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -101,13 +101,25 @@ typedef int (*_daal_threader_get_max_threads_t)(void); typedef int (*_daal_threader_get_current_thread_index_t)(void); typedef void (*_daal_static_threader_for_t)(int64_t, const void *, daal::functype_static); +typedef void (*_daal_static_threader_for_size_t)(size_t, const void *, daal::functype_static_size_t); typedef void (*_daal_threader_for_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_int32_t)(int, int, const void *, daal::functype_int32); + typedef void (*_daal_threader_for_simple_t)(int64_t, int64_t, const void *, daal::functype); -typedef void (*_daal_threader_for_int64ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int64ptr); +typedef void (*_daal_threader_for_simple_int32_t)(int, int, const void *, daal::functype_int32); + +typedef void (*_daal_threader_for_ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int64ptr); +typedef void (*_daal_threader_for_int32ptr_t)(const int64_t *, const int64_t *, const void *, daal::functype_int32ptr); + typedef void (*_daal_threader_for_blocked_t)(int64_t, int64_t, const void *, daal::functype2); +typedef void (*_daal_threader_for_blocked_int32_t)(int, int, const void *, daal::functype2_int32); + typedef void (*_daal_threader_for_optional_t)(int64_t, int64_t, const void *, daal::functype); +typedef void (*_daal_threader_for_optional_int32_t)(int, int, const void *, daal::functype_int32); + typedef void (*_daal_threader_for_break_t)(int64_t, int64_t, const void *, daal::functype_break); +typedef void (*_daal_threader_for_break_int32_t)(int, int, const void *, daal::functype_break_int32); typedef int64_t (*_daal_parallel_reduce_int32_int64_t)(int32_t, int64_t, const void *, daal::loop_functype_int32_int64, const void *, daal::reduction_functype_int64); @@ -187,6 +199,13 @@ DAAL_EXPORT void _daal_threader_for(int64_t n, int64_t grain_size, const void * _daal_threader_for_ptr(n, grain_size, a, func); } +DAAL_EXPORT void _daal_threader_for_int32(int n, int grain_size, const void * a, daal::functype func) +{ + load_daal_thr_dll(); + static _daal_threader_for_t _daal_threader_for_ptr = (_daal_threader_for_t)load_daal_thr_func("_daal_threader_for_int32"); + _daal_threader_for_ptr(n, grain_size, a, func); +} + DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t grain_size, const void * a, daal::functype func) { load_daal_thr_dll(); @@ -194,6 +213,13 @@ DAAL_EXPORT void _daal_threader_for_simple(int64_t n, int64_t grain_size, const _daal_threader_for_simple_ptr(n, grain_size, a, func); } +DAAL_EXPORT void _daal_threader_for_simple_int32(int n, int grain_size, const void * a, daal::functype func) +{ + load_daal_thr_dll(); + static _daal_threader_for_simple_t _daal_threader_for_simple_ptr = (_daal_threader_for_simple_t)load_daal_thr_func("_daal_threader_for_simple"); + _daal_threader_for_simple_ptr(n, grain_size, a, func); +} + DAAL_EXPORT void _daal_threader_for_int64ptr(const int64_t * begin, const int64_t * end, const void * a, daal::functype_int64ptr func) { load_daal_thr_dll();