diff --git a/src/v/cluster/tests/tx_compaction_tests.cc b/src/v/cluster/tests/tx_compaction_tests.cc index 067bca2d2ab82..e0fa6358e7bb6 100644 --- a/src/v/cluster/tests/tx_compaction_tests.cc +++ b/src/v/cluster/tests/tx_compaction_tests.cc @@ -53,6 +53,7 @@ FIXTURE_TEST(test_tx_compaction_combinations, rm_stm_test_fixture) { scoped_config cfg; cfg.get("log_disable_housekeeping_for_tests").set_value(true); cfg.get("log_segment_ms_min").set_value(1ms); + cfg.get("log_compaction_tx_batch_removal_enabled").set_value(true); for (auto num_tx : {10, 20, 30}) { for (auto num_rolls : {0, 1, 2, 3, 5}) { for (auto type : diff --git a/src/v/compaction/utils.cc b/src/v/compaction/utils.cc index 06360a1850c24..2ea0878fe0535 100644 --- a/src/v/compaction/utils.cc +++ b/src/v/compaction/utils.cc @@ -20,21 +20,27 @@ namespace compaction { +bool is_tx_batch_compaction_enabled( + ss::sharded& feature_table) { + if (!config::shard_local_cfg().log_compaction_tx_batch_removal_enabled()) { + // Safety hatch for disabling control batch removal + return false; + } + return feature_table.local().is_active( + features::feature::coordinated_compaction); +} + bool is_removable_control_batch( const model::ntp& ntp, const model::record_batch_type batch_type, - ss::sharded& feature_table) { + bool remove_user_tx_fence_enabled) { // Control batches in consumer offsets are special compared to // the ones in data partitions can be safely compacted away. // Fence batches can also be immediately removed when seen in the // `__consumer_offsets` topic or safely removed from a user topic. However, // removal in a user topic is gated by - // `log_compaction_disable_tx_batch_removal()`. + // `log_compaction_tx_batch_removal_enabled()`. auto is_co_topic = model::is_consumer_offsets_topic(ntp); - auto remove_user_tx_fence_enabled - = !config::shard_local_cfg().log_compaction_disable_tx_batch_removal() - && feature_table.local().is_active( - features::feature::coordinated_compaction); auto tx_fence_removable = batch_type == model::record_batch_type::tx_fence && (is_co_topic || remove_user_tx_fence_enabled); return tx_fence_removable diff --git a/src/v/compaction/utils.h b/src/v/compaction/utils.h index 3c5636273683e..5ba4cc1742dbe 100644 --- a/src/v/compaction/utils.h +++ b/src/v/compaction/utils.h @@ -20,6 +20,9 @@ namespace compaction { +bool is_tx_batch_compaction_enabled( + ss::sharded& feature_table); + // Returns `true` if the provided `record_batch_type` is a control batch that is // immediately removable during compaction. This means we do not need to // consider either deduplication (compactible) or `delete.retention.ms` (a form @@ -28,7 +31,7 @@ namespace compaction { bool is_removable_control_batch( const model::ntp& ntp, const model::record_batch_type batch_type, - ss::sharded& feature_table); + bool remove_user_tx_fence_enabled); // Returns `true` or `false` indicating whether the batch type of the header // passed contains records that may be removed by compaction- whether that is by diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index a3a306dc842e7..fb71f9efff54e 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1130,11 +1130,13 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, std::nullopt) , log_compaction_disable_tx_batch_removal( + *this, "log_compaction_disable_tx_batch_removal") + , log_compaction_tx_batch_removal_enabled( *this, - "log_compaction_disable_tx_batch_removal", - "Disable removal of transactional control batches. This should only be " - "toggled to `true` in extreme cases of proven instability due to issues " - "with transactional control batch removal.", + "log_compaction_tx_batch_removal_enabled", + "Enables removal of transactional control batches during compaction. " + "These batches are removed according to a topic's configured " + "delete.retention.ms.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, false) , retention_bytes( diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 7508a13e51140..80cab30c66a18 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -273,7 +273,8 @@ struct configuration final : public config_store { property> log_compaction_merge_max_segments_per_range; property> log_compaction_merge_max_ranges; - property log_compaction_disable_tx_batch_removal; + deprecated_property log_compaction_disable_tx_batch_removal; + property log_compaction_tx_batch_removal_enabled; // same as retention.size in kafka - TODO: size not implemented property> retention_bytes; property group_topic_partitions; diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index 66abe4bc95e22..a88367ba77376 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -478,7 +478,7 @@ bool tx_reducer::can_discard_consumer_offsets_batch( // batches, so no need to retain originally written group_prepare_tx // batches while the transaction is in progress. return compaction::is_removable_control_batch( - _ntp, b.header().type, _feature_table); + _ntp, b.header().type, _tx_batch_compaction_enabled); } ss::future tx_reducer::operator()(model::record_batch&& b) { diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 5e6f75e2d881a..49204f9b5de04 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -256,13 +256,13 @@ class tx_reducer : public compaction_reducer { ss::lw_shared_ptr stm_mgr, chunked_vector&& txs, compacted_index_writer* w, - ss::sharded& feature_table) noexcept + bool tx_batch_compaction_enabled) noexcept : _ntp(std::move(ntp)) , _delegate(index_rebuilder_reducer(w)) , _aborted_txs(model::tx_range_cmp(), std::move(txs)) , _stm_mgr(stm_mgr) , _transactional_stm_type(stm_mgr->transactional_stm_type()) - , _feature_table(feature_table) { + , _tx_batch_compaction_enabled(tx_batch_compaction_enabled) { _stats.num_aborted_txes = _aborted_txs.size(); } ss::future operator()(model::record_batch&&); @@ -313,7 +313,7 @@ class tx_reducer : public compaction_reducer { // Set if a transactional stm is attached to this partition. std::optional _transactional_stm_type; - ss::sharded& _feature_table; + bool _tx_batch_compaction_enabled; }; // Builds up a key_offset_map for a segment, starting from the offset diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index da5f97b7ecec6..ae09618fc2f61 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -878,7 +878,10 @@ ss::future disk_log_impl::sliding_window_compact( const bool segment_needs_rewrite = internal::may_have_removable_tombstones(seg, cfg) - || internal::has_removable_transaction_batches(seg, cfg) + || internal::has_removable_transaction_batches( + seg, + cfg, + compaction::is_tx_batch_compaction_enabled(_feature_table)) || co_await segment_needs_rewrite_with_offset_map(cfg, seg, map); if (!segment_needs_rewrite) { vlog( @@ -1520,7 +1523,10 @@ ss::future disk_log_impl::chunked_sliding_window_compact( const bool segment_needs_rewrite = internal::may_have_removable_tombstones(s, compact_cfg) - || internal::has_removable_transaction_batches(s, compact_cfg) + || internal::has_removable_transaction_batches( + s, + compact_cfg, + compaction::is_tx_batch_compaction_enabled(_feature_table)) || co_await segment_needs_rewrite_with_offset_map( compact_cfg, s, map); if (!segment_needs_rewrite) { diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index f4ed9590bfd72..205f5b3187111 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -156,7 +156,7 @@ ss::future build_offset_map( read_lock, resources, probe, - feature_table); + compaction::is_tx_batch_compaction_enabled(feature_table)); } catch (const segment_closed_exception& e) { // Stop early if the segment e.g. has been prefix truncated. // We'll make do with the offset map we have so far. @@ -209,15 +209,14 @@ ss::future deduplicate_segment( auto segment_last_offset = seg->offsets().get_committed_offset(); auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); - auto unset_transactional_bit_enabled - = feature_table.local().is_active( - features::feature::coordinated_compaction) - && !config::shard_local_cfg().log_compaction_disable_tx_batch_removal(); + auto tx_batch_compaction_enabled + = compaction::is_tx_batch_compaction_enabled(feature_table); const bool past_tombstone_delete_horizon = internal::is_past_tombstone_delete_horizon(seg, cfg); bool may_have_tombstone_records = false; const bool past_tx_delete_horizon - = internal::is_past_transaction_batch_delete_horizon(seg, cfg); + = internal::is_past_transaction_batch_delete_horizon( + seg, cfg, tx_batch_compaction_enabled); bool may_have_transaction_control_batches = false; bool may_have_transaction_data_or_fence_batches = false; @@ -237,7 +236,8 @@ ss::future deduplicate_segment( &probe, past_tx_delete_horizon, &may_have_transaction_control_batches, - &may_have_transaction_data_or_fence_batches]( + &may_have_transaction_data_or_fence_batches, + tx_batch_compaction_enabled]( const model::record_batch& b, const model::record& r, bool is_last_record_in_batch) { @@ -254,7 +254,8 @@ ss::future deduplicate_segment( may_have_tombstone_records, past_tx_delete_horizon, may_have_transaction_control_batches, - may_have_transaction_data_or_fence_batches); + may_have_transaction_data_or_fence_batches, + tx_batch_compaction_enabled); }; auto copy_reducer = internal::copy_data_segment_reducer( @@ -266,7 +267,7 @@ ss::future deduplicate_segment( seg->index().base_offset(), segment_last_offset, compaction_placeholder_enabled, - unset_transactional_bit_enabled, + tx_batch_compaction_enabled, stm_manager, &cmp_idx_writer, inject_reader_failure, diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 8e169b656e89e..45f705b833f79 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -353,7 +353,8 @@ ss::future do_copy_segment_data( ss::rwlock::holder rw_lock_holder, storage_resources& resources, offset_delta_time apply_offset, - ss::sharded& feature_table) { + ss::sharded& feature_table, + bool tx_batch_compaction_enabled) { // preserve base_offset, broker_timestamp, and clean_compact_timestamp from // the segment's index auto old_base_offset = seg->index().base_offset(); @@ -391,15 +392,12 @@ ss::future do_copy_segment_data( auto segment_last_offset = seg->offsets().get_committed_offset(); auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); - auto unset_transactional_bit_enabled - = feature_table.local().is_active( - features::feature::coordinated_compaction) - && !config::shard_local_cfg().log_compaction_disable_tx_batch_removal(); const bool past_tombstone_delete_horizon = internal::is_past_tombstone_delete_horizon(seg, cfg); bool may_have_tombstone_records = false; const bool past_tx_delete_horizon - = internal::is_past_transaction_batch_delete_horizon(seg, cfg); + = internal::is_past_transaction_batch_delete_horizon( + seg, cfg, tx_batch_compaction_enabled); bool may_have_transaction_control_batches = false; bool may_have_transaction_data_or_fence_batches = false; @@ -422,7 +420,8 @@ ss::future do_copy_segment_data( &pb, past_tx_delete_horizon, &may_have_transaction_control_batches, - &may_have_transaction_data_or_fence_batches]( + &may_have_transaction_data_or_fence_batches, + tx_batch_compaction_enabled]( const model::record_batch& b, const model::record& r, bool is_last_record_in_batch) { @@ -439,7 +438,8 @@ ss::future do_copy_segment_data( may_have_tombstone_records, past_tx_delete_horizon, may_have_transaction_control_batches, - may_have_transaction_data_or_fence_batches); + may_have_transaction_data_or_fence_batches, + tx_batch_compaction_enabled); }; auto copy_reducer = copy_data_segment_reducer( @@ -451,7 +451,7 @@ ss::future do_copy_segment_data( old_base_offset, segment_last_offset, compaction_placeholder_enabled, - unset_transactional_bit_enabled, + tx_batch_compaction_enabled, stm_manager, /*cidx=*/nullptr, /*inject_failure=*/false, @@ -579,7 +579,8 @@ ss::future do_self_compact_segment( storage_resources& resources, offset_delta_time apply_offset, ss::rwlock::holder read_holder, - ss::sharded& feature_table) { + ss::sharded& feature_table, + bool tx_batch_compaction_enabled) { auto size_before = s->size_bytes(); if (cfg.asrc) { @@ -613,7 +614,8 @@ ss::future do_self_compact_segment( std::move(read_holder), resources, apply_offset, - feature_table); + feature_table, + tx_batch_compaction_enabled); vlog( gclog.trace, "finished copying segment data for {}", s->reader().path()); @@ -657,11 +659,15 @@ ss::future<> build_compaction_index( const segment_full_path& p, compaction::compaction_config cfg, storage_resources& resources, - ss::sharded& feature_table) { + bool tx_batch_compaction_enabled) { auto w = storage::make_file_backed_compacted_index( p, false, resources, cfg.sanitizer_config); auto reducer = tx_reducer( - p.get_ntp(), stm_manager, std::move(aborted_txs), w.get(), feature_table); + p.get_ntp(), + stm_manager, + std::move(aborted_txs), + w.get(), + tx_batch_compaction_enabled); auto index_builder = co_await ss::coroutine::as_future( std::move(rdr) .consume(std::move(reducer), model::no_timeout) @@ -700,7 +706,7 @@ ss::future<> rebuild_compaction_index( compaction::compaction_config cfg, storage::probe& pb, storage_resources& resources, - ss::sharded& feature_table) { + bool tx_batch_compaction_enabled) { segment_full_path idx_path = s->path().to_compacted_index(); vlog(gclog.info, "Rebuilding index file... ({})", idx_path); pb.corrupted_compaction_index(); @@ -718,7 +724,7 @@ ss::future<> rebuild_compaction_index( idx_path, cfg, resources, - feature_table); + tx_batch_compaction_enabled); vlog( gclog.info, "rebuilt index: {}, attempting compaction again", idx_path); } @@ -730,7 +736,7 @@ ss::future maybe_rebuild_compaction_index( ss::rwlock::holder& read_holder, storage_resources& resources, storage::probe& pb, - ss::sharded& feature_table) { + bool tx_batch_compaction_enabled) { segment_full_path idx_path = s->path().to_compacted_index(); compacted_index::recovery_state state; @@ -764,7 +770,7 @@ ss::future maybe_rebuild_compaction_index( } co_await rebuild_compaction_index( - s, stm_manager, cfg, pb, resources, feature_table); + s, stm_manager, cfg, pb, resources, tx_batch_compaction_enabled); // Take the lock again before proceeding. read_holder = co_await s->read_lock(); @@ -793,9 +799,11 @@ ss::future self_compact_segment( "Cannot compact an active segment. cfg:{} - segment:{}", cfg, s)); } + const bool tx_batch_compaction_enabled + = compaction::is_tx_batch_compaction_enabled(feature_table); const bool may_remove_tombstones = may_have_removable_tombstones(s, cfg); const bool will_remove_transaction_batches - = has_removable_transaction_batches(s, cfg); + = has_removable_transaction_batches(s, cfg, tx_batch_compaction_enabled); auto should_force_compaction = force_compaction || may_remove_tombstones || will_remove_transaction_batches; @@ -811,7 +819,13 @@ ss::future self_compact_segment( auto read_holder = co_await s->read_lock(); compacted_index::recovery_state state = co_await maybe_rebuild_compaction_index( - s, stm_manager, cfg, read_holder, resources, pb, feature_table); + s, + stm_manager, + cfg, + read_holder, + resources, + pb, + tx_batch_compaction_enabled); const bool is_valid_index_state = (state == compacted_index::recovery_state::index_recovered) @@ -828,7 +842,8 @@ ss::future self_compact_segment( resources, apply_offset, std::move(read_holder), - feature_table); + feature_table, + tx_batch_compaction_enabled); if (res.did_compact()) { pb.add_compaction_removed_bytes( @@ -1397,9 +1412,10 @@ ss::future mark_segment_as_finished_self_compaction( } bool is_past_transaction_batch_delete_horizon( - ss::lw_shared_ptr seg, const compaction::compaction_config& cfg) { - if (config::shard_local_cfg().log_compaction_disable_tx_batch_removal()) { - // Safety hatch for disabling control batch removal + ss::lw_shared_ptr seg, + const compaction::compaction_config& cfg, + bool tx_batch_compaction_enabled) { + if (!tx_batch_compaction_enabled) { return false; } @@ -1417,7 +1433,9 @@ bool is_past_transaction_batch_delete_horizon( } bool has_removable_transaction_batches( - ss::lw_shared_ptr seg, const compaction::compaction_config& cfg) { + ss::lw_shared_ptr seg, + const compaction::compaction_config& cfg, + bool tx_batch_compaction_enabled) { // If there are transactional data batches, we will unset the transactional // bit during compaction. auto has_data_batches @@ -1426,7 +1444,8 @@ bool has_removable_transaction_batches( // compaction. auto has_removable_control_batches = seg->index().may_have_transaction_control_batches() - && is_past_transaction_batch_delete_horizon(seg, cfg); + && is_past_transaction_batch_delete_horizon( + seg, cfg, tx_batch_compaction_enabled); return has_data_batches || has_removable_control_batches; } diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index 9d0968252219f..02c252cfbf074 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -50,7 +50,7 @@ ss::future maybe_rebuild_compaction_index( ss::rwlock::holder& read_holder, storage_resources& resources, storage::probe& pb, - ss::sharded& feature_table); + bool tx_batch_compaction_enabled); /// \brief, this method will acquire it's own locks on the segment /// @@ -72,7 +72,7 @@ ss::future<> rebuild_compaction_index( compaction::compaction_config cfg, storage::probe& pb, storage_resources& resources, - ss::sharded&); + bool tx_batch_compaction_enabled); /* * Concatentate segments into a minimal new segment. @@ -278,10 +278,14 @@ bool may_have_removable_tombstones( ss::lw_shared_ptr seg, const compaction::compaction_config& cfg); bool is_past_transaction_batch_delete_horizon( - ss::lw_shared_ptr seg, const compaction::compaction_config& cfg); + ss::lw_shared_ptr seg, + const compaction::compaction_config& cfg, + bool tx_batch_compaction_enabled); bool has_removable_transaction_batches( - ss::lw_shared_ptr seg, const compaction::compaction_config& cfg); + ss::lw_shared_ptr seg, + const compaction::compaction_config& cfg, + bool tx_batch_compaction_enabled); // Mark a segment as completed window compaction, and whether it is "clean" (in // which case the `clean_compact_timestamp` is set in the segment's index). @@ -319,7 +323,7 @@ auto with_segment_reader_handle(segment_reader_handle handle, Func func) { // __consumer_offsets topic). // 2. Expired tombstone records past the removal horizon set by // `delete.retention.ms` -// 2. Expired control batches past the removal horizon set by +// 3. Expired control batches past the removal horizon set by // `delete.retention.ms` // In all other cases, return `false`. inline bool can_discard( @@ -328,9 +332,9 @@ inline bool can_discard( const model::ntp& ntp, bool past_tombstone_delete_horizon, bool past_tx_delete_horizon, - ss::sharded& feature_table) { + bool tx_batch_compaction_enabled) { if (compaction::is_removable_control_batch( - ntp, b.header().type, feature_table)) { + ntp, b.header().type, tx_batch_compaction_enabled)) { return true; } @@ -339,7 +343,9 @@ inline bool can_discard( return true; } - // Deal with transactional control batch removal + // Deal with transactional control batch removal. + // `tx_batch_compaction_enabled` is already considered in the variable + // `past_tx_delete_horizon`, so it does not need to be queried again here. if (b.header().attrs.is_control() && past_tx_delete_horizon) { return true; } @@ -361,7 +367,8 @@ ss::future should_keep( bool& may_have_tombstone_records, bool past_tx_delete_horizon, bool& has_tx_control_batches, - bool& has_tx_data_or_fence_batches) { + bool& has_tx_data_or_fence_batches, + bool tx_batch_compaction_enabled) { const auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); const auto is_compactible = compaction::is_compactible(b.header()); @@ -407,7 +414,7 @@ ss::future should_keep( ntp, past_tombstone_delete_horizon, past_tx_delete_horizon, - feature_table)) { + tx_batch_compaction_enabled)) { if (is_tombstone) { pb.add_removed_tombstone(); } diff --git a/src/v/storage/tests/compaction_e2e_multinode_test.cc b/src/v/storage/tests/compaction_e2e_multinode_test.cc index 29aef28f11dfe..d8195a77acf9e 100644 --- a/src/v/storage/tests/compaction_e2e_multinode_test.cc +++ b/src/v/storage/tests/compaction_e2e_multinode_test.cc @@ -148,6 +148,8 @@ FIXTURE_TEST(replicate_after_compaction, compaction_multinode_test) { } FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) { + scoped_config cfg; + cfg.get("log_compaction_tx_batch_removal_enabled").set_value(true); const model::topic topic{"mocha"}; model::node_id id{0}; auto* app = create_node_application(id); @@ -251,6 +253,8 @@ FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) { } FIXTURE_TEST(segment_tx_flags, compaction_multinode_test) { + scoped_config cfg; + cfg.get("log_compaction_tx_batch_removal_enabled").set_value(true); const model::topic topic{"tapioca"}; model::node_id id{0}; create_node_application(id); @@ -367,7 +371,6 @@ FIXTURE_TEST(segment_tx_flags, compaction_multinode_test) { FIXTURE_TEST(segment_tx_flags_compaction_disabled, compaction_multinode_test) { scoped_config cfg; - cfg.get("log_compaction_disable_tx_batch_removal").set_value(true); cfg.get("log_compaction_merge_max_segments_per_range") .set_value(std::make_optional(0)); cfg.get("log_compaction_merge_max_ranges") @@ -432,7 +435,7 @@ FIXTURE_TEST(segment_tx_flags_compaction_disabled, compaction_multinode_test) { ss::abort_source as; auto collect_offset = log->stm_manager()->max_removable_local_log_offset(); { - // Compact while `log_compaction_disable_tx_batch_removal` is `true`, + // Compact while `log_compaction_tx_batch_removal_enabled` is `false`, // preventing unsetting of transactional bits or removal of any control // batches. auto conf = storage::housekeeping_config::make_config( @@ -466,7 +469,7 @@ FIXTURE_TEST(segment_tx_flags_compaction_disabled, compaction_multinode_test) { BOOST_REQUIRE(segments[2]->index().may_have_transaction_control_batches()); { - cfg.get("log_compaction_disable_tx_batch_removal").set_value(false); + cfg.get("log_compaction_tx_batch_removal_enabled").set_value(true); // Compact twice to remove transactional batches and unset bits. auto conf = storage::housekeeping_config::make_config( model::timestamp::min(), diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 0af43402186bc..621cb533031cb 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -2328,6 +2328,8 @@ TEST_F(CompactionFixtureTest, SuperfluousPlaceholderRemoval) { } TEST_F(CompactionFixtureTest, AbortTransactions) { + test_local_cfg.get("log_compaction_tx_batch_removal_enabled") + .set_value(true); using cluster::tx_executor; tx_executor exec; auto term = partition->raft()->term(); @@ -2401,6 +2403,8 @@ TEST_F(CompactionFixtureTest, AbortTransactions) { } TEST_F(CompactionFixtureTest, CommitTransactions) { + test_local_cfg.get("log_compaction_tx_batch_removal_enabled") + .set_value(true); using cluster::tx_executor; tx_executor exec; auto term = partition->raft()->term(); diff --git a/tests/rptest/tests/log_compaction_test.py b/tests/rptest/tests/log_compaction_test.py index 95fcafdc8ee36..fc382dc628873 100644 --- a/tests/rptest/tests/log_compaction_test.py +++ b/tests/rptest/tests/log_compaction_test.py @@ -815,9 +815,10 @@ class TestCase: ), } - def __init__(self, test_context): + def __init__(self, test_context, extra_rp_conf=None): self.test_context = test_context # Run with small segments and a very frequent compaction interval. + self.extra_rp_conf = { "log_compaction_interval_ms": 1000, "log_segment_size": 2 * 1024**2, # 2 MiB @@ -826,6 +827,8 @@ def __init__(self, test_context): "storage_target_replay_bytes": 100, "log_segment_ms": 60, } + if extra_rp_conf: + self.extra_rp_conf.update(extra_rp_conf) self.transaction_timeout_ms = 2000 super().__init__( @@ -891,7 +894,8 @@ def produce_func(): class LogCompactionTxRemovalTest(LogCompactionTxRemovalTestBase): def __init__(self, test_context): - super().__init__(test_context) + extra_rp_conf = {"log_compaction_tx_batch_removal_enabled": True} + super().__init__(test_context, extra_rp_conf=extra_rp_conf) @cluster(num_nodes=4) def test_tx_control_batch_removal(self): @@ -973,5 +977,10 @@ def test_tx_control_batch_removal_with_upgrade(self, test_case_name): ): self.upgrade_to_version(version) + # Once we have upgraded to the newest version, enable tx batch removal. + self.redpanda.set_cluster_config( + {"log_compaction_tx_batch_removal_enabled": True} + ) + # Perform rest of test self.do_test_tx_control_batch_removal(test_case_name, test_case) diff --git a/tests/rptest/transactions/tx_upgrade_test.py b/tests/rptest/transactions/tx_upgrade_test.py index 5fee4fe4d6444..edac54ad35c3d 100644 --- a/tests/rptest/transactions/tx_upgrade_test.py +++ b/tests/rptest/transactions/tx_upgrade_test.py @@ -262,6 +262,11 @@ def upgrade_with_compaction_test(self): ) prev_version_str = ver_string(new_version) + # Once we have upgraded to the newest version, enable tx batch removal. + self.redpanda.set_cluster_config( + {"log_compaction_tx_batch_removal_enabled": True} + ) + # One last round of producing self._produce_with_transactions(topic=self.topic_spec.name)