Skip to content

Respond early in fetch requests that have read max.partition.fetch.bytes#28501

Draft
ballard26 wants to merge 3 commits into
redpanda-data:devfrom
ballard26:early-resp-fetch
Draft

Respond early in fetch requests that have read max.partition.fetch.bytes#28501
ballard26 wants to merge 3 commits into
redpanda-data:devfrom
ballard26:early-resp-fetch

Conversation

@ballard26
Copy link
Copy Markdown
Contributor

This PR changes the fetch path to send a response when
max.partition.fetch.bytes bytes have been read for any partition.

Before this if only one partition in a request ever had data to be read
and if max.partition.fetch.bytes < fetch.min.bytes then Redpanda would
always wait fetch.max.wait.ms before returning. This effectively
throttled the rate we could read data from the one partition to
max.partition.fetch.bytes * (1s / fetch.max.wait.ms) bytes/second.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

  • none

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR optimizes the fetch request handling to avoid unnecessary delays when reading from a single partition. Specifically, it ensures that Redpanda sends a response immediately after reading max.partition.fetch.bytes for any partition, even if fetch.min.bytes hasn't been met and fetch.max.wait.ms hasn't elapsed.

Key Changes:

  • Added early response logic when max.partition.fetch.bytes is reached for any partition
  • Introduced tracking of whether the maximum partition fetch bytes has been read
  • Added test coverage to validate the optimization with OpenMessagingBenchmark

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/v/kafka/server/handlers/fetch.h Added has_read_max_partition_fetch_bytes flag to op_context and read_max_bytes flag to read_result; updated should_stop_fetch() condition to check if max partition bytes reached
src/v/kafka/server/handlers/fetch.cc Captured original max_bytes value, computed read_max_bytes flag based on data read vs max bytes, propagated flag to operation context, changed && to non-reference binding
tests/rptest/tests/fetch_tests.py Added new test test_early_response_on_max_bytes to verify early response behavior when max partition fetch bytes is reached
tests/rptest/services/openmessaging_benchmark.py Added computation of consumeRateAvg metric from consumeRate data
tests/rptest/services/openmessaging_benchmark_configs.py Added CON_RATE_AVG constant for consume rate average metric

Comment thread src/v/kafka/server/handlers/fetch.h Outdated
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Nov 13, 2025

CI test results

test results on build#76171
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/76171#019a7a64-b79c-4bf8-b2e4-a6faca9ce370 FLAKY 18/21 upstream reliability is '86.37510513036165'. current run reliability is '85.71428571428571'. drift is 0.66082 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=MountUnmountIcebergTest&test_method=test_simple_remount
src/v/cluster_link/tests/utils_test src/v/cluster_link/tests/utils_test unit https://buildkite.com/redpanda/redpanda/builds/76171#019a7a1d-85b1-4f60-8b74-7364f23f2ebf FAIL 0/2
src/v/storage/tests/segment_appender_rpbench_test src/v/storage/tests/segment_appender_rpbench_test unit https://buildkite.com/redpanda/redpanda/builds/76171#019a7a1d-85b1-4f60-8b74-7364f23f2ebf FLAKY 1/2
src/v/test_utils/tests/test_utils_test src/v/test_utils/tests/test_utils_test unit https://buildkite.com/redpanda/redpanda/builds/76171#019a7a1d-85b1-4f60-8b74-7364f23f2ebf FAIL 0/1
storage_test_fixture max_compaction_lag unit https://buildkite.com/redpanda/redpanda/builds/76171#019a7a1d-85b1-4f60-8b74-7364f23f2ebf FLAKY 1/2
test_with_all_types_remote_fixture/all_types_remote_fixture test_list_bucket_with_prefix/1 unit https://buildkite.com/redpanda/redpanda/builds/76171#019a7a1d-85b1-4f60-8b74-7364f23f2ebf FLAKY 1/2
test results on build#76183
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/76183#019a7bb6-bb7f-451b-b5cc-0f925bc1cf89 FLAKY 17/21 upstream reliability is '86.15384615384616'. current run reliability is '80.95238095238095'. drift is 5.20147 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=MountUnmountIcebergTest&test_method=test_simple_remount
SegmentMsTest test_segment_rolling_with_retention_consumer null integration https://buildkite.com/redpanda/redpanda/builds/76183#019a7bb6-bb80-4afc-acfc-d18454a4c182 FLAKY 20/21 upstream reliability is '92.63537906137184'. current run reliability is '95.23809523809523'. drift is -2.60272 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=SegmentMsTest&test_method=test_segment_rolling_with_retention_consumer
CreateTopicReplicaDistributionTest test_topic_aware_distribution null integration https://buildkite.com/redpanda/redpanda/builds/76183#019a7bb2-4fcb-4520-9927-92303576d24a FLAKY 20/21 upstream reliability is '99.76525821596243'. current run reliability is '95.23809523809523'. drift is 4.52716 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CreateTopicReplicaDistributionTest&test_method=test_topic_aware_distribution

This commit changes the fetch path to send a response when `
max.partition.fetch.bytes` bytes have been read for any partition.

Before this if only one partition in a request ever had data to be read
and if `max.partition.fetch.bytes < fetch.min.bytes` then Redpanda would
always wait `fetch.max.wait.ms` before returning. This effectively
throttled the rate we could read data from the one partition to
`max.partition.fetch.bytes * (1s / fetch.max.wait.ms)` bytes/second.
// hitting some limit then this condition ensure that the fetch
// ends. This is to ensure that we don't unintentionally throttle
// the reads from the partition.
|| has_read_max_partition_fetch_bytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check should_stop_fetch() after at least one shard has returned some data.

Consider a scenario where we have partition 0 on shard 0 and partition 1 on shard 1, both have data, and the read for partition 0 completes earlier. I believe now we would drop the data we have already read for partition 1 because we would return early, whereas previously we would only respond once we have filled both partitions.

Am I missing something here / would this change in behaviour be problematic?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if I'm missing something here, maybe it's worth adding a fixture test to demonstrate the expected behaviour in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is the case. There are a few things to note for why that is though:

  • Shard fetch workers will always try to read from each partition at least once before returning. Even when aborted. And when aborted they'll still send all data that has been read to the coordinator.
  • When the fetch coordinator sees that should_stop_fetch() == true it will signal all the workers to stop via an abort. It'll then wait for all workers to return their results and fill out the response before ending the fetch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a fixture test around worker behavior on abort if its needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the abort source used for the log reading:

.abort_source = octx.rctx.abort_source(),

is different from the shard-local workers' abort source:

.as = _worker_aborts[shard],

Right? I think that makes sense now.

Yeah, can you add a fixture test around abort behaviour please? The logic is sufficiently complex that I think it would be good to have a test around this to ensure that this behaviour stays the same.

Comment thread tests/rptest/tests/fetch_test.py
@ballard26 ballard26 requested a review from pgellert November 13, 2025 21:33
@travisdowns
Copy link
Copy Markdown
Member

@ballard26 what's the status of this one? Are you waiting on review from me?

@dotnwat dotnwat marked this pull request as draft April 29, 2026 14:21
@dotnwat
Copy link
Copy Markdown
Member

dotnwat commented Apr 29, 2026

I put this in draft to remove it from some lists that were accumulating inactive PRs. Please undraft it at any time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants