Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions cpp/oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel_cov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,56 @@ static train_result<Task> call_daal_kernel_finalize_train(const context_cpu& ctx
const auto daal_nobs = interop::convert_to_daal_table<Float>(input.get_partial_n_rows());

daal_cov::Parameter daal_parameter;
daal_parameter.outputMatrixType = daal_cov::correlationMatrix;

if (desc.get_normalization_mode() == normalization::mean_center) {
/// mean_center: compute covariance matrix; variances are on the diagonal.
daal_parameter.outputMatrixType = daal_cov::covarianceMatrix;
interop::status_to_exception(
interop::call_daal_kernel_finalize_compute<Float, daal_cov_kernel_t>(
ctx,
daal_nobs.get(),
daal_crossproduct.get(),
daal_sums.get(),
daal_cor_matrix.get(),
daal_means.get(),
&daal_parameter,
&hp));

/// Extract variances from the covariance diagonal.
/// copyVarianceFromCovarianceTable is protected in PCACorrelationBase,
/// so we read the diagonal directly from the raw array.
const Float* cov = arr_cor_matrix.get_data();
Float* vars = arr_vars.get_mutable_data();
for (std::int64_t i = 0; i < column_count; ++i) {
vars[i] = cov[i * column_count + i];
}
}
else {
/// zscore: variances are cp[i,i] / (nobs - 1) — readable directly from
/// the crossproduct input before DAAL converts it to a correlation matrix.
/// This avoids a second O(N²) finalizeCompute call.
{
const auto cp =
row_accessor<const Float>(input.get_partial_crossproduct()).pull({ 0, -1 });
const Float inv_nm1 = Float(1) / (row_count - 1);
Float* vars = arr_vars.get_mutable_data();
for (std::int64_t i = 0; i < column_count; ++i) {
vars[i] = cp[i * column_count + i] * inv_nm1;
}
}

daal_parameter.outputMatrixType = daal_cov::correlationMatrix;
interop::status_to_exception(
interop::call_daal_kernel_finalize_compute<Float, daal_cov_kernel_t>(
ctx,
daal_nobs.get(),
daal_crossproduct.get(),
daal_sums.get(),
daal_cor_matrix.get(),
daal_means.get(),
&daal_parameter,
&hp));
}
interop::status_to_exception(
interop::call_daal_kernel_finalize_compute<Float, daal_cov_kernel_t>(
ctx,
daal_nobs.get(),
daal_crossproduct.get(),
daal_sums.get(),
daal_cor_matrix.get(),
daal_means.get(),
&daal_parameter,
&hp));

interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) {
return daal_pca_cor_kernel_t<
Expand All @@ -131,13 +166,6 @@ static train_result<Task> call_daal_kernel_finalize_train(const context_cpu& ctx
.computeSingularValues(*daal_eigenvalues, *daal_singular_values, row_count);
}));

interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) {
return daal_pca_cor_kernel_t<
Float,
dal::backend::interop::to_daal_cpu_type<decltype(cpu)>::value>()
.computeVariancesFromCov(*daal_cor_matrix, *daal_variances);
}));

interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) {
return daal_pca_cor_kernel_t<
Float,
Expand Down
97 changes: 97 additions & 0 deletions cpp/oneapi/dal/algo/pca/backend/cpu/train_kernel_cov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
#include "oneapi/dal/algo/pca/backend/common.hpp"
#include "oneapi/dal/algo/pca/backend/cpu/train_kernel.hpp"
#include "oneapi/dal/algo/pca/backend/cpu/train_kernel_common.hpp"
#include "oneapi/dal/algo/pca/backend/cpu/partial_train_kernel.hpp"
#include "oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel.hpp"

#include "oneapi/dal/backend/interop/common.hpp"
#include "oneapi/dal/backend/interop/error_converter.hpp"
#include "oneapi/dal/backend/interop/table_conversion.hpp"
#include "oneapi/dal/backend/primitives/ndarray.hpp"
#include "oneapi/dal/backend/primitives/utils.hpp"
#include "oneapi/dal/table/row_accessor.hpp"

namespace oneapi::dal::pca::backend {
Expand All @@ -43,6 +47,8 @@ using descriptor_t = detail::descriptor_base<task_t>;
namespace daal_pca = daal::algorithms::pca;
namespace daal_cov = daal::algorithms::covariance;
namespace interop = dal::backend::interop;
namespace be = dal::backend;
namespace pr = be::primitives;

template <typename Float, daal::internal::CpuType Cpu>
using daal_pca_cor_kernel_t = daal_pca::internal::PCACorrelationKernel<daal::batch, Float, Cpu>;
Expand Down Expand Up @@ -142,11 +148,102 @@ static result_t call_daal_kernel(const context_cpu& ctx,
return result;
}

template <typename Float>
static result_t call_daal_spmd_kernel(const context_cpu& ctx,
const descriptor_t& desc,
const detail::train_parameters<task_t>& params,
const table& data) {
auto& comm = ctx.get_communicator();

/// Compute partial crossproduct, sums, and nobs on each rank
partial_train_input<task_t> partial_input({}, data);
auto partial_result =
pca::backend::partial_train_kernel_cpu<Float, method::cov, task_t>{}(ctx,
desc,
partial_input);

/// Get local partial results as arrays for collective allreduce
const auto& nobs_local = partial_result.get_partial_n_rows();
const auto& crossproduct_local = partial_result.get_partial_crossproduct();
const auto& sums_local = partial_result.get_partial_sum();

auto nobs_nd = pr::table2ndarray<Float>(nobs_local);
auto crossproduct_nd = pr::table2ndarray<Float>(crossproduct_local);
auto sums_nd = pr::table2ndarray<Float>(sums_local);

auto nobs_ary = dal::array<Float>::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count());
auto crossproduct_ary =
dal::array<Float>::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count());
auto sums_ary = dal::array<Float>::wrap(sums_nd.get_mutable_data(), sums_nd.get_count());

const std::int64_t column_count = crossproduct_local.get_column_count();

/// The DAAL covariance online kernel stores a CENTRED crossproduct:
/// cp = Σ(x - mean_local)^T (x - mean_local)
/// Simply allreducing centred crossproducts from different ranks does
/// NOT produce the globally-centred crossproduct. Convert each rank's
/// centred cp to the raw second-moment matrix before reduction:
/// raw = cp_local + S_local * S_local^T / n_local
{
auto* cp_data = crossproduct_ary.get_mutable_data();
const auto* s_data = sums_ary.get_data();
const Float n_local = nobs_ary.get_data()[0];
if (n_local > Float(0)) {
const Float inv_n = Float(1) / n_local;
for (std::int64_t i = 0; i < column_count; ++i) {
for (std::int64_t j = 0; j < column_count; ++j) {
cp_data[i * column_count + j] += s_data[i] * s_data[j] * inv_n;
}
}
}
}

/// Collectively reduce the raw second-moment matrices
comm.allreduce(nobs_ary).wait();
comm.allreduce(crossproduct_ary).wait();
comm.allreduce(sums_ary).wait();

/// Re-centre using the global sums and nobs
{
auto* cp_data = crossproduct_ary.get_mutable_data();
const auto* s_data = sums_ary.get_data();
const Float n_total = nobs_ary.get_data()[0];
if (n_total > Float(0)) {
const Float inv_n = Float(1) / n_total;
for (std::int64_t i = 0; i < column_count; ++i) {
for (std::int64_t j = 0; j < column_count; ++j) {
cp_data[i * column_count + j] -= s_data[i] * s_data[j] * inv_n;
}
}
}
}

auto nobs_table = homogen_table::wrap(nobs_ary, 1, 1);
auto crossproduct_table = homogen_table::wrap(crossproduct_ary, column_count, column_count);
auto sums_table = homogen_table::wrap(sums_ary, 1, column_count);

/// Build aggregated partial result and finalize
partial_train_result<task_t> aggregated;
aggregated.set_partial_n_rows(nobs_table);
aggregated.set_partial_crossproduct(crossproduct_table);
aggregated.set_partial_sum(sums_table);

/// Use a local (non-SPMD) context so the finalize kernel does NOT
/// dispatch to the SPMD path — the data is already aggregated.
const context_cpu local_ctx;
return pca::backend::finalize_train_kernel_cpu<Float, method::cov, task_t>{}(local_ctx,
desc,
aggregated);
}

