diff --git a/doc/source/ray-core/doc_code/ray_oom_prevention.py b/doc/source/ray-core/doc_code/ray_oom_prevention.py index 03aa021c56eb..cb3122103156 100644 --- a/doc/source/ray-core/doc_code/ray_oom_prevention.py +++ b/doc/source/ray-core/doc_code/ray_oom_prevention.py @@ -10,7 +10,7 @@ # __last_task_start__ import ray -@ray.remote(max_retries=-1) +@ray.remote(max_retries=0) def leaks_memory(): chunks = [] bits_to_allocate = 8 * 100 * 1024 * 1024 # ~100 MiB @@ -67,11 +67,11 @@ def allocate(self, bytes_to_allocate: float) -> None: max_restarts=0, max_task_retries=0, name="second_actor" ).remote() -# each task requests 0.3 of the system memory when the memory threshold is 0.4. -allocate_bytes = get_additional_bytes_to_reach_memory_usage_pct(0.3) +# We want total bytes to exceed 40% threshold to trigger the memory monitor. +allocate_bytes = get_additional_bytes_to_reach_memory_usage_pct(0.5) -first_actor_task = first_actor.allocate.remote(allocate_bytes) -second_actor_task = second_actor.allocate.remote(allocate_bytes) +first_actor_task = first_actor.allocate.remote(0.9 * allocate_bytes) +second_actor_task = second_actor.allocate.remote(0.1* allocate_bytes) error_thrown = False try: diff --git a/src/ray/common/BUILD.bazel b/src/ray/common/BUILD.bazel index c0f92efc8583..ae2113296ebc 100644 --- a/src/ray/common/BUILD.bazel +++ b/src/ray/common/BUILD.bazel @@ -135,6 +135,8 @@ ray_cc_library( ], deps = [ ":memory_monitor_interface", + ":ray_config", + "//src/ray/common/cgroup2:cgroup_manager_interface", "//src/ray/util:compat", "//src/ray/util:logging", "@boost//:algorithm", diff --git a/src/ray/common/cgroup2/BUILD.bazel b/src/ray/common/cgroup2/BUILD.bazel index b9c52eadee59..247f139784d9 100644 --- a/src/ray/common/cgroup2/BUILD.bazel +++ b/src/ray/common/cgroup2/BUILD.bazel @@ -58,7 +58,10 @@ ray_cc_library( "cgroup_manager.h", "scoped_cgroup_operation.h", ], - visibility = [":__subpackages__"], + visibility = [ + ":__subpackages__", + "//src/ray/common/tests:__pkg__", + ], deps = [ ":cgroup_driver_interface", ":cgroup_manager_interface", @@ -74,7 +77,11 @@ ray_cc_library( hdrs = [ "noop_cgroup_manager.h", ], - visibility = ["//visibility:public"], + visibility = [ + ":__subpackages__", + "//src/ray/common/tests:__pkg__", + "//src/ray/raylet/tests:__pkg__", + ], deps = [ ":cgroup_driver_interface", ":cgroup_manager_interface", @@ -117,7 +124,10 @@ ray_cc_library( target_compatible_with = [ "@platforms//os:linux", ], - visibility = [":__subpackages__"], + visibility = [ + ":__subpackages__", + "//src/ray/common/tests:__pkg__", + ], deps = [ ":cgroup_driver_interface", "//src/ray/common:status", diff --git a/src/ray/common/memory_monitor_utils.cc b/src/ray/common/memory_monitor_utils.cc index a263a241181f..e7ed0d218803 100644 --- a/src/ray/common/memory_monitor_utils.cc +++ b/src/ray/common/memory_monitor_utils.cc @@ -22,6 +22,7 @@ #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "ray/common/memory_monitor_interface.h" +#include "ray/common/ray_config.h" #include "ray/util/logging.h" namespace ray { @@ -292,23 +293,59 @@ int64_t MemoryMonitorUtils::NullableMin(int64_t left, int64_t right) { } } -int64_t MemoryMonitorUtils::GetMemoryThreshold(int64_t total_memory_bytes, - float usage_threshold, - int64_t min_memory_free_bytes) { +int64_t MemoryMonitorUtils::GetMemoryThreshold( + int64_t total_memory_bytes, + float usage_threshold, + int64_t min_memory_free_bytes, + bool resource_isolation_enabled, + const CgroupManagerInterface &cgroup_manager) { RAY_CHECK_GE(total_memory_bytes, MemoryMonitorInterface::kNull); RAY_CHECK_GE(min_memory_free_bytes, MemoryMonitorInterface::kNull); - RAY_CHECK_GE(usage_threshold, 0); - RAY_CHECK_LE(usage_threshold, 1); + RAY_CHECK_GE(usage_threshold, 0) + << "Invalid configuration: usage_threshold must be >= 0"; + RAY_CHECK_LE(usage_threshold, 1) + << "Invalid configuration: usage_threshold must be <= 1"; - int64_t threshold_fraction = (int64_t)(total_memory_bytes * usage_threshold); + int64_t resolved_memory_threshold_bytes; + int64_t threshold_fraction = static_cast(total_memory_bytes * usage_threshold); if (min_memory_free_bytes > MemoryMonitorInterface::kNull) { int64_t threshold_absolute = total_memory_bytes - min_memory_free_bytes; RAY_CHECK_GE(threshold_absolute, 0); - return std::max(threshold_fraction, threshold_absolute); + resolved_memory_threshold_bytes = std::max(threshold_fraction, threshold_absolute); } else { - return threshold_fraction; + resolved_memory_threshold_bytes = threshold_fraction; + } + + if (resource_isolation_enabled) { + StatusOr user_memory_max_bytes_or = + cgroup_manager.GetUserCgroupConstraintValue("memory.max"); + RAY_CHECK(user_memory_max_bytes_or.ok()) << absl::StrFormat( + "Failed to get user cgroup memory limit when setting up memory monitor: %s", + user_memory_max_bytes_or.ToString()); + std::string user_memory_max_bytes_str = user_memory_max_bytes_or.value(); + + if (!user_memory_max_bytes_str.empty() && + std::all_of(user_memory_max_bytes_str.begin(), + user_memory_max_bytes_str.end(), + ::isdigit)) { + int64_t user_memory_max_bytes = std::stoll(user_memory_max_bytes_str); + int64_t reaction_buffer_bytes = + std::min(static_cast(total_memory_bytes * + kDefaultThresholdMonitorReactionBufferProportion), + RayConfig::instance().max_threshold_monitor_reaction_buffer_bytes()); + resolved_memory_threshold_bytes = user_memory_max_bytes - reaction_buffer_bytes; + RAY_CHECK_GE(resolved_memory_threshold_bytes, 0) << absl::StrFormat( + "Available user task memory is less than the kill memory buffer bytes: " + "%d < %d. This means the available memory for user proceses is likely " + "less than 5%% of total memory. Please consider decreasing the proportion " + "of reserved system memory if it was custom set.", + user_memory_max_bytes, + reaction_buffer_bytes); + } } + + return resolved_memory_threshold_bytes; } int64_t MemoryMonitorUtils::GetProcessUsedMemoryBytes( diff --git a/src/ray/common/memory_monitor_utils.h b/src/ray/common/memory_monitor_utils.h index 325b02bfdebf..cc6e49596123 100644 --- a/src/ray/common/memory_monitor_utils.h +++ b/src/ray/common/memory_monitor_utils.h @@ -21,6 +21,7 @@ #include #include +#include "ray/common/cgroup2/cgroup_manager_interface.h" #include "ray/common/memory_monitor_interface.h" #include "ray/util/compat.h" @@ -69,11 +70,18 @@ class MemoryMonitorUtils { * @param usage_threshold A value in [0-1] to indicate the max usage. * @param min_memory_free_bytes The min amount of free space to maintain before it is * exceeding the threshold. + * @param resource_isolation_enabled Whether resource isolation is enabled. Used + * to determine if the threshold should be calculated based on the cgroup + * constraints. + * @param cgroup_manager The cgroup manager to fetch the upper bound memory constraints + * from. * @return The memory threshold. */ static int64_t GetMemoryThreshold(int64_t total_memory_bytes, float usage_threshold, - int64_t min_memory_free_bytes); + int64_t min_memory_free_bytes, + bool resource_isolation_enabled, + const CgroupManagerInterface &cgroup_manager); /** * @brief Gets the used memory for a process from the process memory snapshot. @@ -221,6 +229,8 @@ class MemoryMonitorUtils { static constexpr char kCgroupsV2MemoryStatActiveFileKey[] = "active_file"; static constexpr char kProcDirectory[] = "/proc"; static constexpr char kCommandlinePath[] = "cmdline"; + + static constexpr double kDefaultThresholdMonitorReactionBufferProportion = 0.05; }; } // namespace ray diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 5a292e827558..8c90d03529cb 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -72,9 +72,10 @@ RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100) /// it will start killing processes to free up the space. /// Note: when resource isolation is enabled, the memory usage threshold is set to /// total memory - system reserved memory (can be specified in ray start) - -/// kill_memory_buffer_bytes. Notice that the formula does not account for object store -/// memory in system reserved memory. To configure the usage threshold, please adjust the -/// system reserved memory in ray start command instead. Ranging from [0, 1] +/// max(5% of total memory, max_threshold_monitor_reaction_buffer_bytes). +/// Notice that the formula does not account for object store memory in system reserved +/// memory. To configure the usage threshold, please adjust the system reserved memory in +/// ray start command instead. Ranging from [0, 1] RAY_CONFIG(float, memory_usage_threshold, 0.95) /// The interval between runs of the memory usage monitor. @@ -90,9 +91,24 @@ RAY_CONFIG(uint64_t, memory_monitor_refresh_ms, 250) /// means 6.4 GB of the memory will not be usable. RAY_CONFIG(int64_t, min_memory_free_bytes, (int64_t)-1) -/// The amount of memory to free under the memory usage threshold when -/// killing workers via the worker killing policy. -RAY_CONFIG(uint64_t, kill_memory_buffer_bytes, 3ULL * 1024 * 1024 * 1024) // 3GB +/// The maximum amount of memory to free under the memory usage threshold when +/// killing workers via the worker killing policy. The system will by default +/// free up to 5% of total memory under the threshold, capping at +/// max_kill_memory_buffer_bytes. +RAY_CONFIG(int64_t, max_kill_memory_buffer_bytes, 3ULL * 1024 * 1024 * 1024) // 3GiB cap + +/// The threshold monitor is poll based and may miss memory bursts occurring between +/// polls. This is the maximum buffer size that can be subtracted from memory.max to give +/// the threshold monitor time to react before memory max is reached under resource +/// isolation. The system will by default provide 5% of total memory as reaction buffer, +/// capping at max_threshold_monitor_reaction_buffer_bytes. +RAY_CONFIG(int64_t, + max_threshold_monitor_reaction_buffer_bytes, + 2LL * 1024 * 1024 * 1024) // 2GiB + +/// When true, use the legacy group-by-owner worker killing policy instead of the +/// default time-based policy. +RAY_CONFIG(bool, worker_killing_policy_by_group, false) /// The reserved memory bytes for system processes /// enforced via cgroup memory.min constraint which guarantees diff --git a/src/ray/common/tests/BUILD.bazel b/src/ray/common/tests/BUILD.bazel index 7d2ced3b8730..7ab0c62e8f7c 100644 --- a/src/ray/common/tests/BUILD.bazel +++ b/src/ray/common/tests/BUILD.bazel @@ -112,7 +112,11 @@ ray_cc_test( "//src/ray/common:id", "//src/ray/common:memory_monitor_test_fixture", "//src/ray/common:memory_monitor_utils", + "//src/ray/common:ray_config", + "//src/ray/common/cgroup2:cgroup_manager", "//src/ray/common/cgroup2:cgroup_test_utils", + "//src/ray/common/cgroup2:fake_cgroup_driver", + "//src/ray/common/cgroup2:noop_cgroup_manager", "//src/ray/util:process", "@boost//:filesystem", "@com_google_absl//absl/container:flat_hash_map", @@ -135,7 +139,9 @@ ray_cc_test( deps = [ "//src/ray/common:memory_monitor_interface", "//src/ray/common:memory_monitor_test_fixture", + "//src/ray/common:memory_monitor_utils", "//src/ray/common:threshold_memory_monitor", + "//src/ray/common/cgroup2:noop_cgroup_manager", "@boost//:thread", ], ) diff --git a/src/ray/common/tests/memory_monitor_utils_test.cc b/src/ray/common/tests/memory_monitor_utils_test.cc index 551d0fce8661..84b2391f653a 100644 --- a/src/ray/common/tests/memory_monitor_utils_test.cc +++ b/src/ray/common/tests/memory_monitor_utils_test.cc @@ -18,11 +18,16 @@ #include #include #include +#include #include "gtest/gtest.h" +#include "ray/common/cgroup2/cgroup_manager.h" #include "ray/common/cgroup2/cgroup_test_utils.h" +#include "ray/common/cgroup2/fake_cgroup_driver.h" +#include "ray/common/cgroup2/noop_cgroup_manager.h" #include "ray/common/id.h" #include "ray/common/memory_monitor_test_fixture.h" +#include "ray/common/ray_config.h" #include "ray/util/process.h" namespace ray { @@ -228,25 +233,82 @@ TEST_F(MemoryMonitorUtilsTest, TestCgroupNonexistentUsageFileReturnskNull) { } TEST_F(MemoryMonitorUtilsTest, TestGetMemoryThresholdTakeGreaterOfTheTwoValues) { - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0), 100); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60), 50); + NoopCgroupManager noop_cgroup_manager; + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0, false, noop_cgroup_manager), + 100); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60, false, noop_cgroup_manager), + 50); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10), 100); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100), 100); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10, false, noop_cgroup_manager), + 100); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100, false, noop_cgroup_manager), + 100); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100), 10); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10), 90); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100), 0); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100, false, noop_cgroup_manager), + 10); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10, false, noop_cgroup_manager), 90); + ASSERT_EQ( + MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100, false, noop_cgroup_manager), 0); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, MemoryMonitorInterface::kNull), + ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold( + 100, 0, MemoryMonitorInterface::kNull, false, noop_cgroup_manager), 0); - ASSERT_EQ( - MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, MemoryMonitorInterface::kNull), - 50); - ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, MemoryMonitorInterface::kNull), + ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold( + 100, 0.5, MemoryMonitorInterface::kNull, false, noop_cgroup_manager), + 50); + ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold( + 100, 1, MemoryMonitorInterface::kNull, false, noop_cgroup_manager), 100); } +TEST_F( + MemoryMonitorUtilsTest, + TestGetMemoryThresholdWithResourceIsolationUsesUpperBoundConstraintToComputeThreshold) { + // Create a fake cgroup directory using MockCgroupMemoryUsage. + std::string cgroup_dir = MockCgroupMemoryUsage( + /*total_bytes=*/16LL * 1024 * 1024 * 1024, + /*current_bytes=*/5LL * 1024 * 1024 * 1024, + /*inactive_file_bytes=*/100 * 1024 * 1024, + /*active_file_bytes=*/50 * 1024 * 1024); + + // Create a CgroupManager backed by a fake cgroup driver on the mock cgroup directory. + std::shared_ptr> cgroups = + std::make_shared>(); + cgroups->emplace(cgroup_dir, FakeCgroup{cgroup_dir, {5}, {}, {"cpu", "memory"}, {}}); + std::unique_ptr driver = FakeCgroupDriver::Create(cgroups); + + int64_t user_memory_max_bytes = 10LL * 1024 * 1024 * 1024; // 10 GB + StatusOr> result = + CgroupManager::Create(cgroup_dir, + "node_id_123", + /*system_reserved_cpu_weight=*/100, + /*system_memory_bytes_min=*/1LL * 1024 * 1024 * 1024, + /*system_memory_bytes_low=*/1LL * 1024 * 1024 * 1024, + /*user_memory_high_bytes=*/user_memory_max_bytes, + user_memory_max_bytes, + std::move(driver)); + std::unique_ptr cgroup_manager = std::move(result.value()); + + // Reaction buffer defaults to 5% of total memory. If + // kDefaultThresholdMonitorReactionBufferProportion is changed, this should be changed + // accordingly. + int64_t expected_threshold = + user_memory_max_bytes - static_cast(16LL * 1024 * 1024 * 1024 * 0.05); + ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold( + /*total_memory_bytes=*/16LL * 1024 * 1024 * 1024, + /*usage_threshold=*/0.5, + /*min_memory_free_bytes=*/MemoryMonitorInterface::kNull, + /*resource_isolation_enabled=*/true, + *cgroup_manager), + expected_threshold); +} + TEST_F(MemoryMonitorUtilsTest, TestGetPidsFromDirOnlyReturnsNumericFilenames) { std::string proc_dir = UniqueID::FromRandom().Hex(); boost::filesystem::create_directory(proc_dir); diff --git a/src/ray/common/tests/threshold_memory_monitor_test.cc b/src/ray/common/tests/threshold_memory_monitor_test.cc index c2b9823151d5..9121500337bd 100644 --- a/src/ray/common/tests/threshold_memory_monitor_test.cc +++ b/src/ray/common/tests/threshold_memory_monitor_test.cc @@ -23,6 +23,7 @@ #include #include "gtest/gtest.h" +#include "ray/common/cgroup2/noop_cgroup_manager.h" #include "ray/common/memory_monitor_interface.h" #include "ray/common/memory_monitor_test_fixture.h" #include "ray/common/memory_monitor_utils.h" @@ -79,8 +80,9 @@ TEST_F(ThresholdMemoryMonitorTest, std::shared_ptr has_checked_once = std::make_shared(1); - int64_t memory_usage_threshold_bytes = - MemoryMonitorUtils::GetMemoryThreshold(cgroup_total_bytes, 0.7f, -1); + NoopCgroupManager noop_cgroup_manager; + int64_t memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold( + cgroup_total_bytes, 0.7f, -1, false, noop_cgroup_manager); MakeThresholdMemoryMonitor( memory_usage_threshold_bytes, // (70%) 1 /*refresh_interval_ms*/, @@ -109,8 +111,9 @@ TEST_F(ThresholdMemoryMonitorTest, std::shared_ptr> callback_triggered = std::make_shared>(false); - int64_t memory_usage_threshold_bytes = - MemoryMonitorUtils::GetMemoryThreshold(cgroup_total_bytes, 0.7f, -1); + NoopCgroupManager noop_cgroup_manager; + int64_t memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold( + cgroup_total_bytes, 0.7f, -1, false, noop_cgroup_manager); MakeThresholdMemoryMonitor( memory_usage_threshold_bytes, // (70%) 1 /*refresh_interval_ms*/, diff --git a/src/ray/common/threshold_memory_monitor_factory.cc b/src/ray/common/threshold_memory_monitor_factory.cc index c94b21335fd8..7c3fca8a514c 100644 --- a/src/ray/common/threshold_memory_monitor_factory.cc +++ b/src/ray/common/threshold_memory_monitor_factory.cc @@ -38,46 +38,16 @@ std::unique_ptr MemoryMonitorFactory::Create( return std::make_unique(); } - float usage_threshold = RayConfig::instance().memory_usage_threshold(); - RAY_CHECK_GE(usage_threshold, 0) - << "Invalid configuration: usage_threshold must be >= 0"; - RAY_CHECK_LE(usage_threshold, 1) - << "Invalid configuration: usage_threshold must be <= 1"; int64_t total_memory_bytes = MemoryMonitorUtils::TakeSystemMemorySnapshot( MemoryMonitorInterface::kDefaultCgroupPath) .total_bytes; memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold( - total_memory_bytes, usage_threshold, RayConfig::instance().min_memory_free_bytes()); + total_memory_bytes, + RayConfig::instance().memory_usage_threshold(), + RayConfig::instance().min_memory_free_bytes(), + resource_isolation_enabled, + cgroup_manager); - if (resource_isolation_enabled) { - std::string user_cgroup_path = cgroup_manager.GetUserCgroupPath(); - StatusOr result = - cgroup_manager.GetUserCgroupConstraintValue("memory.max"); - RAY_CHECK(result.ok()) << absl::StrFormat( - "Failed to get user cgroup memory limit when setting up memory monitor: %s", - result.ToString()); - std::string user_memory_max_bytes_str = result.value(); - RAY_CHECK(!user_memory_max_bytes_str.empty()) << absl::StrFormat( - "Failed to get memory.max constraint value from user cgroup %s. " - "Please check that the cgroup path for resource isolation is correct.", - user_cgroup_path); - - if (!user_memory_max_bytes_str.empty() && - std::all_of(user_memory_max_bytes_str.begin(), - user_memory_max_bytes_str.end(), - ::isdigit)) { - int64_t user_memory_max_bytes = std::stoll(user_memory_max_bytes_str); - memory_usage_threshold_bytes = - user_memory_max_bytes - - static_cast(RayConfig::instance().kill_memory_buffer_bytes()); - RAY_CHECK_GE(memory_usage_threshold_bytes, 0) << absl::StrFormat( - "Available user task memory is less than the kill memory buffer bytes: " - "%d < %d. Please consider using a host with more memory. If the current host " - "memory size must be kept, please adjust the kill memory buffer size.", - user_memory_max_bytes, - RayConfig::instance().kill_memory_buffer_bytes()); - } - } return std::make_unique(std::move(kill_workers_callback), memory_usage_threshold_bytes, monitor_interval_ms); diff --git a/src/ray/raylet/BUILD.bazel b/src/ray/raylet/BUILD.bazel index 09e4a27ac374..cfe313714275 100644 --- a/src/ray/raylet/BUILD.bazel +++ b/src/ray/raylet/BUILD.bazel @@ -189,7 +189,7 @@ ray_cc_library( name = "worker_killing_policy_factory", srcs = select({ "//bazel:is_linux": [ - "worker_killing_policy_group_by_owner_factory.cc", + "worker_killing_policy_factory.cc", ], "//conditions:default": [ "noop_worker_killing_policy_factory.cc", @@ -201,9 +201,13 @@ ray_cc_library( visibility = [":__subpackages__"], deps = [ ":worker_killing_policy_interface", + "//src/ray/common:ray_config", + "//src/ray/common/cgroup2:cgroup_manager_interface", ] + select({ "//bazel:is_linux": [ + ":worker_killing_policy_by_time", ":worker_killing_policy_group_by_owner", + "//src/ray/common:memory_monitor_utils", ], "//conditions:default": [ ":noop_worker_killing_policy", diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f66653979f03..4bf70b196751 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -242,7 +242,8 @@ NodeManager::NodeManager( record_metrics_period_ms_(config.record_metrics_period_ms), placement_group_resource_manager_(placement_group_resource_manager), ray_syncer_(io_service_, self_node_id_.Binary(), 1, 0), - worker_killing_policy_(WorkerKillingPolicyFactory::Create()), + worker_killing_policy_(WorkerKillingPolicyFactory::Create( + config.enable_resource_isolation, *cgroup_manager)), memory_monitor_(MemoryMonitorFactory::Create(CreateKillWorkersCallback(), config.enable_resource_isolation, *cgroup_manager)), @@ -3082,7 +3083,9 @@ KillWorkersCallback NodeManager::CreateKillWorkersCallback() { int64_t computed_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold( total_memory_bytes, RayConfig::instance().memory_usage_threshold(), - RayConfig::instance().min_memory_free_bytes()); + RayConfig::instance().min_memory_free_bytes(), + initial_config_.enable_resource_isolation, + *cgroup_manager_); float computed_threshold_fraction = static_cast(computed_threshold_bytes) / static_cast(total_memory_bytes); @@ -3284,9 +3287,16 @@ std::string NodeManager::CreateOomKillMessageSuggestions( "parallelism by requesting more CPUs per task. %s" "To adjust the kill " "threshold, set the environment variable " - "`RAY_memory_usage_threshold` when starting Ray. To disable " - "worker killing, set the environment variable " - "`RAY_memory_monitor_refresh_ms` to zero.", + "`RAY_memory_usage_threshold` when starting Ray. " + "To disable worker killing, set the environment variable " + "`RAY_memory_monitor_refresh_ms` to zero. " + "Since 2.56, Ray updated the oom killing policy to enabling killing " + "multiple workers and selecting workers based on the time since " + "the task start executing. To revert to the legacy policy of " + "determining worker to oom kill based on owner group size or only " + "selecting a single worker to kill at a time, set the environment " + "variable `RAY_worker_killing_policy_by_group` to true before " + "starting Ray.", not_retriable_recommendation_ss.str()); } diff --git a/src/ray/raylet/noop_worker_killing_policy_factory.cc b/src/ray/raylet/noop_worker_killing_policy_factory.cc index 3845fc8ddbfe..721821c0670c 100644 --- a/src/ray/raylet/noop_worker_killing_policy_factory.cc +++ b/src/ray/raylet/noop_worker_killing_policy_factory.cc @@ -21,7 +21,8 @@ namespace ray { namespace raylet { -std::unique_ptr WorkerKillingPolicyFactory::Create() { +std::unique_ptr WorkerKillingPolicyFactory::Create( + bool resource_isolation_enabled, const CgroupManagerInterface &cgroup_manager) { return std::make_unique(); } diff --git a/src/ray/raylet/worker_killing_policy_factory.cc b/src/ray/raylet/worker_killing_policy_factory.cc new file mode 100644 index 000000000000..ed3e3f1196a1 --- /dev/null +++ b/src/ray/raylet/worker_killing_policy_factory.cc @@ -0,0 +1,57 @@ +// Copyright 2026 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/worker_killing_policy_factory.h" + +#include +#include + +#include "ray/common/memory_monitor_utils.h" +#include "ray/common/ray_config.h" +#include "ray/raylet/worker_killing_policy_by_time.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" +#include "ray/raylet/worker_killing_policy_interface.h" + +namespace ray { + +namespace raylet { + +std::unique_ptr WorkerKillingPolicyFactory::Create( + bool resource_isolation_enabled, const CgroupManagerInterface &cgroup_manager) { + if (RayConfig::instance().worker_killing_policy_by_group()) { + return std::make_unique(); + } + + int64_t total_memory_bytes = MemoryMonitorUtils::TakeSystemMemorySnapshot( + MemoryMonitorInterface::kDefaultCgroupPath) + .total_bytes; + int64_t memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold( + total_memory_bytes, + RayConfig::instance().memory_usage_threshold(), + RayConfig::instance().min_memory_free_bytes(), + resource_isolation_enabled, + cgroup_manager); + + int64_t kill_memory_buffer_bytes = + std::min(static_cast( + total_memory_bytes * + WorkerKillingPolicyInterface::kDefaultKillMemoryBufferProportion), + RayConfig::instance().max_kill_memory_buffer_bytes()); + return std::make_unique(memory_usage_threshold_bytes, + kill_memory_buffer_bytes); +} + +} // namespace raylet + +} // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_factory.h b/src/ray/raylet/worker_killing_policy_factory.h index d61c97dbf481..efdb0a6c5059 100644 --- a/src/ray/raylet/worker_killing_policy_factory.h +++ b/src/ray/raylet/worker_killing_policy_factory.h @@ -16,6 +16,7 @@ #include +#include "ray/common/cgroup2/cgroup_manager_interface.h" #include "ray/raylet/worker_killing_policy_interface.h" namespace ray { @@ -27,9 +28,14 @@ class WorkerKillingPolicyFactory { /** * Create a worker killing policy instance. * + * @param resource_isolation_enabled Whether resource isolation is enabled. + * Used to determine if the threshold the killing policy will kill to will + * be based on the cgroup memory constraints. + * @param cgroup_manager The cgroup manager to use to get the memory constraints. * @return a unique pointer to the worker killing policy instance. */ - static std::unique_ptr Create(); + static std::unique_ptr Create( + bool resource_isolation_enabled, const CgroupManagerInterface &cgroup_manager); }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_factory.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_factory.cc deleted file mode 100644 index e02c67cbd8b5..000000000000 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_factory.cc +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2026 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "ray/raylet/worker_killing_policy_factory.h" -#include "ray/raylet/worker_killing_policy_group_by_owner.h" - -namespace ray { - -namespace raylet { - -std::unique_ptr WorkerKillingPolicyFactory::Create() { - return std::make_unique(); -} - -} // namespace raylet - -} // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_interface.h b/src/ray/raylet/worker_killing_policy_interface.h index 88ba20a85edf..a15466480f19 100644 --- a/src/ray/raylet/worker_killing_policy_interface.h +++ b/src/ray/raylet/worker_killing_policy_interface.h @@ -44,6 +44,8 @@ class WorkerKillingPolicyInterface { const SystemMemorySnapshot &system_memory) = 0; virtual ~WorkerKillingPolicyInterface() = default; + + static constexpr double kDefaultKillMemoryBufferProportion = 0.05; }; } // namespace raylet