diff --git a/cpp/oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel_cov.cpp b/cpp/oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel_cov.cpp index b9937579ebf..d31ee745122 100644 --- a/cpp/oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel_cov.cpp +++ b/cpp/oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel_cov.cpp @@ -92,21 +92,56 @@ static train_result call_daal_kernel_finalize_train(const context_cpu& ctx const auto daal_nobs = interop::convert_to_daal_table(input.get_partial_n_rows()); daal_cov::Parameter daal_parameter; - daal_parameter.outputMatrixType = daal_cov::correlationMatrix; if (desc.get_normalization_mode() == normalization::mean_center) { + /// mean_center: compute covariance matrix; variances are on the diagonal. daal_parameter.outputMatrixType = daal_cov::covarianceMatrix; + interop::status_to_exception( + interop::call_daal_kernel_finalize_compute( + ctx, + daal_nobs.get(), + daal_crossproduct.get(), + daal_sums.get(), + daal_cor_matrix.get(), + daal_means.get(), + &daal_parameter, + &hp)); + + /// Extract variances from the covariance diagonal. + /// copyVarianceFromCovarianceTable is protected in PCACorrelationBase, + /// so we read the diagonal directly from the raw array. + const Float* cov = arr_cor_matrix.get_data(); + Float* vars = arr_vars.get_mutable_data(); + for (std::int64_t i = 0; i < column_count; ++i) { + vars[i] = cov[i * column_count + i]; + } + } + else { + /// zscore: variances are cp[i,i] / (nobs - 1) — readable directly from + /// the crossproduct input before DAAL converts it to a correlation matrix. + /// This avoids a second O(N²) finalizeCompute call. + { + const auto cp = + row_accessor(input.get_partial_crossproduct()).pull({ 0, -1 }); + const Float inv_nm1 = Float(1) / (row_count - 1); + Float* vars = arr_vars.get_mutable_data(); + for (std::int64_t i = 0; i < column_count; ++i) { + vars[i] = cp[i * column_count + i] * inv_nm1; + } + } + + daal_parameter.outputMatrixType = daal_cov::correlationMatrix; + interop::status_to_exception( + interop::call_daal_kernel_finalize_compute( + ctx, + daal_nobs.get(), + daal_crossproduct.get(), + daal_sums.get(), + daal_cor_matrix.get(), + daal_means.get(), + &daal_parameter, + &hp)); } - interop::status_to_exception( - interop::call_daal_kernel_finalize_compute( - ctx, - daal_nobs.get(), - daal_crossproduct.get(), - daal_sums.get(), - daal_cor_matrix.get(), - daal_means.get(), - &daal_parameter, - &hp)); interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) { return daal_pca_cor_kernel_t< @@ -131,13 +166,6 @@ static train_result call_daal_kernel_finalize_train(const context_cpu& ctx .computeSingularValues(*daal_eigenvalues, *daal_singular_values, row_count); })); - interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) { - return daal_pca_cor_kernel_t< - Float, - dal::backend::interop::to_daal_cpu_type::value>() - .computeVariancesFromCov(*daal_cor_matrix, *daal_variances); - })); - interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) { return daal_pca_cor_kernel_t< Float, diff --git a/cpp/oneapi/dal/algo/pca/backend/cpu/train_kernel_cov.cpp b/cpp/oneapi/dal/algo/pca/backend/cpu/train_kernel_cov.cpp index a44ce2521ef..d6bf9455729 100644 --- a/cpp/oneapi/dal/algo/pca/backend/cpu/train_kernel_cov.cpp +++ b/cpp/oneapi/dal/algo/pca/backend/cpu/train_kernel_cov.cpp @@ -25,10 +25,14 @@ #include "oneapi/dal/algo/pca/backend/common.hpp" #include "oneapi/dal/algo/pca/backend/cpu/train_kernel.hpp" #include "oneapi/dal/algo/pca/backend/cpu/train_kernel_common.hpp" +#include "oneapi/dal/algo/pca/backend/cpu/partial_train_kernel.hpp" +#include "oneapi/dal/algo/pca/backend/cpu/finalize_train_kernel.hpp" #include "oneapi/dal/backend/interop/common.hpp" #include "oneapi/dal/backend/interop/error_converter.hpp" #include "oneapi/dal/backend/interop/table_conversion.hpp" +#include "oneapi/dal/backend/primitives/ndarray.hpp" +#include "oneapi/dal/backend/primitives/utils.hpp" #include "oneapi/dal/table/row_accessor.hpp" namespace oneapi::dal::pca::backend { @@ -43,6 +47,8 @@ using descriptor_t = detail::descriptor_base; namespace daal_pca = daal::algorithms::pca; namespace daal_cov = daal::algorithms::covariance; namespace interop = dal::backend::interop; +namespace be = dal::backend; +namespace pr = be::primitives; template using daal_pca_cor_kernel_t = daal_pca::internal::PCACorrelationKernel; @@ -142,11 +148,102 @@ static result_t call_daal_kernel(const context_cpu& ctx, return result; } +template +static result_t call_daal_spmd_kernel(const context_cpu& ctx, + const descriptor_t& desc, + const detail::train_parameters& params, + const table& data) { + auto& comm = ctx.get_communicator(); + + /// Compute partial crossproduct, sums, and nobs on each rank + partial_train_input partial_input({}, data); + auto partial_result = + pca::backend::partial_train_kernel_cpu{}(ctx, + desc, + partial_input); + + /// Get local partial results as arrays for collective allreduce + const auto& nobs_local = partial_result.get_partial_n_rows(); + const auto& crossproduct_local = partial_result.get_partial_crossproduct(); + const auto& sums_local = partial_result.get_partial_sum(); + + auto nobs_nd = pr::table2ndarray(nobs_local); + auto crossproduct_nd = pr::table2ndarray(crossproduct_local); + auto sums_nd = pr::table2ndarray(sums_local); + + auto nobs_ary = dal::array::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count()); + auto crossproduct_ary = + dal::array::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count()); + auto sums_ary = dal::array::wrap(sums_nd.get_mutable_data(), sums_nd.get_count()); + + const std::int64_t column_count = crossproduct_local.get_column_count(); + + /// The DAAL covariance online kernel stores a CENTRED crossproduct: + /// cp = Σ(x - mean_local)^T (x - mean_local) + /// Simply allreducing centred crossproducts from different ranks does + /// NOT produce the globally-centred crossproduct. Convert each rank's + /// centred cp to the raw second-moment matrix before reduction: + /// raw = cp_local + S_local * S_local^T / n_local + { + auto* cp_data = crossproduct_ary.get_mutable_data(); + const auto* s_data = sums_ary.get_data(); + const Float n_local = nobs_ary.get_data()[0]; + if (n_local > Float(0)) { + const Float inv_n = Float(1) / n_local; + for (std::int64_t i = 0; i < column_count; ++i) { + for (std::int64_t j = 0; j < column_count; ++j) { + cp_data[i * column_count + j] += s_data[i] * s_data[j] * inv_n; + } + } + } + } + + /// Collectively reduce the raw second-moment matrices + comm.allreduce(nobs_ary).wait(); + comm.allreduce(crossproduct_ary).wait(); + comm.allreduce(sums_ary).wait(); + + /// Re-centre using the global sums and nobs + { + auto* cp_data = crossproduct_ary.get_mutable_data(); + const auto* s_data = sums_ary.get_data(); + const Float n_total = nobs_ary.get_data()[0]; + if (n_total > Float(0)) { + const Float inv_n = Float(1) / n_total; + for (std::int64_t i = 0; i < column_count; ++i) { + for (std::int64_t j = 0; j < column_count; ++j) { + cp_data[i * column_count + j] -= s_data[i] * s_data[j] * inv_n; + } + } + } + } + + auto nobs_table = homogen_table::wrap(nobs_ary, 1, 1); + auto crossproduct_table = homogen_table::wrap(crossproduct_ary, column_count, column_count); + auto sums_table = homogen_table::wrap(sums_ary, 1, column_count); + + /// Build aggregated partial result and finalize + partial_train_result aggregated; + aggregated.set_partial_n_rows(nobs_table); + aggregated.set_partial_crossproduct(crossproduct_table); + aggregated.set_partial_sum(sums_table); + + /// Use a local (non-SPMD) context so the finalize kernel does NOT + /// dispatch to the SPMD path — the data is already aggregated. + const context_cpu local_ctx; + return pca::backend::finalize_train_kernel_cpu{}(local_ctx, + desc, + aggregated); +} + template static result_t train(const context_cpu& ctx, const descriptor_t& desc, const detail::train_parameters& params, const input_t& input) { + if (ctx.get_communicator().get_rank_count() > 1) { + return call_daal_spmd_kernel(ctx, desc, params, input.get_data()); + } return call_daal_kernel(ctx, desc, params, input.get_data()); } diff --git a/cpp/oneapi/dal/algo/pca/detail/train_ops.cpp b/cpp/oneapi/dal/algo/pca/detail/train_ops.cpp index b683db6a743..2c4b62938cb 100644 --- a/cpp/oneapi/dal/algo/pca/detail/train_ops.cpp +++ b/cpp/oneapi/dal/algo/pca/detail/train_ops.cpp @@ -35,7 +35,7 @@ struct train_ops_dispatcher { const descriptor_base& desc, const train_input& input) const { using kernel_dispatcher_t = dal::backend::kernel_dispatcher< // - KERNEL_SINGLE_NODE_CPU(parameters::train_parameters_cpu)>; + KERNEL_UNIVERSAL_SPMD_CPU(parameters::train_parameters_cpu)>; return kernel_dispatcher_t{}(ctx, desc, input); } @@ -52,7 +52,7 @@ struct train_ops_dispatcher { const train_parameters& params, const train_input& input) const { using kernel_dispatcher_t = dal::backend::kernel_dispatcher< // - KERNEL_SINGLE_NODE_CPU(backend::train_kernel_cpu)>; + KERNEL_UNIVERSAL_SPMD_CPU(backend::train_kernel_cpu)>; return kernel_dispatcher_t{}(ctx, desc, params, input); } }; diff --git a/cpp/oneapi/dal/algo/pca/test/spmd.cpp b/cpp/oneapi/dal/algo/pca/test/spmd.cpp index f48d72e92b6..59f70947767 100644 --- a/cpp/oneapi/dal/algo/pca/test/spmd.cpp +++ b/cpp/oneapi/dal/algo/pca/test/spmd.cpp @@ -80,7 +80,6 @@ class pca_spmd_test : public pca_test> { using pca_types = COMBINE_TYPES((float, double), (pca::method::cov)); TEMPLATE_LIST_TEST_M(pca_spmd_test, "pca common flow", "[pca][integration][spmd]", pca_types) { - SKIP_IF(this->get_policy().is_cpu()); SKIP_IF(this->not_float64_friendly()); const te::dataframe data = diff --git a/samples/oneapi/cpp/ccl/sources/pca_distr_ccl.cpp b/samples/oneapi/cpp/ccl/sources/pca_distr_ccl.cpp new file mode 100644 index 00000000000..03a75d88cf4 --- /dev/null +++ b/samples/oneapi/cpp/ccl/sources/pca_distr_ccl.cpp @@ -0,0 +1,65 @@ +/******************************************************************************* +* Copyright contributors to the oneDAL project +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*******************************************************************************/ + +#include +#include + +#include "oneapi/dal/algo/pca.hpp" +#include "oneapi/dal/io/csv.hpp" +#include "oneapi/dal/spmd/ccl/communicator.hpp" + +#include "utils.hpp" + +namespace dal = oneapi::dal; +namespace pca = dal::pca; + +void run() { + const auto data_file_name = get_data_path("data/pca_normalized.csv"); + + const auto data = dal::read(dal::csv::data_source{ data_file_name }); + + auto comm = dal::preview::spmd::make_communicator(); + auto rank_id = comm.get_rank(); + auto rank_count = comm.get_rank_count(); + + auto input_vec = split_table_by_rows(data, rank_count); + + const auto pca_desc = pca::descriptor{}; + + const auto result = dal::preview::train(comm, pca_desc, input_vec[rank_id]); + + if (comm.get_rank() == 0) { + std::cout << "Eigenvectors:\n" << result.get_eigenvectors() << std::endl; + + std::cout << "Eigenvalues:\n" << result.get_eigenvalues() << std::endl; + } +} + +int main(int argc, char const* argv[]) { + ccl::init(); + int status = MPI_Init(nullptr, nullptr); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI init" }; + } + + run(); + + status = MPI_Finalize(); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI finalize" }; + } + return 0; +} diff --git a/samples/oneapi/cpp/mpi/sources/pca_distr_mpi.cpp b/samples/oneapi/cpp/mpi/sources/pca_distr_mpi.cpp new file mode 100644 index 00000000000..193a189c056 --- /dev/null +++ b/samples/oneapi/cpp/mpi/sources/pca_distr_mpi.cpp @@ -0,0 +1,64 @@ +/******************************************************************************* +* Copyright contributors to the oneDAL project +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*******************************************************************************/ + +#include +#include + +#include "oneapi/dal/algo/pca.hpp" +#include "oneapi/dal/io/csv.hpp" +#include "oneapi/dal/spmd/mpi/communicator.hpp" + +#include "utils.hpp" + +namespace dal = oneapi::dal; +namespace pca = dal::pca; + +void run() { + const auto data_file_name = get_data_path("data/pca_normalized.csv"); + + const auto data = dal::read(dal::csv::data_source{ data_file_name }); + + auto comm = dal::preview::spmd::make_communicator(); + auto rank_id = comm.get_rank(); + auto rank_count = comm.get_rank_count(); + + auto input_vec = split_table_by_rows(data, rank_count); + + const auto pca_desc = pca::descriptor{}; + + const auto result = dal::preview::train(comm, pca_desc, input_vec[rank_id]); + + if (comm.get_rank() == 0) { + std::cout << "Eigenvectors:\n" << result.get_eigenvectors() << std::endl; + + std::cout << "Eigenvalues:\n" << result.get_eigenvalues() << std::endl; + } +} + +int main(int argc, char const* argv[]) { + int status = MPI_Init(nullptr, nullptr); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI init" }; + } + + run(); + + status = MPI_Finalize(); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI finalize" }; + } + return 0; +}