Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a70ff17
Switch group killing policy to by time killing policy
Apr 14, 2026
b35865a
Add env var to support fallback to legacy killing policy
Apr 16, 2026
5c2599f
Fix get memory threshold test to reflect threshold monitor buffer
Apr 16, 2026
8245dbb
Make threshold monitor buffer adjustable
Apr 16, 2026
caf195e
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 16, 2026
b886146
Avoid transitive dependencies
Apr 16, 2026
a7282df
Merge branch 'master' into time_based_killing_policy
Apr 17, 2026
6609b06
Remove misleading option in ray oom prevention test
Apr 17, 2026
7e7031f
Remove unnecessary string trim
Apr 17, 2026
c5a67e2
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 17, 2026
dc8a434
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 17, 2026
2c14f01
Remove kp transitive dependency on memmon utils
Apr 17, 2026
2884c72
debugging
Apr 17, 2026
e394d33
Default kill buffers to 5%
Apr 19, 2026
251a225
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 19, 2026
83f9f20
Remove ci debug logic
Apr 19, 2026
4db1afd
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 19, 2026
2ce6a98
Provide guidance on how to revert to legacy kp in oom log
Apr 21, 2026
a177a40
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 21, 2026
4091330
Clarify killing policy behavioral change version
Apr 21, 2026
c2655d3
Add space before clarifying killing policy behavioral change version
Apr 21, 2026
6a99c1c
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/ray-core/doc_code/ray_oom_prevention.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# __last_task_start__
import ray

