diff --git a/c10/cuda/CUDAStream.cpp b/c10/cuda/CUDAStream.cpp index 6d2b1e06fda9b..4c39cc02e7120 100644 --- a/c10/cuda/CUDAStream.cpp +++ b/c10/cuda/CUDAStream.cpp @@ -10,6 +10,43 @@ #include #include +#include +#include +#include +#include +#include + + +#include "execinfo.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + + +static std::string CcaGetEnv(const char* name, const char* default_value) { + auto rtn = std::getenv(name); + if (rtn) { + return rtn; + } + return default_value; +} + +static int dev_idx_to_print = -1; + namespace c10::cuda { namespace { @@ -58,6 +95,9 @@ static c10::once_flag [C10_COMPILE_TIME_MAX_GPUS][kStreamsPerPool]; #endif +static std::array default_streams; +static c10::once_flag default_stream_flags[C10_COMPILE_TIME_MAX_GPUS]; + // Note [HIP Lazy Streams] // ~~~~~~~~~~~~~~~~~~~~~~~ // For ROCm/HIP, each stream is lazily initialized rather than creating all @@ -197,14 +237,201 @@ static void initGlobalStreamState() { : range; } -// Init a single CUDA or HIP stream + + +static std::vector createCustomMask(const std::vector& enabledCUs, int totalCUs) { + std::vector cuMask((totalCUs + 31) / 32, 0); + for (int cu : enabledCUs) { + if (cu >= 0 && cu < totalCUs) { + int wordIndex = cu / 32; + int bitIndex = cu % 32; + cuMask[wordIndex] |= (1UL << bitIndex); + } + } + return cuMask; +} + +// Helper function to parse a single item (either number or start:count like "0:7") +static std::vector parseCustomItem(const std::string& item) { + std::vector cus; + // Check if it's a start:count format (contains ':') + size_t colonPos = item.find(':'); + if (colonPos != std::string::npos && colonPos > 0 && colonPos < item.length() - 1) { + // Parse start:count format: "start:count" + int start = std::stoi(item.substr(0, colonPos)); + int count = std::stoi(item.substr(colonPos + 1)); + + // Ensure count > 0 + if (count > 0) { + for (int i = 0; i < count; i++) { + cus.push_back(start + i); + } + } else { + std::cerr << "Invalid count in custom CU mask: " << item + << " (count must be > 0)" << std::endl; + } + } else { + // Single number + cus.push_back(std::stoi(item)); + } + return cus; +} + +int32_t getTotalCUs() { + hipDeviceProp_t prop; + C10_CUDA_CHECK(cudaGetDeviceProperties(&prop, c10::cuda::current_device())); + std::string arch_name = prop.gcnArchName; + return (arch_name.find("gfx942") != std::string::npos) ? 304 : 256; +} + +std::vector GetCuMask(int32_t &enable_cu_num, std::string mask_str, bool lower_bits_zero) { + const int32_t totalCUs = getTotalCUs(); + + if (mask_str.substr(0, 7) == "custom=") { + // Parse custom CU list: custom=0:7,32:8 or custom:1,2,5 + std::string cuList = mask_str.substr(7); + std::vector enabledCUs; + std::stringstream ss(cuList); + std::string item; + + enable_cu_num = 0; + + while (std::getline(ss, item, ',')) { + // Trim whitespace + item.erase(0, item.find_first_not_of(" \t")); + item.erase(item.find_last_not_of(" \t") + 1); + + // Parse this item (could be single number or start:count) + std::vector itemCUs = parseCustomItem(item); + enabledCUs.insert(enabledCUs.end(), itemCUs.begin(), itemCUs.end()); + enable_cu_num += itemCUs.size(); + } + return createCustomMask(enabledCUs, totalCUs); + } + + + if (CcaGetEnv("DBGENV_REVERSE_MASK", "0") == "1") { + lower_bits_zero = !lower_bits_zero; + } + + enable_cu_num = std::stoi(mask_str); + constexpr int32_t single_mask_bits = 32; + + constexpr int32_t max_cu_num = 512; + assert(totalCUs <= max_cu_num); + + std::vector mask; + std::bitset bits_mask(0); + int32_t start_idx = lower_bits_zero ? (totalCUs - enable_cu_num) : 0; + int32_t end_idx = lower_bits_zero ? totalCUs : enable_cu_num; + + auto get_mask_bit_index = [=](int index) -> int { + constexpr int se_num = 32; + constexpr int mapping_to_se[se_num] = + {1, 5, 9, 13, 17, 21, 25, 29, 2, 6, 10, 14, 18, 22, 26, 30, 0, 4, 8, 12, 16, 20, 24, 28, 3, 7, 11, 15, 19, 23, 27, 31}; + // {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22, 25, 26, 29, 30, 0, 3, 4, 7, 8, 11, 12, 15, 16, 19, 20, 23, 24, 27, 28, 31}; + int se = mapping_to_se[index % se_num]; + int rtn = (se % 4) * 8 + (se / 4) + (index / se_num) * se_num; + if (index >= 288) { + rtn = index; + } + return rtn; + }; + + for (int32_t i = start_idx; i < end_idx; i++) { + bits_mask.set(get_mask_bit_index(i)); + } + + for (int i = 0; i < (totalCUs + single_mask_bits - 1) / single_mask_bits; i++) { + auto tmp_mask = bits_mask; + for (int b = single_mask_bits; b < max_cu_num; ++b) { + tmp_mask.reset(b); + } + mask.push_back(static_cast(tmp_mask.to_ulong())); + bits_mask = bits_mask >> single_mask_bits; + } + return mask; +} + +/* + + + */ + +static void initSingleDefaultStream(DeviceIndex device_index); + +static void create_masked_stream(cudaStream_t *stream, const char*cu_num_env, const char*env_default,int device_index, bool lower_bits_zero, int pri, int i = 0) { + const char* create_msg = ""; + std::string env_str = CcaGetEnv(cu_num_env, env_default); + const int32_t totalCUs = getTotalCUs(); + if (env_str == "-1") { + *stream = nullptr; + create_msg = "use_nullptr"; + } else if (env_str == "0") { + C10_CUDA_CHECK(cudaStreamCreateWithPriority(stream, kDefaultFlags, pri)); + create_msg = "priority_stream"; + } else if (env_str == "999") { + TORCH_CHECK(std::string("DBGENV_DEFAULT_COMP_STREAM_CU") != cu_num_env) + c10::call_once(default_stream_flags[device_index], initSingleDefaultStream, device_index); + *stream = default_streams[device_index]; + create_msg = "same_as_default_stream"; + } else { + create_msg = "masked_stream"; + int32_t enable_cu_num = 0; + std::vector mask = GetCuMask(enable_cu_num, env_str, lower_bits_zero); + TORCH_CHECK(enable_cu_num <= totalCUs); + TORCH_CHECK(enable_cu_num > 0); + + if (device_index == 0) { + { + std::ostringstream oss; + oss << "cca_log create_masked_stream"; + for (int m = mask.size() - 1; m >= 0; --m) { + oss << std::hex << " [" << m << "]=" << mask[m]; + } + oss << std::dec << " dev " << device_index + << " env " << cu_num_env << " cu_num " << enable_cu_num << " i " << i; + std::fprintf(stderr, "%s\n", oss.str().c_str()); + } + + { + std::ostringstream oss; + oss << "0x"; + for (int m = mask.size() - 1; m >= 0; --m) { + oss << std::hex << std::setfill('0') << std::setw(8) << mask[m]; + } + std::fprintf(stderr, "%s\n", oss.str().c_str()); + } + } + + C10_CUDA_CHECK(hipExtStreamCreateWithCUMask(stream, mask.size(), &mask[0])); + } + if (device_index == 0) { + std::fprintf(stderr, "cca_log create_stream %s %p %s=%s i %d totalCUs %d GetTraceID %d\n", create_msg, (void*)*stream, cu_num_env, std::getenv(cu_num_env), i, totalCUs, GetTraceID(true)); + } +} +// Init a single HIP or HIP stream // See Note [HIP Lazy Streams] static void initSingleStream(int p, DeviceIndex device_index, int i) { CUDAGuard device_guard(device_index); auto& stream = streams[p][device_index][i]; auto pri = -p; // lower number is higher priority - C10_CUDA_CHECK(cudaStreamCreateWithPriority(&stream, kDefaultFlags, pri)); + dev_idx_to_print = std::stoi(CcaGetEnv("DBGENV_DEVIDX_PRINT", "-1")); + + const char *env_name = "DBGENV_DEFAULT_RCCL_STREAM_CU"; + bool lower_bits_zero = true; + + if (i == 1) { + env_name = "DBGENV_2ND_COMP_STREAM_CU"; + lower_bits_zero = false; + } else if (i == 2) { + env_name = "DBGENV_2ND_RCCL_STREAM_CU"; + lower_bits_zero = true; + } + + create_masked_stream(&stream, env_name, "0", device_index, lower_bits_zero, pri, i); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); if (C10_UNLIKELY(interp)) { (*interp)->trace_gpu_stream_creation( @@ -213,6 +440,16 @@ static void initSingleStream(int p, DeviceIndex device_index, int i) { } } +static void initSingleDefaultStream(DeviceIndex device_index) { + CUDAGuard device_guard(device_index); + auto& stream = default_streams[device_index]; + auto pri = 0; // lower number is higher priority + + dev_idx_to_print = std::stoi(CcaGetEnv("DBGENV_DEVIDX_PRINT", "-1")); + + create_masked_stream(&stream, "DBGENV_DEFAULT_COMP_STREAM_CU", "-1", device_index, false, -1); +} + // Creates the low and high priority stream pools for the specified device // Warning: only call once per device! static void initDeviceStreamState(DeviceIndex device_index) { @@ -238,6 +475,9 @@ static void initCUDAStreamsOnce() { for (const auto i : c10::irange(num_gpus)) { current_streams[i] = makeStreamId(StreamIdType::DEFAULT, 0); } + + // TORCH_WARN("cca_log initCUDAStreamsOnce num_gpus ", (int)num_gpus, " GetTraceID ", GetTraceID()) + } // Helper to verify the GPU index is valid @@ -285,7 +525,9 @@ cudaStream_t CUDAStream::stream() const { ").", " Did you manufacture the StreamId yourself? Don't do that; use the", " official API like c10::cuda::getStreamFromPool() to get a new stream."); - return nullptr; + // See Note [HIP Lazy Streams] + c10::call_once(default_stream_flags[device_index], initSingleDefaultStream, device_index); + return default_streams[device_index]; } else if (st.isExt()) { // NOLINTNEXTLINE(performance-no-int-to-ptr) return reinterpret_cast(stream_id); @@ -355,7 +597,17 @@ CUDAStream getDefaultCUDAStream(DeviceIndex device_index) { c10::cuda::SetTargetDevice(); } check_gpu(device_index); - return CUDAStreamForId(device_index, makeStreamId(StreamIdType::DEFAULT, 0)); + auto rtn = CUDAStreamForId(device_index, makeStreamId(StreamIdType::DEFAULT, 0)); + // TORCH_WARN("cca_log getDefaultCUDAStream device_index ", (int)device_index, " stream ", rtn.stream()) + + if (device_index == dev_idx_to_print) { + std::fprintf(stderr, "cca_log getDefaultCUDAStream device_index %d stream %p GetTraceID %d\n", + (int)device_index, + (void*)rtn.stream(), + GetTraceID()); + } + + return rtn; } CUDAStream getCurrentCUDAStream(DeviceIndex device_index) { @@ -365,11 +617,34 @@ CUDAStream getCurrentCUDAStream(DeviceIndex device_index) { c10::cuda::SetTargetDevice(); } check_gpu(device_index); - return CUDAStreamForId(device_index, current_streams[device_index]); + auto rtn = CUDAStreamForId(device_index, current_streams[device_index]); + if (device_index == dev_idx_to_print) { + // TORCH_WARN("cca_log getCurrentCUDAStream device_index ", (int)device_index, " stream ", rtn.stream(), " GetTraceID ", GetTraceID()) + std::fprintf(stderr, "cca_log getCurrentCUDAStream device_index %d stream %p tid %zu GetTraceID %d\n", + (int)device_index, + (void*)rtn.stream(), + std::hash{}(std::this_thread::get_id()), + GetTraceID()); + } + return rtn; } void setCurrentCUDAStream(CUDAStream stream) { initCUDAStreamsOnce(); + if (stream.device_index() == dev_idx_to_print) { + // TORCH_WARN("cca_log setCurrentCUDAStream device_index ", (int)stream.device_index(), + // " from ", CUDAStreamForId(stream.device_index(), current_streams[stream.device_index()]).stream(), + // " to ", stream.stream(), " GetTraceID ", GetTraceID()); + + std::fprintf(stderr, "cca_log setCurrentCUDAStream device_index %d from %p to %p tid %zu GetTraceID %d\n", + (int)stream.device_index(), + (void*)CUDAStreamForId(stream.device_index(), current_streams[stream.device_index()]).stream(), + (void*)stream.stream(), + std::hash{}(std::this_thread::get_id()), + GetTraceID() + ); + } + current_streams[stream.device_index()] = stream.id(); } diff --git a/caffe2/CMakeLists.txt b/caffe2/CMakeLists.txt index 6ab41b6c84793..e07c377fc5e6b 100644 --- a/caffe2/CMakeLists.txt +++ b/caffe2/CMakeLists.txt @@ -1548,6 +1548,11 @@ target_link_libraries(torch_cpu PUBLIC c10) target_link_libraries(torch_cpu PUBLIC ${Caffe2_PUBLIC_DEPENDENCY_LIBS}) target_link_libraries(torch_cpu PRIVATE ${Caffe2_DEPENDENCY_LIBS}) target_link_libraries(torch_cpu PRIVATE ${Caffe2_DEPENDENCY_WHOLE_LINK_LIBS}) + +if(USE_ROCM) + target_link_libraries(torch_cpu PRIVATE c10_hip) +endif() + if(USE_MPI) target_link_libraries(torch_cpu PRIVATE MPI::MPI_CXX) endif() diff --git a/torch/csrc/api/include/torch/nn/parallel/data_parallel.h b/torch/csrc/api/include/torch/nn/parallel/data_parallel.h index 58916c861523c..cda58d8ba09a9 100644 --- a/torch/csrc/api/include/torch/nn/parallel/data_parallel.h +++ b/torch/csrc/api/include/torch/nn/parallel/data_parallel.h @@ -110,6 +110,7 @@ void replicate_grad_edges( auto grad_fn = std::make_shared((*parameter).device()); grad_fn->set_next_edges(autograd::collect_next_edges(*parameter)); + // std::fprintf(stderr, "cca_log 113 grad_fn->name %s\n", grad_fn->name().c_str()); for (const auto i : c10::irange(devices.size())) { autograd::set_history(replicas[i]->parameters_[parameter.key()], grad_fn); } @@ -120,6 +121,7 @@ void replicate_grad_edges( auto grad_fn = std::make_shared((*buffer).device()); grad_fn->set_next_edges(autograd::collect_next_edges(*buffer)); + // std::fprintf(stderr, "cca_log 123 grad_fn->name %s\n", grad_fn->name().c_str()); for (const auto i : c10::irange(devices.size())) { autograd::set_history(replicas[i]->buffers_[buffer.key()], grad_fn); } diff --git a/torch/csrc/autograd/VariableTypeManual.cpp b/torch/csrc/autograd/VariableTypeManual.cpp index e270df51221bf..31a7cbe85b3c0 100644 --- a/torch/csrc/autograd/VariableTypeManual.cpp +++ b/torch/csrc/autograd/VariableTypeManual.cpp @@ -135,6 +135,7 @@ Tensor _fw_primal(c10::DispatchKeySet ks, const Tensor& self, int64_t level) { })(); if (grad_fn) { + CCADEBUG(std::fprintf(stderr, "cca_log 137 grad_fn->name %s\n", grad_fn->name().c_str())); set_history(flatten_tensor_args(result), grad_fn); } if (isFwGradDefined(self)) { @@ -180,6 +181,7 @@ Tensor _make_dual( })(); if (grad_fn) { + CCADEBUG(std::fprintf(stderr, "cca_log 183 grad_fn->name %s\n", grad_fn->name().c_str())); set_history(flatten_tensor_args(result), grad_fn); } diff --git a/torch/csrc/autograd/autograd_not_implemented_fallback.cpp b/torch/csrc/autograd/autograd_not_implemented_fallback.cpp index 2b44dd5905e93..21e13c45f6c00 100644 --- a/torch/csrc/autograd/autograd_not_implemented_fallback.cpp +++ b/torch/csrc/autograd/autograd_not_implemented_fallback.cpp @@ -209,6 +209,7 @@ static void basicAutogradNotImplementedFallbackImpl( // but custom operators may have multiple Tensor(a!) returns, // rebase_history assumes single Tensor(a!) return, and in general // custom ops don't have a good in-place story. + CCADEBUG(std::fprintf(stderr, "cca_log basicAutogradNotImplementedFallbackImpl grad_fn->name %s\n", grad_fn->name().c_str())); if (!is_mutable_output) { set_history(t, grad_fn); } @@ -459,6 +460,7 @@ static void autogradNotImplementedFallbackImpl( if (any_requires_grad) { _foreach_tensor( [&](size_t idx_tensor, size_t idx_ret, const at::Tensor& t) { + CCADEBUG(std::fprintf(stderr, "cca_log autogradNotImplementedFallbackImpl grad_fn->name %s\n", grad_fn->name().c_str())); if (isDifferentiableType(t.scalar_type())) { if (is_inplace_output[idx_ret]) { rebase_history(t, grad_fn); diff --git a/torch/csrc/autograd/custom_function.cpp b/torch/csrc/autograd/custom_function.cpp index fb33425e656b5..a220f3c548b4c 100644 --- a/torch/csrc/autograd/custom_function.cpp +++ b/torch/csrc/autograd/custom_function.cpp @@ -279,6 +279,7 @@ static optional_variable_list _process_backward_mode_ad( bool is_modified, bool is_differentiable, bool is_saved_and_setup_context) { + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); if (!is_differentiable) { if (!var.requires_grad()) { if (is_input && !is_modified) { @@ -356,6 +357,8 @@ static optional_variable_list _process_backward_mode_ad( int num_diff_outputs = 0; for (const auto i : c10::irange(num_outputs)) { + + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); // We put a undefined_input placeholder for outputs that are not tensor and // for when the output tensor is not differentiable (see below) if (!raw_outputs[i].has_value()) { @@ -379,6 +382,8 @@ static optional_variable_list _process_backward_mode_ad( bool is_saved_and_setup_context = to_save_if_setup_context.count(out_tensor_impl) > 0; + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); + if (cdata) { uint32_t output_nr = 0; if (!is_differentiable) { @@ -388,6 +393,9 @@ static optional_variable_list _process_backward_mode_ad( } AT_ASSERT(i == output_nr); } + + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); + set_history( var, i, @@ -396,6 +404,8 @@ static optional_variable_list _process_backward_mode_ad( is_differentiable, is_saved_and_setup_context); + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); + // For deprecation cycle. Can be removed after 1.6. In the case where we // detected a view in no grad mode during the forward, only warn the user // (do not change the flag if we return and input that is a view as is). See @@ -406,6 +416,7 @@ static optional_variable_list _process_backward_mode_ad( auto diff_view_meta = impl::get_view_autograd_meta(var); diff_view_meta->set_creation_meta(CreationMeta::IN_CUSTOM_FUNCTION); } + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); if (is_differentiable) { ++num_diff_outputs; @@ -449,12 +460,17 @@ optional_variable_list _wrap_outputs( const _jvp_fn_t& jvp_user_function, const std::unordered_set& to_save_if_setup_context, const _view_as_self_fn_t& view_as_self_fn) { + + // std::fprintf(stderr, "cca_log _wrap_outputs 453\n"); + std::unordered_map inputs_mapping; inputs_mapping.reserve(input_vars.size()); for (const auto i : c10::irange(input_vars.size())) { inputs_mapping.emplace(input_vars[i].unsafeGetTensorImpl(), i); } + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); + auto outputs = _process_backward_mode_ad( inputs_mapping, non_differentiable, @@ -464,6 +480,8 @@ optional_variable_list _wrap_outputs( to_save_if_setup_context, view_as_self_fn); + // std::fprintf(stderr, "cca_log %s:%d\n", __FILE__, __LINE__); + // This must happen after the backward processing as we expect the // computations happening here to track backward mode gradients. _process_forward_mode_AD( diff --git a/torch/csrc/autograd/custom_function.h b/torch/csrc/autograd/custom_function.h index 25e88cbf6cfef..81a0dd355493c 100644 --- a/torch/csrc/autograd/custom_function.h +++ b/torch/csrc/autograd/custom_function.h @@ -512,6 +512,7 @@ auto Function::apply(Args&&... args) auto view_as_self_fn = [](const at::Tensor& x) -> at::Tensor { return x.view_as(x); }; +std::fprintf(stderr, "cca_log Function::apply\n"); auto wrapped_outputs = _wrap_outputs( input_vars, diff --git a/torch/csrc/autograd/engine.cpp b/torch/csrc/autograd/engine.cpp index f0024f8f0b070..b29e59bc47661 100644 --- a/torch/csrc/autograd/engine.cpp +++ b/torch/csrc/autograd/engine.cpp @@ -1067,6 +1067,9 @@ void Engine::evaluate_function( const std::shared_ptr& cpu_ready_queue) { // Locally set the current stream to func's associated stream auto opt_parent_stream = (*func).stream(); + + // std::fprintf(stderr, "cca_log Engine::evaluate_function name %s cca_tag %d\n", func->name().c_str(), func->cca_tag() ? 1 : 0); + c10::OptionalStreamGuard parent_stream_guard{opt_parent_stream}; // Ensure that the incoming gradients are ready diff --git a/torch/csrc/autograd/function.h b/torch/csrc/autograd/function.h index fba950bbcec55..ed0e7659fcd89 100644 --- a/torch/csrc/autograd/function.h +++ b/torch/csrc/autograd/function.h @@ -24,6 +24,14 @@ #include #include +#if 0 +#define CCADEBUG(x) x +#else +#define CCADEBUG(x) +#endif +TORCH_API int GetTraceID(bool force_print_trace=false, int skip = 0, int max_frames = 64); +// TORCH_API std::string CcaGetEnv(const char* name, const char* default_value); + namespace torch::autograd { struct Edge; @@ -132,12 +140,16 @@ struct TORCH_API Node : std::enable_shared_from_this { // Store the thread_id of the forward operator. // See NOTE [ Sequence Numbers ] thread_id_ = at::RecordFunction::currentThreadId(); + + cca_debug(); } explicit Node(edge_list&& next_edges = edge_list()) : Node( /*sequence_nr=*/at::sequence_number::get_and_increment(), - std::move(next_edges)) {} + std::move(next_edges)) { + cca_debug(); + } /// Nodes are neither copyable nor moveable. Node(const Node& other) = delete; @@ -162,6 +174,8 @@ struct TORCH_API Node : std::enable_shared_from_this { at::ROCmBackwardPassGuard in_backward; #endif + cca_debug(); + auto step_callbacks = at::getStepCallbacksUnlessEmpty(at::RecordScope::BACKWARD_FUNCTION); if (C10_UNLIKELY(step_callbacks.has_value())) { @@ -205,12 +219,14 @@ struct TORCH_API Node : std::enable_shared_from_this { auto meta_shape = MetadataShape{std::in_place_type, shape}; input_metadata_.emplace_back( options, meta_shape, is_tensor_subclass, is_nested); + cca_debug(); return input_nr; } uint32_t add_input_metadata(const at::Tensor& t) noexcept { uint32_t input_nr = input_metadata_.size(); input_metadata_.emplace_back(t); + cca_debug(); return input_nr; } @@ -218,19 +234,23 @@ struct TORCH_API Node : std::enable_shared_from_this { uint32_t add_input_metadata(undefined_input u) noexcept { uint32_t input_nr = input_metadata_.size(); input_metadata_.emplace_back(); + cca_debug(); return input_nr; } uint32_t num_inputs() const noexcept { + cca_debug(); return input_metadata_.size(); } const InputMetadata& input_metadata(size_t index) const { + cca_debug(); return input_metadata_[index]; } // Danger: not thread safe, caller must protect with lock InputMetadata& mutable_input_metadata(size_t index) { + cca_debug(); return input_metadata_[index]; } @@ -248,6 +268,11 @@ struct TORCH_API Node : std::enable_shared_from_this { if (!opt_device_type.has_value()) { return std::nullopt; } + + if (override_stream_) { + return override_stream_; + } + for (const auto& metadata : input_metadata_) { if (metadata.device().type() == opt_device_type.value()) return metadata.stream(); @@ -614,6 +639,20 @@ struct TORCH_API Node : std::enable_shared_from_this { return false; } + private: + bool cca_tag_ = false; + std::optional override_stream_; + void cca_debug() const { + if (this->name() == "FlashAttnFuncBackward") { + // std::fprintf(stderr, "cca_log FlashAttnFuncBackward GetTraceID %d\n", GetTraceID(true)); + } + } + public: + + void set_cca_tag(bool t) { cca_tag_ = t; } + bool cca_tag() const { return cca_tag_; } + void set_override_stream(const c10::Stream& s) { override_stream_ = s; } + protected: /// Performs the `Node`'s actual operation. virtual variable_list apply(variable_list&& inputs) = 0; diff --git a/torch/csrc/autograd/functions/utils.cpp b/torch/csrc/autograd/functions/utils.cpp index c655e4664b8f7..d93e65ec8f603 100644 --- a/torch/csrc/autograd/functions/utils.cpp +++ b/torch/csrc/autograd/functions/utils.cpp @@ -8,6 +8,147 @@ #include #include +#include +#include +#include +#include +#include + + +#include "execinfo.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#if 0 +#define CCA_DEBUG(x) x +#else +#define CCA_DEBUG(x) +#endif + +static std::string CcaGetEnv(const char* name, const char* default_value) { + auto rtn = std::getenv(name); + if (rtn) { + return rtn; + } + return default_value; +} + +static int dev_idx_to_print = -1; + +static std::unordered_map g_stack2id; +static int g_next_id = 1; +static std::mutex g_stack_mu; + +static std::string frame_to_string(void* addr) { + Dl_info info; + if (!(dladdr(addr, &info) && info.dli_fname)) { + char buf[64]; + std::snprintf(buf, sizeof(buf), "%p ??", addr); + return std::string(buf); + } + + const char* so = info.dli_fname; + const char* sym = info.dli_sname ? info.dli_sname : "??"; + void* sym_addr = info.dli_saddr; + +#ifdef __GNUG__ + int status = 0; + char* dem = abi::__cxa_demangle(sym, nullptr, nullptr, &status); + const char* pretty = (status == 0 && dem) ? dem : sym; +#else + const char* pretty = sym; + char* dem = nullptr; +#endif + + uintptr_t base = (uintptr_t)info.dli_fbase; + uintptr_t mod_off = base ? ((uintptr_t)addr - base) : 0; + uintptr_t sym_off = sym_addr ? ((uintptr_t)addr - (uintptr_t)sym_addr) : 0; + + std::ostringstream oss; + oss << addr << " " << so + << " mod+0x" << std::hex << mod_off + << " " << pretty << " +0x" << std::hex << sym_off; + +#ifdef __GNUG__ + std::free(dem); +#endif + return oss.str(); +} + +static std::string make_stack_string(int skip = 0, int max_frames = 32) { + void* frames[128]; + int n = backtrace(frames, std::min(max_frames, 128)); + + std::ostringstream oss; + for (int i = skip; i < n; ++i) { + oss << frame_to_string(frames[i]) << "\n"; + } + return oss.str(); +} + +int GetTraceID(bool force_print_trace, int skip, int max_frames) { + CCADEBUG(if(0)) return 0; + if (CcaGetEnv("DBGENV_PRINT_CALLSTACK", "0") == "0" && !force_print_trace) { + return 0; + } + std::string stack = make_stack_string(skip, max_frames); + + int id = 0; + bool first_time = false; + { + std::lock_guard lk(g_stack_mu); + auto it = g_stack2id.find(stack); + if (it == g_stack2id.end()) { + id = g_next_id++; + g_stack2id.emplace(stack, id); + first_time = true; + } else { + id = it->second; + } + } + + if (first_time) { + std::fprintf(stderr, "[GetTraceID %d] new\n%s", id, stack.c_str()); + // } else { + // std::fprintf(stderr, "[STACK_ID %d]\n", id); + } + return id; +} + +namespace torch::autograd::stream_tag { + +thread_local std::vector tag_stack; + +void push() { tag_stack.push_back(true); } +void pop() { if (!tag_stack.empty()) tag_stack.pop_back(); } +bool active() { return !tag_stack.empty() && tag_stack.back(); } + +} // namespace + + +namespace torch::ddp_model2_stream { + +Registry& registry() { + static Registry r; + return r; +} + +} // namespace torch::ddp_model2_stream + + namespace torch::autograd { variable_list wrap_outputs( diff --git a/torch/csrc/autograd/functions/utils.h b/torch/csrc/autograd/functions/utils.h index 2f0027d59fa1f..f96df19a83279 100644 --- a/torch/csrc/autograd/functions/utils.h +++ b/torch/csrc/autograd/functions/utils.h @@ -13,6 +13,54 @@ #include #include + +namespace torch::autograd::stream_tag { +TORCH_API void push(); +TORCH_API void pop(); +TORCH_API bool active(); +} // namespace + +namespace torch::ddp_model2_stream { + +struct HasStream12 { + HasStream12() {} + HasStream12(bool has1, bool has2) : has_stream1(has1), has_stream2(has2) {} + bool has_stream1 = false; + bool has_stream2 = false; +}; + +struct Registry { + std::mutex mu; + + // model2 module identity (python object) + PyObject* model2_module = nullptr; // strong ref + bool enabled = false; + bool start_compute = false; + + // // streams + // c10::Stream bwd_stream; + // c10::Stream rccl_stream; + + int64_t bwd_stream_id = 0; + int64_t bwd_device_index = 0; + int64_t bwd_device_type = 0; + + int64_t rccl_stream_id = 0; + int64_t rccl_device_index = 0; + int64_t rccl_device_type = 0; + + int32_t rccl_cnt = 0; + + // params (for bucket classification) + std::unordered_set model2_param_impls; + + std::unordered_map bucket_tensor_has_stream; +}; + +TORCH_API Registry& registry(); + +} // namespace torch::ddp_model2_stream + namespace torch::autograd { using function_constructor = std::function(edge_list&&)>; @@ -67,6 +115,22 @@ inline void set_history( const at::Tensor& variable, const std::shared_ptr& grad_fn) { TORCH_CHECK(grad_fn != nullptr); + + if (torch::autograd::stream_tag::active() && !grad_fn->cca_tag()) { + // CCADEBUG(std::fprintf(stderr, "cca_log set_history override_stream grad_fn->name %s GetTraceID %d\n", grad_fn->name().c_str(), GetTraceID(true))); + grad_fn->set_cca_tag(true); + + auto& reg = torch::ddp_model2_stream::registry(); + std::lock_guard g(reg.mu); + auto bwd_stream = c10::Stream::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(reg.bwd_device_type)); + grad_fn->set_override_stream(bwd_stream); + } else { + // CCADEBUG(std::fprintf(stderr, "cca_log set_history not_override_stream grad_fn->name %s GetTraceID %d\n", grad_fn->name().c_str(), GetTraceID(true))); + } + if (variable.defined()) { // If the codegen triggers this, you most likely want to add your newly // added function to the DONT_REQUIRE_DERIVATIVE list in diff --git a/torch/csrc/autograd/python_function.cpp b/torch/csrc/autograd/python_function.cpp index 14591bc1fb4a1..6d3c277f4f0d4 100644 --- a/torch/csrc/autograd/python_function.cpp +++ b/torch/csrc/autograd/python_function.cpp @@ -44,6 +44,8 @@ #include #include +#include + using namespace torch; using namespace torch::autograd; using at::Tensor; @@ -602,6 +604,9 @@ static void _wrap_outputs( PyObject* outputs, bool is_executable, const std::unordered_set& to_save_if_setup_context) { + + // std::fprintf(stderr, "cca_log _wrap_outputs 606\n"); + auto cdata_if_executable = is_executable ? cdata : nullptr; Py_ssize_t num_outputs = PyTuple_GET_SIZE(raw_output); if (is_executable) { @@ -1091,6 +1096,26 @@ PyObject* process_outputs( bool is_executable, torch::jit::Node* node, bool overridden_setup_context) { + + if (cdata) { + // std::fprintf(stderr, "cca_log process_outputs %s\n", cdata->name().c_str()); + + if (torch::autograd::stream_tag::active() && !cdata->cca_tag()) { + // std::fprintf(stderr, "cca_log process_outputs override_stream cdata->name %s GetTraceID %d\n", cdata->name().c_str(), GetTraceID(true)); + cdata->set_cca_tag(true); + + auto& reg = torch::ddp_model2_stream::registry(); + std::lock_guard g(reg.mu); + auto bwd_stream = c10::Stream::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(reg.bwd_device_type)); + cdata->set_override_stream(bwd_stream); + } else { + // std::fprintf(stderr, "cca_log process_outputs not_override_stream cdata->name %s GetTraceID %d\n", cdata->name().c_str(), GetTraceID(true)); + } + } + bool unpack_output = ensure_tuple(raw_output); auto num_outputs = PyTuple_GET_SIZE(raw_output.get()); diff --git a/torch/csrc/autograd/python_function.h b/torch/csrc/autograd/python_function.h index e24399c10aa3f..0d45eb4c216d1 100644 --- a/torch/csrc/autograd/python_function.h +++ b/torch/csrc/autograd/python_function.h @@ -26,7 +26,11 @@ namespace torch::autograd { // Calls to 'apply' are forwarded to the Python method implementation. // NOLINTNEXTLINE(cppcoreguidelines-special-member-functions) struct PyNode : public Node { - PyNode(THPObjectPtr obj) : obj(obj.release()) {} + PyNode(THPObjectPtr obj) : obj(obj.release()) { + if (this->name() == "FlashAttnFuncBackward") { + // CCADEBUG(std::fprintf(stderr, "cca_log PyNode FlashAttnFuncBackward GetTraceID %d\n", GetTraceID(true))); + } + } PyObject* to_py_args( const variable_list& inputs, diff --git a/torch/csrc/autograd/python_torch_functions_manual.cpp b/torch/csrc/autograd/python_torch_functions_manual.cpp index 79739b6e459d2..8d26e993b8a56 100644 --- a/torch/csrc/autograd/python_torch_functions_manual.cpp +++ b/torch/csrc/autograd/python_torch_functions_manual.cpp @@ -801,6 +801,7 @@ void initTorchFunctions(PyObject* module) { new torch::autograd::Error( "Cannot backprop through mirrored meta, file a bug in PyTorch"), torch::autograd::deleteNode); + CCADEBUG(std::fprintf(stderr, "cca_log 804 grad_fn->name %s\n", new_grad_fn->name().c_str())); torch::autograd::set_history(dst_, new_grad_fn); } } diff --git a/torch/csrc/cuda/Module.cpp b/torch/csrc/cuda/Module.cpp index 3a8929110e8b4..929bb38d2007d 100644 --- a/torch/csrc/cuda/Module.cpp +++ b/torch/csrc/cuda/Module.cpp @@ -1928,6 +1928,189 @@ static void initCudaMethodBindings(PyObject* module) { }); } +#include +#include +#include + +#include +#include +#include // THPVariable_Unpack +#include + +namespace torch::ddp_model2_stream { +static bool is_stream_module(PyObject* obj) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_module == obj; +} +static bool p(c10::TensorImpl* impl) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_param_impls.count(impl) != 0; +} +} // namespace torch::ddp_model2_stream + +static bool collect_param_impls(PyObject* py_module, + std::unordered_set& out_set) { + PyObject* params_obj = PyObject_CallMethod(py_module, "parameters", nullptr); + if (!params_obj) return false; + + PyObject* it = PyObject_GetIter(params_obj); + Py_DECREF(params_obj); + if (!it) return false; + + while (true) { + PyObject* item = PyIter_Next(it); + if (!item) { + if (PyErr_Occurred()) { + Py_DECREF(it); + return false; + } + break; + } + + // item should be a Tensor/Parameter + at::Tensor t = THPVariable_Unpack(item); + out_set.insert(t.unsafeGetTensorImpl()); + + Py_DECREF(item); + } + + Py_DECREF(it); + return true; +} + +PyObject* THCPModule_ddpSetBackwardAllreduceStream_wrap( + PyObject* self, + PyObject* args, + PyObject* kwargs) { + HANDLE_TH_ERRORS + + PyObject* py_module = nullptr; + + int64_t bwd_stream_id = 0; + int64_t bwd_device_index = 0; + int64_t bwd_device_type = 0; + + int64_t rccl_stream_id = 0; + int64_t rccl_device_index = 0; + int64_t rccl_device_type = 0; + + // NOLINTNEXTLINE(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays) + constexpr const char* kwlist[] = { + "module", + "bwd_stream_id", "bwd_device_index", "bwd_device_type", + "rccl_stream_id", "rccl_device_index", "rccl_device_type", + nullptr}; + + if (!PyArg_ParseTupleAndKeywords( + args, + kwargs, + "O|LLLLLL", + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) + const_cast(kwlist), + &py_module, + &bwd_stream_id, &bwd_device_index, &bwd_device_type, + &rccl_stream_id, &rccl_device_index, &rccl_device_type)) { + return nullptr; + } + + // auto bwd_stream = at::hip::HIPStreamMasqueradingAsCUDA::unpack3( + // bwd_stream_id, + // static_cast(bwd_device_index), + // static_cast(bwd_device_type)); + + // auto rccl_stream = at::hip::HIPStreamMasqueradingAsCUDA::unpack3( + // rccl_stream_id, + // static_cast(rccl_device_index), + // static_cast(rccl_device_type)); + + // ---- registry write (pseudo) ---- + // 1) store module PyObject* (INCREF) + // 2) store streams (convert to c10::Stream or store HIPStreamMasqueradingAsCUDA directly) + // 3) collect module.parameters() and store TensorImpl* set + + auto& reg = torch::ddp_model2_stream::registry(); + { + std::lock_guard g(reg.mu); + reg.model2_param_impls.clear(); + if (reg.model2_module) { + Py_DECREF(reg.model2_module); + reg.model2_module = nullptr; + } + } + std::unordered_set tmp; + + if (!collect_param_impls(py_module, tmp)) { + return nullptr; // python exception already set + } + + { + std::lock_guard g(reg.mu); + reg.model2_param_impls = std::move(tmp); + + Py_INCREF(py_module); + reg.model2_module = py_module; + + + reg.bwd_stream_id = bwd_stream_id; + reg.bwd_device_index = bwd_device_index; + reg.bwd_device_type = bwd_device_type; + + reg.rccl_stream_id = rccl_stream_id; + reg.rccl_device_index = rccl_device_index; + reg.rccl_device_type = rccl_device_type; + + reg.enabled = true; + } + + std::fprintf(stderr, "cca_log THCPModule_ddpSetBackwardAllreduceStream_wrap %ld %ld\n", bwd_stream_id, rccl_stream_id); + + Py_RETURN_NONE; + END_HANDLE_TH_ERRORS +} + +static PyObject* THCPModule_autogradIsStreamModule_wrap( + PyObject* self, PyObject* obj) { + if (torch::ddp_model2_stream::is_stream_module(obj)) Py_RETURN_TRUE; + Py_RETURN_FALSE; +} + +PyObject* THCPModule_autogradPushStreamTag_wrap(PyObject* self, PyObject* args) { + HANDLE_TH_ERRORS + torch::autograd::stream_tag::push(); + Py_RETURN_NONE; + END_HANDLE_TH_ERRORS +} + +PyObject* THCPModule_autogradPopStreamTag_wrap(PyObject* self, PyObject* args) { + HANDLE_TH_ERRORS + torch::autograd::stream_tag::pop(); + Py_RETURN_NONE; + END_HANDLE_TH_ERRORS +} + +static std::string CcaGetEnv(const char* name, const char* default_value) { + auto rtn = std::getenv(name); + if (rtn) { + return rtn; + } + return default_value; +} + +PyObject* THCPModule_resetRcclStreamCnt_wrap(PyObject* self, PyObject* args) { + HANDLE_TH_ERRORS + auto& reg = torch::ddp_model2_stream::registry(); + { + std::lock_guard g(reg.mu); + reg.rccl_cnt = std::stoi(CcaGetEnv("DBGENV_2ND_RCCL_STREAM_CNT", "999999999")); + } + reg.start_compute = true; + CCADEBUG(std::fprintf(stderr, "cca_log set rccl_cnt %d\n", reg.rccl_cnt)); + Py_RETURN_NONE; + END_HANDLE_TH_ERRORS +} + // NOLINTNEXTLINE(*-c-arrays*, *-global-variables) static struct PyMethodDef _THCPModule_methods[] = { {"_cuda_init", THCPModule_initExtension, METH_NOARGS, nullptr}, @@ -2160,6 +2343,26 @@ static struct PyMethodDef _THCPModule_methods[] = { THCPModule_cuda_tunableop_get_rotating_buffer_size, METH_NOARGS, nullptr}, + {"_cuda_set_backward_rccl_stream", + castPyCFunctionWithKeywords(THCPModule_ddpSetBackwardAllreduceStream_wrap), + METH_VARARGS | METH_KEYWORDS, + nullptr}, + {"_autograd_is_stream_module", + THCPModule_autogradIsStreamModule_wrap, + METH_O, + nullptr}, + {"_autograd_push_stream_tag", + THCPModule_autogradPushStreamTag_wrap, + METH_NOARGS, + nullptr}, + {"_autograd_pop_stream_tag", + THCPModule_autogradPopStreamTag_wrap, + METH_NOARGS, + nullptr}, + {"_cuda_reset_rccl_stream_cnt", + THCPModule_resetRcclStreamCnt_wrap, + METH_NOARGS, + nullptr}, {nullptr}}; PyMethodDef* THCPModule_methods() { diff --git a/torch/csrc/distributed/autograd/utils.cpp b/torch/csrc/distributed/autograd/utils.cpp index 84ddaa1a5ce07..bbb3c849a7280 100644 --- a/torch/csrc/distributed/autograd/utils.cpp +++ b/torch/csrc/distributed/autograd/utils.cpp @@ -62,6 +62,7 @@ ContextPtr addRecvRpcBackward( autogradMetadata, autogradContext, fromWorkerId, deviceMap); for (auto& tensor : tensors) { if (tensor.requires_grad()) { + CCADEBUG(std::fprintf(stderr, "cca_log 65 grad_fn->name %s\n", grad_fn->name().c_str())); torch::autograd::set_history(tensor, grad_fn); } } diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp index dbb25aca96f44..0ef9d578bb358 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp @@ -34,6 +34,22 @@ #include #include +#include +#include + +namespace torch::ddp_model2_stream { +static bool is_stream_module(PyObject* obj) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_module == obj; +} +static bool is_stream_param(c10::TensorImpl* impl) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_param_impls.count(impl) != 0; +} +} // namespace torch::ddp_model2_stream + namespace c10d { constexpr const char* const kNCCLAbortedCommStoreKey = "NCCLABORTEDCOMM"; @@ -3131,6 +3147,7 @@ std::shared_ptr ProcessGroupNCCL::initNCCLComm( // performance using cudaEvent, this should be set. // TODO(kwen2501): is ncclEvents_ used anywhere else? ncclEvents_.emplace(deviceKey, at::cuda::CUDAEvent(cudaEventDisableTiming)); + ncclEvents2_.emplace(deviceKey, at::cuda::CUDAEvent(hipEventDisableTiming)); // Move the NCCL resource to cache auto it = inInitializationCommMap_.find(deviceKey); @@ -3552,6 +3569,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( PostProcess post, OpType opType, bool asyncOp, + bool dummy1, + bool dummy2, const char* profilingTitle, bool nanCheck) { // Environment setting by the user may add onto collective call's option @@ -3602,9 +3621,72 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( // otherwise, we use separate ncclStream and let it sync on currentStream auto ncclStream = asyncOp ? ncclStreams_.at(key) : at::cuda::getCurrentCUDAStream(device.index()); + + auto& reg = torch::ddp_model2_stream::registry(); + auto curr_stream = at::hip::getCurrentHIPStreamMasqueradingAsCUDA(device.index()); + + int cnt = 0; + { + std::lock_guard g(reg.mu); + cnt = reg.rccl_cnt--; + } + + if (reg.enabled && curr_stream.id() == reg.bwd_stream_id && cnt > 0) { + if (device.index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective use rccl_stream %d\n", cnt)); + } + ncclStream = at::hip::HIPStreamMasqueradingAsCUDA::unpack3( + reg.rccl_stream_id, + static_cast(reg.rccl_device_index), + static_cast(reg.rccl_device_type)); + + } else { + if (device.index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective use defualt rccl_stream %d\n", cnt)); + } + } + if (asyncOp) { + bool has_stream1 = false; + bool has_stream2 = false; + { + std::lock_guard g(reg.mu); + + if (reg.bucket_tensor_has_stream.count(inputs[0].unsafeGetTensorImpl()) > 0) { + has_stream1 = reg.bucket_tensor_has_stream[inputs[0].unsafeGetTensorImpl()].has_stream1; + has_stream2 = reg.bucket_tensor_has_stream[inputs[0].unsafeGetTensorImpl()].has_stream2; + } else { + has_stream1 = true; + } + } + // First let NCCL streams wait for input tensors allocation streams - syncStream(device, ncclEvents_[key], ncclStream); + // syncStream(device, ncclEvents_[key], ncclStream); + if (device.index() == 0) { + if (has_stream1 && has_stream2) { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective wait defualt/bwd stream\n")); + } else if (has_stream1) { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective wait defualt stream\n")); + } else if (has_stream2) { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective wait bwd stream\n")); + } else { + CCADEBUG(std::fprintf(stderr, "cca_log ProcessGroupNCCL::collective wait none\n")); + } + } + + if (has_stream1) { + ncclEvents_[key].record(at::hip::getDefaultHIPStreamMasqueradingAsCUDA(device.index())); + ncclEvents_[key].block(ncclStream); + } + + if (has_stream2) { + ncclEvents2_[key].record(at::hip::HIPStreamMasqueradingAsCUDA::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(reg.bwd_device_type)) + ); + ncclEvents2_[key].block(ncclStream); + } } bool enqueue = @@ -4224,6 +4306,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( PostProcess post, OpType opType, bool asyncOp, + bool has_stream1, + bool has_stream2, const char* profilingTitle, bool nanCheck) { auto inputs = std::vector{input}; @@ -4236,6 +4320,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( post, opType, asyncOp, + has_stream1, + has_stream2, profilingTitle, nanCheck); } @@ -4247,6 +4333,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( Fn fn, OpType opType, bool asyncOp, + bool has_stream1, + bool has_stream2, const char* profilingTitle, bool nanCheck) { auto inputs = std::vector{input}; @@ -4261,6 +4349,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( c10::intrusive_ptr& work) {}, opType, asyncOp, + has_stream1, + has_stream2, profilingTitle, nanCheck); } @@ -4382,6 +4472,8 @@ c10::intrusive_ptr ProcessGroupNCCL::allreduce_impl( }, OpType::ALLREDUCE, opts.asyncOp, + opts.has_stream1, + opts.has_stream2, profilingTitle); } diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp index f7a3a28caceb3..8377ffde17d49 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp @@ -1109,6 +1109,8 @@ class TORCH_API ProcessGroupNCCL : public Backend { Fn fn, OpType opType, bool asyncOp, + bool has_stream1 = false, + bool has_stream2 = false, const char* profilingTitle = nullptr, bool nanCheck = true); @@ -1121,6 +1123,8 @@ class TORCH_API ProcessGroupNCCL : public Backend { PostProcess post, OpType opType, bool asyncOp, + bool has_stream1 = false, + bool has_stream2 = false, const char* profilingTitle = nullptr, bool nanCheck = true); @@ -1133,6 +1137,8 @@ class TORCH_API ProcessGroupNCCL : public Backend { PostProcess post, OpType opType, bool asyncOp, + bool has_stream1 = false, + bool has_stream2 = false, const char* profilingTitle = nullptr, bool nanCheck = true); @@ -1366,6 +1372,7 @@ class TORCH_API ProcessGroupNCCL : public Backend { // The CUDA events used to sync NCCL streams std::unordered_map ncclEvents_; + std::unordered_map ncclEvents2_; // Device Indexes used for all collectives in this group std::set usedDeviceIdxs_; diff --git a/torch/csrc/distributed/c10d/Types.hpp b/torch/csrc/distributed/c10d/Types.hpp index 18db14f5cef04..911d5605e012b 100644 --- a/torch/csrc/distributed/c10d/Types.hpp +++ b/torch/csrc/distributed/c10d/Types.hpp @@ -126,6 +126,8 @@ struct AllreduceOptions { std::chrono::milliseconds timeout = kUnsetTimeout; bool asyncOp = true; std::optional sparseIndices = std::nullopt; + bool has_stream1 = false; + bool has_stream2 = false; }; struct AllreduceCoalescedOptions : AllreduceOptions {}; diff --git a/torch/csrc/distributed/c10d/comm.hpp b/torch/csrc/distributed/c10d/comm.hpp index 599c1709c4df5..b5273c3dd8635 100644 --- a/torch/csrc/distributed/c10d/comm.hpp +++ b/torch/csrc/distributed/c10d/comm.hpp @@ -77,6 +77,9 @@ class TORCH_API GradBucket { return sparse_grad_indices_; } + bool has_stream1 = false; + bool has_stream2 = false; + private: size_t index_; size_t bucket_count_; diff --git a/torch/csrc/distributed/c10d/reducer.cpp b/torch/csrc/distributed/c10d/reducer.cpp index 752e18c8dbf7d..bc5428ae69db0 100644 --- a/torch/csrc/distributed/c10d/reducer.cpp +++ b/torch/csrc/distributed/c10d/reducer.cpp @@ -22,6 +22,85 @@ #include #include +#include + + #ifdef USE_ROCM + #include +// #include + #endif + + +static std::string CcaGetEnv(const char* name, const char* default_value) { + auto rtn = std::getenv(name); + if (rtn) { + return rtn; + } + return default_value; +} + +namespace { + +void debug_mul_out_or_dummy( + at::Tensor& out, + const at::Tensor& grad, + const at::Tensor& scale, + int replica_index, + int variable_index) { + int mode = std::stoi(CcaGetEnv("DBGENV_DUMMY_MUL_MODE", "0")); + + switch (mode) { + default: + case 0: + at::mul_out(out, grad, scale); + break; + case 1: { + at::Tensor tiny = at::zeros({1}, grad.options()); + tiny.add_(1); + break; + } + case 2: + out.copy_(grad); + break; + case 3: + out.zero_(); + break; + case 4: + break; + } +} + +} // anonymous namespace + +namespace torch::ddp_model2_stream { +static bool is_stream_module(PyObject* obj) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_module == obj; +} +static bool is_stream_param(c10::TensorImpl* impl) { + auto& r = registry(); + std::lock_guard g(r.mu); + return r.enabled && r.model2_param_impls.count(impl) != 0; +} +} // namespace torch::ddp_model2_stream + +#include "execinfo.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + namespace c10d { namespace { @@ -127,6 +206,9 @@ Reducer::Reducer( C10_LOG_API_USAGE_ONCE("torch.distributed.ddp.reducer"); TORCH_INTERNAL_ASSERT(!params_.empty(), "Expected at least one parameter."); + CCADEBUG(std::fprintf(stderr, "cca_log Reducer::Reducer bucket_bytes_cap %ld first_bucket_bytes_cap %ld GetTraceID %d\n", + bucket_bytes_cap_, first_bucket_bytes_cap_, GetTraceID(true))); + if (ddp_debug_level_ != c10d::DebugLevel::Off) { LOG(INFO) << "Reducer initialized with bucket_bytes_cap: " << bucket_bytes_cap_ @@ -383,7 +465,119 @@ void Reducer::mark_variable_ready_dense(size_t variable_index) { RECORD_FUNCTION( "torch::distributed::reducer::mul_out", std::vector({bucket_view})) - at::mul_out(bucket_view, grad, wrapped); + + auto& reg = torch::ddp_model2_stream::registry(); + c10::OptionalStreamGuard cca_guard; + bucket.mul_out_moved = false; + if (torch::ddp_model2_stream::is_stream_param(variable.unsafeGetTensorImpl())) { + + // auto a = at::Tensor(123); + + // CCADEBUG(std::fprintf(stderr, "cca_log mul_out on cca_stream variable_index %ld bucket_index %ld %s GetTraceID %d\n", + // variable_index, bucket_index.bucket_index, variable.name().c_str(), GetTraceID())); + + cca_guard.reset_stream(c10::Stream::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(reg.bwd_device_type))); + } else { + +#ifdef USE_ROCM +#define cudaEventExternal 0x08 +#define cudaEventWaitDefault 0x00 + + // if (variable.device().index() == 0) { + // CCADEBUG(std::fprintf(stderr, "cca_log mul_out on cca_stream bucket_index %ld variable_index %ld intra_bucket_index %ld size %d GetTraceID %d\n", + // bucket_index.bucket_index, variable_index, bucket_index.intra_bucket_index, (int)bucket.variables.size(), GetTraceID(true));) + // } + + static std::mutex evt_mutex; + std::lock_guard lock(evt_mutex); + + bool do_centralize_mulout = CcaGetEnv("DBGENV_CENTRALIZE_MULOUT", "0") == "1"; + bool do_mulout_parallel = CcaGetEnv("DBGENV_MULOUT_PARALLEL", "0") == "1"; + int last_bucket_num_elem = std::stoi(CcaGetEnv("DBGENV_LAST_BUCKET_NUM_ELEM", "0")); + + if (last_bucket_num_elem > 0 && last_bucket_num_elem == bucket.variables.size()) { + do_centralize_mulout = false; + do_mulout_parallel = false; + } + + if (reg.start_compute && do_centralize_mulout && bucket_index.intra_bucket_index < (bucket.variables.size() - 1)) { + bucket.defer_mul_out[bucket_index.intra_bucket_index] = Bucket::DeferMulOut(grad, bucket_view); + if (gradient_as_bucket_view_) { + // Let grad point to bucket_view buffer. + grad = bucket_view; + // The grad is modified and need to be written back. + return true; + } + return false; + } + + if (do_mulout_parallel && reg.start_compute) { + auto bwd_stream = c10::Stream::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(c10::DeviceType::HIP)); + + if (CcaGetEnv("DBGENV_DISABLE_MULOUT_WAIT", "0") != "1") { + + unsigned int flag = hipEventDisableTiming; + bool external = (flag & cudaEventExternal) != 0; + // TORCH_CHECK(!external, "External events are disallowed in rocm"); + + int orig_device = -1; + hipGetDevice(&orig_device); + hipSetDevice(variable.device().index()); + + if (bucket.mul_out_evt.count(bucket_index.intra_bucket_index) == 0) { + flag &= ~cudaEventExternal; + // c10::hip::HIPGuardMasqueradingAsCUDA guard(variable.device().index()); + + hipEvent_t evt; + hipEventCreateWithFlags(&evt, flag); + bucket.mul_out_evt.insert(std::make_pair(bucket_index.intra_bucket_index, evt)); + } + + auto &evt = bucket.mul_out_evt.at(bucket_index.intra_bucket_index); + auto default_stream = c10::hip::getDefaultHIPStream(variable.device().index()); + hipEventRecord(evt, default_stream); + + // it is an error to use cudaEventWaitExternal when not doing stream capture + // unsigned int flags = (c10::hip::currentStreamCaptureStatusMayInitCtx() != c10::hip::CaptureStatus::None && external) ? cudaEventWaitExternal : cudaEventWaitDefault; + + unsigned int flags = cudaEventWaitDefault; + hipStreamWaitEvent(c10::hip::HIPStream(bwd_stream), evt, flags); + hipSetDevice(orig_device); + } + cca_guard.reset_stream(bwd_stream); + + if (variable.device().index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log mul_out on cca_stream bucket_index %ld variable_index %ld intra_bucket_index %ld size %d GetTraceID %d\n", + bucket_index.bucket_index, variable_index, bucket_index.intra_bucket_index, (int)bucket.variables.size(), GetTraceID(true));) + } + + if (do_centralize_mulout) { + for (int i = 0; i < bucket.variables.size() - 1; ++i) { + assert(bucket.defer_mul_out.count(i) > 0); + + if (bucket.defer_mul_out.count(i) > 0) { + debug_mul_out_or_dummy(bucket.defer_mul_out[i].bucket_view_, bucket.defer_mul_out[i].grad_, wrapped, -1, variable_index); + } + + } + bucket.defer_mul_out.clear(); + bucket.mul_out_moved = true; + } + + } +#endif + + // CCADEBUG(std::fprintf(stderr, "cca_log mul_out on default stream variable_index %ld bucket_index %ld %s GetTraceID %d\n", + // variable_index, bucket_index.bucket_index, variable.name().c_str(), GetTraceID())); + } + debug_mul_out_or_dummy(bucket_view, grad, wrapped, -1, variable_index); + } else { // If DDP is running with create_graph=True, gradients require_grad // themselves in order to compute higher order derivatives. However, @@ -973,6 +1167,16 @@ void Reducer::all_reduce_bucket(Bucket& bucket) { bucket.sizes_vec, variables_for_bucket, bucket.sparse_tensor_indices); + + { + auto& reg = torch::ddp_model2_stream::registry(); + std::lock_guard g(reg.mu); + reg.bucket_tensor_has_stream.insert(std::make_pair(tensor.unsafeGetTensorImpl(), torch::ddp_model2_stream::HasStream12(bucket.has_stream1, bucket.has_stream2))); + // if (reg.bwd_device_index == 0) { + // CCADEBUG(std::fprintf(stderr, "cca_log Reducer::all_reduce_bucket tensor %p \n", (void*)tensor.unsafeGetTensorImpl())); + // } + } + bucket.future_work = run_comm_hook(grad_bucket); } @@ -1045,6 +1249,53 @@ void Reducer::mark_bucket_ready(size_t bucket_index) { } auto& bucket = buckets_[next_bucket_]; if (!should_skip_all_reduce_bucket(bucket)) { + + auto& reg = torch::ddp_model2_stream::registry(); + + int v_i = 0; + for (auto& v : bucket.variables) { + if (torch::ddp_model2_stream::is_stream_param(v.unsafeGetTensorImpl())) { + bucket.has_stream2 = true; + // CCADEBUG(std::fprintf(stderr, "cca_log allreduce has_stream2 %d \n", v_i)); + } else { + bucket.has_stream1 = true; + // CCADEBUG(std::fprintf(stderr, "cca_log allreduce has_stream1 %d \n", v_i)); + } + v_i += 1; + } + if (bucket.mul_out_moved) { + bucket.has_stream1 = false; + bucket.has_stream2 = true; + } + + c10::OptionalStreamGuard cca_guard; + if (bucket.has_stream2) { + + // TODO(chien-an): if ..., also need to wait default stream. + // CCADEBUG(std::fprintf(stderr, "cca_log all_reduce_bucket wait cca_stream bucket_index %ld GetTraceID %d\n", + // bucket_index, GetTraceID())); + + cca_guard.reset_stream(c10::Stream::unpack3( + reg.bwd_stream_id, + static_cast(reg.bwd_device_index), + static_cast(reg.bwd_device_type))); + } else { + // CCADEBUG(std::fprintf(stderr, "cca_log all_reduce_bucket wait default stream bucket_index %ld GetTraceID %d\n", + // bucket_index, GetTraceID())); + } + + // if (reg.bwd_device_index == 0) { + // if (bucket.has_stream1 && bucket.has_stream2) { + // CCADEBUG(std::fprintf(stderr, "cca_log Reducer::mark_bucket_ready wait defualt/bwd stream\n")); + // } else if (bucket.has_stream1) { + // CCADEBUG(std::fprintf(stderr, "cca_log Reducer::mark_bucket_ready wait defualt stream\n")); + // } else if (bucket.has_stream2) { + // CCADEBUG(std::fprintf(stderr, "cca_log Reducer::mark_bucket_ready wait bwd stream\n")); + // } else { + // CCADEBUG(std::fprintf(stderr, "cca_log Reducer::mark_bucket_ready wait none\n")); + // } + // } + all_reduce_bucket(bucket); num_buckets_reduced_++; } @@ -1837,6 +2088,17 @@ void Reducer::sync_bucket_indices( } } +std::vector splitStringStream(const std::string& str, char delimiter) { + std::vector result; + std::stringstream ss(str); + std::string token; + + while (std::getline(ss, token, delimiter)) { + result.push_back(token); + } + return result; +} + bool Reducer::rebuild_buckets() { // Ensure reduction for previous backwards pass is finished. If user's model // has unused parameters for example, this will raise an error recommending to @@ -1864,11 +2126,33 @@ bool Reducer::rebuild_buckets() { " versus rebuilt params size of: ", rebuilt_param_indices_.size())); std::vector bucket_size_limits; + + + if (CcaGetEnv("DBGENV_FIRST_BUCKET_SIZE", "0") == "0") { + CCADEBUG(std::fprintf(stderr, "cca_log rebuild_buckets first_bucket_bytes_cap %ld\n", first_bucket_bytes_cap_)); bucket_size_limits.push_back(first_bucket_bytes_cap_); + } else { + + auto fist_buckets = splitStringStream(CcaGetEnv("DBGENV_FIRST_BUCKET_SIZE", "0"), ','); + + std::string buckets_str = ""; + int bs_i = 0; + for (auto &bs : fist_buckets) { + size_t bs_bytes = std::stoi(bs) * 1024 * 1024; + CCADEBUG(std::fprintf(stderr, "cca_log rebuild_buckets %d bs_bytes %ld\n", bs_i, bs_bytes)); + bucket_size_limits.push_back(bs_bytes); + ++bs_i; + } + + } + bucket_size_limits.push_back(bucket_bytes_cap_); auto ddp_set_last_bucket_as_small = (getCvarString({"DDP_SET_LAST_BUCKET_CAP"}, "N/A") == "1"); + CCADEBUG(std::fprintf(stderr, "cca_log rebuild_buckets bucket_bytes_cap %ld ddp_set_last_bucket_as_small %d\n", + bucket_bytes_cap_, (int)ddp_set_last_bucket_as_small)); + if (ddp_set_last_bucket_as_small) { // Reverse so that first_bucket_bytes_cap_ (smaller bucket) becomes the last // bucket. We cannot simply pass in {bucket_bytes_cap_, @@ -2206,6 +2490,10 @@ compute_bucket_assignment_by_size( const auto bucket_size_limit = *bucket_size_limit_iterator; bucket.size_limit = bucket_size_limit; if (bucket.size >= bucket_size_limit) { + + if (tensors[0].device().index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size push bucket.size %ld\n", bucket.size)); + } result.emplace_back(std::move(bucket.indices), bucket.size_limit); bucket = BucketAccumulator(); @@ -2221,7 +2509,60 @@ compute_bucket_assignment_by_size( for (auto& it : buckets) { auto& bucket = it.second; if (!bucket.indices.empty()) { + + if (tensors[0].device().index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size %d %d\n", + (int)bucket.indices.size(), (int)bucket.size_limit)); + } + + #if 0 + int last_bucket_size = std::stoi(CcaGetEnv("DBGENV_LAST_BUCKET_SIZE", "0")) * 1024 * 1024; + if (last_bucket_size) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size last_bucket_size %d\n", last_bucket_size)); + + int last_i = bucket.indices.size() - 1; + int split_i = last_i; + int acc_size = 0; + for (; split_i >= 0; --split_i) { + const auto& tensor = tensors[bucket.indices[split_i]]; + acc_size += tensor.numel() * tensor.element_size(); + if (acc_size > last_bucket_size) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size acc_size %d split_i %d last_i %d\n", acc_size, split_i, last_i)); + break; + } + } + + if (split_i > 0) { + std::vector sub(bucket.indices.begin(), bucket.indices.begin() + split_i); + result.emplace_back(std::move(sub), bucket.size_limit); + } + if (split_i != last_i) { + std::vector sub(bucket.indices.begin() + split_i, bucket.indices.end()); + result.emplace_back(std::move(sub), bucket.size_limit); + } + #endif + + + int last_bucket_num_elem = std::stoi(CcaGetEnv("DBGENV_LAST_BUCKET_NUM_ELEM", "0")); + int remain_num = bucket.indices.size(); + if (last_bucket_num_elem && last_bucket_num_elem < remain_num) { + if (tensors[0].device().index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size last_bucket_num_elem %d\n", last_bucket_num_elem)); + } + if (last_bucket_num_elem > 0) { + std::vector sub(bucket.indices.begin(), bucket.indices.begin() + remain_num - last_bucket_num_elem); + result.emplace_back(std::move(sub), bucket.size_limit); + } + if (last_bucket_num_elem != remain_num) { + std::vector sub(bucket.indices.begin() + remain_num - last_bucket_num_elem, bucket.indices.end()); + result.emplace_back(std::move(sub), bucket.size_limit); + } + } else { + if (tensors[0].device().index() == 0) { + CCADEBUG(std::fprintf(stderr, "cca_log compute_bucket_assignment_by_size default\n")); + } result.emplace_back(std::move(bucket.indices), bucket.size_limit); + } } } diff --git a/torch/csrc/distributed/c10d/reducer.hpp b/torch/csrc/distributed/c10d/reducer.hpp index 6707975d38ac1..66d41a637d38b 100644 --- a/torch/csrc/distributed/c10d/reducer.hpp +++ b/torch/csrc/distributed/c10d/reducer.hpp @@ -24,6 +24,10 @@ #include #endif +#ifdef USE_ROCM +#include +#endif + namespace c10d { constexpr int kDefaultFirstBucketBytes = int(1024 * 1024); @@ -343,6 +347,9 @@ class TORCH_API Reducer { // and the buckets are reduced in a predetermined order consistent across // processes. struct Bucket { + bool has_stream1 = false; + bool has_stream2 = false; + bool mul_out_moved = false; // Gradients of the bucket flattened into a 1-dimensional tensor at::Tensor gradients; @@ -398,6 +405,17 @@ class TORCH_API Reducer { // done on different CUDA streams. We record an event for every copy // so that we can synchronize with them prior to kicking off the reduction. // std::vector events; + struct DeferMulOut { + DeferMulOut() {} + DeferMulOut(at::Tensor &grad, at::Tensor &bucket_view) : grad_(grad), bucket_view_(bucket_view) {} + at::Tensor grad_; + at::Tensor bucket_view_; + }; + std::unordered_map defer_mul_out; + +#ifdef USE_ROCM + std::unordered_map mul_out_evt; +#endif }; std::vector buckets_; diff --git a/torch/cuda/__init__.py b/torch/cuda/__init__.py index e9049f036e1e2..f9c4a6dee12d2 100644 --- a/torch/cuda/__init__.py +++ b/torch/cuda/__init__.py @@ -733,6 +733,38 @@ def set_stream(stream: Stream): ) +def debug_set_bwd_rccl_stream( + module, bwd_stream: Stream, rccl_stream: Stream +): + + bwd_stream_id = bwd_stream.stream_id + bwd_device_index = bwd_stream.device_index + bwd_device_type = bwd_stream.device_type + + rccl_stream_id = rccl_stream.stream_id + rccl_device_index = rccl_stream.device_index + rccl_device_type = rccl_stream.device_type + + print("bwd_stream_id ",bwd_stream_id, + " bwd_device_index ",bwd_device_index, + " bwd_device_type ",bwd_device_type, + " rccl_stream_id ",rccl_stream_id, + " rccl_device_index ",rccl_device_index, + " rccl_device_type ",rccl_device_type) + + torch._C._cuda_set_backward_rccl_stream( + module=module, + bwd_stream_id=bwd_stream_id, + bwd_device_index=bwd_device_index, + bwd_device_type=bwd_device_type, + rccl_stream_id=rccl_stream_id, + rccl_device_index=rccl_device_index, + rccl_device_type=rccl_device_type, + ) + +def debug_reset_rccl_stream_cnt(): + torch._C._cuda_reset_rccl_stream_cnt() + def _parse_visible_devices() -> Union[list[int], list[str]]: r"""Parse CUDA_VISIBLE_DEVICES environment variable.""" var = os.getenv("CUDA_VISIBLE_DEVICES") diff --git a/torch/nn/modules/module.py b/torch/nn/modules/module.py index f0c4914782f39..d4e92c4773e2a 100644 --- a/torch/nn/modules/module.py +++ b/torch/nn/modules/module.py @@ -1783,7 +1783,16 @@ def _call_impl(self, *args, **kwargs): if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_pre_hooks or _global_backward_hooks or _global_forward_hooks or _global_forward_pre_hooks): - return forward_call(*args, **kwargs) + # return forward_call(*args, **kwargs) + pushed = False + if torch._C._autograd_is_stream_module(self): + torch._C._autograd_push_stream_tag() + pushed = True + try: + return forward_call(*args, **kwargs) + finally: + if pushed: + torch._C._autograd_pop_stream_tag() result = None called_always_called_hooks = set()