diff --git a/cpp/oneapi/dal/algo/covariance/backend/cpu/compute_kernel_dense.cpp b/cpp/oneapi/dal/algo/covariance/backend/cpu/compute_kernel_dense.cpp index ac70439f072..99c0b3b2b45 100644 --- a/cpp/oneapi/dal/algo/covariance/backend/cpu/compute_kernel_dense.cpp +++ b/cpp/oneapi/dal/algo/covariance/backend/cpu/compute_kernel_dense.cpp @@ -18,10 +18,14 @@ #include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel.hpp" #include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel_common.hpp" +#include "oneapi/dal/algo/covariance/backend/cpu/partial_compute_kernel.hpp" +#include "oneapi/dal/algo/covariance/backend/cpu/finalize_compute_kernel.hpp" #include "oneapi/dal/backend/interop/common.hpp" #include "oneapi/dal/backend/interop/error_converter.hpp" #include "oneapi/dal/backend/interop/table_conversion.hpp" +#include "oneapi/dal/backend/primitives/utils.hpp" + #include "oneapi/dal/table/row_accessor.hpp" namespace oneapi::dal::covariance::backend { @@ -30,6 +34,8 @@ using dal::backend::context_cpu; using descriptor_t = detail::descriptor_base; using parameters_t = detail::compute_parameters; +namespace be = dal::backend; +namespace pr = be::primitives; namespace daal_covariance = daal::algorithms::covariance; namespace interop = dal::backend::interop; @@ -37,6 +43,74 @@ template using daal_covariance_kernel_t = daal_covariance::internal:: CovarianceDenseBatchKernel; +template +static compute_result call_daal_spmd_kernel(const context_cpu& ctx, + const detail::descriptor_base& desc, + const detail::compute_parameters& params, + const table& data) { + auto& comm = ctx.get_communicator(); + const std::int64_t component_count = data.get_column_count(); + + // Compute partial results locally on this rank's data + partial_compute_input partial_input(data); + auto partial_result = + partial_compute_kernel_cpu{}(ctx, desc, partial_input); + + // Extract partial results as mutable arrays + auto nobs_nd = pr::table2ndarray(partial_result.get_partial_n_rows()); + auto sums_nd = pr::table2ndarray(partial_result.get_partial_sum()); + auto crossproduct_nd = pr::table2ndarray(partial_result.get_partial_crossproduct()); + + auto nobs_ary = dal::array::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count()); + auto sums_ary = dal::array::wrap(sums_nd.get_mutable_data(), sums_nd.get_count()); + auto crossproduct_ary = + dal::array::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count()); + + // The DAAL online kernel stores centered crossproducts: + // cp = X^T*X - sums*sums^T/nobs + // Simple allreduce of centered crossproducts is incorrect because each + // rank uses its local mean. Un-center before allreduce, then re-center + // with global statistics after. + if (!desc.get_assume_centered()) { + Float* cp_ptr = crossproduct_ary.get_mutable_data(); + const Float* sums_ptr = sums_ary.get_data(); + const Float local_nobs = *nobs_ary.get_data(); + const Float inv_nobs = Float(1) / local_nobs; + for (std::int64_t i = 0; i < component_count; ++i) { + for (std::int64_t j = 0; j < component_count; ++j) { + cp_ptr[i * component_count + j] += inv_nobs * sums_ptr[i] * sums_ptr[j]; + } + } + } + + // Allreduce raw crossproduct, sums, and nobs across all ranks + comm.allreduce(nobs_ary).wait(); + comm.allreduce(sums_ary).wait(); + comm.allreduce(crossproduct_ary).wait(); + + // Re-center with global statistics + if (!desc.get_assume_centered()) { + Float* cp_ptr = crossproduct_ary.get_mutable_data(); + const Float* sums_ptr = sums_ary.get_data(); + const Float global_nobs = *nobs_ary.get_data(); + const Float inv_nobs = Float(1) / global_nobs; + for (std::int64_t i = 0; i < component_count; ++i) { + for (std::int64_t j = 0; j < component_count; ++j) { + cp_ptr[i * component_count + j] -= inv_nobs * sums_ptr[i] * sums_ptr[j]; + } + } + } + + // Reconstruct aggregated partial result and finalize + partial_compute_result aggregated; + aggregated.set_partial_n_rows(homogen_table::wrap(nobs_ary, 1, 1)); + aggregated.set_partial_sum(homogen_table::wrap(sums_ary, 1, component_count)); + aggregated.set_partial_crossproduct( + homogen_table::wrap(crossproduct_ary, component_count, component_count)); + + return finalize_compute_kernel_cpu{}(ctx, desc, aggregated); +} + template static compute_result call_daal_kernel(const context_cpu& ctx, const detail::descriptor_base& desc, @@ -121,6 +195,9 @@ static compute_result compute(const context_cpu& ctx, const detail::descriptor_base& desc, const detail::compute_parameters& params, const compute_input& input) { + if (ctx.get_communicator().get_rank_count() > 1) { + return call_daal_spmd_kernel(ctx, desc, params, input.get_data()); + } return call_daal_kernel(ctx, desc, params, input.get_data()); } diff --git a/cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp b/cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp index 429678413ae..6a05385f590 100644 --- a/cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp +++ b/cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp @@ -34,7 +34,7 @@ struct compute_ops_dispatcher { compute_parameters select_parameters(const Policy& ctx, const descriptor_base& desc, const compute_input& input) const { - using kernel_dispatcher_t = dal::backend::kernel_dispatcher)>; return kernel_dispatcher_t{}(ctx, desc, input); } @@ -51,7 +51,7 @@ struct compute_ops_dispatcher { const descriptor_base& desc, const compute_parameters& params, const compute_input& input) const { - using kernel_dispatcher_t = dal::backend::kernel_dispatcher)>; return kernel_dispatcher_t()(ctx, desc, params, input); } diff --git a/cpp/oneapi/dal/algo/covariance/test/spmd.cpp b/cpp/oneapi/dal/algo/covariance/test/spmd.cpp index 458ec9df3fd..b76b7830878 100644 --- a/cpp/oneapi/dal/algo/covariance/test/spmd.cpp +++ b/cpp/oneapi/dal/algo/covariance/test/spmd.cpp @@ -84,7 +84,6 @@ TEMPLATE_LIST_TEST_M(covariance_spmd_test, "covariance common flow", "[covariance][integration][spmd]", covariance_types) { - SKIP_IF(this->get_policy().is_cpu()); SKIP_IF(this->not_float64_friendly()); using Float = std::tuple_element_t<0, TestType>; diff --git a/samples/oneapi/cpp/ccl/sources/covariance_distr_ccl.cpp b/samples/oneapi/cpp/ccl/sources/covariance_distr_ccl.cpp new file mode 100644 index 00000000000..fabd6bdf59a --- /dev/null +++ b/samples/oneapi/cpp/ccl/sources/covariance_distr_ccl.cpp @@ -0,0 +1,65 @@ +/******************************************************************************* +* Copyright contributors to the oneDAL project +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*******************************************************************************/ + +#include +#include + +#include "oneapi/dal/algo/covariance.hpp" +#include "oneapi/dal/io/csv.hpp" +#include "oneapi/dal/spmd/ccl/communicator.hpp" + +#include "utils.hpp" + +namespace dal = oneapi::dal; + +void run() { + const auto data_file_name = get_data_path("data/covcormoments_dense.csv"); + + const auto data = dal::read(dal::csv::data_source{ data_file_name }); + + const auto cov_desc = dal::covariance::descriptor{}.set_result_options( + dal::covariance::result_options::cov_matrix | dal::covariance::result_options::means); + + auto comm = dal::preview::spmd::make_communicator(); + auto rank_id = comm.get_rank(); + auto rank_count = comm.get_rank_count(); + + auto input_vec = split_table_by_rows(data, rank_count); + + const auto result = dal::preview::compute(comm, cov_desc, input_vec[rank_id]); + + if (comm.get_rank() == 0) { + std::cout << "Sample covariance:\n" << result.get_cov_matrix() << std::endl; + + std::cout << "Means:\n" << result.get_means() << std::endl; + } +} + +int main(int argc, char const *argv[]) { + ccl::init(); + int status = MPI_Init(nullptr, nullptr); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI init" }; + } + + run(); + + status = MPI_Finalize(); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI finalize" }; + } + return 0; +} diff --git a/samples/oneapi/cpp/mpi/sources/covariance_distr_mpi.cpp b/samples/oneapi/cpp/mpi/sources/covariance_distr_mpi.cpp new file mode 100644 index 00000000000..7bd8fed02cf --- /dev/null +++ b/samples/oneapi/cpp/mpi/sources/covariance_distr_mpi.cpp @@ -0,0 +1,64 @@ +/******************************************************************************* +* Copyright contributors to the oneDAL project +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*******************************************************************************/ + +#include +#include + +#include "oneapi/dal/algo/covariance.hpp" +#include "oneapi/dal/io/csv.hpp" +#include "oneapi/dal/spmd/mpi/communicator.hpp" + +#include "utils.hpp" + +namespace dal = oneapi::dal; + +void run() { + const auto data_file_name = get_data_path("data/covcormoments_dense.csv"); + + const auto data = dal::read(dal::csv::data_source{ data_file_name }); + + const auto cov_desc = dal::covariance::descriptor{}.set_result_options( + dal::covariance::result_options::cov_matrix | dal::covariance::result_options::means); + + auto comm = dal::preview::spmd::make_communicator(); + auto rank_id = comm.get_rank(); + auto rank_count = comm.get_rank_count(); + + auto input_vec = split_table_by_rows(data, rank_count); + + const auto result = dal::preview::compute(comm, cov_desc, input_vec[rank_id]); + + if (comm.get_rank() == 0) { + std::cout << "Sample covariance:\n" << result.get_cov_matrix() << std::endl; + + std::cout << "Means:\n" << result.get_means() << std::endl; + } +} + +int main(int argc, char const *argv[]) { + int status = MPI_Init(nullptr, nullptr); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI init" }; + } + + run(); + + status = MPI_Finalize(); + if (status != MPI_SUCCESS) { + throw std::runtime_error{ "Problem occurred during MPI finalize" }; + } + return 0; +}