@ray.remote(max_retries=-1)
@ray.remote(max_retries=0)
def leaks_memory():
chunks = []
bits_to_allocate = 8 * 100 * 1024 * 1024 # ~100 MiB
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ ray_cc_library(
],
deps = [
":memory_monitor_interface",
":ray_config",
"//src/ray/common/cgroup2:cgroup_manager_interface",
"//src/ray/util:compat",
"//src/ray/util:logging",
"@boost//:algorithm",
Expand Down
15 changes: 12 additions & 3 deletions src/ray/common/cgroup2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ ray_cc_library(
"cgroup_manager.h",
"scoped_cgroup_operation.h",
],
visibility = [":__subpackages__"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
Expand All @@ -74,7 +77,10 @@ ray_cc_library(
hdrs = [
"noop_cgroup_manager.h",
],
visibility = ["//visibility:public"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
Expand Down Expand Up @@ -117,7 +123,10 @@ ray_cc_library(
target_compatible_with = [
"@platforms//os:linux",
],
visibility = [":__subpackages__"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
"//src/ray/common:status",
Expand Down
51 changes: 43 additions & 8 deletions src/ray/common/memory_monitor_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "ray/common/memory_monitor_interface.h"
#include "ray/common/ray_config.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down Expand Up @@ -292,23 +293,57 @@ int64_t MemoryMonitorUtils::NullableMin(int64_t left, int64_t right) {
}
}

int64_t MemoryMonitorUtils::GetMemoryThreshold(int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes) {
int64_t MemoryMonitorUtils::GetMemoryThreshold(
int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes,
bool resource_isolation_enabled,
const CgroupManagerInterface &cgroup_manager) {
RAY_CHECK_GE(total_memory_bytes, MemoryMonitorInterface::kNull);
RAY_CHECK_GE(min_memory_free_bytes, MemoryMonitorInterface::kNull);
RAY_CHECK_GE(usage_threshold, 0);
RAY_CHECK_LE(usage_threshold, 1);
RAY_CHECK_GE(usage_threshold, 0)
<< "Invalid configuration: usage_threshold must be >= 0";
RAY_CHECK_LE(usage_threshold, 1)
<< "Invalid configuration: usage_threshold must be <= 1";

int64_t threshold_fraction = (int64_t)(total_memory_bytes * usage_threshold);
int64_t resolved_memory_threshold_bytes;
int64_t threshold_fraction = static_cast<int64_t>(total_memory_bytes * usage_threshold);

if (min_memory_free_bytes > MemoryMonitorInterface::kNull) {
int64_t threshold_absolute = total_memory_bytes - min_memory_free_bytes;
RAY_CHECK_GE(threshold_absolute, 0);
return std::max(threshold_fraction, threshold_absolute);
resolved_memory_threshold_bytes = std::max(threshold_fraction, threshold_absolute);
} else {
return threshold_fraction;
resolved_memory_threshold_bytes = threshold_fraction;
}

if (resource_isolation_enabled) {
StatusOr<std::string> user_memory_max_bytes_or =
cgroup_manager.GetUserCgroupConstraintValue("memory.max");
RAY_CHECK(user_memory_max_bytes_or.ok()) << absl::StrFormat(
"Failed to get user cgroup memory limit when setting up memory monitor: %s",
user_memory_max_bytes_or.ToString());
std::string user_memory_max_bytes_str = user_memory_max_bytes_or.value();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When moving the logic from threshold_memory_monitor_factor.cc to here, the RAY_CHECK of checking whether user_memory_max_bytes_str are missing. Wondering is there a particular reason for that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the check for whether the string is empty is unnecessary as the underlying call to fetch the constraint value already asserts this.

if (!user_memory_max_bytes_str.empty() &&
std::all_of(user_memory_max_bytes_str.begin(),
user_memory_max_bytes_str.end(),
::isdigit)) {
Comment thread
Kunchd marked this conversation as resolved.
int64_t user_memory_max_bytes = std::stoll(user_memory_max_bytes_str);
resolved_memory_threshold_bytes =
user_memory_max_bytes -
RayConfig::instance().threshold_monitor_reaction_buffer_bytes();
RAY_CHECK_GE(resolved_memory_threshold_bytes, 0) << absl::StrFormat(
"Available user task memory is less than the kill memory buffer bytes: "
"%d < %d. Please consider using a host with more memory. If the current host "
"memory size must be kept, please adjust the threshold monitor reaction buffer "
"size.",
user_memory_max_bytes,
RayConfig::instance().threshold_monitor_reaction_buffer_bytes());
}
}

return resolved_memory_threshold_bytes;
}

int64_t MemoryMonitorUtils::GetProcessUsedMemoryBytes(
Expand Down
10 changes: 9 additions & 1 deletion src/ray/common/memory_monitor_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <tuple>
#include <vector>

#include "ray/common/cgroup2/cgroup_manager_interface.h"
#include "ray/common/memory_monitor_interface.h"
#include "ray/util/compat.h"

Expand Down Expand Up @@ -69,11 +70,18 @@ class MemoryMonitorUtils {
* @param usage_threshold A value in [0-1] to indicate the max usage.
* @param min_memory_free_bytes The min amount of free space to maintain before it is
* exceeding the threshold.
* @param resource_isolation_enabled Whether resource isolation is enabled. Used
* to determine if the threshold should be calculated based on the cgroup
* constraints.
* @param cgroup_manager The cgroup manager to fetch the upper bound memory constraints
* from.
* @return The memory threshold.
*/
static int64_t GetMemoryThreshold(int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes);
int64_t min_memory_free_bytes,
bool resource_isolation_enabled,
const CgroupManagerInterface &cgroup_manager);

/**
* @brief Gets the used memory for a process from the process memory snapshot.
Expand Down
12 changes: 12 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ RAY_CONFIG(int64_t, min_memory_free_bytes, (int64_t)-1)
/// killing workers via the worker killing policy.
RAY_CONFIG(uint64_t, kill_memory_buffer_bytes, 3ULL * 1024 * 1024 * 1024) // 3GB

/// The threshold monitor is poll based and may miss memory bursts occurring between
/// polls. This buffer is subtracted from memory.max to give the threshold monitor
/// time to react before memory max is reached under resource isolation.
RAY_CONFIG(int64_t,
threshold_monitor_reaction_buffer_bytes,
2LL * 1024 * 1024 * 1024) // 2GB

/// Whether to use the group-by-owner worker killing policy instead of the
/// default time-based worker killing policy. When true, workers killed by the
/// by group killing policy (legacy policy).
Comment thread
Kunchd marked this conversation as resolved.
Outdated
RAY_CONFIG(bool, WORKER_KILLING_POLICY_BY_GROUP, false)
Comment thread
Kunchd marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are changing the default behavior of the killing policy for all the users. Wondering have we communicated the default behavior to the users and should we gradually roll out the change instead of the directly change the config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted offline. The new policy behaves very similarly to the existing policy (identical in the common case when the driver is the single source that's submitting the tasks). Additionally, it addresses the issue where the existing policy was not killing aggressively enough to relieve us of memory pressure. We expect the new policy to be a general improvement across the board, but we will modify this PR to make it explicit that there's a behavioral change, and include this in the next version release. We will also leave a comment in the oom log so that users will know how to revert if they experience oom.


/// The reserved memory bytes for system processes
/// enforced via cgroup memory.min constraint which guarantees
/// that the system processes' memory will not be reclaimed under any conditions.
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ ray_cc_test(
"//src/ray/common:id",
"//src/ray/common:memory_monitor_test_fixture",
"//src/ray/common:memory_monitor_utils",
"//src/ray/common:ray_config",
"//src/ray/common/cgroup2:cgroup_manager",
"//src/ray/common/cgroup2:cgroup_test_utils",
"//src/ray/common/cgroup2:fake_cgroup_driver",
"//src/ray/common/cgroup2:noop_cgroup_manager",
"//src/ray/util:process",
"@boost//:filesystem",
"@com_google_absl//absl/container:flat_hash_map",
Expand All @@ -135,7 +139,9 @@ ray_cc_test(
deps = [
"//src/ray/common:memory_monitor_interface",
"//src/ray/common:memory_monitor_test_fixture",
"//src/ray/common:memory_monitor_utils",
"//src/ray/common:threshold_memory_monitor",
"//src/ray/common/cgroup2:noop_cgroup_manager",
"@boost//:thread",
],
)
Expand Down
86 changes: 74 additions & 12 deletions src/ray/common/tests/memory_monitor_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
#include <fstream>
#include <memory>
#include <string>
#include <unordered_map>

#include "gtest/gtest.h"
#include "ray/common/cgroup2/cgroup_manager.h"
#include "ray/common/cgroup2/cgroup_test_utils.h"
#include "ray/common/cgroup2/fake_cgroup_driver.h"
#include "ray/common/cgroup2/noop_cgroup_manager.h"
#include "ray/common/id.h"
#include "ray/common/memory_monitor_test_fixture.h"
#include "ray/common/ray_config.h"
#include "ray/util/process.h"

namespace ray {
Expand Down Expand Up @@ -228,25 +233,82 @@ TEST_F(MemoryMonitorUtilsTest, TestCgroupNonexistentUsageFileReturnskNull) {
}

TEST_F(MemoryMonitorUtilsTest, TestGetMemoryThresholdTakeGreaterOfTheTwoValues) {
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0), 100);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60), 50);
NoopCgroupManager noop_cgroup_manager;
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0, false, noop_cgroup_manager),
100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60, false, noop_cgroup_manager),
50);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10), 100);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100), 100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10, false, noop_cgroup_manager),
100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100, false, noop_cgroup_manager),
100);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100), 10);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10), 90);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100), 0);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100, false, noop_cgroup_manager),
10);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10, false, noop_cgroup_manager), 90);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100, false, noop_cgroup_manager), 0);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, MemoryMonitorInterface::kNull),
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 0, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
0);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, MemoryMonitorInterface::kNull),
50);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, MemoryMonitorInterface::kNull),
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 0.5, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
50);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 1, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
100);
}

TEST_F(
MemoryMonitorUtilsTest,
TestGetMemoryThresholdWithResourceIsolationUsesUpperBoundConstraintToComputeThreshold) {
// Create a fake cgroup directory using MockCgroupMemoryUsage.
std::string cgroup_dir = MockCgroupMemoryUsage(
/*total_bytes=*/16LL * 1024 * 1024 * 1024,
/*current_bytes=*/5LL * 1024 * 1024 * 1024,
/*inactive_file_bytes=*/100 * 1024 * 1024,
/*active_file_bytes=*/50 * 1024 * 1024);

// Create a CgroupManager backed by a fake cgroup driver on the mock cgroup directory.
std::shared_ptr<std::unordered_map<std::string, FakeCgroup>> cgroups =
std::make_shared<std::unordered_map<std::string, FakeCgroup>>();
cgroups->emplace(cgroup_dir, FakeCgroup{cgroup_dir, {5}, {}, {"cpu", "memory"}, {}});
std::unique_ptr<FakeCgroupDriver> driver = FakeCgroupDriver::Create(cgroups);

int64_t user_memory_max_bytes = 10LL * 1024 * 1024 * 1024; // 10 GB
StatusOr<std::unique_ptr<CgroupManager>> result =
CgroupManager::Create(cgroup_dir,
"node_id_123",
/*system_reserved_cpu_weight=*/100,
/*system_memory_bytes_min=*/1LL * 1024 * 1024 * 1024,
/*system_memory_bytes_low=*/1LL * 1024 * 1024 * 1024,
/*user_memory_high_bytes=*/user_memory_max_bytes,
user_memory_max_bytes,
std::move(driver));
std::unique_ptr<CgroupManager> cgroup_manager = std::move(result.value());

// When resource isolation is enabled, GetMemoryThreshold uses
// memory.max - threshold_monitor_reaction_buffer_bytes regardless of other parameters.
int64_t expected_threshold =
user_memory_max_bytes -
RayConfig::instance().threshold_monitor_reaction_buffer_bytes();
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
/*total_memory_bytes=*/16LL * 1024 * 1024 * 1024,
/*usage_threshold=*/0.5,
/*min_memory_free_bytes=*/MemoryMonitorInterface::kNull,
/*resource_isolation_enabled=*/true,
*cgroup_manager),
expected_threshold);
}

TEST_F(MemoryMonitorUtilsTest, TestGetPidsFromDirOnlyReturnsNumericFilenames) {
std::string proc_dir = UniqueID::FromRandom().Hex();
boost::filesystem::create_directory(proc_dir);
Expand Down
11 changes: 7 additions & 4 deletions src/ray/common/tests/threshold_memory_monitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>

#include "gtest/gtest.h"
#include "ray/common/cgroup2/noop_cgroup_manager.h"
#include "ray/common/memory_monitor_interface.h"
#include "ray/common/memory_monitor_test_fixture.h"
#include "ray/common/memory_monitor_utils.h"
Expand Down Expand Up @@ -79,8 +80,9 @@ TEST_F(ThresholdMemoryMonitorTest,

std::shared_ptr<boost::latch> has_checked_once = std::make_shared<boost::latch>(1);

int64_t memory_usage_threshold_bytes =
MemoryMonitorUtils::GetMemoryThreshold(cgroup_total_bytes, 0.7f, -1);
NoopCgroupManager noop_cgroup_manager;
int64_t memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold(
cgroup_total_bytes, 0.7f, -1, false, noop_cgroup_manager);
MakeThresholdMemoryMonitor(
memory_usage_threshold_bytes, // (70%)
1 /*refresh_interval_ms*/,
Expand Down Expand Up @@ -109,8 +111,9 @@ TEST_F(ThresholdMemoryMonitorTest,
std::shared_ptr<std::atomic<bool>> callback_triggered =
std::make_shared<std::atomic<bool>>(false);

int64_t memory_usage_threshold_bytes =
MemoryMonitorUtils::GetMemoryThreshold(cgroup_total_bytes, 0.7f, -1);
NoopCgroupManager noop_cgroup_manager;
int64_t memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold(
cgroup_total_bytes, 0.7f, -1, false, noop_cgroup_manager);
MakeThresholdMemoryMonitor(
memory_usage_threshold_bytes, // (70%)
1 /*refresh_interval_ms*/,
Expand Down
40 changes: 5 additions & 35 deletions src/ray/common/threshold_memory_monitor_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,46 +38,16 @@ std::unique_ptr<MemoryMonitorInterface> MemoryMonitorFactory::Create(
return std::make_unique<NoopMemoryMonitor>();
}

float usage_threshold = RayConfig::instance().memory_usage_threshold();
RAY_CHECK_GE(usage_threshold, 0)
<< "Invalid configuration: usage_threshold must be >= 0";
RAY_CHECK_LE(usage_threshold, 1)
<< "Invalid configuration: usage_threshold must be <= 1";
int64_t total_memory_bytes = MemoryMonitorUtils::TakeSystemMemorySnapshot(
MemoryMonitorInterface::kDefaultCgroupPath)
.total_bytes;
memory_usage_threshold_bytes = MemoryMonitorUtils::GetMemoryThreshold(
total_memory_bytes, usage_threshold, RayConfig::instance().min_memory_free_bytes());
total_memory_bytes,
RayConfig::instance().memory_usage_threshold(),
RayConfig::instance().min_memory_free_bytes(),
resource_isolation_enabled,
cgroup_manager);

if (resource_isolation_enabled) {
std::string user_cgroup_path = cgroup_manager.GetUserCgroupPath();
StatusOr<std::string> result =
cgroup_manager.GetUserCgroupConstraintValue("memory.max");
RAY_CHECK(result.ok()) << absl::StrFormat(
"Failed to get user cgroup memory limit when setting up memory monitor: %s",
result.ToString());
std::string user_memory_max_bytes_str = result.value();
RAY_CHECK(!user_memory_max_bytes_str.empty()) << absl::StrFormat(
"Failed to get memory.max constraint value from user cgroup %s. "
"Please check that the cgroup path for resource isolation is correct.",
user_cgroup_path);

if (!user_memory_max_bytes_str.empty() &&
std::all_of(user_memory_max_bytes_str.begin(),
user_memory_max_bytes_str.end(),
::isdigit)) {
int64_t user_memory_max_bytes = std::stoll(user_memory_max_bytes_str);
memory_usage_threshold_bytes =
user_memory_max_bytes -
static_cast<int64_t>(RayConfig::instance().kill_memory_buffer_bytes());
RAY_CHECK_GE(memory_usage_threshold_bytes, 0) << absl::StrFormat(
"Available user task memory is less than the kill memory buffer bytes: "
"%d < %d. Please consider using a host with more memory. If the current host "
"memory size must be kept, please adjust the kill memory buffer size.",
user_memory_max_bytes,
RayConfig::instance().kill_memory_buffer_bytes());
}
}
return std::make_unique<ThresholdMemoryMonitor>(std::move(kill_workers_callback),
memory_usage_threshold_bytes,
monitor_interval_ms);
Expand Down
Loading
Loading