diff --git a/python/ray/tests/test_memory_pressure.py b/python/ray/tests/test_memory_pressure.py index cb218c33a60c..391c152f0da8 100644 --- a/python/ray/tests/test_memory_pressure.py +++ b/python/ray/tests/test_memory_pressure.py @@ -2,7 +2,6 @@ import time from math import ceil -import numpy as np import pytest import ray @@ -57,24 +56,6 @@ def ray_with_memory_monitor(shutdown_only): yield addr -@pytest.fixture -def ray_with_memory_monitor_no_oom_retry(shutdown_only): - with ray.init( - num_cpus=1, - object_store_memory=100 * 1024 * 1024, - _system_config={ - "memory_usage_threshold": memory_usage_threshold, - "memory_monitor_refresh_ms": memory_monitor_refresh_ms, - "metrics_report_interval_ms": 100, - "task_failure_entry_ttl_ms": 2 * 60 * 1000, - "task_oom_retries": 0, - "min_memory_free_bytes": -1, - "task_oom_retry_delay_base_ms": 0, - }, - ) as addr: - yield addr - - @ray.remote def allocate_memory( allocate_bytes: int, @@ -172,43 +153,6 @@ def test_restartable_actor_throws_oom_error(ray_with_memory_monitor, restartable ) -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_restartable_actor_oom_retry_off_throws_oom_error( - ray_with_memory_monitor_no_oom_retry, -): - addr = ray_with_memory_monitor_no_oom_retry - leaker = Leaker.options(max_restarts=1, max_task_retries=1).remote() - - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct( - memory_usage_threshold + 0.1 - ) - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3)) - - timeseries = PrometheusTimeseries() - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="MemoryManager.ActorEviction.Total", - value=2.0, - timeseries=timeseries, - ) - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="Leaker.__init__", - value=2.0, - timeseries=timeseries, - ) - - @pytest.mark.skipif( sys.platform != "linux" and sys.platform != "linux2", reason="memory monitor is currently only supported on Linux", @@ -242,54 +186,6 @@ def test_non_retryable_task_killed_by_memory_monitor_with_oom_error( ) -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_memory_pressure_kill_newest_worker(ray_with_memory_monitor): - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct( - memory_usage_threshold - 0.1 - ) - - actor_ref = Leaker.options(name="actor").remote() - ray.get(actor_ref.allocate.remote(bytes_to_alloc)) - - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get( - allocate_memory.options(max_retries=0).remote(allocate_bytes=bytes_to_alloc) - ) - - actors = ray.util.list_named_actors() - assert len(actors) == 1 - assert "actor" in actors - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_memory_pressure_kill_task_if_actor_submitted_task_first( - ray_with_memory_monitor, -): - actor_ref = Leaker.options(name="leaker1").remote() - ray.get(actor_ref.allocate.remote(10)) - - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct( - memory_usage_threshold - 0.1 - ) - task_ref = allocate_memory.options(max_retries=0).remote( - allocate_bytes=bytes_to_alloc, allocate_interval_s=0, post_allocate_sleep_s=1000 - ) - - ray.get(actor_ref.allocate.remote(bytes_to_alloc)) - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get(task_ref) - - actors = ray.util.list_named_actors() - assert len(actors) == 1 - assert "leaker1" in actors - - @pytest.mark.asyncio @pytest.mark.skipif( sys.platform != "linux" and sys.platform != "linux2", @@ -369,219 +265,5 @@ async def test_task_oom_logs_error(ray_with_memory_monitor): # TODO(clarng): verify log info once state api can dump log info -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_task_oom_no_oom_retry_fails_immediately( - ray_with_memory_monitor_no_oom_retry, -): - addr = ray_with_memory_monitor_no_oom_retry - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1.1) - - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get( - allocate_memory.options(max_retries=1).remote( - allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=100 - ) - ) - - timeseries = PrometheusTimeseries() - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="MemoryManager.TaskEviction.Total", - value=1.0, - timeseries=timeseries, - ) - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="allocate_memory", - value=1.0, - timeseries=timeseries, - ) - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_task_oom_only_uses_oom_retry( - ray_with_memory_monitor, -): - addr = ray_with_memory_monitor - - leaker = Leaker.options(max_restarts=1, max_task_retries=1).remote() - ray.get(leaker.allocate.remote(1)) - - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1.1) - - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get( - allocate_memory.options(max_retries=-1).remote( - allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=100 - ) - ) - - timeseries = PrometheusTimeseries() - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="MemoryManager.TaskEviction.Total", - value=task_oom_retries + 1, - timeseries=timeseries, - ) - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="allocate_memory", - value=task_oom_retries + 1, - timeseries=timeseries, - ) - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_newer_task_not_retriable_kill_older_retriable_task_first( - ray_with_memory_monitor, -): - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct( - memory_usage_threshold - 0.1 - ) - - retriable_task_ref = allocate_memory.options(max_retries=1).remote( - allocate_bytes=bytes_to_alloc, post_allocate_sleep_s=5 - ) - - actor_ref = Leaker.options(name="actor", max_restarts=0).remote() - non_retriable_actor_ref = actor_ref.allocate.remote(bytes_to_alloc) - - ray.get(non_retriable_actor_ref) - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get(retriable_task_ref) - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_put_object_task_usage_slightly_below_limit_does_not_crash(): - with ray.init( - num_cpus=1, - object_store_memory=2 << 30, - _system_config={ - "memory_monitor_refresh_ms": 50, - "memory_usage_threshold": 0.98, - }, - ): - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.9) - ray.get( - allocate_memory.options(max_retries=0).remote( - allocate_bytes=bytes_to_alloc, - ), - timeout=90, - ) - - entries = int((1 << 30) / 8) - obj_ref = ray.put(np.random.rand(entries)) - ray.get(obj_ref) - - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.9) - ray.get( - allocate_memory.options(max_retries=0).remote( - allocate_bytes=bytes_to_alloc, - ), - timeout=90, - ) - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_last_task_of_the_group_fail_immediately(): - @ray.remote(max_retries=-1) - def infinite_retry_task(): - chunks = [] - bytes_per_chunk = 1024 * 1024 * 1024 - while True: - chunks.append([0] * bytes_per_chunk) - time.sleep(5) - - with ray.init() as addr: - timeseries = PrometheusTimeseries() - with pytest.raises(ray.exceptions.OutOfMemoryError) as _: - ray.get(infinite_retry_task.remote()) - - wait_for_condition( - has_metric_tagged_with_value, - timeout=10, - retry_interval_ms=100, - addr=addr, - tag="MemoryManager.TaskEviction.Total", - value=1.0, - timeseries=timeseries, - ) - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "linux2", - reason="memory monitor is currently only supported on Linux", -) -def test_one_actor_max_lifo_kill_next_actor(shutdown_only): - with ray.init( - _system_config={ - "memory_usage_threshold": 0.7, - "memory_monitor_refresh_ms": memory_monitor_refresh_ms, - }, - ): - bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.5) - - first_actor = Leaker.options(name="first_actor").remote() - ray.get(first_actor.allocate.remote(bytes_to_alloc)) - - actors = ray.util.list_named_actors() - assert len(actors) == 1 - assert "first_actor" in actors - - second_actor = Leaker.options(name="second_actor").remote() - with pytest.raises(ray.exceptions.OutOfMemoryError): - ray.get( - second_actor.allocate.remote( - bytes_to_alloc, memory_monitor_refresh_ms * 3 - ) - ) - - actors = ray.util.list_named_actors() - assert len(actors) == 1, actors - assert "first_actor" in actors - assert "second_actor" not in actors - - third_actor = Leaker.options(name="third_actor").remote() - with pytest.raises(ray.exceptions.OutOfMemoryError): - ray.get( - third_actor.allocate.remote( - bytes_to_alloc, memory_monitor_refresh_ms * 3 - ) - ) - - actors = ray.util.list_named_actors() - assert len(actors) == 1 - assert "first_actor" in actors - assert "second_actor" not in actors - assert "third_actor" not in actors - - if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/tests/task_manager_test.cc b/src/ray/core_worker/tests/task_manager_test.cc index fd42a8c71ac5..363a9c28bda3 100644 --- a/src/ray/core_worker/tests/task_manager_test.cc +++ b/src/ray/core_worker/tests/task_manager_test.cc @@ -20,6 +20,7 @@ #include #include +#include "absl/strings/str_format.h" #include "gmock/gmock.h" #include "gtest/gtest.h" #include "mock/ray/gcs_client/gcs_client.h" @@ -693,6 +694,36 @@ TEST_F(TaskManagerTest, TestTaskOomInfiniteRetry) { manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); } +TEST_F(TaskManagerTest, TestTaskOomKillWithFiniteOomRetryDecrementsCounter) { + const int kOomRetries = 3; + RayConfig::instance().initialize( + absl::StrFormat(R"({"task_oom_retries": %d})", kOomRetries)); + + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}); + manager_.AddPendingTask(caller_address, spec, "", /*max_retries=*/10); + auto return_id = spec.ReturnId(0); + + for (int i = 0; i < kOomRetries; i++) { + ASSERT_EQ(num_retries_, i); + manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + ASSERT_EQ(num_retries_, i + 1); + ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_oom_retry_delay_base_ms()); + } + + ASSERT_EQ(num_retries_, kOomRetries); + manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + ASSERT_EQ(num_retries_, kOomRetries); + + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, &results)); + ASSERT_EQ(results.size(), 1); + rpc::ErrorType stored_error; + ASSERT_TRUE(results[0]->IsException(&stored_error)); + ASSERT_EQ(stored_error, rpc::ErrorType::OUT_OF_MEMORY); +} + TEST_F(TaskManagerTest, TestTaskNotRetriableOomFailsImmediatelyEvenWithOomRetryCounter) { RayConfig::instance().initialize(R"({"task_oom_retries": 1})"); int num_retries = 0;