Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel_common.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/partial_compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/finalize_compute_kernel.hpp"
#include "oneapi/dal/backend/interop/common.hpp"
#include "oneapi/dal/backend/interop/error_converter.hpp"
#include "oneapi/dal/backend/interop/table_conversion.hpp"

#include "oneapi/dal/backend/primitives/utils.hpp"

#include "oneapi/dal/table/row_accessor.hpp"

namespace oneapi::dal::covariance::backend {
Expand All @@ -30,13 +34,83 @@ using dal::backend::context_cpu;
using descriptor_t = detail::descriptor_base<task::compute>;
using parameters_t = detail::compute_parameters<task::compute>;

namespace be = dal::backend;
namespace pr = be::primitives;
namespace daal_covariance = daal::algorithms::covariance;
namespace interop = dal::backend::interop;

template <typename Float, daal::CpuType Cpu>
using daal_covariance_kernel_t = daal_covariance::internal::
CovarianceDenseBatchKernel<Float, daal_covariance::Method::defaultDense, Cpu>;

template <typename Float, typename Task>
static compute_result<Task> call_daal_spmd_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const table& data) {
auto& comm = ctx.get_communicator();
const std::int64_t component_count = data.get_column_count();

// Compute partial results locally on this rank's data
partial_compute_input<Task> partial_input(data);
auto partial_result =
partial_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, partial_input);

// Extract partial results as mutable arrays
auto nobs_nd = pr::table2ndarray<Float>(partial_result.get_partial_n_rows());
auto sums_nd = pr::table2ndarray<Float>(partial_result.get_partial_sum());
auto crossproduct_nd = pr::table2ndarray<Float>(partial_result.get_partial_crossproduct());

auto nobs_ary = dal::array<Float>::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count());
auto sums_ary = dal::array<Float>::wrap(sums_nd.get_mutable_data(), sums_nd.get_count());
auto crossproduct_ary =
dal::array<Float>::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count());

// The DAAL online kernel stores centered crossproducts:
// cp = X^T*X - sums*sums^T/nobs
// Simple allreduce of centered crossproducts is incorrect because each
// rank uses its local mean. Un-center before allreduce, then re-center
// with global statistics after.
if (!desc.get_assume_centered()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be simpler to use DAAL's Distributed<step2Master> algorithm for aggregation?
https://github.com/uxlfoundation/oneDAL/blob/main/samples/daal/cpp/mpi/sources/covariance_dense_distributed_mpi.cpp#L106

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting suggestion, will look into this

Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float local_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / local_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] += inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Allreduce raw crossproduct, sums, and nobs across all ranks
comm.allreduce(nobs_ary).wait();
comm.allreduce(sums_ary).wait();
comm.allreduce(crossproduct_ary).wait();

// Re-center with global statistics
if (!desc.get_assume_centered()) {
Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float global_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / global_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] -= inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Reconstruct aggregated partial result and finalize
partial_compute_result<Task> aggregated;
aggregated.set_partial_n_rows(homogen_table::wrap(nobs_ary, 1, 1));
aggregated.set_partial_sum(homogen_table::wrap(sums_ary, 1, component_count));
aggregated.set_partial_crossproduct(
homogen_table::wrap(crossproduct_ary, component_count, component_count));

return finalize_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, aggregated);
}

template <typename Float, typename Task>
static compute_result<Task> call_daal_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
Expand Down Expand Up @@ -121,6 +195,9 @@ static compute_result<Task> compute(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const compute_input<Task>& input) {
if (ctx.get_communicator().get_rank_count() > 1) {
return call_daal_spmd_kernel<Float, Task>(ctx, desc, params, input.get_data());
}
return call_daal_kernel<Float, Task>(ctx, desc, params, input.get_data());
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
compute_parameters<Task> select_parameters(const Policy& ctx,
const descriptor_base<Task>& desc,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
parameters::compute_parameters_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t{}(ctx, desc, input);
}
Expand All @@ -51,7 +51,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
const descriptor_base<Task>& desc,
const compute_parameters<Task>& params,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
backend::compute_kernel_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t()(ctx, desc, params, input);
}
Expand Down
1 change: 0 additions & 1 deletion cpp/oneapi/dal/algo/covariance/test/spmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ TEMPLATE_LIST_TEST_M(covariance_spmd_test,
"covariance common flow",
"[covariance][integration][spmd]",
covariance_types) {
SKIP_IF(this->get_policy().is_cpu());
SKIP_IF(this->not_float64_friendly());

using Float = std::tuple_element_t<0, TestType>;
Expand Down
Loading