Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a70ff17
Switch group killing policy to by time killing policy
Apr 14, 2026
b35865a
Add env var to support fallback to legacy killing policy
Apr 16, 2026
5c2599f
Fix get memory threshold test to reflect threshold monitor buffer
Apr 16, 2026
8245dbb
Make threshold monitor buffer adjustable
Apr 16, 2026
caf195e
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 16, 2026
b886146
Avoid transitive dependencies
Apr 16, 2026
a7282df
Merge branch 'master' into time_based_killing_policy
Apr 17, 2026
6609b06
Remove misleading option in ray oom prevention test
Apr 17, 2026
7e7031f
Remove unnecessary string trim
Apr 17, 2026
c5a67e2
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 17, 2026
dc8a434
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 17, 2026
2c14f01
Remove kp transitive dependency on memmon utils
Apr 17, 2026
2884c72
debugging
Apr 17, 2026
e394d33
Default kill buffers to 5%
Apr 19, 2026
251a225
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 19, 2026
83f9f20
Remove ci debug logic
Apr 19, 2026
4db1afd
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 19, 2026
2ce6a98
Provide guidance on how to revert to legacy kp in oom log
Apr 21, 2026
a177a40
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 21, 2026
4091330
Clarify killing policy behavioral change version
Apr 21, 2026
c2655d3
Add space before clarifying killing policy behavioral change version
Apr 21, 2026
6a99c1c
Merge branch 'master' into time_based_killing_policy
Kunchd Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .buildkite/core.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,23 @@ steps:
--only-tags doctest
--python-version 3.10 --build-name corebuild-py3.10
--parallelism-per-worker 3
|| true
# doc examples
- bazel run //ci/ray_ci:test_in_docker -- //doc/... core
--install-mask all-ray-libraries
--except-tags doctest,post_wheel_build,gpu,multi_gpu,mem_pressure
--parallelism-per-worker 3
--python-version 3.10 --build-name corebuild-py3.10
--skip-ray-installation
|| true
# doc memory pressure example
- bazel run //ci/ray_ci:test_in_docker -- //doc/... core
--only-tags mem_pressure
--except-tags gpu
--python-version 3.10 --build-name corebuild-py3.10
--skip-ray-installation
|| true
- sleep 28800
Comment thread
Kunchd marked this conversation as resolved.
Outdated

- label: ":ray: core: dask tests"
tags:
Expand Down
10 changes: 5 additions & 5 deletions doc/source/ray-core/doc_code/ray_oom_prevention.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# __last_task_start__
import ray

@ray.remote(max_retries=-1)
@ray.remote(max_retries=0)
def leaks_memory():
chunks = []
bits_to_allocate = 8 * 100 * 1024 * 1024 # ~100 MiB
Expand Down Expand Up @@ -67,11 +67,11 @@ def allocate(self, bytes_to_allocate: float) -> None:
max_restarts=0, max_task_retries=0, name="second_actor"
).remote()

# each task requests 0.3 of the system memory when the memory threshold is 0.4.
allocate_bytes = get_additional_bytes_to_reach_memory_usage_pct(0.3)
# We want total bytes to exceed 40% threshold to trigger the memory monitor.
allocate_bytes = get_additional_bytes_to_reach_memory_usage_pct(0.5)

first_actor_task = first_actor.allocate.remote(allocate_bytes)
second_actor_task = second_actor.allocate.remote(allocate_bytes)
first_actor_task = first_actor.allocate.remote(0.9 * allocate_bytes)
second_actor_task = second_actor.allocate.remote(0.1* allocate_bytes)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying the amount we allocate here so that killing the first actor alone will be guaranteed to relief enough memory pressure without killing a second actor.


