diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 40461511b08f3..ef18171bdba6b 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -257,6 +257,7 @@ ss::future> rm_stm::begin_tx( std::chrono::milliseconds transaction_timeout_ms, model::partition_id tm) { auto state_lock = co_await _state_lock.hold_read_lock(); + auto lso_lock_holder = co_await _lso_lock.hold_write_lock(); if (!co_await sync(_sync_timeout())) { vlog( _ctx_log.trace, @@ -738,7 +739,7 @@ ss::future rm_stm::do_abort_tx( // Or it may mean that a tx coordinator // - lost its state // - rolled back to previous op - // - the previous op happend to be an abort + // - the previous op happened to be an abort // - the coordinator retried it // // In the first case the least impactful way to reject the request. @@ -1264,6 +1265,9 @@ ss::future> rm_stm::replicate_msg( } model::offset rm_stm::last_stable_offset() { + if (_as.abort_requested()) [[unlikely]] { + return model::invalid_lso; + } // There are two main scenarios we deal with here. // 1. stm is still bootstrapping // 2. stm is past bootstrapping. @@ -1278,6 +1282,8 @@ model::offset rm_stm::last_stable_offset() { // We optimize for the case where there are no inflight transactional // batch to return the high water mark. auto last_applied = last_applied_offset(); + + // scenario 1: still bootstrapping if (unlikely( !_bootstrap_committed_offset || last_applied < _bootstrap_committed_offset.value())) { @@ -1287,6 +1293,29 @@ model::offset rm_stm::last_stable_offset() { return model::invalid_lso; } + // scenario 2: past bootstrapping + auto read_units = _state_lock.try_hold_read_lock(); + if (!read_units) { + // A reset in progress means the stm may not be in a consistent state + // for LSO calculation. In this case we return the last known LSO to be + // conservative. + vlog( + _ctx_log.trace, + "state machine is resetting, last_known_lso: {}, last_applied: {}", + _last_known_lso, + last_applied); + return _last_known_lso; + } + auto lso_read_units = _lso_lock.try_hold_read_lock(); + if (!lso_read_units) { + // LSO calculation is in progress, return last known LSO + vlog( + _ctx_log.trace, + "lso update in progress, last_known_lso: {}, last_applied: {}", + _last_known_lso, + last_applied); + return _last_known_lso; + } // Check for any in-flight transactions. auto first_tx_start = model::offset::max(); if (_is_tx_enabled && !_active_tx_producers.empty()) { @@ -1313,6 +1342,39 @@ model::offset rm_stm::last_stable_offset() { // transactions. lso = std::min(first_tx_start, next_to_apply); } else if (synced_leader) { + //////////////// WARNING /////////// + // there is a real bug lurking here that overestimates the LSO beyond + // an open transaction. + // + + // The problem manifests when the LSO is requested after successful + // replication of begin_tx batch but before the stm has applied it. + // In this case the LSO may be advanced beyond the begin_tx batch offset + // because the leader doesn't yet 'know' about the begin_tx batch and + // may not consider it in LSO calculation. + + // Another problem is we do not let lso move backwards once + // computed (see _last_known_lso update below), So even if the + // stm has applied the begin_tx later, we will not correct + // the LSO to reflect the begin_tx presence. + + // There is a test that caught this issue in rm_stm_tests which + // is disabled for now until we can fix the underlying problem. + + // The impact of this overestimation is that compaction may compact + // away open transaction begin marker as it relies on LSO. The + // chances are rare but not impossible :(. if at that point the replica + // restarts and there are no further updates in the transaction, the + // transaction has no record of ever beginning. + + // We need a better way to track in-flight transactions for the purposes + // of LSO calculation. + + // An obvious solution is to clamp LSO to next_to_apply in all cases + // but it was tried in the past and caused performance regressions + // in non transaction workloads like write_caching, acks=0/1. So that + // is not a viable solution. + // no inflight transactions in (last_applied, last_visible_index] lso = model::next_offset(last_visible_index); } else { @@ -1834,6 +1896,7 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const { ss::future rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { + auto data_buf = std::move(tx_ss_buf); auto units = co_await _state_lock.hold_write_lock(); vlog( @@ -1841,7 +1904,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { "applying snapshot with last included offset: {}", hdr.offset); tx_snapshot_v6 data; - iobuf_parser data_parser(std::move(tx_ss_buf)); + iobuf_parser data_parser(std::move(data_buf)); if (hdr.version == tx_snapshot_v4::version) { tx_snapshot_v4 data_v4 = co_await reflection::async_adl{}.from(data_parser); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index a2063655dbd58..b146a3b8a8386 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -117,7 +117,7 @@ namespace cluster { * This stm periodically checks if there is any pending transaction for * expiration. The expiration kicks in the transaction is not committed/aborted * within the user set transaction timeout. A producer with an active - * transaction cannot be evicted, so exipration ensures that with timely + * transaction cannot be evicted, so expiration ensures that with timely * expiration of open transactions, the producer states are candidates for * eviction. */ @@ -435,6 +435,15 @@ class rm_stm final : public raft::persisted_stm<> { model::producer_id _highest_producer_id; // for monotonicity of computed LSO. model::offset _last_known_lso{-1}; + /** + * LSO lock protects the LSO from being exposed before transaction begin + * batch is applied. + * + * The lock is acquired in write mode when a begin transaction batch is + * being handled protecting exposure of potentially invalid LSO until the + * begin batch is applied. + */ + ss::rwlock _lso_lock; friend struct ::rm_stm_test_fixture; }; diff --git a/src/v/cluster/tests/rm_stm_test_fixture.h b/src/v/cluster/tests/rm_stm_test_fixture.h index 2637de1a83e6e..04288c8f8cc2d 100644 --- a/src/v/cluster/tests/rm_stm_test_fixture.h +++ b/src/v/cluster/tests/rm_stm_test_fixture.h @@ -20,11 +20,13 @@ static ss::logger logger{"rm_stm-test"}; static prefix_logger ctx_logger{logger, ""}; +static constexpr auto large_timeout = std::chrono::minutes(30); + struct rm_stm_test_fixture : simple_raft_fixture { void create_stm_and_start_raft( storage::ntp_config::default_overrides overrides = {}) { max_concurent_producers.start(std::numeric_limits::max()).get(); - producer_expiration_ms.start(std::chrono::milliseconds::max()).get(); + producer_expiration_ms.start(large_timeout).get(); producer_state_manager .start( ss::sharded_parameter( diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index 638bc2ad0d359..0a108b515e5ce 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -37,6 +37,8 @@ using namespace std::chrono_literals; static const failure_type invalid_producer_epoch(cluster::errc::invalid_producer_epoch); +static constexpr auto timeout = 30min; + struct batches_with_identity { model::batch_identity id; chunked_vector batches; @@ -141,14 +143,8 @@ FIXTURE_TEST(test_tx_happy_tx, rm_stm_test_fixture) { }).get(); auto pid2 = model::producer_identity{2, 0}; - auto term_op = stm - .begin_tx( - pid2, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + auto term_op + = stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE((bool)term_op); BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id()); @@ -208,14 +204,8 @@ FIXTURE_TEST(test_tx_aborted_tx_1, rm_stm_test_fixture) { }).get(); auto pid2 = model::producer_identity{2, 0}; - auto term_op = stm - .begin_tx( - pid2, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + auto term_op + = stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE((bool)term_op); BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id()); @@ -284,14 +274,8 @@ FIXTURE_TEST(test_tx_aborted_tx_2, rm_stm_test_fixture) { }).get(); auto pid2 = model::producer_identity{2, 0}; - auto term_op = stm - .begin_tx( - pid2, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + auto term_op + = stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id()); BOOST_REQUIRE((bool)term_op); @@ -370,14 +354,12 @@ FIXTURE_TEST(test_stale_begin_tx_fenced, rm_stm_test_fixture) { auto tx_seq_old = model::tx_seq(9); auto tx_seq_new = model::tx_seq(11); auto pid1 = model::producer_identity{1, 0}; - auto timeout = std::chrono::milliseconds( - std::numeric_limits::max()); - auto begin_tx = [&stm, &pid1, timeout](model::tx_seq seq) { + auto begin_tx = [&stm, &pid1](model::tx_seq seq) { return stm.begin_tx(pid1, seq, timeout, model::partition_id(0)).get(); }; - auto commit_tx = [&stm, &pid1, timeout](model::tx_seq seq) { + auto commit_tx = [&stm, &pid1](model::tx_seq seq) { return stm.commit_tx(pid1, seq, timeout).get(); }; @@ -421,25 +403,13 @@ FIXTURE_TEST(test_tx_begin_fences_produce, rm_stm_test_fixture) { BOOST_REQUIRE((bool)offset_r); auto pid20 = model::producer_identity{2, 0}; - auto term_op = stm - .begin_tx( - pid20, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + auto term_op + = stm.begin_tx(pid20, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE((bool)term_op); auto pid21 = model::producer_identity{2, 1}; - term_op = stm - .begin_tx( - pid21, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + term_op + = stm.begin_tx(pid21, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE((bool)term_op); rreader = make_batches(pid20, 0, 5, true); @@ -468,14 +438,8 @@ FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) { BOOST_REQUIRE((bool)offset_r); auto pid20 = model::producer_identity{2, 0}; - auto term_op = stm - .begin_tx( - pid20, - tx_seq, - std::chrono::milliseconds( - std::numeric_limits::max()), - model::partition_id(0)) - .get(); + auto term_op + = stm.begin_tx(pid20, tx_seq, timeout, model::partition_id(0)).get(); BOOST_REQUIRE((bool)term_op); rreader = make_batches(pid20, 0, 5, true); @@ -512,8 +476,6 @@ FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) { static int64_t pid_counter = 0; const auto tx_seq = model::tx_seq(0); - const auto timeout = std::chrono::milliseconds( - std::numeric_limits::max()); size_t segment_count = 1; auto& segments = disk_log->segments(); @@ -981,3 +943,89 @@ FIXTURE_TEST(test_concurrent_producer_evictions, rm_stm_test_fixture) { ss::when_all_succeed(std::move(replicate_f), std::move(reset_f)).get(); gate.close().get(); } + +FIXTURE_TEST(test_lso_bound_by_open_tx, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; + auto raft = _raft; + stm.start().get(); + stm.testing_only_disable_auto_abort(); + + wait_for_confirmed_leader(); + wait_for_meta_initialized(); + + auto pid = model::producer_identity{0, 0}; + + auto random_sleep = []() { + auto sleep_ms = std::chrono::milliseconds{ + random_generators::get_int(0, 5)}; + return ss::sleep(sleep_ms); + }; + + auto snapshot_and_apply = [&] { + return local_snapshot(cluster::tx::tx_snapshot::version) + .then([&](auto snapshot) { + return random_sleep().then( + [this, snapshot = std::move(snapshot)]() mutable { + return apply_snapshot( + snapshot.header, std::move(snapshot.data)) + .discard_result(); + }); + }); + }; + + std::optional open_tx_start_offset; + auto tx_and_snapshot = [&](model::tx_seq tx_seq) { + // begin tx + BOOST_REQUIRE( + stm.begin_tx(pid, tx_seq, timeout, model::partition_id(0)).get()); + + open_tx_start_offset = raft->committed_offset(); + + if (tests::random_bool()) { + // snapshot + snapshot_and_apply().get(); + } + // replicate some batches + random_sleep().get(); + auto result = replicate_all(stm, make_batches(pid, 0, 5, true)).get(); + BOOST_REQUIRE(result.has_value()); + + if (tests::random_bool()) { + // snapshot + snapshot_and_apply().get(); + } + random_sleep().get(); + + open_tx_start_offset.reset(); + // commit + BOOST_REQUIRE_EQUAL( + stm.commit_tx(pid, tx_seq, timeout).get(), cluster::tx::errc::none); + }; + + ss::abort_source as; + auto lso_bound_checker_f = ss::do_until( + [&as] { return as.abort_requested(); }, + [&] { + auto current_lso = stm.last_stable_offset(); + bool lso_bound = !open_tx_start_offset + || current_lso == open_tx_start_offset; + if (!lso_bound) { + vlog( + logger.error, + "LSO {} exceeded earliest open tx offset {}", + current_lso, + open_tx_start_offset); + } + BOOST_REQUIRE(lso_bound); + return ss::now(); + }); + + auto deadline = ss::lowres_clock::now() + 10s; + model::tx_seq tx_seq{0}; + while (ss::lowres_clock::now() < deadline) { + tx_and_snapshot(tx_seq++); + } + as.request_abort(); + std::move(lso_bound_checker_f).get(); +} diff --git a/src/v/cluster/tests/tx_compaction_utils.h b/src/v/cluster/tests/tx_compaction_utils.h index 5dbc92ce46590..838cf6a1afc56 100644 --- a/src/v/cluster/tests/tx_compaction_utils.h +++ b/src/v/cluster/tests/tx_compaction_utils.h @@ -138,7 +138,7 @@ class tx_executor { auto batches = co_await copy_to_mem(reader); // Ensure there are no aborted keys (tracked in _aborted_xxx) - // int fence_batch_count = 0; + int fence_batch_count = 0; for (auto& batch : batches) { auto type = batch.header().type; RPTEST_REQUIRE_NE_CORO(type, model::record_batch_type::tx_prepare); @@ -153,14 +153,13 @@ class tx_executor { RPTEST_REQUIRE_CORO(_committed_pids.contains(pid)); RPTEST_REQUIRE_CORO(!_aborted_pids.contains(pid)); } - // if (type == model::record_batch_type::tx_fence) { - // fence_batch_count++; - // } + if (type == model::record_batch_type::tx_fence) { + fence_batch_count++; + } } - // TODO(tx_compact): Re-enable this when transactional control batch - // feature is added. Fence batches should be removed. - // RPTEST_REQUIRE_CORO(fence_batch_count == 0); + // Fence batches should be removed. + RPTEST_REQUIRE_CORO(fence_batch_count == 0); } void run_random_workload( diff --git a/src/v/compaction/utils.cc b/src/v/compaction/utils.cc index d42c960430c0d..06360a1850c24 100644 --- a/src/v/compaction/utils.cc +++ b/src/v/compaction/utils.cc @@ -23,7 +23,7 @@ namespace compaction { bool is_removable_control_batch( const model::ntp& ntp, const model::record_batch_type batch_type, - [[maybe_unused]] ss::sharded& feature_table) { + ss::sharded& feature_table) { // Control batches in consumer offsets are special compared to // the ones in data partitions can be safely compacted away. // Fence batches can also be immediately removed when seen in the @@ -31,13 +31,10 @@ bool is_removable_control_batch( // removal in a user topic is gated by // `log_compaction_disable_tx_batch_removal()`. auto is_co_topic = model::is_consumer_offsets_topic(ntp); - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - auto remove_user_tx_fence_enabled = false; - // auto remove_user_tx_fence_enabled - // = !config::shard_local_cfg().log_compaction_disable_tx_batch_removal() - // && feature_table.local().is_active( - // features::feature::coordinated_compaction); + auto remove_user_tx_fence_enabled + = !config::shard_local_cfg().log_compaction_disable_tx_batch_removal() + && feature_table.local().is_active( + features::feature::coordinated_compaction); auto tx_fence_removable = batch_type == model::record_batch_type::tx_fence && (is_co_topic || remove_user_tx_fence_enabled); return tx_fence_removable diff --git a/src/v/kafka/server/tests/group_tx_compaction_test.cc b/src/v/kafka/server/tests/group_tx_compaction_test.cc index fa5e590881a4d..b9225ddedacf6 100644 --- a/src/v/kafka/server/tests/group_tx_compaction_test.cc +++ b/src/v/kafka/server/tests/group_tx_compaction_test.cc @@ -380,21 +380,19 @@ ss::future<> run_workload( }); struct batch_validator { - bool do_validate_batch([[maybe_unused]] const model::record_batch& b) { - // TODO(tx_compact): Re-enable this when transactional control batch - // feature is added. - // auto batch_type = b.header().type; - // if ( - // batch_type == model::record_batch_type::group_fence_tx - // || batch_type == model::record_batch_type::group_prepare_tx - // || batch_type == model::record_batch_type::group_commit_tx - // || batch_type == model::record_batch_type::group_abort_tx - // || batch_type == model::record_batch_type::tx_fence) { - // return true; - // } - // if (b.header().attrs.is_control()) { - // return true; - // } + bool do_validate_batch(const model::record_batch& b) { + auto batch_type = b.header().type; + if ( + batch_type == model::record_batch_type::group_fence_tx + || batch_type == model::record_batch_type::group_prepare_tx + || batch_type == model::record_batch_type::group_commit_tx + || batch_type == model::record_batch_type::group_abort_tx + || batch_type == model::record_batch_type::tx_fence) { + return true; + } + if (b.header().attrs.is_control()) { + return true; + } return false; } diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index b774d01d22b34..c68b354352200 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -208,13 +208,8 @@ ss::future deduplicate_segment( auto segment_last_offset = seg->offsets().get_committed_offset(); auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - auto unset_transactional_bit_enabled = false; - // auto unset_transactional_bit_enabled - // = !config::shard_local_cfg().log_compaction_disable_tx_batch_removal() - // && feature_table.local().is_active( - // features::feature::coordinated_compaction); + auto unset_transactional_bit_enabled = feature_table.local().is_active( + features::feature::coordinated_compaction); const bool past_tombstone_delete_horizon = internal::is_past_tombstone_delete_horizon(seg, cfg); bool may_have_tombstone_records = false; diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index b66c5fb051b2d..f202c510da256 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -390,13 +390,8 @@ ss::future do_copy_segment_data( auto segment_last_offset = seg->offsets().get_committed_offset(); auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - auto unset_transactional_bit_enabled = false; - // auto unset_transactional_bit_enabled - // = !config::shard_local_cfg().log_compaction_disable_tx_batch_removal() - // && feature_table.local().is_active( - // features::feature::coordinated_compaction); + auto unset_transactional_bit_enabled = feature_table.local().is_active( + features::feature::coordinated_compaction); const bool past_tombstone_delete_horizon = internal::is_past_tombstone_delete_horizon(seg, cfg); bool may_have_tombstone_records = false; @@ -1391,9 +1386,6 @@ ss::future mark_segment_as_finished_self_compaction( bool is_past_transaction_batch_delete_horizon( ss::lw_shared_ptr seg, const compaction::compaction_config& cfg) { - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - return false; if (config::shard_local_cfg().log_compaction_disable_tx_batch_removal()) { // Safety hatch for disabling control batch removal return false; diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 6ae0151bdf59a..7a49896204cb8 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -2327,9 +2327,6 @@ TEST_F(CompactionFixtureTest, SuperfluousPlaceholderRemoval) { } TEST_F(CompactionFixtureTest, AbortTransactions) { - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - return; using cluster::tx_executor; tx_executor exec; auto term = partition->raft()->term(); @@ -2403,9 +2400,6 @@ TEST_F(CompactionFixtureTest, AbortTransactions) { } TEST_F(CompactionFixtureTest, CommitTransactions) { - // TODO(tx_compact): Re-enable this when transactional control batch feature - // is added. - return; using cluster::tx_executor; tx_executor exec; auto term = partition->raft()->term(); diff --git a/tests/rptest/tests/log_compaction_test.py b/tests/rptest/tests/log_compaction_test.py index e19a2c0c4e344..4c0ef2f49e439 100644 --- a/tests/rptest/tests/log_compaction_test.py +++ b/tests/rptest/tests/log_compaction_test.py @@ -21,6 +21,9 @@ KgoVerifierProducer, KgoVerifierSeqConsumer, ) +from rptest.services.redpanda_installer import ( + RedpandaVersionLine, +) from rptest.services.redpanda import MetricsEndpoint from rptest.tests.partition_movement import PartitionMovementMixin from rptest.tests.prealloc_nodes import PreallocNodesTest @@ -752,76 +755,74 @@ def do_test_tx_control_batch_removal(self, test_case_name, test_case): self.check_tx_batches() -# TODO(tx_compact): Re-enable this when transactional control batch feature -# is added. -# class LogCompactionTxRemovalTest(LogCompactionTxRemovalTestBase): -# def __init__(self, test_context): -# super().__init__(test_context) -# -# @cluster(num_nodes=4) -# def test_tx_control_batch_removal(self): -# failed_test_cases = [] -# for name, test_case in LogCompactionTxRemovalTestBase.test_cases.items(): -# self.topic_setup( -# cleanup_policy=TopicSpec.CLEANUP_COMPACT, -# replication_factor=3, -# key_set_cardinality=100, -# partition_count=1, -# ) -# -# try: -# self.do_test_tx_control_batch_removal(name, test_case) -# except Exception as e: -# self.logger.info(f"Test case {name} failed with exception {e}") -# -# failed_test_cases.append(e) -# assert len(failed_test_cases) == 0, ( -# f"Expected 0 failed test cases, got {len(failed_test_cases)}" -# ) -# -# -# class LogCompactionTxRemovalUpgradeTest(LogCompactionTxRemovalTestBase): -# def __init__(self, test_context): -# super().__init__(test_context) -# # Version before `may_have_transactional_batches` was added. -# self.initial_version: RedpandaVersionLine = (25, 1) -# # Version before tx removal was added to compaction. -# self.may_have_tx_batch_version: RedpandaVersionLine = (25, 2) -# -# def setUp(self): -# self.redpanda._installer.install(self.redpanda.nodes, self.initial_version) -# self.redpanda.start() -# -# def upgrade_to_version(self, version): -# self.redpanda._installer.install(self.redpanda.nodes, version) -# self.redpanda.restart_nodes(self.redpanda.nodes) -# -# @cluster(num_nodes=4) -# @matrix(test_case_name=list(LogCompactionTxRemovalTestBase.test_cases.keys())) -# def test_tx_control_batch_removal_with_upgrade(self, test_case_name): -# test_case = LogCompactionTxRemovalTestBase.test_cases[test_case_name] -# -# self.topic_setup( -# cleanup_policy=TopicSpec.CLEANUP_COMPACT, -# replication_factor=3, -# key_set_cardinality=100, -# partition_count=1, -# ) -# -# # Produce some transactional data -# self.produce(test_case) -# -# # Upgrade to `may_have_transactional_batch` version. -# self.upgrade_to_version(self.may_have_tx_batch_version) -# -# # Produce more transactional data. -# self.produce(test_case) -# -# # Upgrade to `HEAD`. -# for version in self.redpanda._installer.upgrade_path_to_head( -# self.may_have_tx_batch_version -# ): -# self.upgrade_to_version(version) -# -# # Perform rest of test -# self.do_test_tx_control_batch_removal(test_case_name, test_case) +class LogCompactionTxRemovalTest(LogCompactionTxRemovalTestBase): + def __init__(self, test_context): + super().__init__(test_context) + + @cluster(num_nodes=4) + def test_tx_control_batch_removal(self): + failed_test_cases = [] + for name, test_case in LogCompactionTxRemovalTestBase.test_cases.items(): + self.topic_setup( + cleanup_policy=TopicSpec.CLEANUP_COMPACT, + replication_factor=3, + key_set_cardinality=100, + partition_count=1, + ) + + try: + self.do_test_tx_control_batch_removal(name, test_case) + except Exception as e: + self.logger.info(f"Test case {name} failed with exception {e}") + + failed_test_cases.append(e) + assert len(failed_test_cases) == 0, ( + f"Expected 0 failed test cases, got {len(failed_test_cases)}" + ) + + +class LogCompactionTxRemovalUpgradeTest(LogCompactionTxRemovalTestBase): + def __init__(self, test_context): + super().__init__(test_context) + # Version before `may_have_transactional_batches` was added. + self.initial_version: RedpandaVersionLine = (25, 1) + # Version before tx removal was added to compaction. + self.may_have_tx_batch_version: RedpandaVersionLine = (25, 2) + + def setUp(self): + self.redpanda._installer.install(self.redpanda.nodes, self.initial_version) + self.redpanda.start() + + def upgrade_to_version(self, version): + self.redpanda._installer.install(self.redpanda.nodes, version) + self.redpanda.restart_nodes(self.redpanda.nodes) + + @cluster(num_nodes=4) + @matrix(test_case_name=list(LogCompactionTxRemovalTestBase.test_cases.keys())) + def test_tx_control_batch_removal_with_upgrade(self, test_case_name): + test_case = LogCompactionTxRemovalTestBase.test_cases[test_case_name] + + self.topic_setup( + cleanup_policy=TopicSpec.CLEANUP_COMPACT, + replication_factor=3, + key_set_cardinality=100, + partition_count=1, + ) + + # Produce some transactional data + self.produce(test_case) + + # Upgrade to `may_have_transactional_batch` version. + self.upgrade_to_version(self.may_have_tx_batch_version) + + # Produce more transactional data. + self.produce(test_case) + + # Upgrade to `HEAD`. + for version in self.redpanda._installer.upgrade_path_to_head( + self.may_have_tx_batch_version + ): + self.upgrade_to_version(version) + + # Perform rest of test + self.do_test_tx_control_batch_removal(test_case_name, test_case) diff --git a/tests/rptest/transactions/tx_upgrade_test.py b/tests/rptest/transactions/tx_upgrade_test.py index d28db37f1923a..1f52403e7ad5d 100644 --- a/tests/rptest/transactions/tx_upgrade_test.py +++ b/tests/rptest/transactions/tx_upgrade_test.py @@ -18,6 +18,7 @@ from ducktape.errors import TimeoutError from ducktape.utils.util import wait_until +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.services.admin import Admin @@ -25,9 +26,12 @@ from rptest.services.redpanda import ( RESTART_LOG_ALLOW_LIST, RedpandaService, + MetricsEndpoint, ) from rptest.services.redpanda_installer import ( RedpandaInstaller, + RedpandaVersionLine, + RedpandaVersionTriple, wait_for_num_versions, ver_string, ) @@ -145,132 +149,130 @@ def upgrade_does_not_change_tx_coordinator_assignment_test(self): ) -# TODO(tx_compact): Re-enable this when transactional control batch feature -# is added. -# class TxUpgradeCompactionTest(TxUpgradeTestBase): -# """ -# Test validating interaction between compaction and transactions during rolling-restart upgrades (including mixed-version node cluster interaction) -# """ -# -# def __init__(self, test_context): -# self.extra_rp_conf = { -# "log_compaction_interval_ms": 4000, -# "log_segment_size": 2 * 1024**2, # 2 MiB -# "compacted_log_segment_size": 1024**2, # 1 MiB -# } -# -# super(TxUpgradeCompactionTest, self).__init__( -# test_context=test_context, extra_rp_conf=self.extra_rp_conf -# ) -# -# self.transaction_timeout_ms = 2000 -# -# def setUp(self): -# # Version before `may_have_transactional_batches` was added. -# self.initial_version: RedpandaVersionTriple = self.installer.latest_for_line( -# RedpandaVersionLine((25, 1)) -# )[0] -# self.installer.install(self.redpanda.nodes, self.initial_version) -# super(TxUpgradeCompactionTest, self).setUp() -# -# def get_complete_sliding_window_rounds(self): -# return self.redpanda.metric_sum( -# metric_name="vectorized_storage_log_complete_sliding_window_rounds_total", -# metrics_endpoint=MetricsEndpoint.METRICS, -# topic=self.topic_spec.name, -# ) -# -# def wait_for_sliding_window_compaction(self): -# self.prev_sliding_window_rounds = None -# -# def compaction_has_completed(): -# new_sliding_window_rounds = self.get_complete_sliding_window_rounds() -# res = self.prev_sliding_window_rounds == new_sliding_window_rounds -# self.prev_sliding_window_rounds = new_sliding_window_rounds -# return res -# -# wait_until( -# compaction_has_completed, -# timeout_sec=120, -# backoff_sec=self.extra_rp_conf["log_compaction_interval_ms"] / 1000 * 4, -# err_msg="Compaction did not stabilize.", -# ) -# -# def check_tx_batches(self): -# viewer = OfflineLogViewer(self.redpanda) -# for node in self.redpanda.nodes: -# num_control_batches = 0 -# num_fence_batches = 0 -# partitions = viewer.read_kafka_records(node, self.topic_spec.name) -# for partition in partitions: -# for record_or_batch in partition: -# if "expanded_attrs" not in record_or_batch: -# continue -# if record_or_batch["expanded_attrs"]["control_batch"]: -# num_control_batches += 1 -# if record_or_batch["type_name"] == "tx_fence": -# num_fence_batches += 1 -# -# assert num_control_batches == 0, ( -# f"expected 0 control batches (abort/commit batches), saw {num_control_batches} on node {node.name}" -# ) -# assert num_fence_batches == 0, ( -# f"expected 0 tx_fence batches, saw {num_fence_batches} on node {node.name}" -# ) -# -# @skip_debug_mode -# @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) -# def upgrade_with_compaction_test(self): -# self.topic_spec = TopicSpec( -# partition_count=self.partition_count, -# delete_retention_ms=3000, -# cleanup_policy=TopicSpec.CLEANUP_COMPACT, -# min_cleanable_dirty_ratio=0.0, -# ) -# self.client().create_topic(self.topic_spec) -# -# prev_version_str = ver_string(self.initial_version) -# unique_versions = wait_for_num_versions(self.redpanda, 1) -# assert prev_version_str in unique_versions, unique_versions -# -# for new_version in self.installer.upgrade_path_to_head(self.initial_version): -# self._populate_tx_coordinator(topic=self.topic_spec.name) -# initial_mapping = self._get_tx_id_mapping() -# self.logger.info(f"Initial mapping {initial_mapping}") -# -# first_node = self.redpanda.nodes[0] -# -# self.installer.install(self.redpanda.nodes, new_version) -# -# # Upgrade & restart one node to the new version. -# self.redpanda.restart_nodes([first_node]) -# unique_versions = wait_for_num_versions(self.redpanda, 2) -# assert prev_version_str in unique_versions, unique_versions -# assert self._get_tx_id_mapping() == initial_mapping, ( -# "Mapping changed after upgrading one of the nodes" -# ) -# -# # verify if txs are handled correctly with mixed versions -# self._populate_tx_coordinator(topic=self.topic_spec.name) -# -# # Only once we upgrade the rest of the nodes do we converge on the new -# # version. -# self.redpanda.restart_nodes(self.redpanda.nodes) -# unique_versions = wait_for_num_versions(self.redpanda, 1) -# assert prev_version_str not in unique_versions, unique_versions -# assert self._get_tx_id_mapping() == initial_mapping, ( -# "Mapping changed after full upgrade" -# ) -# prev_version_str = ver_string(new_version) -# -# # One last round of producing -# self._populate_tx_coordinator(topic=self.topic_spec.name) -# -# # Restart the redpanda broker to roll segments -# self.redpanda.restart_nodes(self.redpanda.nodes) -# -# self.wait_for_sliding_window_compaction() -# self.check_tx_batches() +class TxUpgradeCompactionTest(TxUpgradeTestBase): + """ + Test validating interaction between compaction and transactions during rolling-restart upgrades (including mixed-version node cluster interaction) + """ + + def __init__(self, test_context): + self.extra_rp_conf = { + "log_compaction_interval_ms": 4000, + "log_segment_size": 2 * 1024**2, # 2 MiB + "compacted_log_segment_size": 1024**2, # 1 MiB + } + + super(TxUpgradeCompactionTest, self).__init__( + test_context=test_context, extra_rp_conf=self.extra_rp_conf + ) + + self.transaction_timeout_ms = 2000 + + def setUp(self): + # Version before `may_have_transactional_batches` was added. + self.initial_version: RedpandaVersionTriple = self.installer.latest_for_line( + RedpandaVersionLine((25, 1)) + )[0] + self.installer.install(self.redpanda.nodes, self.initial_version) + super(TxUpgradeCompactionTest, self).setUp() + + def get_complete_sliding_window_rounds(self): + return self.redpanda.metric_sum( + metric_name="vectorized_storage_log_complete_sliding_window_rounds_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_spec.name, + ) + + def wait_for_sliding_window_compaction(self): + self.prev_sliding_window_rounds = None + + def compaction_has_completed(): + new_sliding_window_rounds = self.get_complete_sliding_window_rounds() + res = self.prev_sliding_window_rounds == new_sliding_window_rounds + self.prev_sliding_window_rounds = new_sliding_window_rounds + return res + + wait_until( + compaction_has_completed, + timeout_sec=120, + backoff_sec=self.extra_rp_conf["log_compaction_interval_ms"] / 1000 * 4, + err_msg="Compaction did not stabilize.", + ) + + def check_tx_batches(self): + viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.nodes: + num_control_batches = 0 + num_fence_batches = 0 + partitions = viewer.read_kafka_records(node, self.topic_spec.name) + for partition in partitions: + for record_or_batch in partition: + if "expanded_attrs" not in record_or_batch: + continue + if record_or_batch["expanded_attrs"]["control_batch"]: + num_control_batches += 1 + if record_or_batch["type_name"] == "tx_fence": + num_fence_batches += 1 + + assert num_control_batches == 0, ( + f"expected 0 control batches (abort/commit batches), saw {num_control_batches} on node {node.name}" + ) + assert num_fence_batches == 0, ( + f"expected 0 tx_fence batches, saw {num_fence_batches} on node {node.name}" + ) + + @skip_debug_mode + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def upgrade_with_compaction_test(self): + self.topic_spec = TopicSpec( + partition_count=self.partition_count, + delete_retention_ms=3000, + cleanup_policy=TopicSpec.CLEANUP_COMPACT, + min_cleanable_dirty_ratio=0.0, + ) + self.client().create_topic(self.topic_spec) + + prev_version_str = ver_string(self.initial_version) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert prev_version_str in unique_versions, unique_versions + + for new_version in self.installer.upgrade_path_to_head(self.initial_version): + self._populate_tx_coordinator(topic=self.topic_spec.name) + initial_mapping = self._get_tx_id_mapping() + self.logger.info(f"Initial mapping {initial_mapping}") + + first_node = self.redpanda.nodes[0] + + self.installer.install(self.redpanda.nodes, new_version) + + # Upgrade & restart one node to the new version. + self.redpanda.restart_nodes([first_node]) + unique_versions = wait_for_num_versions(self.redpanda, 2) + assert prev_version_str in unique_versions, unique_versions + assert self._get_tx_id_mapping() == initial_mapping, ( + "Mapping changed after upgrading one of the nodes" + ) + + # verify if txs are handled correctly with mixed versions + self._populate_tx_coordinator(topic=self.topic_spec.name) + + # Only once we upgrade the rest of the nodes do we converge on the new + # version. + self.redpanda.restart_nodes(self.redpanda.nodes) + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert prev_version_str not in unique_versions, unique_versions + assert self._get_tx_id_mapping() == initial_mapping, ( + "Mapping changed after full upgrade" + ) + prev_version_str = ver_string(new_version) + + # One last round of producing + self._populate_tx_coordinator(topic=self.topic_spec.name) + + # Restart the redpanda broker to roll segments + self.redpanda.restart_nodes(self.redpanda.nodes) + + self.wait_for_sliding_window_compaction() + self.check_tx_batches() class TxUpgradeRevertTest(RedpandaTest):