cl/test: rnot: add cloud topic workloads#30435
Conversation
There was a problem hiding this comment.
Pull request overview
Extends the shadow-linking random node operations test to also exercise cloud-topics and tiered-cloud-topics storage modes during node operations, including enabling the necessary cluster config/feature flags and allow-listing expected cloud-topics shutdown/retry log messages.
Changes:
- Enable
cloud_topics_enabledon both clusters and activate the explicit-onlytiered_cloud_topicsfeature before topic creation. - Increase preallocated client nodes and ducktape cluster node count to support two additional concurrent workloads.
- Add two new workloads (
cloud-topic,tiered-cloud-topic) usingredpanda.storage.modetopic config and allow-list expected cloud-topics shadow-link logs.
| extra_rp_conf={ | ||
| "group_new_member_join_timeout": 3000, | ||
| CLOUD_TOPICS_CONFIG_STR: True, | ||
| }, |
| @@ -276,6 +280,7 @@ def __init__(self, test_ctx: TestContext): | |||
| "retention_local_trim_interval": 5000, | |||
| "partition_autobalancing_tick_interval_ms": 2000, | |||
| "group_new_member_join_timeout": 3000, | |||
| CLOUD_TOPICS_CONFIG_STR: True, | |||
| }, | |||
| # enabled on both clusters before any tiered_cloud topic can be | ||
| # created. | ||
| self.source_cluster.service.set_feature_active( | ||
| "tiered_cloud_topics", True, timeout_sec=30 | ||
| ) |
CI test resultstest results on build#84286
test results on build#84317test results on build#84334
|
745e0db to
c0ffaf5
Compare
Retry command for Build#84317please wait until all jobs are finished before running the slash command |
6c0c15f to
b0d9665
Compare
pgellert
left a comment
There was a problem hiding this comment.
Looks good to me, but I'll let someone from the cloud topics team approve
| for (auto&& b : passthrough_batches) { | ||
| final_batches.push_back(std::move(b)); | ||
| } |
There was a problem hiding this comment.
I think this would be simpler here:
| for (auto&& b : passthrough_batches) { | |
| final_batches.push_back(std::move(b)); | |
| } | |
| final_batches = std::move(passthrough_batches); |
There was a problem hiding this comment.
this was simplified quite a bit
| // (raft_configuration, tx_fence, control batches like transaction | ||
| // commit/abort markers, etc.) carry their payload in the record |
There was a problem hiding this comment.
I'm kind of confused by this -- I was under the impression that the only batches we expect here are data batches (which may include tx control batches, but not raft configuration). Is that the case? What non-raft_data batches do we expect here? If we're just being conservative here, could we update the comment to indicate that?
There was a problem hiding this comment.
I added the assertion and the comment below.
| // Per-input-position slots: true means the slot will be filled with a | ||
| // generated placeholder, false means it carries a pass-through batch | ||
| // already stored in `passthrough_batches`. | ||
| chunked_vector<bool> is_data_slot; |
There was a problem hiding this comment.
Maybe it makes sense to make this a set of non-data indexes. At least, I imagine we're more likely to have zero non-data batches in most cases.
| // Interleave generated placeholders with pass-through batches to | ||
| // restore the original input order. | ||
| auto ph_it = placeholders.batches.begin(); | ||
| auto pt_it = passthrough_batches.begin(); | ||
| for (bool is_data : is_data_slot) { | ||
| if (is_data) { | ||
| vassert( | ||
| ph_it != placeholders.batches.end(), | ||
| "placeholder count mismatch for {}", | ||
| ntp()); | ||
| final_batches.push_back(std::move(*ph_it++)); | ||
| } else { | ||
| vassert( | ||
| pt_it != passthrough_batches.end(), | ||
| "passthrough count mismatch for {}", | ||
| ntp()); | ||
| final_batches.push_back(std::move(*pt_it++)); | ||
| } | ||
| } |
There was a problem hiding this comment.
I'm wondering if we need the bitmap at all. Do these batches have offsets assigned already? If so, could we merge them by offset?
| headers.reserve(batches.size()); | ||
| for (const auto& batch : batches) { | ||
| headers.push_back(batch.header()); | ||
| // Only user data batches (raft_data with !is_control()) are uploaded |
There was a problem hiding this comment.
Shower thought, write_at_offset might be a natural place to write directly as L1. In the context of shadow linking, we know the data is stable and has assigned offsets.
There was a problem hiding this comment.
I thought about this. The problem is that L1 objects are bounded by last stable offsets but the shadowing is bounded only by the high watermark. So the write_at_offset has to replicate batches that belong to transactions which are not yet committed + control batches.
Adds cloud-topic and tiered-cloud-topic workloads to the shadow linking random node ops test so we exercise plain cloud and tiered_cloud storage modes alongside the existing si, compacted, and transactional workloads. Enables the explicit-only tiered_cloud_topics feature on both clusters and CLOUD_TOPICS_CONFIG_STR cluster-wide; allow-lists the expected cloud-topics shutdown warnings. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
For storage.mode=cloud topics, replicate_at_offset previously sent every input batch through stage_write/execute_write and wrapped each one as a ctp_placeholder. The placeholder encoding drops the record key, so for control records (e.g. transaction commit/abort markers) the original key bytes are lost and rm_stm's parse_control_batch throws std::out_of_range on the empty iobuf, halting state machine apply at the marker offset. Split the input list into user data batches (raft_data with !is_control()) and pass-through batches (tx_fence, control batches, etc.). Only data batches are uploaded to L0 and wrapped as ctp_placeholders; the rest are forwarded to the write_at_offset_stm unchanged. The original input ordering is preserved by interleaving the generated placeholders with the pass-through batches. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Adds a new "flipping" workload_set matrix variant. A single workload runs against flipping-storage-topic while a background daemon thread toggles redpanda.storage.mode between cloud and tiered_cloud every 3 seconds on the source. Transient alter-config failures (leader changes, partition movement) are logged and retried on the next tick; the target config is not separately verified. Wired through ClusterLinkingWorkloadSpec via optional flip_storage_modes / flip_interval_seconds fields so other workloads can opt in if needed. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
b0d9665 to
2a0a5ea
Compare
| const auto& hdr = batch.header(); | ||
| const bool is_data = hdr.type == model::record_batch_type::raft_data | ||
| && !hdr.attrs.is_control(); | ||
| const bool is_txn_control = hdr.type | ||
| == model::record_batch_type::raft_data | ||
| && hdr.attrs.is_control(); | ||
| vassert( | ||
| is_data || is_txn_control, | ||
| "Unexpected batch type {} (control={}) for {} in " | ||
| "replicate_at_offset; only raft_data and transactional control " | ||
| "batches are supported", | ||
| hdr.type, | ||
| hdr.attrs.is_control(), | ||
| ntp()); |
There was a problem hiding this comment.
nit: simpler to
vassert(hdr.type == model::record_batch_type::raft_data, "...");
auto is_data = !hdr.attrs.is_control();
| auto ph_it = placeholder_batches.begin(); | ||
| auto pt_it = passthrough_batches.begin(); |
There was a problem hiding this comment.
nit: it's a little scary how similar the names are, since it makes it easy to mix them up. Maybe call passthrough_batches control_batches instead? This also avoids introducing new batch concepts (even if the scope of "passthrough" was already quite limited)
| def _flip_storage_mode_loop(self, stop_event: threading.Event) -> None: | ||
| modes = self.spec.flip_storage_modes or [] | ||
| idx = 0 | ||
| while not stop_event.wait(self.spec.flip_interval_seconds): | ||
| mode = modes[idx % len(modes)] | ||
| idx += 1 | ||
| try: | ||
| self.source_rpk.alter_topic_config( | ||
| self.spec.topic, | ||
| "redpanda.storage.mode", | ||
| mode, | ||
| ) | ||
| self.logger.debug( | ||
| f"Flipped storage mode of {self.spec.topic} to {mode}" | ||
| ) | ||
| except Exception as e: | ||
| # Transient errors (leadership changes, partition movement, | ||
| # etc.) are expected; keep retrying on the next tick. | ||
| self.logger.warning( | ||
| f"Failed to flip storage mode of {self.spec.topic} to {mode}: {e}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Wondering if this makes sense for regular RNOT too. If so, maybe it belongs in a shared class?
| ) | ||
| @matrix( | ||
| failures=[False, True], | ||
| workload_set=["basic", "cloud_combos", "flipping"], |
There was a problem hiding this comment.
Curious what the rationale is for adding this as another matrix, vs adding the workload to cloud_combos.
Adds cloud-topic and tiered-cloud-topic workloads to the shadow linking random node ops test so we exercise plain cloud and tiered_cloud storage modes alongside the existing si, compacted, and transactional workloads. Enables the explicit-only tiered_cloud_topics feature on both clusters and CLOUD_TOPICS_CONFIG_STR cluster-wide; allow-lists the expected cloud-topics shutdown warnings.
Fixes the bug in the write-at-offset code path in the cloud topics frontend. The frontend was converting batches of all types as placeholders. This caused the stall in the target cluster. The second commit in the PR fixes this.
Finally, the test adds new workload that constantly flips between
cloudandtiered_cloudmodes. The goal is to have a mix ofraft_dataandct_placeholderbatches in the partition which is being shadowed.Backports Required
Release Notes