Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 0 additions & 318 deletions python/ray/tests/test_memory_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
from math import ceil

import numpy as np
import pytest

import ray
Expand Down Expand Up @@ -57,24 +56,6 @@ def ray_with_memory_monitor(shutdown_only):
yield addr


@pytest.fixture
def ray_with_memory_monitor_no_oom_retry(shutdown_only):
with ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_system_config={
"memory_usage_threshold": memory_usage_threshold,
"memory_monitor_refresh_ms": memory_monitor_refresh_ms,
"metrics_report_interval_ms": 100,
"task_failure_entry_ttl_ms": 2 * 60 * 1000,
"task_oom_retries": 0,
"min_memory_free_bytes": -1,
"task_oom_retry_delay_base_ms": 0,
},
) as addr:
yield addr


@ray.remote
def allocate_memory(
allocate_bytes: int,
Expand Down Expand Up @@ -172,43 +153,6 @@ def test_restartable_actor_throws_oom_error(ray_with_memory_monitor, restartable
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_restartable_actor_oom_retry_off_throws_oom_error(
ray_with_memory_monitor_no_oom_retry,
):
addr = ray_with_memory_monitor_no_oom_retry
leaker = Leaker.options(max_restarts=1, max_task_retries=1).remote()

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold + 0.1
)
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))

timeseries = PrometheusTimeseries()
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="MemoryManager.ActorEviction.Total",
value=2.0,
timeseries=timeseries,
)
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="Leaker.__init__",
value=2.0,
timeseries=timeseries,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
Expand Down Expand Up @@ -242,54 +186,6 @@ def test_non_retryable_task_killed_by_memory_monitor_with_oom_error(
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_memory_pressure_kill_newest_worker(ray_with_memory_monitor):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold - 0.1
)

actor_ref = Leaker.options(name="actor").remote()
ray.get(actor_ref.allocate.remote(bytes_to_alloc))

with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(
allocate_memory.options(max_retries=0).remote(allocate_bytes=bytes_to_alloc)
)

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "actor" in actors


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_memory_pressure_kill_task_if_actor_submitted_task_first(
ray_with_memory_monitor,
):
actor_ref = Leaker.options(name="leaker1").remote()
ray.get(actor_ref.allocate.remote(10))

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold - 0.1
)
task_ref = allocate_memory.options(max_retries=0).remote(
allocate_bytes=bytes_to_alloc, allocate_interval_s=0, post_allocate_sleep_s=1000
)

ray.get(actor_ref.allocate.remote(bytes_to_alloc))
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(task_ref)

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "leaker1" in actors


@pytest.mark.asyncio
@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
Expand Down Expand Up @@ -369,219 +265,5 @@ async def test_task_oom_logs_error(ray_with_memory_monitor):
# TODO(clarng): verify log info once state api can dump log info


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_task_oom_no_oom_retry_fails_immediately(
ray_with_memory_monitor_no_oom_retry,
):
addr = ray_with_memory_monitor_no_oom_retry
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1.1)

with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(
allocate_memory.options(max_retries=1).remote(
allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=100
)
)

timeseries = PrometheusTimeseries()
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="MemoryManager.TaskEviction.Total",
value=1.0,
timeseries=timeseries,
)
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="allocate_memory",
value=1.0,
timeseries=timeseries,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_task_oom_only_uses_oom_retry(
ray_with_memory_monitor,
):
addr = ray_with_memory_monitor

leaker = Leaker.options(max_restarts=1, max_task_retries=1).remote()
ray.get(leaker.allocate.remote(1))

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1.1)

with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(
allocate_memory.options(max_retries=-1).remote(
allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=100
)
)

timeseries = PrometheusTimeseries()
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="MemoryManager.TaskEviction.Total",
value=task_oom_retries + 1,
timeseries=timeseries,
)
wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="allocate_memory",
value=task_oom_retries + 1,
timeseries=timeseries,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_newer_task_not_retriable_kill_older_retriable_task_first(
ray_with_memory_monitor,
):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold - 0.1
)

retriable_task_ref = allocate_memory.options(max_retries=1).remote(
allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=5
)

actor_ref = Leaker.options(name="actor", max_restarts=0).remote()
non_retriable_actor_ref = actor_ref.allocate.remote(bytes_to_alloc)