template <typename Float>
static result_t train(const context_cpu& ctx,
const descriptor_t& desc,
const detail::train_parameters<task_t>& params,
const input_t& input) {
if (ctx.get_communicator().get_rank_count() > 1) {
return call_daal_spmd_kernel<Float>(ctx, desc, params, input.get_data());
}
return call_daal_kernel<Float>(ctx, desc, params, input.get_data());
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/oneapi/dal/algo/pca/detail/train_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct train_ops_dispatcher<Policy, Float, Method, Task> {
const descriptor_base<Task>& desc,
const train_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher< //
KERNEL_SINGLE_NODE_CPU(parameters::train_parameters_cpu<Float, Method, Task>)>;
KERNEL_UNIVERSAL_SPMD_CPU(parameters::train_parameters_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t{}(ctx, desc, input);
}

Expand All @@ -52,7 +52,7 @@ struct train_ops_dispatcher<Policy, Float, Method, Task> {
const train_parameters<Task>& params,
const train_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher< //
KERNEL_SINGLE_NODE_CPU(backend::train_kernel_cpu<Float, Method, Task>)>;
KERNEL_UNIVERSAL_SPMD_CPU(backend::train_kernel_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t{}(ctx, desc, params, input);
}
};
Expand Down
1 change: 0 additions & 1 deletion cpp/oneapi/dal/algo/pca/test/spmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class pca_spmd_test : public pca_test<TestType, pca_spmd_test<TestType>> {
using pca_types = COMBINE_TYPES((float, double), (pca::method::cov));

TEMPLATE_LIST_TEST_M(pca_spmd_test, "pca common flow", "[pca][integration][spmd]", pca_types) {
SKIP_IF(this->get_policy().is_cpu());
SKIP_IF(this->not_float64_friendly());

const te::dataframe data =
Expand Down
65 changes: 65 additions & 0 deletions samples/oneapi/cpp/ccl/sources/pca_distr_ccl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*******************************************************************************
* Copyright contributors to the oneDAL project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#include <iomanip>
#include <iostream>

#include "oneapi/dal/algo/pca.hpp"
#include "oneapi/dal/io/csv.hpp"
#include "oneapi/dal/spmd/ccl/communicator.hpp"

#include "utils.hpp"

namespace dal = oneapi::dal;
namespace pca = dal::pca;

void run() {
const auto data_file_name = get_data_path("data/pca_normalized.csv");

const auto data = dal::read<dal::table>(dal::csv::data_source{ data_file_name });

auto comm = dal::preview::spmd::make_communicator<dal::preview::spmd::backend::ccl>();
auto rank_id = comm.get_rank();
auto rank_count = comm.get_rank_count();

auto input_vec = split_table_by_rows<float>(data, rank_count);

const auto pca_desc = pca::descriptor<float>{};

const auto result = dal::preview::train(comm, pca_desc, input_vec[rank_id]);

if (comm.get_rank() == 0) {
std::cout << "Eigenvectors:\n" << result.get_eigenvectors() << std::endl;

std::cout << "Eigenvalues:\n" << result.get_eigenvalues() << std::endl;
}
}

int main(int argc, char const* argv[]) {
ccl::init();
int status = MPI_Init(nullptr, nullptr);
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI init" };
}

run();

status = MPI_Finalize();
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI finalize" };
}
return 0;
}
64 changes: 64 additions & 0 deletions samples/oneapi/cpp/mpi/sources/pca_distr_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*******************************************************************************
* Copyright contributors to the oneDAL project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#include <iomanip>
#include <iostream>

#include "oneapi/dal/algo/pca.hpp"
#include "oneapi/dal/io/csv.hpp"
#include "oneapi/dal/spmd/mpi/communicator.hpp"

#include "utils.hpp"

namespace dal = oneapi::dal;
namespace pca = dal::pca;

void run() {
const auto data_file_name = get_data_path("data/pca_normalized.csv");

const auto data = dal::read<dal::table>(dal::csv::data_source{ data_file_name });

auto comm = dal::preview::spmd::make_communicator<dal::preview::spmd::backend::mpi>();
auto rank_id = comm.get_rank();
auto rank_count = comm.get_rank_count();

auto input_vec = split_table_by_rows<float>(data, rank_count);

const auto pca_desc = pca::descriptor<float>{};

const auto result = dal::preview::train(comm, pca_desc, input_vec[rank_id]);

if (comm.get_rank() == 0) {
std::cout << "Eigenvectors:\n" << result.get_eigenvectors() << std::endl;

std::cout << "Eigenvalues:\n" << result.get_eigenvalues() << std::endl;
}
}

int main(int argc, char const* argv[]) {
int status = MPI_Init(nullptr, nullptr);
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI init" };
}

run();

status = MPI_Finalize();
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI finalize" };
}
return 0;
}
Loading