error_thrown = False
try:
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ ray_cc_library(
],
deps = [
":memory_monitor_interface",
":ray_config",
"//src/ray/common/cgroup2:cgroup_manager_interface",
"//src/ray/util:compat",
"//src/ray/util:logging",
"@boost//:algorithm",
Expand Down
16 changes: 13 additions & 3 deletions src/ray/common/cgroup2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ ray_cc_library(
"cgroup_manager.h",
"scoped_cgroup_operation.h",
],
visibility = [":__subpackages__"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
Expand All @@ -74,7 +77,11 @@ ray_cc_library(
hdrs = [
"noop_cgroup_manager.h",
],
visibility = ["//visibility:public"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
"//src/ray/raylet/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
Expand Down Expand Up @@ -117,7 +124,10 @@ ray_cc_library(
target_compatible_with = [
"@platforms//os:linux",
],
visibility = [":__subpackages__"],
visibility = [
":__subpackages__",
"//src/ray/common/tests:__pkg__",
],
deps = [
":cgroup_driver_interface",
"//src/ray/common:status",
Expand Down
53 changes: 45 additions & 8 deletions src/ray/common/memory_monitor_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "ray/common/memory_monitor_interface.h"
#include "ray/common/ray_config.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down Expand Up @@ -292,23 +293,59 @@ int64_t MemoryMonitorUtils::NullableMin(int64_t left, int64_t right) {
}
}

int64_t MemoryMonitorUtils::GetMemoryThreshold(int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes) {
int64_t MemoryMonitorUtils::GetMemoryThreshold(
int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes,
bool resource_isolation_enabled,
const CgroupManagerInterface &cgroup_manager) {
RAY_CHECK_GE(total_memory_bytes, MemoryMonitorInterface::kNull);
RAY_CHECK_GE(min_memory_free_bytes, MemoryMonitorInterface::kNull);
RAY_CHECK_GE(usage_threshold, 0);
RAY_CHECK_LE(usage_threshold, 1);
RAY_CHECK_GE(usage_threshold, 0)
<< "Invalid configuration: usage_threshold must be >= 0";
RAY_CHECK_LE(usage_threshold, 1)
<< "Invalid configuration: usage_threshold must be <= 1";

int64_t threshold_fraction = (int64_t)(total_memory_bytes * usage_threshold);
int64_t resolved_memory_threshold_bytes;
int64_t threshold_fraction = static_cast<int64_t>(total_memory_bytes * usage_threshold);

if (min_memory_free_bytes > MemoryMonitorInterface::kNull) {
int64_t threshold_absolute = total_memory_bytes - min_memory_free_bytes;
RAY_CHECK_GE(threshold_absolute, 0);
return std::max(threshold_fraction, threshold_absolute);
resolved_memory_threshold_bytes = std::max(threshold_fraction, threshold_absolute);
} else {
return threshold_fraction;
resolved_memory_threshold_bytes = threshold_fraction;
}

if (resource_isolation_enabled) {
StatusOr<std::string> user_memory_max_bytes_or =
cgroup_manager.GetUserCgroupConstraintValue("memory.max");
RAY_CHECK(user_memory_max_bytes_or.ok()) << absl::StrFormat(
"Failed to get user cgroup memory limit when setting up memory monitor: %s",
user_memory_max_bytes_or.ToString());
std::string user_memory_max_bytes_str = user_memory_max_bytes_or.value();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When moving the logic from threshold_memory_monitor_factor.cc to here, the RAY_CHECK of checking whether user_memory_max_bytes_str are missing. Wondering is there a particular reason for that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the check for whether the string is empty is unnecessary as the underlying call to fetch the constraint value already asserts this.

if (!user_memory_max_bytes_str.empty() &&
std::all_of(user_memory_max_bytes_str.begin(),
user_memory_max_bytes_str.end(),
::isdigit)) {
Comment thread
Kunchd marked this conversation as resolved.
int64_t user_memory_max_bytes = std::stoll(user_memory_max_bytes_str);
int64_t reaction_buffer_bytes =
std::min(static_cast<int64_t>(total_memory_bytes *
kDefaultThresholdMonitorReactionBufferProportion),
RayConfig::instance().max_threshold_monitor_reaction_buffer_bytes());
resolved_memory_threshold_bytes = user_memory_max_bytes - reaction_buffer_bytes;
RAY_CHECK_GE(resolved_memory_threshold_bytes, 0) << absl::StrFormat(
"Available user task memory is less than the kill memory buffer bytes: "
"%d < %d. This means the available memory for user proceses is likely "
"less than 5%% of total memory. Please consider decreasing the proportion "
"of reserved system memory if it was custom set.",
user_memory_max_bytes,
reaction_buffer_bytes);
}
}

return resolved_memory_threshold_bytes;
}

int64_t MemoryMonitorUtils::GetProcessUsedMemoryBytes(
Expand Down
12 changes: 11 additions & 1 deletion src/ray/common/memory_monitor_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <tuple>
#include <vector>

#include "ray/common/cgroup2/cgroup_manager_interface.h"
#include "ray/common/memory_monitor_interface.h"
#include "ray/util/compat.h"

Expand Down Expand Up @@ -69,11 +70,18 @@ class MemoryMonitorUtils {
* @param usage_threshold A value in [0-1] to indicate the max usage.
* @param min_memory_free_bytes The min amount of free space to maintain before it is
* exceeding the threshold.
* @param resource_isolation_enabled Whether resource isolation is enabled. Used
* to determine if the threshold should be calculated based on the cgroup
* constraints.
* @param cgroup_manager The cgroup manager to fetch the upper bound memory constraints
* from.
* @return The memory threshold.
*/
static int64_t GetMemoryThreshold(int64_t total_memory_bytes,
float usage_threshold,
int64_t min_memory_free_bytes);
int64_t min_memory_free_bytes,
bool resource_isolation_enabled,
const CgroupManagerInterface &cgroup_manager);

/**
* @brief Gets the used memory for a process from the process memory snapshot.
Expand Down Expand Up @@ -221,6 +229,8 @@ class MemoryMonitorUtils {
static constexpr char kCgroupsV2MemoryStatActiveFileKey[] = "active_file";
static constexpr char kProcDirectory[] = "/proc";
static constexpr char kCommandlinePath[] = "cmdline";

static constexpr double kDefaultThresholdMonitorReactionBufferProportion = 0.05;
};

} // namespace ray
26 changes: 20 additions & 6 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100)
/// it will start killing processes to free up the space.
/// Note: when resource isolation is enabled, the memory usage threshold is set to
/// total memory - system reserved memory (can be specified in ray start) -
/// kill_memory_buffer_bytes. Notice that the formula does not account for object store
/// memory in system reserved memory. To configure the usage threshold, please adjust the
/// system reserved memory in ray start command instead. Ranging from [0, 1]
/// max(5% of total memory, max_threshold_monitor_reaction_buffer_bytes).
/// Notice that the formula does not account for object store memory in system reserved
/// memory. To configure the usage threshold, please adjust the system reserved memory in
/// ray start command instead. Ranging from [0, 1]
RAY_CONFIG(float, memory_usage_threshold, 0.95)

/// The interval between runs of the memory usage monitor.
Expand All @@ -90,9 +91,22 @@ RAY_CONFIG(uint64_t, memory_monitor_refresh_ms, 250)
/// means 6.4 GB of the memory will not be usable.
RAY_CONFIG(int64_t, min_memory_free_bytes, (int64_t)-1)

/// The amount of memory to free under the memory usage threshold when
/// killing workers via the worker killing policy.
RAY_CONFIG(uint64_t, kill_memory_buffer_bytes, 3ULL * 1024 * 1024 * 1024) // 3GB
/// The maximum amount of memory to free under the memory usage threshold when
/// killing workers via the worker killing policy. Defaults to 5% of total memory.
Comment thread
Kunchd marked this conversation as resolved.
Outdated
RAY_CONFIG(int64_t, max_kill_memory_buffer_bytes, 3ULL * 1024 * 1024 * 1024) // 3GiB cap

/// The threshold monitor is poll based and may miss memory bursts occurring between
/// polls. This is the maximum buffer size that can be subtracted from memory.max to give
/// the threshold monitor time to react before memory max is reached under resource
/// isolation. Defaults to 5% of total memory.
RAY_CONFIG(int64_t,
max_threshold_monitor_reaction_buffer_bytes,
2LL * 1024 * 1024 * 1024) // 2GiB

/// Whether to use the group-by-owner worker killing policy instead of the
/// default time-based worker killing policy. When true, workers killed by the
/// by group killing policy (legacy policy).
Comment thread
Kunchd marked this conversation as resolved.
Outdated
RAY_CONFIG(bool, WORKER_KILLING_POLICY_BY_GROUP, false)
Comment thread
Kunchd marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are changing the default behavior of the killing policy for all the users. Wondering have we communicated the default behavior to the users and should we gradually roll out the change instead of the directly change the config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted offline. The new policy behaves very similarly to the existing policy (identical in the common case when the driver is the single source that's submitting the tasks). Additionally, it addresses the issue where the existing policy was not killing aggressively enough to relieve us of memory pressure. We expect the new policy to be a general improvement across the board, but we will modify this PR to make it explicit that there's a behavioral change, and include this in the next version release. We will also leave a comment in the oom log so that users will know how to revert if they experience oom.


/// The reserved memory bytes for system processes
/// enforced via cgroup memory.min constraint which guarantees
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ ray_cc_test(
"//src/ray/common:id",
"//src/ray/common:memory_monitor_test_fixture",
"//src/ray/common:memory_monitor_utils",
"//src/ray/common:ray_config",
"//src/ray/common/cgroup2:cgroup_manager",
"//src/ray/common/cgroup2:cgroup_test_utils",
"//src/ray/common/cgroup2:fake_cgroup_driver",
"//src/ray/common/cgroup2:noop_cgroup_manager",
"//src/ray/util:process",
"@boost//:filesystem",
"@com_google_absl//absl/container:flat_hash_map",
Expand All @@ -135,7 +139,9 @@ ray_cc_test(
deps = [
"//src/ray/common:memory_monitor_interface",
"//src/ray/common:memory_monitor_test_fixture",
"//src/ray/common:memory_monitor_utils",
"//src/ray/common:threshold_memory_monitor",
"//src/ray/common/cgroup2:noop_cgroup_manager",
"@boost//:thread",
],
)
Expand Down
86 changes: 74 additions & 12 deletions src/ray/common/tests/memory_monitor_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
#include <fstream>
#include <memory>
#include <string>
#include <unordered_map>

#include "gtest/gtest.h"
#include "ray/common/cgroup2/cgroup_manager.h"
#include "ray/common/cgroup2/cgroup_test_utils.h"
#include "ray/common/cgroup2/fake_cgroup_driver.h"
#include "ray/common/cgroup2/noop_cgroup_manager.h"
#include "ray/common/id.h"
#include "ray/common/memory_monitor_test_fixture.h"
#include "ray/common/ray_config.h"
#include "ray/util/process.h"

namespace ray {
Expand Down Expand Up @@ -228,25 +233,82 @@ TEST_F(MemoryMonitorUtilsTest, TestCgroupNonexistentUsageFileReturnskNull) {
}

TEST_F(MemoryMonitorUtilsTest, TestGetMemoryThresholdTakeGreaterOfTheTwoValues) {
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0), 100);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60), 50);
NoopCgroupManager noop_cgroup_manager;
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 0, false, noop_cgroup_manager),
100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, 60, false, noop_cgroup_manager),
50);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10), 100);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100), 100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 1, 10, false, noop_cgroup_manager),
100);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 1, 100, false, noop_cgroup_manager),
100);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100), 10);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10), 90);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100), 0);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.1, 100, false, noop_cgroup_manager),
10);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0, 10, false, noop_cgroup_manager), 90);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0, 100, false, noop_cgroup_manager), 0);

ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 0, MemoryMonitorInterface::kNull),
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 0, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
0);
ASSERT_EQ(
MemoryMonitorUtils::GetMemoryThreshold(100, 0.5, MemoryMonitorInterface::kNull),
50);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(100, 1, MemoryMonitorInterface::kNull),
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 0.5, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
50);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
100, 1, MemoryMonitorInterface::kNull, false, noop_cgroup_manager),
100);
}

TEST_F(
MemoryMonitorUtilsTest,
TestGetMemoryThresholdWithResourceIsolationUsesUpperBoundConstraintToComputeThreshold) {
// Create a fake cgroup directory using MockCgroupMemoryUsage.
std::string cgroup_dir = MockCgroupMemoryUsage(
/*total_bytes=*/16LL * 1024 * 1024 * 1024,
/*current_bytes=*/5LL * 1024 * 1024 * 1024,
/*inactive_file_bytes=*/100 * 1024 * 1024,
/*active_file_bytes=*/50 * 1024 * 1024);

// Create a CgroupManager backed by a fake cgroup driver on the mock cgroup directory.
std::shared_ptr<std::unordered_map<std::string, FakeCgroup>> cgroups =
std::make_shared<std::unordered_map<std::string, FakeCgroup>>();
cgroups->emplace(cgroup_dir, FakeCgroup{cgroup_dir, {5}, {}, {"cpu", "memory"}, {}});
std::unique_ptr<FakeCgroupDriver> driver = FakeCgroupDriver::Create(cgroups);

int64_t user_memory_max_bytes = 10LL * 1024 * 1024 * 1024; // 10 GB
StatusOr<std::unique_ptr<CgroupManager>> result =
CgroupManager::Create(cgroup_dir,
"node_id_123",
/*system_reserved_cpu_weight=*/100,
/*system_memory_bytes_min=*/1LL * 1024 * 1024 * 1024,
/*system_memory_bytes_low=*/1LL * 1024 * 1024 * 1024,
/*user_memory_high_bytes=*/user_memory_max_bytes,
user_memory_max_bytes,
std::move(driver));
std::unique_ptr<CgroupManager> cgroup_manager = std::move(result.value());

// Reaction buffer defaults to 5% of total memory. If
// kDefaultThresholdMonitorReactionBufferProportion is changed, this should be changed
// accordingly.
int64_t expected_threshold =
user_memory_max_bytes - static_cast<int64_t>(16LL * 1024 * 1024 * 1024 * 0.05);
ASSERT_EQ(MemoryMonitorUtils::GetMemoryThreshold(
/*total_memory_bytes=*/16LL * 1024 * 1024 * 1024,
/*usage_threshold=*/0.5,
/*min_memory_free_bytes=*/MemoryMonitorInterface::kNull,
/*resource_isolation_enabled=*/true,
*cgroup_manager),
expected_threshold);
}

TEST_F(MemoryMonitorUtilsTest, TestGetPidsFromDirOnlyReturnsNumericFilenames) {
std::string proc_dir = UniqueID::FromRandom().Hex();
boost::filesystem::create_directory(proc_dir);
Expand Down
Loading
Loading