Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/v/cloud_io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ redpanda_cc_library(
"//src/v/bytes:iostream",
"//src/v/cloud_io:logger",
"//src/v/ssx:future_util",
"//src/v/ssx:semaphore",
"//src/v/ssx:watchdog",
"//src/v/utils:retry_chain_node",
"@boost//:beast",
"@boost//:lexical_cast",
Expand Down
130 changes: 74 additions & 56 deletions src/v/cloud_io/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
#include "container/chunked_vector.h"
#include "model/metadata.h"
#include "ssx/future-util.h"
#include "ssx/semaphore.h"
#include "ssx/watchdog.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/variant_utils.hh>

#include <boost/beast/http/field.hpp>
#include <boost/lexical_cast.hpp>
Expand Down Expand Up @@ -297,6 +294,9 @@ ss::future<upload_result> remote::upload_stream(
}

lease.client->shutdown();
{
auto _ = std::move(lease);
Comment thread
nvartolomei marked this conversation as resolved.
}
switch (res.error()) {
case cloud_storage_clients::error_outcome::retry:
vlog(
Expand Down Expand Up @@ -375,21 +375,20 @@ ss::future<download_result> remote::download_stream(
retry_chain_node fib(&transfer_details.parent_rtc);
retry_chain_logger ctxlog(log, fib);

auto fut = co_await [this, &fib, &transfer_details, &bucket_parts] {
transfer_details.on_client_acquire();
return ss::coroutine::as_future(_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
}();
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();

auto permit = fib.retry();
vlog(ctxlog.debug, "Download {} {}", stream_label, path);
std::optional<download_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await [this, &fib, &transfer_details, &bucket_parts] {
transfer_details.on_client_acquire();
return ss::coroutine::as_future(_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
}();
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();
transfer_details.on_request(fib.retry_count());

auto download_latency_measure
Expand Down Expand Up @@ -425,6 +424,9 @@ ss::future<download_result> remote::download_stream(
download_latency_measure.reset();

lease.client->shutdown();
{
auto _ = std::move(lease);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::ignore = std::move(lease);?

}
Comment thread
nvartolomei marked this conversation as resolved.

switch (resp.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down Expand Up @@ -511,20 +513,19 @@ remote::download_object(download_request download_request) {
}
const auto object_type = download_request.display_str;

auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();

auto permit = fib.retry();
vlog(ctxlog.debug, "Downloading {} from {}", object_type, path);

std::optional<download_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();
download_request.transfer_details.on_request(fib.retry_count());
auto resp = co_await lease.client->get_object(
bucket_parts->name,
Expand All @@ -547,6 +548,9 @@ remote::download_object(download_request download_request) {
}

lease.client->shutdown();
{
auto _ = std::move(lease);
}
Comment thread
nvartolomei marked this conversation as resolved.

switch (resp.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down Expand Up @@ -616,18 +620,18 @@ ss::future<download_result> remote::object_exists(
ss::gate::holder gh{_gate};
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(log, fib);
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();
auto permit = fib.retry();
vlog(ctxlog.debug, "Check {} {}", object_type, path);
std::optional<download_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), download_result::timedout);
}
auto lease = std::move(fut).get();
auto resp = co_await lease.client->head_object(
bucket_parts->name, path, fib.get_timeout());
if (resp) {
Expand All @@ -641,8 +645,10 @@ ss::future<download_result> remote::object_exists(
co_return download_result::success;
}

// Error path
lease.client->shutdown();
{
auto _ = std::move(lease);
}
Comment thread
nvartolomei marked this conversation as resolved.
switch (resp.error()) {
case cloud_storage_clients::error_outcome::retry:
vlog(
Expand Down Expand Up @@ -705,18 +711,18 @@ remote::delete_object(transfer_details transfer_details) {
ss::gate::holder gh{_gate};
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(log, fib);
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), upload_result::timedout);
}
auto lease = std::move(fut).get();
auto permit = fib.retry();
vlog(ctxlog.debug, "Delete object {}", path);
std::optional<upload_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), upload_result::timedout);
}
auto lease = std::move(fut).get();
// NOTE: DeleteObject in S3 doesn't return an error
// if the object doesn't exist. Because of that we're
// using 'upload_result' type as a return type. No need
Expand All @@ -731,6 +737,9 @@ remote::delete_object(transfer_details transfer_details) {
}

lease.client->shutdown();
{
auto _ = std::move(lease);
}
Comment thread
nvartolomei marked this conversation as resolved.

switch (res.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down Expand Up @@ -877,18 +886,18 @@ ss::future<upload_result> remote::delete_object_batch(

retry_chain_node fib(&parent);
retry_chain_logger ctxlog(log, fib);
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), upload_result::timedout);
}
auto lease = std::move(fut).get();
auto permit = fib.retry();
vlog(ctxlog.debug, "Deleting a batch of size {}", keys.size());
std::optional<upload_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), upload_result::timedout);
}
auto lease = std::move(fut).get();
req_cb(fib.retry_count());
auto res = co_await lease.client->delete_objects(
bucket_parts->name, keys, fib.get_timeout());
Expand All @@ -910,6 +919,9 @@ ss::future<upload_result> remote::delete_object_batch(
}

lease.client->shutdown();
{
auto _ = std::move(lease);
Comment thread
nvartolomei marked this conversation as resolved.
}

switch (res.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down Expand Up @@ -1082,14 +1094,6 @@ ss::future<list_result> remote::list_objects(
ss::gate::holder gh{_gate};
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(log, fib);
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), cloud_storage_clients::error_outcome::retry);
}
auto lease = std::move(fut).get();
auto permit = fib.retry();
vlog(ctxlog.debug, "List objects {}", bucket);
std::optional<list_result> result;
Expand All @@ -1107,6 +1111,14 @@ ss::future<list_result> remote::list_objects(

// Keep iterating while the ListObjectsV2 calls has more items to return
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto fut = co_await ss::coroutine::as_future(
_pool.local().acquire_with_timeout(
*bucket_parts, fib.root_abort_source(), _lease_timeout(), fib()));
if (fut.failed()) {
co_return throw_if_not_timeout(
fut.get_exception(), cloud_storage_clients::error_outcome::retry);
}
auto lease = std::move(fut).get();
auto res = co_await lease.client->list_objects(
bucket_parts->name,
prefix,
Expand Down Expand Up @@ -1159,6 +1171,9 @@ ss::future<list_result> remote::list_objects(
}

lease.client->shutdown();
{
auto _ = std::move(lease);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use std:ignore =

}
Comment thread
nvartolomei marked this conversation as resolved.

switch (res.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down Expand Up @@ -1256,6 +1271,9 @@ ss::future<upload_result> remote::upload_object(upload_request upload_request) {
}

lease.client->shutdown();
{
auto _ = std::move(lease);
}
Comment thread
nvartolomei marked this conversation as resolved.
switch (res.error()) {
case cloud_storage_clients::error_outcome::retry:
vlog(
Expand Down
Loading