diff --git a/src/v/cloud_topics/frontend/frontend.cc b/src/v/cloud_topics/frontend/frontend.cc index a9df4e6206a82..269d666bf0b4c 100644 --- a/src/v/cloud_topics/frontend/frontend.cc +++ b/src/v/cloud_topics/frontend/frontend.cc @@ -1245,92 +1245,143 @@ ss::future> frontend::replicate_at_offset( model::timeout_clock::duration timeout, std::optional> as, ss::shared_ptr stm) { - chunked_vector headers; - headers.reserve(batches.size()); - for (const auto& batch : batches) { - headers.push_back(batch.header()); + // Only user data batches (raft_data with !is_control()) are uploaded + // to L0 and wrapped as ctp_placeholders. Other batch types + // (raft_configuration, tx_fence, control batches like transaction + // commit/abort markers, etc.) carry their payload in the record + // key/value and must be passed through to the local raft log + // unchanged - otherwise the placeholder encoding strips the key and + // downstream consumers like rm_stm cannot parse them. + chunked_vector data_headers; + chunked_vector data_batches; + // Per-input-position slots: true means the slot will be filled with a + // generated placeholder, false means it carries a pass-through batch + // already stored in `passthrough_batches`. + chunked_vector is_data_slot; + chunked_vector passthrough_batches; + is_data_slot.reserve(batches.size()); + for (auto&& batch : batches) { + const bool is_data = batch.header().type + == model::record_batch_type::raft_data + && !batch.header().attrs.is_control(); + if (is_data) { + is_data_slot.push_back(true); + data_headers.push_back(batch.header()); + data_batches.push_back(std::move(batch)); + } else { + is_data_slot.push_back(false); + passthrough_batches.push_back(std::move(batch)); + } } + batches.clear(); - auto min_epoch = cluster_epoch(_partition->get_topic_revision_id()); - - // Use the std::max trick from the normal produce path to reduce - // the likelihood of fencing errors when shards are on different - // epochs. - auto accepted_min = _ctp_stm_api->get_max_seen_epoch(_partition->term()); - if (!accepted_min) { - accepted_min = _ctp_stm_api->get_max_epoch(); - } - if (accepted_min) { - min_epoch = std::max(min_epoch, *accepted_min); - } + chunked_vector final_batches; + final_batches.reserve(is_data_slot.size()); - vassert( - min_epoch() > 0L, - "Unexpected invalid min epoch {} for {}", - min_epoch, - ntp()); + if (data_batches.empty()) { + // Nothing to upload - all batches are pass-through. + for (auto&& b : passthrough_batches) { + final_batches.push_back(std::move(b)); + } + } else { + auto min_epoch = cluster_epoch(_partition->get_topic_revision_id()); + + // Use the std::max trick from the normal produce path to reduce + // the likelihood of fencing errors when shards are on different + // epochs. + auto accepted_min = _ctp_stm_api->get_max_seen_epoch( + _partition->term()); + if (!accepted_min) { + accepted_min = _ctp_stm_api->get_max_epoch(); + } + if (accepted_min) { + min_epoch = std::max(min_epoch, *accepted_min); + } - auto staged = co_await _data_plane->stage_write(std::move(batches)); - if (!staged.has_value()) { - co_return staged.error(); - } + vassert( + min_epoch() > 0L, + "Unexpected invalid min epoch {} for {}", + min_epoch, + ntp()); - auto deadline = model::timeout_clock::now() + timeout; - auto res = co_await _data_plane->execute_write( - ntp(), min_epoch, std::move(staged.value()), deadline); + auto staged = co_await _data_plane->stage_write( + std::move(data_batches)); + if (!staged.has_value()) { + co_return staged.error(); + } - if (!res.has_value()) { - co_return res.error(); - } + auto deadline = model::timeout_clock::now() + timeout; + auto res = co_await _data_plane->execute_write( + ntp(), min_epoch, std::move(staged.value()), deadline); - auto batch_epoch = res.value().extents.front().id.epoch; + if (!res.has_value()) { + co_return res.error(); + } - auto fence_fut = co_await ss::coroutine::as_future( - _ctp_stm_api->fence_epoch(batch_epoch)); - if (fence_fut.failed()) { - auto not_leader = !_partition->is_leader(); - auto e = fence_fut.get_exception(); - if (not_leader) { + auto batch_epoch = res.value().extents.front().id.epoch; + + auto fence_fut = co_await ss::coroutine::as_future( + _ctp_stm_api->fence_epoch(batch_epoch)); + if (fence_fut.failed()) { + auto not_leader = !_partition->is_leader(); + auto e = fence_fut.get_exception(); + if (not_leader) { + vlog( + cd_log.debug, + "Failed to fence epoch {} for ntp {}, not a leader", + batch_epoch, + ntp()); + } else { + vlogl( + cd_log, + ssx::is_shutdown_exception(e) ? ss::log_level::debug + : ss::log_level::warn, + "Failed to fence epoch {} for ntp {}, error: {}", + batch_epoch, + ntp(), + e); + } + std::rethrow_exception(e); + } + auto fence = std::move(fence_fut.get()); + if (!fence.has_value()) { vlog( - cd_log.debug, - "Failed to fence epoch {} for ntp {}, not a leader", - batch_epoch, - ntp()); - } else { - vlogl( - cd_log, - ssx::is_shutdown_exception(e) ? ss::log_level::debug - : ss::log_level::warn, - "Failed to fence epoch {} for ntp {}, error: {}", + cd_log.warn, + "Failed to fence epoch {} for ntp {}, ctp latest seen epoch " + "is [{}, {}]", batch_epoch, ntp(), - e); + fence.error().window_min, + fence.error().window_max); + co_return raft::errc::not_leader; } - std::rethrow_exception(e); - } - auto fence = std::move(fence_fut.get()); - if (!fence.has_value()) { - vlog( - cd_log.warn, - "Failed to fence epoch {} for ntp {}, ctp latest seen epoch " - "is [{}, {}]", - batch_epoch, - ntp(), - fence.error().window_min, - fence.error().window_max); - co_return raft::errc::not_leader; - } - auto placeholders = co_await convert_to_placeholders( - res.value().extents, headers); - - chunked_vector placeholder_batches; - for (auto&& batch : placeholders.batches) { - placeholder_batches.push_back(std::move(batch)); + auto placeholders = co_await convert_to_placeholders( + res.value().extents, data_headers); + + // Interleave generated placeholders with pass-through batches to + // restore the original input order. + auto ph_it = placeholders.batches.begin(); + auto pt_it = passthrough_batches.begin(); + for (bool is_data : is_data_slot) { + if (is_data) { + vassert( + ph_it != placeholders.batches.end(), + "placeholder count mismatch for {}", + ntp()); + final_batches.push_back(std::move(*ph_it++)); + } else { + vassert( + pt_it != passthrough_batches.end(), + "passthrough count mismatch for {}", + ntp()); + final_batches.push_back(std::move(*pt_it++)); + } + } } auto stages = stm->replicate( - std::move(placeholder_batches), + std::move(final_batches), std::move(expected_base_offsets), prev_log_offset, timeout, diff --git a/tests/rptest/tests/shadow_linking_rnot_test.py b/tests/rptest/tests/shadow_linking_rnot_test.py index c2277a4f7c73e..0055e46fc5c5a 100644 --- a/tests/rptest/tests/shadow_linking_rnot_test.py +++ b/tests/rptest/tests/shadow_linking_rnot_test.py @@ -15,6 +15,7 @@ from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec from rptest.services.cluster import TestContext, cluster from rptest.services.multi_cluster_services import ( Cluster, @@ -22,11 +23,13 @@ SecondaryClusterArgs, ) from rptest.services.redpanda import ( + CLOUD_TOPICS_CONFIG_STR, PandaproxyConfig, SISettings, SchemaRegistryConfig, ) from rptest.tests.cluster_linking_test_base import ( + CLOUD_TOPICS_SHADOW_LINK_LOG_ALLOW_LIST, ClusterLinkingProgressVerifier, ShadowLinkTestBase, ) @@ -61,6 +64,8 @@ def __init__( validate_number_of_messages_on_target: bool = True, use_transactions: bool = False, use_compaction: bool = False, + flip_storage_modes: list[str] | None = None, + flip_interval_seconds: float = 3.0, ): self.topic = topic self.topic_properties = topic_properties @@ -75,6 +80,8 @@ def __init__( ) self.use_transactions = use_transactions self.use_compaction = use_compaction + self.flip_storage_modes = flip_storage_modes + self.flip_interval_seconds = flip_interval_seconds def __str__(self) -> str: return ( @@ -138,8 +145,32 @@ def __init__( validate_number_of_messages_on_target=self.spec.validate_number_of_messages_on_target, ) + def _flip_storage_mode_loop(self, stop_event: threading.Event) -> None: + modes = self.spec.flip_storage_modes or [] + idx = 0 + while not stop_event.wait(self.spec.flip_interval_seconds): + mode = modes[idx % len(modes)] + idx += 1 + try: + self.source_rpk.alter_topic_config( + self.spec.topic, + "redpanda.storage.mode", + mode, + ) + self.logger.debug( + f"Flipped storage mode of {self.spec.topic} to {mode}" + ) + except Exception as e: + # Transient errors (leadership changes, partition movement, + # etc.) are expected; keep retrying on the next tick. + self.logger.warning( + f"Failed to flip storage mode of {self.spec.topic} to {mode}: {e}" + ) + def start_and_verify(self, progress_timeout: int = 60): self.logger.info(f"Starting workload: {self.spec}") + stop_flipper = threading.Event() + flipper_thread: threading.Thread | None = None try: self.source_rpk.create_topic( self.spec.topic, @@ -154,6 +185,14 @@ def start_and_verify(self, progress_timeout: int = 60): err_msg=f"Topic {self.spec.topic} did not appear on target cluster within {progress_timeout} seconds", ) self.verifier.start() + if self.spec.flip_storage_modes: + flipper_thread = threading.Thread( + target=self._flip_storage_mode_loop, + args=(stop_flipper,), + daemon=True, + name=f"flipper-{self.spec.topic}", + ) + flipper_thread.start() success, error = self.verifier.wait_and_verify( progress_timeout=progress_timeout ) @@ -161,6 +200,10 @@ def start_and_verify(self, progress_timeout: int = 60): self.logger.error(f"Workload for topic: {self.spec.topic} failed: {e}") success = False error = str(e) + finally: + stop_flipper.set() + if flipper_thread is not None: + flipper_thread.join(timeout=30) return ClusterLinkingWorkloadResult(self.spec.topic, success, error) @@ -247,7 +290,7 @@ def __init__(self, test_ctx: TestContext): super().__init__( test_ctx, num_brokers=5, - num_prealloc_nodes=3, + num_prealloc_nodes=5, secondary_cluster_args=SecondaryClusterArgs( # TODO: enable when DR of schemas is supported # schema_registry_config=SchemaRegistryConfig(), @@ -260,6 +303,7 @@ def __init__(self, test_ctx: TestContext): ), extra_rp_conf={ "group_new_member_join_timeout": 3000, + CLOUD_TOPICS_CONFIG_STR: True, }, ), extra_rp_conf={ @@ -276,6 +320,7 @@ def __init__(self, test_ctx: TestContext): "retention_local_trim_interval": 5000, "partition_autobalancing_tick_interval_ms": 2000, "group_new_member_join_timeout": 3000, + CLOUD_TOPICS_CONFIG_STR: True, }, schema_registry_config=SchemaRegistryConfig(), pandaproxy_config=PandaproxyConfig(), @@ -312,49 +357,104 @@ def setup_scale(self): self.total_node_ops = 5 self.partition_count = 12 - @cluster(num_nodes=11) - @matrix(failures=[False, True]) - def test_node_operations(self, failures: bool): - self.setup_scale() - - req = self.create_default_link_request("rnot-link") - for prop in ALL_TOPIC_PROPERTIES: - req.shadow_link.configurations.topic_metadata_sync_options.synced_shadow_topic_properties.append( - prop - ) - self.create_link_with_request(req) - - lock = threading.Lock() - if failures: - self.fi = FailureInjectorBackgroundThread( - self.redpanda, - self.logger, - max_suspend_duration_seconds=4, - lock=lock, - min_inter_failure_time=20, - max_inter_failure_time=40, - ) - self.fi.start() + def _basic_workload_specs(self) -> list[ClusterLinkingWorkloadSpec]: + return [ + ClusterLinkingWorkloadSpec( + topic="si-topic", + topic_properties={ + "cleanup.policy": "delete", + "retention.bytes": "1024000", + "segment.bytes": f"{1024 * 1024}", + }, + partition_count=self.partition_count, + msg_count=self.msg_count, + msg_size=self.msg_size, + ), + ClusterLinkingWorkloadSpec( + topic="compacted-topic", + topic_properties={ + "cleanup.policy": "compact", + "segment.bytes": f"{1024 * 1024}", + }, + partition_count=self.partition_count, + msg_count=self.msg_count, + msg_size=self.msg_size, + producer_properties={ + "key_set_cardinality": 600, + "tombstone_probability": 0.4, + }, + consumer_properties={ + "compacted": True, + }, + use_compaction=True, + ), + ClusterLinkingWorkloadSpec( + topic="topic-txns", + # transactions, use a smaller topic to avoid long test times + msg_count=math.floor(self.msg_count / 10), + msg_size=self.msg_size, + partition_count=1, + use_transactions=True, + producer_properties={ + "transaction_abort_rate": 0.1, + "msgs_per_transaction": 50, + "debug_logs": True, + }, + consumer_properties={}, + validate_number_of_messages_on_target=True, + ), + ClusterLinkingWorkloadSpec( + topic="cloud-topic", + topic_properties={ + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, + "segment.bytes": f"{1024 * 1024}", + }, + partition_count=self.partition_count, + msg_count=self.msg_count, + msg_size=self.msg_size, + ), + ClusterLinkingWorkloadSpec( + topic="tiered-cloud-topic", + topic_properties={ + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_TIERED_CLOUD, + "segment.bytes": f"{1024 * 1024}", + }, + partition_count=self.partition_count, + msg_count=self.msg_count, + msg_size=self.msg_size, + ), + ] + + def _flipping_workload_specs(self) -> list[ClusterLinkingWorkloadSpec]: + return [ + ClusterLinkingWorkloadSpec( + topic="flipping-storage-topic", + topic_properties={ + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, + "segment.bytes": f"{1024 * 1024}", + }, + partition_count=self.partition_count, + msg_count=self.msg_count, + msg_size=self.msg_size, + flip_storage_modes=[ + TopicSpec.STORAGE_MODE_CLOUD, + TopicSpec.STORAGE_MODE_TIERED_CLOUD, + ], + flip_interval_seconds=3.0, + ), + ] - manager = ClusterLinkingWorkloadManager( - self.test_context, - self.source_cluster, - self.target_cluster, - [ - ClusterLinkingWorkloadSpec( - topic="si-topic", - topic_properties={ - "cleanup.policy": "delete", - "retention.bytes": "1024000", - "segment.bytes": f"{1024 * 1024}", - }, - partition_count=self.partition_count, - msg_count=self.msg_count, - msg_size=self.msg_size, - ), + def _cloud_combo_workload_specs(self) -> list[ClusterLinkingWorkloadSpec]: + specs: list[ClusterLinkingWorkloadSpec] = [] + for mode, mode_label in ( + (TopicSpec.STORAGE_MODE_CLOUD, "cloud"), + (TopicSpec.STORAGE_MODE_TIERED_CLOUD, "tiered-cloud"), + ): + specs.append( ClusterLinkingWorkloadSpec( - topic="compacted-topic", + topic=f"{mode_label}-compacted-topic", topic_properties={ + TopicSpec.PROPERTY_STORAGE_MODE: mode, "cleanup.policy": "compact", "segment.bytes": f"{1024 * 1024}", }, @@ -369,10 +469,14 @@ def test_node_operations(self, failures: bool): "compacted": True, }, use_compaction=True, - ), + ) + ) + specs.append( ClusterLinkingWorkloadSpec( - topic="topic-txns", - # transactions, use a smaller topic to avoid long test times + topic=f"{mode_label}-topic-txns", + topic_properties={ + TopicSpec.PROPERTY_STORAGE_MODE: mode, + }, msg_count=math.floor(self.msg_count / 10), msg_size=self.msg_size, partition_count=1, @@ -384,8 +488,62 @@ def test_node_operations(self, failures: bool): }, consumer_properties={}, validate_number_of_messages_on_target=True, - ), - ], + ) + ) + return specs + + @cluster( + num_nodes=13, + log_allow_list=CLOUD_TOPICS_SHADOW_LINK_LOG_ALLOW_LIST, + ) + @matrix( + failures=[False, True], + workload_set=["basic", "cloud_combos", "flipping"], + ) + def test_node_operations(self, failures: bool, workload_set: str): + self.setup_scale() + + # tiered_cloud_topics is an explicit-only feature and must be + # enabled on both clusters before any tiered_cloud topic can be + # created. + self.source_cluster.service.set_feature_active( + "tiered_cloud_topics", True, timeout_sec=30 + ) + self.target_cluster.service.set_feature_active( + "tiered_cloud_topics", True, timeout_sec=30 + ) + + req = self.create_default_link_request("rnot-link") + for prop in ALL_TOPIC_PROPERTIES: + req.shadow_link.configurations.topic_metadata_sync_options.synced_shadow_topic_properties.append( + prop + ) + self.create_link_with_request(req) + + lock = threading.Lock() + if failures: + self.fi = FailureInjectorBackgroundThread( + self.redpanda, + self.logger, + max_suspend_duration_seconds=4, + lock=lock, + min_inter_failure_time=20, + max_inter_failure_time=40, + ) + self.fi.start() + + if workload_set == "basic": + workload_specs = self._basic_workload_specs() + elif workload_set == "cloud_combos": + workload_specs = self._cloud_combo_workload_specs() + else: + workload_specs = self._flipping_workload_specs() + + manager = ClusterLinkingWorkloadManager( + self.test_context, + self.source_cluster, + self.target_cluster, + workload_specs, self.preallocated_nodes, self.logger, )