ray.get(non_retriable_actor_ref)
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(retriable_task_ref)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_put_object_task_usage_slightly_below_limit_does_not_crash():
with ray.init(
num_cpus=1,
object_store_memory=2 << 30,
_system_config={
"memory_monitor_refresh_ms": 50,
"memory_usage_threshold": 0.98,
},
):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.9)
ray.get(
allocate_memory.options(max_retries=0).remote(
allocate_bytes=bytes_to_alloc,
),
timeout=90,
)

entries = int((1 << 30) / 8)
obj_ref = ray.put(np.random.rand(entries))
ray.get(obj_ref)

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.9)
ray.get(
allocate_memory.options(max_retries=0).remote(
allocate_bytes=bytes_to_alloc,
),
timeout=90,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_last_task_of_the_group_fail_immediately():
@ray.remote(max_retries=-1)
def infinite_retry_task():
chunks = []
bytes_per_chunk = 1024 * 1024 * 1024
while True:
chunks.append([0] * bytes_per_chunk)
time.sleep(5)

with ray.init() as addr:
timeseries = PrometheusTimeseries()
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(infinite_retry_task.remote())

wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="MemoryManager.TaskEviction.Total",
value=1.0,
timeseries=timeseries,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor is currently only supported on Linux",
)
def test_one_actor_max_lifo_kill_next_actor(shutdown_only):
with ray.init(
_system_config={
"memory_usage_threshold": 0.7,
"memory_monitor_refresh_ms": memory_monitor_refresh_ms,
},
):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.5)

first_actor = Leaker.options(name="first_actor").remote()
ray.get(first_actor.allocate.remote(bytes_to_alloc))

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "first_actor" in actors

second_actor = Leaker.options(name="second_actor").remote()
with pytest.raises(ray.exceptions.OutOfMemoryError):
ray.get(
second_actor.allocate.remote(
bytes_to_alloc, memory_monitor_refresh_ms * 3
)
)

actors = ray.util.list_named_actors()
assert len(actors) == 1, actors
assert "first_actor" in actors
assert "second_actor" not in actors

third_actor = Leaker.options(name="third_actor").remote()
with pytest.raises(ray.exceptions.OutOfMemoryError):
ray.get(
third_actor.allocate.remote(
bytes_to_alloc, memory_monitor_refresh_ms * 3
)
)

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "first_actor" in actors
assert "second_actor" not in actors
assert "third_actor" not in actors


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
31 changes: 31 additions & 0 deletions src/ray/core_worker/tests/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <vector>

#include "absl/strings/str_format.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mock/ray/gcs_client/gcs_client.h"
Expand Down Expand Up @@ -693,6 +694,36 @@ TEST_F(TaskManagerTest, TestTaskOomInfiniteRetry) {
manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::TASK_CANCELLED);
}

TEST_F(TaskManagerTest, TestTaskOomKillWithFiniteOomRetryDecrementsCounter) {
const int kOomRetries = 3;
RayConfig::instance().initialize(
absl::StrFormat(R"({"task_oom_retries": %d})", kOomRetries));

rpc::Address caller_address;
auto spec = CreateTaskHelper(1, {});
manager_.AddPendingTask(caller_address, spec, "", /*max_retries=*/10);
auto return_id = spec.ReturnId(0);

for (int i = 0; i < kOomRetries; i++) {
ASSERT_EQ(num_retries_, i);
manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY);
ASSERT_EQ(num_retries_, i + 1);
ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_oom_retry_delay_base_ms());
}

ASSERT_EQ(num_retries_, kOomRetries);
manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY);
ASSERT_EQ(num_retries_, kOomRetries);

std::vector<std::shared_ptr<RayObject>> results;
WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0));
RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, &results));
ASSERT_EQ(results.size(), 1);
rpc::ErrorType stored_error;
ASSERT_TRUE(results[0]->IsException(&stored_error));
ASSERT_EQ(stored_error, rpc::ErrorType::OUT_OF_MEMORY);
}

TEST_F(TaskManagerTest, TestTaskNotRetriableOomFailsImmediatelyEvenWithOomRetryCounter) {
RayConfig::instance().initialize(R"({"task_oom_retries": 1})");
int num_retries = 0;
Expand Down
Loading