-
Notifications
You must be signed in to change notification settings - Fork 743
kafka: foundation for ListOffsets leader epoch fix (CORE-12505) #30347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ff7f879
716a7c4
2040bce
cafb8c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| #include "cluster/metadata_cache.h" | ||
| #include "cluster/partition_manager.h" | ||
| #include "cluster/shard_table.h" | ||
| #include "config/configuration.h" | ||
| #include "container/chunked_vector.h" | ||
| #include "kafka/data/partition_proxy.h" | ||
| #include "kafka/protocol/errors.h" | ||
|
|
@@ -61,12 +62,38 @@ struct list_offsets_ctx { | |
| , unauthorized_topics(std::move(unauthorized_topics)) {} | ||
| }; | ||
|
|
||
| /// Compute the leader_epoch to put on a ListOffsets response. | ||
| /// | ||
| /// CORE-12505: when `enable_listoffsets_historical_leader_epoch` is on | ||
| /// (and the topic is not a read replica), return the record's leader | ||
| /// epoch, to match Kafka and support KIP-320 truncation detection on | ||
| /// consumers. Otherwise, return the partition's current leader epoch. | ||
| /// | ||
| /// `historical_term` is the term of the matched record, or `nullopt` | ||
| /// when no record was matched (e.g., timequery on an empty partition), | ||
| /// in which case we return `kafka::invalid_leader_epoch`. | ||
| /// | ||
| /// Read replicas are excluded pending separate analysis. | ||
| static kafka::leader_epoch response_leader_epoch( | ||
| const partition_proxy& kafka_partition, | ||
| bool is_read_replica, | ||
| std::optional<model::term_id> historical_term) { | ||
| const bool correct_epoch_enabled | ||
| = config::shard_local_cfg().enable_listoffsets_historical_leader_epoch(); | ||
| if (!correct_epoch_enabled || is_read_replica) { | ||
| return kafka_partition.leader_epoch(); | ||
| } | ||
| return historical_term ? kafka::leader_epoch_from_term(*historical_term) | ||
| : kafka::invalid_leader_epoch; | ||
| } | ||
|
|
||
| static ss::future<list_offset_partition_response> list_offsets_partition( | ||
| list_offsets_ctx& octx, | ||
| model::timestamp timestamp, | ||
| model::ktp ktp, | ||
| model::isolation_level isolation_lvl, | ||
| kafka::leader_epoch current_leader_epoch, | ||
| bool is_read_replica, | ||
| cluster::partition_manager& mgr) { | ||
| auto kafka_partition = make_partition_proxy(ktp, mgr); | ||
| if (!kafka_partition) { | ||
|
|
@@ -116,6 +143,11 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |
| ktp.get_partition(), maybe_start_ofs.error()); | ||
| } | ||
|
|
||
| // TODO(CORE-12505 follow-up): route through response_leader_epoch | ||
| // once partition_proxy exposes a term-for-offset lookup. When | ||
| // the property is true and the topic is not a read replica, | ||
| // this should return the historical term for | ||
| // maybe_start_ofs.value() instead of the current leader term. | ||
| co_return list_offsets_response::make_partition( | ||
| ktp.get_partition(), | ||
| model::timestamp(-1), | ||
|
|
@@ -138,7 +170,8 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |
| ktp.get_partition(), | ||
| model::timestamp(-1), | ||
| model::offset(-1), | ||
| kafka_partition->leader_epoch()); | ||
| response_leader_epoch( | ||
| *kafka_partition, is_read_replica, std::nullopt)); | ||
| } | ||
|
|
||
| auto res_fut = co_await ss::coroutine::as_future(kafka_partition->timequery( | ||
|
|
@@ -161,7 +194,10 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |
| auto res = res_fut.get(); | ||
| if (res) { | ||
| co_return list_offsets_response::make_partition( | ||
| id, res->time, res->offset, kafka_partition->leader_epoch()); | ||
| id, | ||
| res->time, | ||
| res->offset, | ||
| response_leader_epoch(*kafka_partition, is_read_replica, res->term)); | ||
| } | ||
| co_return list_offsets_response::make_partition(id, error_code::none); | ||
| } | ||
|
|
@@ -170,7 +206,8 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |
| list_offsets_ctx& octx, | ||
| model::timestamp timestamp, | ||
| list_offset_topic& topic, | ||
| list_offset_partition& part) { | ||
| list_offset_partition& part, | ||
| bool is_read_replica) { | ||
| model::ktp ktp(topic.name, part.partition_index); | ||
|
|
||
| auto shard = octx.rctx.shards().shard_for(ktp); | ||
|
|
@@ -188,14 +225,15 @@ static ss::future<list_offset_partition_response> list_offsets_partition( | |
| ntp = std::move(ktp), | ||
| isolation_lvl = model::isolation_level( | ||
| octx.request.data.isolation_level), | ||
| current_leader_epoch = part.current_leader_epoch]( | ||
| cluster::partition_manager& mgr) mutable { | ||
| current_leader_epoch = part.current_leader_epoch, | ||
| is_read_replica](cluster::partition_manager& mgr) mutable { | ||
| return list_offsets_partition( | ||
| octx, | ||
| timestamp, | ||
| std::move(ntp), | ||
| isolation_lvl, | ||
| current_leader_epoch, | ||
| is_read_replica, | ||
| mgr); | ||
| }); | ||
| } | ||
|
|
@@ -209,6 +247,9 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { | |
| = octx.rctx.metadata_cache().get_topic_disabled_set( | ||
| model::topic_namespace_view{model::kafka_namespace, topic.name}); | ||
|
|
||
| const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should error out if topic_cfg is missing. Suggest making is_rr an optional and use it in the per partition conditional below Wdyt?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, I added a check for topic_cfg in the existing per-partition not-found branch. After that point, we can use |
||
| model::topic_namespace_view{model::kafka_namespace, topic.name}); | ||
|
|
||
| for (auto& part : topic.partitions) { | ||
| if (octx.request.duplicate_tp(topic.name, part.partition_index)) { | ||
| partitions.push_back( | ||
|
|
@@ -218,9 +259,11 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { | |
| continue; | ||
| } | ||
|
|
||
| if (!octx.rctx.metadata_cache().contains( | ||
| model::topic_namespace_view(model::kafka_namespace, topic.name), | ||
| part.partition_index)) { | ||
| if ( | ||
| !octx.rctx.metadata_cache().contains( | ||
| model::topic_namespace_view(model::kafka_namespace, topic.name), | ||
| part.partition_index) | ||
| || !topic_cfg.has_value()) { | ||
| partitions.push_back( | ||
| ss::make_ready_future<list_offset_partition_response>( | ||
| list_offsets_response::make_partition( | ||
|
|
@@ -237,7 +280,8 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { | |
| continue; | ||
| } | ||
|
|
||
| auto pr = list_offsets_partition(octx, part.timestamp, topic, part); | ||
| auto pr = list_offsets_partition( | ||
| octx, part.timestamp, topic, part, topic_cfg->is_read_replica()); | ||
| partitions.push_back(std::move(pr)); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| from rptest.services.rpk_consumer import RpkConsumer | ||
| from rptest.tests.redpanda_test import RedpandaTest | ||
| from rptest.util import wait_until_result | ||
| from ducktape.mark import parametrize | ||
|
|
||
|
|
||
| # --- ListOffsets v4 (API key 2) --- | ||
|
|
@@ -371,14 +372,20 @@ def _get_committed(self, cluster, group, topic, partition): | |
|
|
||
| return (-1, -1) | ||
|
|
||
| def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): | ||
| def _test_list_offsets_epoch( | ||
| self, cluster, expect_incorrect_earliest, expect_incorrect_timequery | ||
| ): | ||
| """Verify ListOffsets returns the correct leader epoch for each | ||
| timestamp query type. | ||
|
|
||
| All records are produced before leadership is transferred 3 | ||
| times. The earliest and timequery paths should return the | ||
| initial epoch (the record epoch), while the latest path should | ||
| return the current leader epoch (correct per Kafka). | ||
|
|
||
| Earliest and timequery are gated by separate flags because | ||
| Redpanda's partial fix (CORE-12505) addresses timequery before | ||
| earliest — the earliest-path lookup ships in a follow-up. | ||
| """ | ||
| initial_epoch, current_epoch = self._setup_topic_with_epoch_gap(cluster) | ||
|
|
||
|
|
@@ -387,7 +394,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): | |
| self.logger.info( | ||
| f"Earliest: offset={offset}, epoch={epoch}, current_epoch={current_epoch}" | ||
| ) | ||
| if expect_incorrect_behavior: | ||
| if expect_incorrect_earliest: | ||
| assert epoch == current_epoch, ( | ||
| f"Bug expected: earliest epoch should be current " | ||
| f"({current_epoch}), got {epoch}" | ||
|
|
@@ -415,7 +422,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): | |
| self.logger.info( | ||
| f"Timequery: offset={offset}, epoch={epoch}, current_epoch={current_epoch}" | ||
| ) | ||
| if expect_incorrect_behavior: | ||
| if expect_incorrect_timequery: | ||
| assert epoch == current_epoch, ( | ||
| f"Bug expected: timequery epoch should be current " | ||
| f"({current_epoch}), got {epoch}" | ||
|
|
@@ -637,13 +644,36 @@ def _apply_throwaway_hack(self, real_group, throwaway_group, topic, partition): | |
| ) | ||
|
|
||
| @cluster(num_nodes=3) | ||
| def test_list_offsets_epoch(self): | ||
| self._test_list_offsets_epoch(self.redpanda, expect_incorrect_behavior=True) | ||
| @parametrize(correct_epoch=False) | ||
| @parametrize(correct_epoch=True) | ||
| def test_list_offsets_epoch(self, correct_epoch): | ||
| if correct_epoch: | ||
| # enable_listoffsets_historical_leader_epoch is gated as a | ||
| # development feature; opt in to dev features before flipping it. | ||
| self.redpanda.enable_development_feature_support() | ||
| self.redpanda.set_cluster_config( | ||
| {"enable_listoffsets_historical_leader_epoch": True} | ||
| ) | ||
| self._test_list_offsets_epoch( | ||
| self.redpanda, | ||
| expect_incorrect_earliest=True, # earliest still buggy until follow-up PR | ||
| expect_incorrect_timequery=not correct_epoch, | ||
| ) | ||
|
|
||
| @cluster(num_nodes=3) | ||
| def test_list_offsets_epoch_empty_partition(self): | ||
| @parametrize(correct_epoch=False) | ||
| @parametrize(correct_epoch=True) | ||
| def test_list_offsets_epoch_empty_partition(self, correct_epoch): | ||
| if correct_epoch: | ||
| # enable_listoffsets_historical_leader_epoch is gated as a | ||
| # development feature; opt in to dev features before flipping it. | ||
| self.redpanda.enable_development_feature_support() | ||
| self.redpanda.set_cluster_config( | ||
| {"enable_listoffsets_historical_leader_epoch": True} | ||
| ) | ||
| self._test_empty_partition_list_offsets_epoch( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing coverage for earliest/latest queries with empty partition? These return leader_epoch()
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| self.redpanda, expect_incorrect_behavior=True | ||
| self.redpanda, | ||
| expect_incorrect_behavior=not correct_epoch, | ||
| ) | ||
|
|
||
| @cluster(num_nodes=4) | ||
|
|
@@ -860,7 +890,11 @@ def _restart_leader(self, cluster, topic, partition): | |
| @ducktape_cluster(num_nodes=4) | ||
| def test_list_offsets_epoch(self): | ||
| # Kafka defines the correct behavior we compare against. | ||
| self._test_list_offsets_epoch(self.kafka, expect_incorrect_behavior=False) | ||
| self._test_list_offsets_epoch( | ||
| self.kafka, | ||
| expect_incorrect_earliest=False, | ||
| expect_incorrect_timequery=False, | ||
| ) | ||
|
|
||
| @ducktape_cluster(num_nodes=4) | ||
| def test_list_offsets_epoch_empty_partition(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on reading the ticket, it seems unsafe to ever change this once set. Is that correct? If so, it's probably worth adding a warning, and making it non-user-visable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I updated the description and changed it to
visibility::tunable.