Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel_common.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/partial_compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/finalize_compute_kernel.hpp"
#include "oneapi/dal/backend/interop/common.hpp"
#include "oneapi/dal/backend/interop/error_converter.hpp"
#include "oneapi/dal/backend/interop/table_conversion.hpp"

#include "oneapi/dal/backend/primitives/utils.hpp"

#include "oneapi/dal/table/row_accessor.hpp"

namespace oneapi::dal::covariance::backend {
Expand All @@ -30,13 +34,83 @@ using dal::backend::context_cpu;
using descriptor_t = detail::descriptor_base<task::compute>;
using parameters_t = detail::compute_parameters<task::compute>;

namespace be = dal::backend;
namespace pr = be::primitives;
namespace daal_covariance = daal::algorithms::covariance;
namespace interop = dal::backend::interop;

template <typename Float, daal::CpuType Cpu>
using daal_covariance_kernel_t = daal_covariance::internal::
CovarianceDenseBatchKernel<Float, daal_covariance::Method::defaultDense, Cpu>;

template <typename Float, typename Task>
static compute_result<Task> call_daal_spmd_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const table& data) {
auto& comm = ctx.get_communicator();
const std::int64_t component_count = data.get_column_count();

// Compute partial results locally on this rank's data
partial_compute_input<Task> partial_input(data);
auto partial_result =
partial_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, partial_input);

// Extract partial results as mutable arrays
auto nobs_nd = pr::table2ndarray<Float>(partial_result.get_partial_n_rows());
auto sums_nd = pr::table2ndarray<Float>(partial_result.get_partial_sum());
auto crossproduct_nd = pr::table2ndarray<Float>(partial_result.get_partial_crossproduct());

auto nobs_ary = dal::array<Float>::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count());
auto sums_ary = dal::array<Float>::wrap(sums_nd.get_mutable_data(), sums_nd.get_count());
auto crossproduct_ary =
dal::array<Float>::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count());

// The DAAL online kernel stores centered crossproducts:
// cp = X^T*X - sums*sums^T/nobs
// Simple allreduce of centered crossproducts is incorrect because each
// rank uses its local mean. Un-center before allreduce, then re-center
// with global statistics after.
if (!desc.get_assume_centered()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be simpler to use DAAL's Distributed<step2Master> algorithm for aggregation?
https://github.com/uxlfoundation/oneDAL/blob/main/samples/daal/cpp/mpi/sources/covariance_dense_distributed_mpi.cpp#L106

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting suggestion, will look into this

Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float local_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / local_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] += inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Allreduce raw crossproduct, sums, and nobs across all ranks
comm.allreduce(nobs_ary).wait();
comm.allreduce(sums_ary).wait();
comm.allreduce(crossproduct_ary).wait();

// Re-center with global statistics
if (!desc.get_assume_centered()) {
Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float global_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / global_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] -= inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Reconstruct aggregated partial result and finalize
partial_compute_result<Task> aggregated;
aggregated.set_partial_n_rows(homogen_table::wrap(nobs_ary, 1, 1));
aggregated.set_partial_sum(homogen_table::wrap(sums_ary, 1, component_count));
aggregated.set_partial_crossproduct(
homogen_table::wrap(crossproduct_ary, component_count, component_count));

return finalize_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, aggregated);
}

template <typename Float, typename Task>
static compute_result<Task> call_daal_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
Expand Down Expand Up @@ -121,6 +195,9 @@ static compute_result<Task> compute(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const compute_input<Task>& input) {
if (ctx.get_communicator().get_rank_count() > 1) {
return call_daal_spmd_kernel<Float, Task>(ctx, desc, params, input.get_data());
}
return call_daal_kernel<Float, Task>(ctx, desc, params, input.get_data());
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/oneapi/dal/algo/covariance/compute_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class compute_input : public base {
/// property value
compute_input(const table& data);

virtual ~compute_input() = default;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason of this change?
I see that it breaks the ABI, but were there any problems due to the lack of destructor definition in this and the partial_compute_input classes?


/// An $n \\times p$ table with the training data, where each row stores one
/// feature vector.
/// @remark default = table{}
Expand Down Expand Up @@ -249,6 +251,8 @@ class partial_compute_input : protected compute_input<Task> {

partial_compute_input(const partial_compute_result<Task>& prev, const table& data);

virtual ~partial_compute_input() = default;

const table& get_data() const {
return compute_input<Task>::get_data();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
compute_parameters<Task> select_parameters(const Policy& ctx,
const descriptor_base<Task>& desc,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
parameters::compute_parameters_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t{}(ctx, desc, input);
}
Expand All @@ -51,7 +51,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
const descriptor_base<Task>& desc,
const compute_parameters<Task>& params,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
backend::compute_kernel_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t()(ctx, desc, params, input);
}
Expand Down
1 change: 0 additions & 1 deletion cpp/oneapi/dal/algo/covariance/test/spmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ TEMPLATE_LIST_TEST_M(covariance_spmd_test,
"covariance common flow",
"[covariance][integration][spmd]",
covariance_types) {
SKIP_IF(this->get_policy().is_cpu());
SKIP_IF(this->not_float64_friendly());

using Float = std::tuple_element_t<0, TestType>;
Expand Down
Loading
Loading