Skip to content
67 changes: 65 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ ss::future<checked<model::term_id, tx::errc>> rm_stm::begin_tx(
std::chrono::milliseconds transaction_timeout_ms,
model::partition_id tm) {
auto state_lock = co_await _state_lock.hold_read_lock();
auto lso_lock_holder = co_await _lso_lock.hold_write_lock();
if (!co_await sync(_sync_timeout())) {
vlog(
_ctx_log.trace,
Expand Down Expand Up @@ -738,7 +739,7 @@ ss::future<tx::errc> rm_stm::do_abort_tx(
// Or it may mean that a tx coordinator
// - lost its state
// - rolled back to previous op
// - the previous op happend to be an abort
// - the previous op happened to be an abort
// - the coordinator retried it
//
// In the first case the least impactful way to reject the request.
Expand Down Expand Up @@ -1264,6 +1265,9 @@ ss::future<result<kafka_result>> rm_stm::replicate_msg(
}

model::offset rm_stm::last_stable_offset() {
if (_as.abort_requested()) [[unlikely]] {
return model::invalid_lso;
}
// There are two main scenarios we deal with here.
// 1. stm is still bootstrapping
// 2. stm is past bootstrapping.
Expand All @@ -1278,6 +1282,8 @@ model::offset rm_stm::last_stable_offset() {
// We optimize for the case where there are no inflight transactional
// batch to return the high water mark.
auto last_applied = last_applied_offset();

// scenario 1: still bootstrapping
if (unlikely(
!_bootstrap_committed_offset
|| last_applied < _bootstrap_committed_offset.value())) {
Expand All @@ -1287,6 +1293,29 @@ model::offset rm_stm::last_stable_offset() {
return model::invalid_lso;
}

// scenario 2: past bootstrapping
auto read_units = _state_lock.try_hold_read_lock();
if (!read_units) {
// A reset in progress means the stm may not be in a consistent state
// for LSO calculation. In this case we return the last known LSO to be
// conservative.
vlog(
_ctx_log.trace,
"state machine is resetting, last_known_lso: {}, last_applied: {}",
_last_known_lso,
last_applied);
return _last_known_lso;
}
auto lso_read_units = _lso_lock.try_hold_read_lock();
if (!lso_read_units) {
// LSO calculation is in progress, return last known LSO
vlog(
_ctx_log.trace,
"lso update in progress, last_known_lso: {}, last_applied: {}",
_last_known_lso,
last_applied);
return _last_known_lso;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor optimization is to advance lso right after obtaining _lso_lock.write_lock(). Doing so ensures that lso reflects the last_visible_index at that time, instead of depending on the previous lso() call, which might be outdated.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure i understand that part

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this situation: when last_stable_offset() is called at time t, the _last_known_lso is set to 100.
Now, suppose several transactions have occurred, and the actual LSO is now at 900. When the new begin (at offset 901) and last_stable_offset() are executed concurrently at say t + 1hr, a race condition occurs and runs into this check and the function conservatively returns 100, since that’s the last recorded known LSO.
Ideally, the function could return 900 instead. One possible solution would be to recompute the _last_known_offset after acquiring the _lso_lock in begin, but before performing replication, I think.

}
// Check for any in-flight transactions.
auto first_tx_start = model::offset::max();
if (_is_tx_enabled && !_active_tx_producers.empty()) {
Expand All @@ -1313,6 +1342,39 @@ model::offset rm_stm::last_stable_offset() {
// transactions.
lso = std::min(first_tx_start, next_to_apply);
} else if (synced_leader) {
//////////////// WARNING ///////////
// there is a real bug lurking here that overestimates the LSO beyond
// an open transaction.
//

// The problem manifests when the LSO is requested after successful
// replication of begin_tx batch but before the stm has applied it.
// In this case the LSO may be advanced beyond the begin_tx batch offset
// because the leader doesn't yet 'know' about the begin_tx batch and
// may not consider it in LSO calculation.

// Another problem is we do not let lso move backwards once
// computed (see _last_known_lso update below), So even if the
// stm has applied the begin_tx later, we will not correct
// the LSO to reflect the begin_tx presence.

// There is a test that caught this issue in rm_stm_tests which
// is disabled for now until we can fix the underlying problem.

// The impact of this overestimation is that compaction may compact
// away open transaction begin marker as it relies on LSO. The
// chances are rare but not impossible :(. if at that point the replica
// restarts and there are no further updates in the transaction, the
// transaction has no record of ever beginning.

// We need a better way to track in-flight transactions for the purposes
// of LSO calculation.

// An obvious solution is to clamp LSO to next_to_apply in all cases
// but it was tried in the past and caused performance regressions
// in non transaction workloads like write_caching, acks=0/1. So that
// is not a viable solution.

// no inflight transactions in (last_applied, last_visible_index]
lso = model::next_offset(last_visible_index);
} else {
Expand Down Expand Up @@ -1834,14 +1896,15 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {

ss::future<raft::local_snapshot_applied>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto data_buf = std::move(tx_ss_buf);
Comment thread
joe-redpanda marked this conversation as resolved.
auto units = co_await _state_lock.hold_write_lock();

vlog(
_ctx_log.trace,
"applying snapshot with last included offset: {}",
hdr.offset);
tx_snapshot_v6 data;
iobuf_parser data_parser(std::move(tx_ss_buf));
iobuf_parser data_parser(std::move(data_buf));
if (hdr.version == tx_snapshot_v4::version) {
tx_snapshot_v4 data_v4
= co_await reflection::async_adl<tx_snapshot_v4>{}.from(data_parser);
Expand Down
11 changes: 10 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ namespace cluster {
* This stm periodically checks if there is any pending transaction for
* expiration. The expiration kicks in the transaction is not committed/aborted
* within the user set transaction timeout. A producer with an active
* transaction cannot be evicted, so exipration ensures that with timely
* transaction cannot be evicted, so expiration ensures that with timely
* expiration of open transactions, the producer states are candidates for
* eviction.
*/
Expand Down Expand Up @@ -435,6 +435,15 @@ class rm_stm final : public raft::persisted_stm<> {
model::producer_id _highest_producer_id;
// for monotonicity of computed LSO.
model::offset _last_known_lso{-1};
/**
* LSO lock protects the LSO from being exposed before transaction begin
* batch is applied.
*
* The lock is acquired in write mode when a begin transaction batch is
* being handled protecting exposure of potentially invalid LSO until the
* begin batch is applied.
*/
ss::rwlock _lso_lock;
Comment thread
mmaslankaprv marked this conversation as resolved.

friend struct ::rm_stm_test_fixture;
};
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
static ss::logger logger{"rm_stm-test"};
static prefix_logger ctx_logger{logger, ""};

static constexpr auto large_timeout = std::chrono::minutes(30);

struct rm_stm_test_fixture : simple_raft_fixture {
void create_stm_and_start_raft(
storage::ntp_config::default_overrides overrides = {}) {
max_concurent_producers.start(std::numeric_limits<size_t>::max()).get();
producer_expiration_ms.start(std::chrono::milliseconds::max()).get();
producer_expiration_ms.start(large_timeout).get();
producer_state_manager
.start(
ss::sharded_parameter(
Expand Down
156 changes: 102 additions & 54 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ using namespace std::chrono_literals;
static const failure_type<cluster::errc>
invalid_producer_epoch(cluster::errc::invalid_producer_epoch);

static constexpr auto timeout = 30min;

struct batches_with_identity {
model::batch_identity id;
chunked_vector<model::record_batch> batches;
Expand Down Expand Up @@ -141,14 +143,8 @@ FIXTURE_TEST(test_tx_happy_tx, rm_stm_test_fixture) {
}).get();

auto pid2 = model::producer_identity{2, 0};
auto term_op = stm
.begin_tx(
pid2,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
auto term_op
= stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE((bool)term_op);
BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id());

Expand Down Expand Up @@ -208,14 +204,8 @@ FIXTURE_TEST(test_tx_aborted_tx_1, rm_stm_test_fixture) {
}).get();

auto pid2 = model::producer_identity{2, 0};
auto term_op = stm
.begin_tx(
pid2,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
auto term_op
= stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE((bool)term_op);
BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id());

Expand Down Expand Up @@ -284,14 +274,8 @@ FIXTURE_TEST(test_tx_aborted_tx_2, rm_stm_test_fixture) {
}).get();

auto pid2 = model::producer_identity{2, 0};
auto term_op = stm
.begin_tx(
pid2,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
auto term_op
= stm.begin_tx(pid2, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id());
BOOST_REQUIRE((bool)term_op);

Expand Down Expand Up @@ -370,14 +354,12 @@ FIXTURE_TEST(test_stale_begin_tx_fenced, rm_stm_test_fixture) {
auto tx_seq_old = model::tx_seq(9);
auto tx_seq_new = model::tx_seq(11);
auto pid1 = model::producer_identity{1, 0};
auto timeout = std::chrono::milliseconds(
std::numeric_limits<int32_t>::max());

auto begin_tx = [&stm, &pid1, timeout](model::tx_seq seq) {
auto begin_tx = [&stm, &pid1](model::tx_seq seq) {
return stm.begin_tx(pid1, seq, timeout, model::partition_id(0)).get();
};

auto commit_tx = [&stm, &pid1, timeout](model::tx_seq seq) {
auto commit_tx = [&stm, &pid1](model::tx_seq seq) {
return stm.commit_tx(pid1, seq, timeout).get();
};

Expand Down Expand Up @@ -421,25 +403,13 @@ FIXTURE_TEST(test_tx_begin_fences_produce, rm_stm_test_fixture) {
BOOST_REQUIRE((bool)offset_r);

auto pid20 = model::producer_identity{2, 0};
auto term_op = stm
.begin_tx(
pid20,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
auto term_op
= stm.begin_tx(pid20, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE((bool)term_op);

auto pid21 = model::producer_identity{2, 1};
term_op = stm
.begin_tx(
pid21,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
term_op
= stm.begin_tx(pid21, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE((bool)term_op);

rreader = make_batches(pid20, 0, 5, true);
Expand Down Expand Up @@ -468,14 +438,8 @@ FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) {
BOOST_REQUIRE((bool)offset_r);

auto pid20 = model::producer_identity{2, 0};
auto term_op = stm
.begin_tx(
pid20,
tx_seq,
std::chrono::milliseconds(
std::numeric_limits<int32_t>::max()),
model::partition_id(0))
.get();
auto term_op
= stm.begin_tx(pid20, tx_seq, timeout, model::partition_id(0)).get();
BOOST_REQUIRE((bool)term_op);

rreader = make_batches(pid20, 0, 5, true);
Expand Down Expand Up @@ -512,8 +476,6 @@ FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) {

static int64_t pid_counter = 0;
const auto tx_seq = model::tx_seq(0);
const auto timeout = std::chrono::milliseconds(
std::numeric_limits<int32_t>::max());
size_t segment_count = 1;

auto& segments = disk_log->segments();
Expand Down Expand Up @@ -981,3 +943,89 @@ FIXTURE_TEST(test_concurrent_producer_evictions, rm_stm_test_fixture) {
ss::when_all_succeed(std::move(replicate_f), std::move(reset_f)).get();
gate.close().get();
}

FIXTURE_TEST(test_lso_bound_by_open_tx, rm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
auto raft = _raft;
stm.start().get();
stm.testing_only_disable_auto_abort();

wait_for_confirmed_leader();
wait_for_meta_initialized();

auto pid = model::producer_identity{0, 0};

auto random_sleep = []() {
auto sleep_ms = std::chrono::milliseconds{
random_generators::get_int(0, 5)};
return ss::sleep(sleep_ms);
};

auto snapshot_and_apply = [&] {
return local_snapshot(cluster::tx::tx_snapshot::version)
.then([&](auto snapshot) {
return random_sleep().then(
[this, snapshot = std::move(snapshot)]() mutable {
return apply_snapshot(
snapshot.header, std::move(snapshot.data))
.discard_result();
});
});
};

std::optional<model::offset> open_tx_start_offset;
auto tx_and_snapshot = [&](model::tx_seq tx_seq) {
// begin tx
BOOST_REQUIRE(
stm.begin_tx(pid, tx_seq, timeout, model::partition_id(0)).get());

open_tx_start_offset = raft->committed_offset();

if (tests::random_bool()) {
// snapshot
snapshot_and_apply().get();
}
// replicate some batches
random_sleep().get();
auto result = replicate_all(stm, make_batches(pid, 0, 5, true)).get();
BOOST_REQUIRE(result.has_value());

if (tests::random_bool()) {
// snapshot
snapshot_and_apply().get();
}
random_sleep().get();

open_tx_start_offset.reset();
// commit
BOOST_REQUIRE_EQUAL(
stm.commit_tx(pid, tx_seq, timeout).get(), cluster::tx::errc::none);
};

ss::abort_source as;
auto lso_bound_checker_f = ss::do_until(
[&as] { return as.abort_requested(); },
[&] {
auto current_lso = stm.last_stable_offset();
bool lso_bound = !open_tx_start_offset
|| current_lso == open_tx_start_offset;
if (!lso_bound) {
vlog(
logger.error,
"LSO {} exceeded earliest open tx offset {}",
current_lso,
open_tx_start_offset);
}
BOOST_REQUIRE(lso_bound);
return ss::now();
});

auto deadline = ss::lowres_clock::now() + 10s;
model::tx_seq tx_seq{0};
while (ss::lowres_clock::now() < deadline) {
tx_and_snapshot(tx_seq++);
}
as.request_abort();
std::move(lso_bound_checker_f).get();
}
Loading