Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ static ss::future<read_result> do_read_from_ntp(
std::optional<model::timeout_clock::time_point> deadline,
const bool obligatory_batch_read,
fetch_memory_units_manager& units_mgr) {
const auto original_max_bytes = ntp_config.cfg.max_bytes;
// control available memory
auto memory_units = units_mgr.zero_units();
if (!ntp_config.cfg.skip_read) {
Expand Down Expand Up @@ -330,6 +331,8 @@ static ss::future<read_result> do_read_from_ntp(
// happen because there is no strict limit on read size when reading the
// obligatory batch.
auto result = std::move(res_fut.get());
result.read_max_bytes = original_max_bytes != 0
&& result.data_size_bytes() >= original_max_bytes;
memory_units.adjust_units(result.data_size_bytes());
result.memory_units = std::move(memory_units);
co_return result;
Expand Down Expand Up @@ -452,6 +455,7 @@ static void fill_fetch_responses(
});
resp.aborted_transactions = std::move(aborted);
}
octx.has_read_max_partition_fetch_bytes |= res.read_max_bytes;
resp_units = std::move(res.memory_units);
resp.records = batch_reader(std::move(res).release_data());
} else {
Expand Down Expand Up @@ -506,7 +510,7 @@ static ss::future<chunked_vector<read_result>> fetch_ntps(
// is larger. This is needed to conform with KIP-74.
ntp_cfg.cfg.strict_max_bytes = !obligatory_batch_read;

auto&& res = co_await do_read_from_ntp(
auto res = co_await do_read_from_ntp(
cluster_pm,
md_cache,
replica_selector,
Expand Down
9 changes: 8 additions & 1 deletion src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ struct op_context {
return !request.debounce_delay() || over_min_bytes()
|| is_empty_request() || contains_preferred_replica
|| response_error || rctx.abort_requested()
|| deadline <= model::timeout_clock::now();
|| deadline <= model::timeout_clock::now()
// If no more data can be read from a given partition due to
// hitting some limit then this condition ensure that the fetch
// ends. This is to ensure that we don't unintentionally throttle
// the reads from the partition.
|| has_read_max_partition_fetch_bytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check should_stop_fetch() after at least one shard has returned some data.

Consider a scenario where we have partition 0 on shard 0 and partition 1 on shard 1, both have data, and the read for partition 0 completes earlier. I believe now we would drop the data we have already read for partition 1 because we would return early, whereas previously we would only respond once we have filled both partitions.

Am I missing something here / would this change in behaviour be problematic?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if I'm missing something here, maybe it's worth adding a fixture test to demonstrate the expected behaviour in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is the case. There are a few things to note for why that is though:

  • Shard fetch workers will always try to read from each partition at least once before returning. Even when aborted. And when aborted they'll still send all data that has been read to the coordinator.
  • When the fetch coordinator sees that should_stop_fetch() == true it will signal all the workers to stop via an abort. It'll then wait for all workers to return their results and fill out the response before ending the fetch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a fixture test around worker behavior on abort if its needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the abort source used for the log reading:

.abort_source = octx.rctx.abort_source(),

is different from the shard-local workers' abort source:

.as = _worker_aborts[shard],

Right? I think that makes sense now.

Yeah, can you add a fixture test around abort behaviour please? The logic is sufficiently complex that I think it would be good to have a test around this to ensure that this behaviour stays the same.

}

bool over_min_bytes() const {
Expand Down Expand Up @@ -208,6 +213,7 @@ struct op_context {
// for fetches that have preferred replica set we skip read, therefore we
// need other indicator of finished fetch request.
bool contains_preferred_replica = false;
bool has_read_max_partition_fetch_bytes = false;
};

struct fetch_config {
Expand Down Expand Up @@ -350,6 +356,7 @@ struct read_result {
model::offset data_base_offset;
model::offset data_last_offset;
size_t batch_count{0};
bool read_max_bytes{false};
model::offset high_watermark;
model::offset last_stable_offset;
std::optional<std::chrono::milliseconds> delta_from_tip_ms;
Expand Down