Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/compute_kernel_common.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/partial_compute_kernel.hpp"
#include "oneapi/dal/algo/covariance/backend/cpu/finalize_compute_kernel.hpp"
#include "oneapi/dal/backend/interop/common.hpp"
#include "oneapi/dal/backend/interop/error_converter.hpp"
#include "oneapi/dal/backend/interop/table_conversion.hpp"

#include "oneapi/dal/backend/primitives/utils.hpp"

#include "oneapi/dal/table/row_accessor.hpp"

namespace oneapi::dal::covariance::backend {
Expand All @@ -30,13 +34,83 @@ using dal::backend::context_cpu;
using descriptor_t = detail::descriptor_base<task::compute>;
using parameters_t = detail::compute_parameters<task::compute>;

namespace be = dal::backend;
namespace pr = be::primitives;
namespace daal_covariance = daal::algorithms::covariance;
namespace interop = dal::backend::interop;

template <typename Float, daal::internal::CpuType Cpu>
using daal_covariance_kernel_t = daal_covariance::internal::
CovarianceDenseBatchKernel<Float, daal_covariance::Method::defaultDense, Cpu>;

template <typename Float, typename Task>
static compute_result<Task> call_daal_spmd_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const table& data) {
auto& comm = ctx.get_communicator();
const std::int64_t component_count = data.get_column_count();

// Compute partial results locally on this rank's data
partial_compute_input<Task> partial_input(data);
auto partial_result =
partial_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, partial_input);

// Extract partial results as mutable arrays
auto nobs_nd = pr::table2ndarray<Float>(partial_result.get_partial_n_rows());
auto sums_nd = pr::table2ndarray<Float>(partial_result.get_partial_sum());
auto crossproduct_nd = pr::table2ndarray<Float>(partial_result.get_partial_crossproduct());

auto nobs_ary = dal::array<Float>::wrap(nobs_nd.get_mutable_data(), nobs_nd.get_count());
auto sums_ary = dal::array<Float>::wrap(sums_nd.get_mutable_data(), sums_nd.get_count());
auto crossproduct_ary =
dal::array<Float>::wrap(crossproduct_nd.get_mutable_data(), crossproduct_nd.get_count());

// The DAAL online kernel stores centered crossproducts:
// cp = X^T*X - sums*sums^T/nobs
// Simple allreduce of centered crossproducts is incorrect because each
// rank uses its local mean. Un-center before allreduce, then re-center
// with global statistics after.
if (!desc.get_assume_centered()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be simpler to use DAAL's Distributed<step2Master> algorithm for aggregation?
https://github.com/uxlfoundation/oneDAL/blob/main/samples/daal/cpp/mpi/sources/covariance_dense_distributed_mpi.cpp#L106

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting suggestion, will look into this

Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float local_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / local_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] += inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Allreduce raw crossproduct, sums, and nobs across all ranks
comm.allreduce(nobs_ary).wait();
comm.allreduce(sums_ary).wait();
comm.allreduce(crossproduct_ary).wait();

// Re-center with global statistics
if (!desc.get_assume_centered()) {
Float* cp_ptr = crossproduct_ary.get_mutable_data();
const Float* sums_ptr = sums_ary.get_data();
const Float global_nobs = *nobs_ary.get_data();
const Float inv_nobs = Float(1) / global_nobs;
for (std::int64_t i = 0; i < component_count; ++i) {
for (std::int64_t j = 0; j < component_count; ++j) {
cp_ptr[i * component_count + j] -= inv_nobs * sums_ptr[i] * sums_ptr[j];
}
}
}

// Reconstruct aggregated partial result and finalize
partial_compute_result<Task> aggregated;
aggregated.set_partial_n_rows(homogen_table::wrap(nobs_ary, 1, 1));
aggregated.set_partial_sum(homogen_table::wrap(sums_ary, 1, component_count));
aggregated.set_partial_crossproduct(
homogen_table::wrap(crossproduct_ary, component_count, component_count));

return finalize_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, aggregated);
}

template <typename Float, typename Task>
static compute_result<Task> call_daal_kernel(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
Expand Down Expand Up @@ -121,6 +195,9 @@ static compute_result<Task> compute(const context_cpu& ctx,
const detail::descriptor_base<Task>& desc,
const detail::compute_parameters<Task>& params,
const compute_input<Task>& input) {
if (ctx.get_communicator().get_rank_count() > 1) {
return call_daal_spmd_kernel<Float, Task>(ctx, desc, params, input.get_data());
}
return call_daal_kernel<Float, Task>(ctx, desc, params, input.get_data());
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
compute_parameters<Task> select_parameters(const Policy& ctx,
const descriptor_base<Task>& desc,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
parameters::compute_parameters_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t{}(ctx, desc, input);
}
Expand All @@ -51,7 +51,7 @@ struct compute_ops_dispatcher<Policy, Float, Method, Task> {
const descriptor_base<Task>& desc,
const compute_parameters<Task>& params,
const compute_input<Task>& input) const {
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU(
using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU(
backend::compute_kernel_cpu<Float, Method, Task>)>;
return kernel_dispatcher_t()(ctx, desc, params, input);
}
Expand Down
1 change: 0 additions & 1 deletion cpp/oneapi/dal/algo/covariance/test/spmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ TEMPLATE_LIST_TEST_M(covariance_spmd_test,
"covariance common flow",
"[covariance][integration][spmd]",
covariance_types) {
SKIP_IF(this->get_policy().is_cpu());
SKIP_IF(this->not_float64_friendly());

using Float = std::tuple_element_t<0, TestType>;
Expand Down
65 changes: 65 additions & 0 deletions samples/oneapi/cpp/ccl/sources/covariance_distr_ccl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*******************************************************************************
* Copyright contributors to the oneDAL project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#include <iomanip>
#include <iostream>

#include "oneapi/dal/algo/covariance.hpp"
#include "oneapi/dal/io/csv.hpp"
#include "oneapi/dal/spmd/ccl/communicator.hpp"

#include "utils.hpp"

namespace dal = oneapi::dal;

void run() {
const auto data_file_name = get_data_path("data/covcormoments_dense.csv");

const auto data = dal::read<dal::table>(dal::csv::data_source{ data_file_name });

const auto cov_desc = dal::covariance::descriptor<float>{}.set_result_options(
dal::covariance::result_options::cov_matrix | dal::covariance::result_options::means);

auto comm = dal::preview::spmd::make_communicator<dal::preview::spmd::backend::ccl>();
auto rank_id = comm.get_rank();
auto rank_count = comm.get_rank_count();

auto input_vec = split_table_by_rows<float>(data, rank_count);

const auto result = dal::preview::compute(comm, cov_desc, input_vec[rank_id]);

if (comm.get_rank() == 0) {
std::cout << "Sample covariance:\n" << result.get_cov_matrix() << std::endl;

std::cout << "Means:\n" << result.get_means() << std::endl;
}
}

int main(int argc, char const *argv[]) {
ccl::init();
int status = MPI_Init(nullptr, nullptr);
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI init" };
}

run();

status = MPI_Finalize();
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI finalize" };
}
return 0;
}
64 changes: 64 additions & 0 deletions samples/oneapi/cpp/mpi/sources/covariance_distr_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*******************************************************************************
* Copyright contributors to the oneDAL project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#include <iomanip>
#include <iostream>

#include "oneapi/dal/algo/covariance.hpp"
#include "oneapi/dal/io/csv.hpp"
#include "oneapi/dal/spmd/mpi/communicator.hpp"

#include "utils.hpp"

namespace dal = oneapi::dal;

void run() {
const auto data_file_name = get_data_path("data/covcormoments_dense.csv");

const auto data = dal::read<dal::table>(dal::csv::data_source{ data_file_name });

const auto cov_desc = dal::covariance::descriptor<float>{}.set_result_options(
dal::covariance::result_options::cov_matrix | dal::covariance::result_options::means);

auto comm = dal::preview::spmd::make_communicator<dal::preview::spmd::backend::mpi>();
auto rank_id = comm.get_rank();
auto rank_count = comm.get_rank_count();

auto input_vec = split_table_by_rows<float>(data, rank_count);

const auto result = dal::preview::compute(comm, cov_desc, input_vec[rank_id]);

if (comm.get_rank() == 0) {
std::cout << "Sample covariance:\n" << result.get_cov_matrix() << std::endl;

std::cout << "Means:\n" << result.get_means() << std::endl;
}
}

int main(int argc, char const *argv[]) {
int status = MPI_Init(nullptr, nullptr);
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI init" };
}

run();

status = MPI_Finalize();
if (status != MPI_SUCCESS) {
throw std::runtime_error{ "Problem occurred during MPI finalize" };
}
return 0;
}
Loading