Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/tests/tx_compaction_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ FIXTURE_TEST(test_tx_compaction_combinations, rm_stm_test_fixture) {
scoped_config cfg;
cfg.get("log_disable_housekeeping_for_tests").set_value(true);
cfg.get("log_segment_ms_min").set_value(1ms);
cfg.get("log_compaction_tx_batch_removal_enabled").set_value(true);
for (auto num_tx : {10, 20, 30}) {
for (auto num_rolls : {0, 1, 2, 3, 5}) {
for (auto type :
Expand Down
18 changes: 12 additions & 6 deletions src/v/compaction/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,27 @@

namespace compaction {

bool is_tx_batch_compaction_enabled(
ss::sharded<features::feature_table>& feature_table) {
if (!config::shard_local_cfg().log_compaction_tx_batch_removal_enabled()) {
// Safety hatch for disabling control batch removal
return false;
}
return feature_table.local().is_active(
features::feature::coordinated_compaction);
}

bool is_removable_control_batch(
const model::ntp& ntp,
const model::record_batch_type batch_type,
ss::sharded<features::feature_table>& feature_table) {
bool remove_user_tx_fence_enabled) {
// Control batches in consumer offsets are special compared to
// the ones in data partitions can be safely compacted away.
// Fence batches can also be immediately removed when seen in the
// `__consumer_offsets` topic or safely removed from a user topic. However,
// removal in a user topic is gated by
// `log_compaction_disable_tx_batch_removal()`.
// `log_compaction_tx_batch_removal_enabled()`.
auto is_co_topic = model::is_consumer_offsets_topic(ntp);
auto remove_user_tx_fence_enabled
= !config::shard_local_cfg().log_compaction_disable_tx_batch_removal()
&& feature_table.local().is_active(
features::feature::coordinated_compaction);
auto tx_fence_removable = batch_type == model::record_batch_type::tx_fence
&& (is_co_topic || remove_user_tx_fence_enabled);
return tx_fence_removable
Expand Down
5 changes: 4 additions & 1 deletion src/v/compaction/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

namespace compaction {

bool is_tx_batch_compaction_enabled(
ss::sharded<features::feature_table>& feature_table);

// Returns `true` if the provided `record_batch_type` is a control batch that is
// immediately removable during compaction. This means we do not need to
// consider either deduplication (compactible) or `delete.retention.ms` (a form
Expand All @@ -28,7 +31,7 @@ namespace compaction {
bool is_removable_control_batch(
const model::ntp& ntp,
const model::record_batch_type batch_type,
ss::sharded<features::feature_table>& feature_table);
bool remove_user_tx_fence_enabled);

// Returns `true` or `false` indicating whether the batch type of the header
// passed contains records that may be removed by compaction- whether that is by
Expand Down
10 changes: 6 additions & 4 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1130,11 +1130,13 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt)
, log_compaction_disable_tx_batch_removal(
*this, "log_compaction_disable_tx_batch_removal")
, log_compaction_tx_batch_removal_enabled(
*this,
"log_compaction_disable_tx_batch_removal",
"Disable removal of transactional control batches. This should only be "
"toggled to `true` in extreme cases of proven instability due to issues "
"with transactional control batch removal.",
"log_compaction_tx_batch_removal_enabled",
"Enables removal of transactional control batches during compaction. "
"These batches are removed according to a topic's configured "
"delete.retention.ms.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
false)
, retention_bytes(
Expand Down
3 changes: 2 additions & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ struct configuration final : public config_store {
property<std::optional<uint32_t>>
log_compaction_merge_max_segments_per_range;
property<std::optional<uint32_t>> log_compaction_merge_max_ranges;
property<bool> log_compaction_disable_tx_batch_removal;
deprecated_property log_compaction_disable_tx_batch_removal;
property<bool> log_compaction_tx_batch_removal_enabled;
// same as retention.size in kafka - TODO: size not implemented
property<std::optional<size_t>> retention_bytes;
property<int32_t> group_topic_partitions;
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ bool tx_reducer::can_discard_consumer_offsets_batch(
// batches, so no need to retain originally written group_prepare_tx
// batches while the transaction is in progress.
return compaction::is_removable_control_batch(
_ntp, b.header().type, _feature_table);
_ntp, b.header().type, _tx_batch_compaction_enabled);
}

ss::future<ss::stop_iteration> tx_reducer::operator()(model::record_batch&& b) {
Expand Down
6 changes: 3 additions & 3 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ class tx_reducer : public compaction_reducer {
ss::lw_shared_ptr<storage::stm_manager> stm_mgr,
chunked_vector<model::tx_range>&& txs,
compacted_index_writer* w,
ss::sharded<features::feature_table>& feature_table) noexcept
bool tx_batch_compaction_enabled) noexcept
: _ntp(std::move(ntp))
, _delegate(index_rebuilder_reducer(w))
, _aborted_txs(model::tx_range_cmp(), std::move(txs))
, _stm_mgr(stm_mgr)
, _transactional_stm_type(stm_mgr->transactional_stm_type())
, _feature_table(feature_table) {
, _tx_batch_compaction_enabled(tx_batch_compaction_enabled) {
_stats.num_aborted_txes = _aborted_txs.size();
}
ss::future<ss::stop_iteration> operator()(model::record_batch&&);
Expand Down Expand Up @@ -313,7 +313,7 @@ class tx_reducer : public compaction_reducer {
// Set if a transactional stm is attached to this partition.
std::optional<storage::stm_type> _transactional_stm_type;

ss::sharded<features::feature_table>& _feature_table;
bool _tx_batch_compaction_enabled;
};

// Builds up a key_offset_map for a segment, starting from the offset
Expand Down
10 changes: 8 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,10 @@ ss::future<bool> disk_log_impl::sliding_window_compact(

const bool segment_needs_rewrite
= internal::may_have_removable_tombstones(seg, cfg)
|| internal::has_removable_transaction_batches(seg, cfg)
|| internal::has_removable_transaction_batches(
seg,
cfg,
compaction::is_tx_batch_compaction_enabled(_feature_table))
|| co_await segment_needs_rewrite_with_offset_map(cfg, seg, map);
if (!segment_needs_rewrite) {
vlog(
Expand Down Expand Up @@ -1520,7 +1523,10 @@ ss::future<bool> disk_log_impl::chunked_sliding_window_compact(

const bool segment_needs_rewrite
= internal::may_have_removable_tombstones(s, compact_cfg)
|| internal::has_removable_transaction_batches(s, compact_cfg)
|| internal::has_removable_transaction_batches(
s,
compact_cfg,
compaction::is_tx_batch_compaction_enabled(_feature_table))
|| co_await segment_needs_rewrite_with_offset_map(
compact_cfg, s, map);
if (!segment_needs_rewrite) {
Expand Down
19 changes: 10 additions & 9 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ ss::future<model::offset> build_offset_map(
read_lock,
resources,
probe,
feature_table);
compaction::is_tx_batch_compaction_enabled(feature_table));
} catch (const segment_closed_exception& e) {
// Stop early if the segment e.g. has been prefix truncated.
// We'll make do with the offset map we have so far.
Expand Down Expand Up @@ -209,15 +209,14 @@ ss::future<index_state> deduplicate_segment(
auto segment_last_offset = seg->offsets().get_committed_offset();
auto compaction_placeholder_enabled = feature_table.local().is_active(
features::feature::compaction_placeholder_batch);
auto unset_transactional_bit_enabled
= feature_table.local().is_active(
features::feature::coordinated_compaction)
&& !config::shard_local_cfg().log_compaction_disable_tx_batch_removal();
auto tx_batch_compaction_enabled
= compaction::is_tx_batch_compaction_enabled(feature_table);
const bool past_tombstone_delete_horizon
= internal::is_past_tombstone_delete_horizon(seg, cfg);
bool may_have_tombstone_records = false;
const bool past_tx_delete_horizon
= internal::is_past_transaction_batch_delete_horizon(seg, cfg);
= internal::is_past_transaction_batch_delete_horizon(
seg, cfg, tx_batch_compaction_enabled);
bool may_have_transaction_control_batches = false;
bool may_have_transaction_data_or_fence_batches = false;

Expand All @@ -237,7 +236,8 @@ ss::future<index_state> deduplicate_segment(
&probe,
past_tx_delete_horizon,
&may_have_transaction_control_batches,
&may_have_transaction_data_or_fence_batches](
&may_have_transaction_data_or_fence_batches,
tx_batch_compaction_enabled](
const model::record_batch& b,
const model::record& r,
bool is_last_record_in_batch) {
Expand All @@ -254,7 +254,8 @@ ss::future<index_state> deduplicate_segment(
may_have_tombstone_records,
past_tx_delete_horizon,
may_have_transaction_control_batches,
may_have_transaction_data_or_fence_batches);
may_have_transaction_data_or_fence_batches,
tx_batch_compaction_enabled);
};

auto copy_reducer = internal::copy_data_segment_reducer(
Expand All @@ -266,7 +267,7 @@ ss::future<index_state> deduplicate_segment(
seg->index().base_offset(),
segment_last_offset,
compaction_placeholder_enabled,
unset_transactional_bit_enabled,
tx_batch_compaction_enabled,
stm_manager,
&cmp_idx_writer,
inject_reader_failure,
Expand Down
Loading