diff --git a/src/v/cloud_io/BUILD b/src/v/cloud_io/BUILD index 7996d79d7de61..19f052d5f54e8 100644 --- a/src/v/cloud_io/BUILD +++ b/src/v/cloud_io/BUILD @@ -76,8 +76,6 @@ redpanda_cc_library( "//src/v/bytes:iostream", "//src/v/cloud_io:logger", "//src/v/ssx:future_util", - "//src/v/ssx:semaphore", - "//src/v/ssx:watchdog", "//src/v/utils:retry_chain_node", "@boost//:beast", "@boost//:lexical_cast", diff --git a/src/v/cloud_io/remote.cc b/src/v/cloud_io/remote.cc index 574ed604d8ee8..27cb481eb5233 100644 --- a/src/v/cloud_io/remote.cc +++ b/src/v/cloud_io/remote.cc @@ -22,15 +22,12 @@ #include "container/chunked_vector.h" #include "model/metadata.h" #include "ssx/future-util.h" -#include "ssx/semaphore.h" -#include "ssx/watchdog.h" #include "utils/retry_chain_node.h" #include #include #include #include -#include #include #include @@ -297,6 +294,9 @@ ss::future remote::upload_stream( } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (res.error()) { case cloud_storage_clients::error_outcome::retry: vlog( @@ -375,21 +375,20 @@ ss::future remote::download_stream( retry_chain_node fib(&transfer_details.parent_rtc); retry_chain_logger ctxlog(log, fib); - auto fut = co_await [this, &fib, &transfer_details, &bucket_parts] { - transfer_details.on_client_acquire(); - return ss::coroutine::as_future(_pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - }(); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), download_result::timedout); - } - auto lease = std::move(fut).get(); - auto permit = fib.retry(); vlog(ctxlog.debug, "Download {} {}", stream_label, path); std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await [this, &fib, &transfer_details, &bucket_parts] { + transfer_details.on_client_acquire(); + return ss::coroutine::as_future(_pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + }(); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), download_result::timedout); + } + auto lease = std::move(fut).get(); transfer_details.on_request(fib.retry_count()); auto download_latency_measure @@ -425,6 +424,9 @@ ss::future remote::download_stream( download_latency_measure.reset(); lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (resp.error()) { case cloud_storage_clients::error_outcome::retry: @@ -511,20 +513,19 @@ remote::download_object(download_request download_request) { } const auto object_type = download_request.display_str; - auto fut = co_await ss::coroutine::as_future( - _pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), download_result::timedout); - } - auto lease = std::move(fut).get(); - auto permit = fib.retry(); vlog(ctxlog.debug, "Downloading {} from {}", object_type, path); std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await ss::coroutine::as_future( + _pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), download_result::timedout); + } + auto lease = std::move(fut).get(); download_request.transfer_details.on_request(fib.retry_count()); auto resp = co_await lease.client->get_object( bucket_parts->name, @@ -547,6 +548,9 @@ remote::download_object(download_request download_request) { } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (resp.error()) { case cloud_storage_clients::error_outcome::retry: @@ -616,18 +620,18 @@ ss::future remote::object_exists( ss::gate::holder gh{_gate}; retry_chain_node fib(&parent); retry_chain_logger ctxlog(log, fib); - auto fut = co_await ss::coroutine::as_future( - _pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), download_result::timedout); - } - auto lease = std::move(fut).get(); auto permit = fib.retry(); vlog(ctxlog.debug, "Check {} {}", object_type, path); std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await ss::coroutine::as_future( + _pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), download_result::timedout); + } + auto lease = std::move(fut).get(); auto resp = co_await lease.client->head_object( bucket_parts->name, path, fib.get_timeout()); if (resp) { @@ -641,8 +645,10 @@ ss::future remote::object_exists( co_return download_result::success; } - // Error path lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (resp.error()) { case cloud_storage_clients::error_outcome::retry: vlog( @@ -705,18 +711,18 @@ remote::delete_object(transfer_details transfer_details) { ss::gate::holder gh{_gate}; retry_chain_node fib(&parent); retry_chain_logger ctxlog(log, fib); - auto fut = co_await ss::coroutine::as_future( - _pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), upload_result::timedout); - } - auto lease = std::move(fut).get(); auto permit = fib.retry(); vlog(ctxlog.debug, "Delete object {}", path); std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await ss::coroutine::as_future( + _pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), upload_result::timedout); + } + auto lease = std::move(fut).get(); // NOTE: DeleteObject in S3 doesn't return an error // if the object doesn't exist. Because of that we're // using 'upload_result' type as a return type. No need @@ -731,6 +737,9 @@ remote::delete_object(transfer_details transfer_details) { } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (res.error()) { case cloud_storage_clients::error_outcome::retry: @@ -877,18 +886,18 @@ ss::future remote::delete_object_batch( retry_chain_node fib(&parent); retry_chain_logger ctxlog(log, fib); - auto fut = co_await ss::coroutine::as_future( - _pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), upload_result::timedout); - } - auto lease = std::move(fut).get(); auto permit = fib.retry(); vlog(ctxlog.debug, "Deleting a batch of size {}", keys.size()); std::optional result; while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await ss::coroutine::as_future( + _pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), upload_result::timedout); + } + auto lease = std::move(fut).get(); req_cb(fib.retry_count()); auto res = co_await lease.client->delete_objects( bucket_parts->name, keys, fib.get_timeout()); @@ -910,6 +919,9 @@ ss::future remote::delete_object_batch( } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (res.error()) { case cloud_storage_clients::error_outcome::retry: @@ -1082,14 +1094,6 @@ ss::future remote::list_objects( ss::gate::holder gh{_gate}; retry_chain_node fib(&parent); retry_chain_logger ctxlog(log, fib); - auto fut = co_await ss::coroutine::as_future( - _pool.local().acquire_with_timeout( - *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); - if (fut.failed()) { - co_return throw_if_not_timeout( - fut.get_exception(), cloud_storage_clients::error_outcome::retry); - } - auto lease = std::move(fut).get(); auto permit = fib.retry(); vlog(ctxlog.debug, "List objects {}", bucket); std::optional result; @@ -1107,6 +1111,14 @@ ss::future remote::list_objects( // Keep iterating while the ListObjectsV2 calls has more items to return while (!_gate.is_closed() && permit.is_allowed && !result) { + auto fut = co_await ss::coroutine::as_future( + _pool.local().acquire_with_timeout( + *bucket_parts, fib.root_abort_source(), _lease_timeout(), fib())); + if (fut.failed()) { + co_return throw_if_not_timeout( + fut.get_exception(), cloud_storage_clients::error_outcome::retry); + } + auto lease = std::move(fut).get(); auto res = co_await lease.client->list_objects( bucket_parts->name, prefix, @@ -1159,6 +1171,9 @@ ss::future remote::list_objects( } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (res.error()) { case cloud_storage_clients::error_outcome::retry: @@ -1256,6 +1271,9 @@ ss::future remote::upload_object(upload_request upload_request) { } lease.client->shutdown(); + { + auto _ = std::move(lease); + } switch (res.error()) { case cloud_storage_clients::error_outcome::retry: vlog(