Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 121 additions & 70 deletions src/v/cloud_topics/frontend/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,92 +1245,143 @@ ss::future<result<raft::replicate_result>> frontend::replicate_at_offset(
model::timeout_clock::duration timeout,
std::optional<std::reference_wrapper<ss::abort_source>> as,
ss::shared_ptr<kafka::write_at_offset_stm> stm) {
chunked_vector<model::record_batch_header> headers;
headers.reserve(batches.size());
for (const auto& batch : batches) {
headers.push_back(batch.header());
// Only user data batches (raft_data with !is_control()) are uploaded
// to L0 and wrapped as ctp_placeholders. Other batch types
// (raft_configuration, tx_fence, control batches like transaction
// commit/abort markers, etc.) carry their payload in the record
Comment on lines +1250 to +1251
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of confused by this -- I was under the impression that the only batches we expect here are data batches (which may include tx control batches, but not raft configuration). Is that the case? What non-raft_data batches do we expect here? If we're just being conservative here, could we update the comment to indicate that?

// key/value and must be passed through to the local raft log
// unchanged - otherwise the placeholder encoding strips the key and
// downstream consumers like rm_stm cannot parse them.
chunked_vector<model::record_batch_header> data_headers;
chunked_vector<model::record_batch> data_batches;
// Per-input-position slots: true means the slot will be filled with a
// generated placeholder, false means it carries a pass-through batch
// already stored in `passthrough_batches`.
chunked_vector<bool> is_data_slot;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to make this a set of non-data indexes. At least, I imagine we're more likely to have zero non-data batches in most cases.

chunked_vector<model::record_batch> passthrough_batches;
is_data_slot.reserve(batches.size());
for (auto&& batch : batches) {
const bool is_data = batch.header().type
== model::record_batch_type::raft_data
&& !batch.header().attrs.is_control();
if (is_data) {
is_data_slot.push_back(true);
data_headers.push_back(batch.header());
data_batches.push_back(std::move(batch));
} else {
is_data_slot.push_back(false);
passthrough_batches.push_back(std::move(batch));
}
}
batches.clear();

auto min_epoch = cluster_epoch(_partition->get_topic_revision_id());

// Use the std::max trick from the normal produce path to reduce
// the likelihood of fencing errors when shards are on different
// epochs.
auto accepted_min = _ctp_stm_api->get_max_seen_epoch(_partition->term());
if (!accepted_min) {
accepted_min = _ctp_stm_api->get_max_epoch();
}
if (accepted_min) {
min_epoch = std::max(min_epoch, *accepted_min);
}
chunked_vector<model::record_batch> final_batches;
final_batches.reserve(is_data_slot.size());

vassert(
min_epoch() > 0L,
"Unexpected invalid min epoch {} for {}",
min_epoch,
ntp());
if (data_batches.empty()) {
// Nothing to upload - all batches are pass-through.
for (auto&& b : passthrough_batches) {
final_batches.push_back(std::move(b));
}
Comment on lines +1283 to +1285
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be simpler here:

Suggested change
for (auto&& b : passthrough_batches) {
final_batches.push_back(std::move(b));
}
final_batches = std::move(passthrough_batches);

} else {
auto min_epoch = cluster_epoch(_partition->get_topic_revision_id());

// Use the std::max trick from the normal produce path to reduce
// the likelihood of fencing errors when shards are on different
// epochs.
auto accepted_min = _ctp_stm_api->get_max_seen_epoch(
_partition->term());
if (!accepted_min) {
accepted_min = _ctp_stm_api->get_max_epoch();
}
if (accepted_min) {
min_epoch = std::max(min_epoch, *accepted_min);
}

auto staged = co_await _data_plane->stage_write(std::move(batches));
if (!staged.has_value()) {
co_return staged.error();
}
vassert(
min_epoch() > 0L,
"Unexpected invalid min epoch {} for {}",
min_epoch,
ntp());

auto deadline = model::timeout_clock::now() + timeout;
auto res = co_await _data_plane->execute_write(
ntp(), min_epoch, std::move(staged.value()), deadline);
auto staged = co_await _data_plane->stage_write(
std::move(data_batches));
if (!staged.has_value()) {
co_return staged.error();
}

if (!res.has_value()) {
co_return res.error();
}
auto deadline = model::timeout_clock::now() + timeout;
auto res = co_await _data_plane->execute_write(
ntp(), min_epoch, std::move(staged.value()), deadline);

auto batch_epoch = res.value().extents.front().id.epoch;
if (!res.has_value()) {
co_return res.error();
}

auto fence_fut = co_await ss::coroutine::as_future(
_ctp_stm_api->fence_epoch(batch_epoch));
if (fence_fut.failed()) {
auto not_leader = !_partition->is_leader();
auto e = fence_fut.get_exception();
if (not_leader) {
auto batch_epoch = res.value().extents.front().id.epoch;

auto fence_fut = co_await ss::coroutine::as_future(
_ctp_stm_api->fence_epoch(batch_epoch));
if (fence_fut.failed()) {
auto not_leader = !_partition->is_leader();
auto e = fence_fut.get_exception();
if (not_leader) {
vlog(
cd_log.debug,
"Failed to fence epoch {} for ntp {}, not a leader",
batch_epoch,
ntp());
} else {
vlogl(
cd_log,
ssx::is_shutdown_exception(e) ? ss::log_level::debug
: ss::log_level::warn,
"Failed to fence epoch {} for ntp {}, error: {}",
batch_epoch,
ntp(),
e);
}
std::rethrow_exception(e);
}
auto fence = std::move(fence_fut.get());
if (!fence.has_value()) {
vlog(
cd_log.debug,
"Failed to fence epoch {} for ntp {}, not a leader",
batch_epoch,
ntp());
} else {
vlogl(
cd_log,
ssx::is_shutdown_exception(e) ? ss::log_level::debug
: ss::log_level::warn,
"Failed to fence epoch {} for ntp {}, error: {}",
cd_log.warn,
"Failed to fence epoch {} for ntp {}, ctp latest seen epoch "
"is [{}, {}]",
batch_epoch,
ntp(),
e);
fence.error().window_min,
fence.error().window_max);
co_return raft::errc::not_leader;
}
std::rethrow_exception(e);
}
auto fence = std::move(fence_fut.get());
if (!fence.has_value()) {
vlog(
cd_log.warn,
"Failed to fence epoch {} for ntp {}, ctp latest seen epoch "
"is [{}, {}]",
batch_epoch,
ntp(),
fence.error().window_min,
fence.error().window_max);
co_return raft::errc::not_leader;
}

auto placeholders = co_await convert_to_placeholders(
res.value().extents, headers);

chunked_vector<model::record_batch> placeholder_batches;
for (auto&& batch : placeholders.batches) {
placeholder_batches.push_back(std::move(batch));
auto placeholders = co_await convert_to_placeholders(
res.value().extents, data_headers);

// Interleave generated placeholders with pass-through batches to
// restore the original input order.
auto ph_it = placeholders.batches.begin();
auto pt_it = passthrough_batches.begin();
for (bool is_data : is_data_slot) {
if (is_data) {
vassert(
ph_it != placeholders.batches.end(),
"placeholder count mismatch for {}",
ntp());
final_batches.push_back(std::move(*ph_it++));
} else {
vassert(
pt_it != passthrough_batches.end(),
"passthrough count mismatch for {}",
ntp());
final_batches.push_back(std::move(*pt_it++));
}
}
Comment on lines +1362 to +1380
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we need the bitmap at all. Do these batches have offsets assigned already? If so, could we merge them by offset?

}

auto stages = stm->replicate(
std::move(placeholder_batches),
std::move(final_batches),
std::move(expected_base_offsets),
prev_log_offset,
timeout,
Expand Down
Loading
Loading