diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 08d6157cfaa64..db0ffec4bb06f 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -220,6 +220,7 @@ static ss::future do_read_from_ntp( std::optional deadline, const bool obligatory_batch_read, fetch_memory_units_manager& units_mgr) { + const auto original_max_bytes = ntp_config.cfg.max_bytes; // control available memory auto memory_units = units_mgr.zero_units(); if (!ntp_config.cfg.skip_read) { @@ -330,6 +331,8 @@ static ss::future do_read_from_ntp( // happen because there is no strict limit on read size when reading the // obligatory batch. auto result = std::move(res_fut.get()); + result.read_max_bytes = original_max_bytes != 0 + && result.data_size_bytes() >= original_max_bytes; memory_units.adjust_units(result.data_size_bytes()); result.memory_units = std::move(memory_units); co_return result; @@ -452,6 +455,7 @@ static void fill_fetch_responses( }); resp.aborted_transactions = std::move(aborted); } + octx.has_read_max_partition_fetch_bytes |= res.read_max_bytes; resp_units = std::move(res.memory_units); resp.records = batch_reader(std::move(res).release_data()); } else { @@ -506,7 +510,7 @@ static ss::future> fetch_ntps( // is larger. This is needed to conform with KIP-74. ntp_cfg.cfg.strict_max_bytes = !obligatory_batch_read; - auto&& res = co_await do_read_from_ntp( + auto res = co_await do_read_from_ntp( cluster_pm, md_cache, replica_selector, diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 07dfcc0fd5158..bc1c1b7b5fc8d 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -155,7 +155,12 @@ struct op_context { return !request.debounce_delay() || over_min_bytes() || is_empty_request() || contains_preferred_replica || response_error || rctx.abort_requested() - || deadline <= model::timeout_clock::now(); + || deadline <= model::timeout_clock::now() + // If no more data can be read from a given partition due to + // hitting some limit then this condition ensure that the fetch + // ends. This is to ensure that we don't unintentionally throttle + // the reads from the partition. + || has_read_max_partition_fetch_bytes; } bool over_min_bytes() const { @@ -208,6 +213,7 @@ struct op_context { // for fetches that have preferred replica set we skip read, therefore we // need other indicator of finished fetch request. bool contains_preferred_replica = false; + bool has_read_max_partition_fetch_bytes = false; }; struct fetch_config { @@ -350,6 +356,7 @@ struct read_result { model::offset data_base_offset; model::offset data_last_offset; size_t batch_count{0}; + bool read_max_bytes{false}; model::offset high_watermark; model::offset last_stable_offset; std::optional delta_from_tip_ms; diff --git a/tests/rptest/services/openmessaging_benchmark.py b/tests/rptest/services/openmessaging_benchmark.py index c93093c1a4af3..23c68d99d0629 100644 --- a/tests/rptest/services/openmessaging_benchmark.py +++ b/tests/rptest/services/openmessaging_benchmark.py @@ -479,6 +479,9 @@ def check_succeed( ) / (1024.0 * 1024.0) metrics["publishLatencyMin"] = min(metrics["publishLatencyMin"]) metrics["endToEndLatencyMin"] = min(metrics["endToEndLatencyMin"]) + metrics["consumeRateAvg"] = sum(metrics["consumeRate"]) / len( + metrics["consumeRate"] + ) self._metrics = metrics diff --git a/tests/rptest/services/openmessaging_benchmark_configs.py b/tests/rptest/services/openmessaging_benchmark_configs.py index 26c9dca3b223d..57d5ff5ef71b4 100644 --- a/tests/rptest/services/openmessaging_benchmark_configs.py +++ b/tests/rptest/services/openmessaging_benchmark_configs.py @@ -21,6 +21,7 @@ class OMBSampleConfigurations: # These are copied over from OMB/bin/generate_charts.py # All metrics are in milliseconds unless specially suffixed. + CON_RATE_AVG = "consumeRateAvg" PUB_LATENCY_MIN = "publishLatencyMin" PUB_LATENCY_AVG = "aggregatedPublishLatencyAvg" PUB_LATENCY_50PCT = "aggregatedPublishLatency50pct" diff --git a/tests/rptest/tests/fetch_test.py b/tests/rptest/tests/fetch_test.py new file mode 100644 index 0000000000000..da408b547b0a1 --- /dev/null +++ b/tests/rptest/tests/fetch_test.py @@ -0,0 +1,73 @@ +# Copyright 2025 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +from typing import Any +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster +from rptest.services.openmessaging_benchmark import OpenMessagingBenchmark +from rptest.services.openmessaging_benchmark_configs import OMBSampleConfigurations + + +class FetchTests(RedpandaTest): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, num_brokers=1, **kwargs) + + @cluster(num_nodes=4) + def test_early_response_on_max_bytes(self): + """ + Tests that Redpanda will send a fetch response when max.partition.fetch.bytes + has been read from a partition. Even if min bytes hasn't been met and max wait + hasn't passed. + """ + throughput_bytes = 10 * 1024 + workload = { + "name": "workload", + "topics": 1, + "partitions_per_topic": 1, + "subscriptions_per_topic": 1, + "consumer_per_subscription": 1, + "producers_per_topic": 1, + "producer_rate": throughput_bytes // 1024, + "consumer_backlog_size_GB": 0, + "test_duration_minutes": 1, + "warmup_duration_minutes": 1, + "message_size": 1024, + "payload_file": "payload/payload-1Kb.data", + } + driver = { + "name": "driver", + "replication_factor": 1, + "request_timeout": 300000, + "producer_config": { + "enable.idempotence": "true", + "acks": "all", + "max.in.flight.requests.per.connection": 5, + "batch.size": 1024, + }, + "consumer_config": { + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "max.partition.fetch.bytes": 1024, + "fetch.min.bytes": throughput_bytes, + "fetch.max.wait.ms": 1000, + }, + } + validator = { + OMBSampleConfigurations.CON_RATE_AVG: [OMBSampleConfigurations.gte(2)], + } + benchmark = OpenMessagingBenchmark( + ctx=self.test_context, + redpanda=self.redpanda, + driver=driver, + workload=(workload, validator), + topology="ensemble", + ) + benchmark.start() + benchmark_time_min = benchmark.benchmark_time_mins() + 5 + benchmark.wait(timeout_sec=benchmark_time_min * 60) + benchmark.check_succeed()