diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 351f72f52365..e3a8c0d710cb 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -35,7 +35,6 @@ #include "arrow/type.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" -#include "arrow/util/unreachable.h" #include "arrow/util/vector.h" #include "arrow/visit_type_inline.h" @@ -286,204 +285,11 @@ Result> RecordBatch::ToStructArray() const { /*offset=*/0); } -template -struct ConvertColumnsToTensorVisitor { - Out*& out_values; - const ArrayData& in_data; - - template - Status Visit(const T&) { - if constexpr (is_numeric(T::type_id)) { - using In = typename T::c_type; - auto in_values = ArraySpan(in_data).GetSpan(1, in_data.length); - - if (in_data.null_count == 0) { - if constexpr (std::is_same_v) { - memcpy(out_values, in_values.data(), in_values.size_bytes()); - out_values += in_values.size(); - } else { - for (In in_value : in_values) { - *out_values++ = static_cast(in_value); - } - } - } else { - for (int64_t i = 0; i < in_data.length; ++i) { - *out_values++ = - in_data.IsNull(i) ? static_cast(NAN) : static_cast(in_values[i]); - } - } - return Status::OK(); - } - Unreachable(); - } -}; - -template -struct ConvertColumnsToTensorRowMajorVisitor { - Out*& out_values; - const ArrayData& in_data; - int num_cols; - int col_idx; - - template - Status Visit(const T&) { - if constexpr (is_numeric(T::type_id)) { - using In = typename T::c_type; - auto in_values = ArraySpan(in_data).GetSpan(1, in_data.length); - - if (in_data.null_count == 0) { - for (int64_t i = 0; i < in_data.length; ++i) { - out_values[i * num_cols + col_idx] = static_cast(in_values[i]); - } - } else { - for (int64_t i = 0; i < in_data.length; ++i) { - out_values[i * num_cols + col_idx] = - in_data.IsNull(i) ? static_cast(NAN) : static_cast(in_values[i]); - } - } - return Status::OK(); - } - Unreachable(); - } -}; - -template -inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out, - bool row_major) { - using CType = typename arrow::TypeTraits::CType; - auto* out_values = reinterpret_cast(out); - - int i = 0; - for (const auto& column : batch.columns()) { - if (row_major) { - ConvertColumnsToTensorRowMajorVisitor visitor{out_values, *column->data(), - batch.num_columns(), i++}; - DCHECK_OK(VisitTypeInline(*column->type(), &visitor)); - } else { - ConvertColumnsToTensorVisitor visitor{out_values, *column->data()}; - DCHECK_OK(VisitTypeInline(*column->type(), &visitor)); - } - } -} - Result> RecordBatch::ToTensor(bool null_to_nan, bool row_major, MemoryPool* pool) const { - if (num_columns() == 0) { - return Status::TypeError( - "Conversion to Tensor for RecordBatches without columns/schema is not " - "supported."); - } - // Check for no validity bitmap of each field - // if null_to_nan conversion is set to false - for (int i = 0; i < num_columns(); ++i) { - if (column(i)->null_count() > 0 && !null_to_nan) { - return Status::TypeError( - "Can only convert a RecordBatch with no nulls. Set null_to_nan to true to " - "convert nulls to NaN"); - } - } - - // Check for supported data types and merge fields - // to get the resulting uniform data type - if (!is_integer(column(0)->type()->id()) && !is_floating(column(0)->type()->id())) { - return Status::TypeError("DataType is not supported: ", - column(0)->type()->ToString()); - } - std::shared_ptr result_field = schema_->field(0); - std::shared_ptr result_type = result_field->type(); - - Field::MergeOptions options; - options.promote_integer_to_float = true; - options.promote_integer_sign = true; - options.promote_numeric_width = true; - - if (num_columns() > 1) { - for (int i = 1; i < num_columns(); ++i) { - if (!is_numeric(column(i)->type()->id())) { - return Status::TypeError("DataType is not supported: ", - column(i)->type()->ToString()); - } - - // Casting of float16 is not supported, throw an error in this case - if ((column(i)->type()->id() == Type::HALF_FLOAT || - result_field->type()->id() == Type::HALF_FLOAT) && - column(i)->type()->id() != result_field->type()->id()) { - return Status::NotImplemented("Casting from or to halffloat is not supported."); - } - - ARROW_ASSIGN_OR_RAISE( - result_field, result_field->MergeWith( - schema_->field(i)->WithName(result_field->name()), options)); - } - result_type = result_field->type(); - } - - // Check if result_type is signed or unsigned integer and null_to_nan is set to true - // Then all columns should be promoted to float type - if (is_integer(result_type->id()) && null_to_nan) { - ARROW_ASSIGN_OR_RAISE( - result_field, - result_field->MergeWith(field(result_field->name(), float32()), options)); - result_type = result_field->type(); - } - - // Allocate memory - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr result, - AllocateBuffer(result_type->bit_width() * num_columns() * num_rows(), pool)); - // Copy data - switch (result_type->id()) { - case Type::UINT8: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::UINT16: - case Type::HALF_FLOAT: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::UINT32: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::UINT64: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::INT8: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::INT16: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::INT32: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::INT64: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::FLOAT: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - case Type::DOUBLE: - ConvertColumnsToTensor(*this, result->mutable_data(), row_major); - break; - default: - return Status::TypeError("DataType is not supported: ", result_type->ToString()); - } - - // Construct Tensor object - const auto& fixed_width_type = - internal::checked_cast(*result_type); - std::vector shape = {num_rows(), num_columns()}; - std::vector strides; std::shared_ptr tensor; - - if (row_major) { - ARROW_RETURN_NOT_OK( - internal::ComputeRowMajorStrides(fixed_width_type, shape, &strides)); - } else { - ARROW_RETURN_NOT_OK( - internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides)); - } - ARROW_ASSIGN_OR_RAISE(tensor, - Tensor::Make(result_type, std::move(result), shape, strides)); + ARROW_RETURN_NOT_OK( + internal::RecordBatchToTensor(*this, null_to_nan, row_major, pool, &tensor)); return tensor; } diff --git a/cpp/src/arrow/tensor.cc b/cpp/src/arrow/tensor.cc index 77ccedbde15c..b47f1a1075b3 100644 --- a/cpp/src/arrow/tensor.cc +++ b/cpp/src/arrow/tensor.cc @@ -18,6 +18,7 @@ #include "arrow/tensor.h" #include +#include #include #include #include @@ -27,12 +28,14 @@ #include #include +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/int_util_overflow.h" #include "arrow/util/logging.h" +#include "arrow/util/unreachable.h" #include "arrow/visit_type_inline.h" namespace arrow { @@ -220,6 +223,209 @@ Status ValidateTensorParameters(const std::shared_ptr& type, return Status::OK(); } +template +struct ConvertColumnsToTensorVisitor { + Out*& out_values; + const ArrayData& in_data; + + template + Status Visit(const T&) { + if constexpr (is_numeric(T::type_id)) { + using In = typename T::c_type; + auto in_values = ArraySpan(in_data).GetSpan(1, in_data.length); + + if (in_data.null_count == 0) { + if constexpr (std::is_same_v) { + memcpy(out_values, in_values.data(), in_values.size_bytes()); + out_values += in_values.size(); + } else { + for (In in_value : in_values) { + *out_values++ = static_cast(in_value); + } + } + } else { + for (int64_t i = 0; i < in_data.length; ++i) { + *out_values++ = + in_data.IsNull(i) ? static_cast(NAN) : static_cast(in_values[i]); + } + } + return Status::OK(); + } + Unreachable(); + } +}; + +template +struct ConvertColumnsToTensorRowMajorVisitor { + Out*& out_values; + const ArrayData& in_data; + int num_cols; + int col_idx; + + template + Status Visit(const T&) { + if constexpr (is_numeric(T::type_id)) { + using In = typename T::c_type; + auto in_values = ArraySpan(in_data).GetSpan(1, in_data.length); + + if (in_data.null_count == 0) { + for (int64_t i = 0; i < in_data.length; ++i) { + out_values[i * num_cols + col_idx] = static_cast(in_values[i]); + } + } else { + for (int64_t i = 0; i < in_data.length; ++i) { + out_values[i * num_cols + col_idx] = + in_data.IsNull(i) ? static_cast(NAN) : static_cast(in_values[i]); + } + } + return Status::OK(); + } + Unreachable(); + } +}; + +template +inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out, + bool row_major) { + using CType = typename arrow::TypeTraits::CType; + auto* out_values = reinterpret_cast(out); + + int i = 0; + for (const auto& column : batch.columns()) { + if (row_major) { + ConvertColumnsToTensorRowMajorVisitor visitor{out_values, *column->data(), + batch.num_columns(), i++}; + DCHECK_OK(VisitTypeInline(*column->type(), &visitor)); + } else { + ConvertColumnsToTensorVisitor visitor{out_values, *column->data()}; + DCHECK_OK(VisitTypeInline(*column->type(), &visitor)); + } + } +} + +Status RecordBatchToTensor(const RecordBatch& batch, bool null_to_nan, bool row_major, + MemoryPool* pool, std::shared_ptr* tensor) { + if (batch.num_columns() == 0) { + return Status::TypeError( + "Conversion to Tensor for RecordBatches without columns/schema is not " + "supported."); + } + // Check for no validity bitmap of each field + // if null_to_nan conversion is set to false + for (int i = 0; i < batch.num_columns(); ++i) { + if (batch.column(i)->null_count() > 0 && !null_to_nan) { + return Status::TypeError( + "Can only convert a RecordBatch with no nulls. Set null_to_nan to true to " + "convert nulls to NaN"); + } + } + + // Check for supported data types and merge fields + // to get the resulting uniform data type + if (!is_integer(batch.column(0)->type()->id()) && + !is_floating(batch.column(0)->type()->id())) { + return Status::TypeError("DataType is not supported: ", + batch.column(0)->type()->ToString()); + } + std::shared_ptr result_field = batch.schema()->field(0); + std::shared_ptr result_type = result_field->type(); + + Field::MergeOptions options; + options.promote_integer_to_float = true; + options.promote_integer_sign = true; + options.promote_numeric_width = true; + + if (batch.num_columns() > 1) { + for (int i = 1; i < batch.num_columns(); ++i) { + if (!is_numeric(batch.column(i)->type()->id())) { + return Status::TypeError("DataType is not supported: ", + batch.column(i)->type()->ToString()); + } + + // Casting of float16 is not supported, throw an error in this case + if ((batch.column(i)->type()->id() == Type::HALF_FLOAT || + result_field->type()->id() == Type::HALF_FLOAT) && + batch.column(i)->type()->id() != result_field->type()->id()) { + return Status::NotImplemented("Casting from or to halffloat is not supported."); + } + + ARROW_ASSIGN_OR_RAISE( + result_field, + result_field->MergeWith( + batch.schema()->field(i)->WithName(result_field->name()), options)); + } + result_type = result_field->type(); + } + + // Check if result_type is signed or unsigned integer and null_to_nan is set to true + // Then all columns should be promoted to float type + if (is_integer(result_type->id()) && null_to_nan) { + ARROW_ASSIGN_OR_RAISE( + result_field, + result_field->MergeWith(field(result_field->name(), float32()), options)); + result_type = result_field->type(); + } + + // Allocate memory + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr result, + AllocateBuffer(result_type->bit_width() * batch.num_columns() * batch.num_rows(), + pool)); + // Copy data + switch (result_type->id()) { + case Type::UINT8: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::UINT16: + case Type::HALF_FLOAT: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::UINT32: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::UINT64: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::INT8: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::INT16: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::INT32: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::INT64: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::FLOAT: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + case Type::DOUBLE: + ConvertColumnsToTensor(batch, result->mutable_data(), row_major); + break; + default: + return Status::TypeError("DataType is not supported: ", result_type->ToString()); + } + + // Construct Tensor object + const auto& fixed_width_type = + internal::checked_cast(*result_type); + std::vector shape = {batch.num_rows(), batch.num_columns()}; + std::vector strides; + + if (row_major) { + ARROW_RETURN_NOT_OK( + internal::ComputeRowMajorStrides(fixed_width_type, shape, &strides)); + } else { + ARROW_RETURN_NOT_OK( + internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides)); + } + ARROW_ASSIGN_OR_RAISE(*tensor, + Tensor::Make(result_type, std::move(result), shape, strides)); + return Status::OK(); +} + } // namespace internal /// Constructor with strides and dimension names diff --git a/cpp/src/arrow/tensor.h b/cpp/src/arrow/tensor.h index ff6f3735f919..dd3a21fae495 100644 --- a/cpp/src/arrow/tensor.h +++ b/cpp/src/arrow/tensor.h @@ -77,6 +77,10 @@ Status ValidateTensorParameters(const std::shared_ptr& type, const std::vector& strides, const std::vector& dim_names); +ARROW_EXPORT +Status RecordBatchToTensor(const RecordBatch& batch, bool null_to_nan, bool row_major, + MemoryPool* pool, std::shared_ptr* tensor); + } // namespace internal class ARROW_EXPORT Tensor {