From dd8e5749511456a68a5fb3304a5fd06232c7d7ec Mon Sep 17 00:00:00 2001 From: majialong Date: Tue, 2 Jun 2026 22:50:53 +0800 Subject: [PATCH] KAFKA-18209: Clean up transaction state config logic --- .../transaction/TransactionCoordinator.scala | 18 ++++++++++++- .../transaction/TransactionStateManager.scala | 25 ++++-------------- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../TransactionStateManagerTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 4 +-- .../transaction/TransactionLog.java | 26 ------------------- 7 files changed, 27 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 43455a83e4143..70db45b1ebadb 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -19,6 +19,7 @@ package kafka.coordinator.transaction import kafka.server.{KafkaConfig, ReplicaManager} import kafka.utils.Logging import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult import org.apache.kafka.common.message.{DescribeTransactionsResponseData, ListTransactionsResponseData} @@ -31,6 +32,7 @@ import org.apache.kafka.common.utils.internals.LogContext import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionConfig, TransactionLogConfig, TransactionMetadata, TransactionState, TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch, TxnTransitMetadata} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} +import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler import java.util @@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.jdk.OptionConverters._ object TransactionCoordinator { + val EnforcedRequiredAcks: Short = -1.toShort def apply(config: KafkaConfig, replicaManager: ReplicaManager, @@ -1004,7 +1007,20 @@ class TransactionCoordinator(txnConfig: TransactionConfig, } } - def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs + /** + * Return the configuration properties of the transaction state topic. + * + * @return Properties of the transaction state topic. + */ + def transactionStateTopicConfigs: Properties = { + val props = new Properties + props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false") + props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name) + props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) + props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, txnConfig.transactionLogMinInsyncReplicas.toString) + props.put(TopicConfig.SEGMENT_BYTES_CONFIG, txnConfig.transactionLogSegmentBytes.toString) + props + } def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 425789a8263a8..99894ce759ae6 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -17,13 +17,12 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer -import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.server.ReplicaManager import kafka.utils.Logging -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.ListTransactionsResponseData import org.apache.kafka.common.metrics.Metrics @@ -38,7 +37,6 @@ import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition import org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata, TransactionConfig, TransactionLog, TransactionMetadata, TransactionState, TransactionStateManagerConfig, TransactionPartitionAndLeaderEpoch, TransactionalIdAndProducerIdEpoch, TransactionalIdCoordinatorEpochAndMetadata, TransactionalIdCoordinatorEpochAndTransitMetadata, TxnMetadataCacheEntry, TxnTransitMetadata} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} -import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock} @@ -183,7 +181,7 @@ class TransactionStateManager(brokerId: Int, if (recordsBuilder == null) { recordsBuilder = MemoryRecords.builder( ByteBuffer.allocate(math.min(16384, maxBatchSize)), - TransactionLog.ENFORCED_COMPRESSION, + Compression.NONE, TimestampType.CREATE_TIME, 0L, maxBatchSize @@ -290,7 +288,7 @@ class TransactionStateManager(brokerId: Int, inReadLock[Exception](stateLock, () => { replicaManager.appendRecords( timeout = config.requestTimeoutMs, - requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS, + requiredAcks = TransactionCoordinator.EnforcedRequiredAcks, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords), @@ -438,19 +436,6 @@ class TransactionStateManager(brokerId: Int, enableTwoPC || (txnTimeoutMs <= config.transactionMaxTimeoutMs && txnTimeoutMs > 0) } - def transactionTopicConfigs: Properties = { - val props = new Properties - - // enforce disabled unclean leader election, no compression types, and compact cleanup policy - props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false") - props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name) - props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.transactionLogMinInsyncReplicas.toString) - props.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.transactionLogSegmentBytes.toString) - - props - } - def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): ConcurrentMap[String, TransactionMetadata] = { @@ -672,7 +657,7 @@ class TransactionStateManager(brokerId: Int, val valueBytes = TransactionLog.valueToBytes(newMetadata, transactionVersionLevel()) val timestamp = time.milliseconds() - val records = MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new SimpleRecord(timestamp, keyBytes, valueBytes)) + val records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(timestamp, keyBytes, valueBytes)) val transactionStateTopicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId)) val transactionStateTopicIdPartition = replicaManager.topicIdPartition(transactionStateTopicPartition) val recordsPerPartition = Map(transactionStateTopicIdPartition -> records) @@ -815,7 +800,7 @@ class TransactionStateManager(brokerId: Int, if (append) { replicaManager.appendRecords( timeout = newMetadata.txnTimeoutMs.toLong, - requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS, + requiredAcks = TransactionCoordinator.EnforcedRequiredAcks, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = recordsPerPartition, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 63a0472ddf07f..c5ccf3571d1ca 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -413,7 +413,7 @@ class BrokerServer( autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, () => groupCoordinator.groupMetadataTopicConfigs, - () => transactionCoordinator.transactionTopicConfigs, + () => transactionCoordinator.transactionStateTopicConfigs, () => shareCoordinator.shareGroupStateTopicConfigs, new KRaftTopicCreator(clientToControllerChannelManager), time, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 03bac25b016b5..cc6355b61ba1a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1835,7 +1835,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (controlRecords.nonEmpty) { replicaManager.appendRecords( timeout = config.requestTimeoutMs.toLong, - requiredAcks = -1, + requiredAcks = TransactionCoordinator.EnforcedRequiredAcks, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = controlRecords, diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 4343987f1a075..11714a66ab66e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -1182,7 +1182,7 @@ class TransactionStateManagerTest { val partitionId = transactionManager.partitionFor(transactionalId1) val topicPartition = new TopicIdPartition(transactionTopicId, partitionId, TRANSACTION_STATE_TOPIC_NAME) val expectedTombstone = new SimpleRecord(time.milliseconds(), TransactionLog.keyToBytes(transactionalId1), null) - val expectedRecords = MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, expectedTombstone) + val expectedRecords = MemoryRecords.withRecords(Compression.NONE, expectedTombstone) assertEquals(Set(topicPartition), appendedRecords.keySet) assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq) } else { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5471c41029354..1acc98639ca13 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -765,7 +765,7 @@ class KafkaApisTest extends Logging { case CoordinatorType.TRANSACTION => topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) - when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) + when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new Properties) authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID, groupId, AuthorizationResult.ALLOWED) Topic.TRANSACTION_STATE_TOPIC_NAME @@ -937,7 +937,7 @@ class KafkaApisTest extends Logging { case Topic.TRANSACTION_STATE_TOPIC_NAME => topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) - when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) + when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new Properties) true case _ => topicConfigOverride.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numBrokersNeeded.toString) diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java index b63141ca5dc8a..fbf3a21304c01 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java @@ -17,7 +17,6 @@ package org.apache.kafka.coordinator.transaction; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.record.internal.RecordBatch; @@ -42,14 +41,6 @@ */ public class TransactionLog { - // enforce always using - // 1. cleanup policy = compact - // 2. compression = none - // 3. unclean leader election = disabled - // 4. required acks = -1 when writing - public static final Compression ENFORCED_COMPRESSION = Compression.NONE; - public static final short ENFORCED_REQUIRED_ACKS = (short) -1; - /** * Generates the bytes for transaction log message key * @@ -103,23 +94,6 @@ public static byte[] valueToBytes(TxnTransitMetadata txnMetadata, return MessageUtil.toVersionPrefixedBytes(logValueVersion, value); } - /** - * Decodes the transaction log messages' key - * - * @return the transactional id - * @throws IllegalStateException if the version is not a valid transaction log key version - */ - public static String readTxnRecordKey(ByteBuffer buffer) { - short version = buffer.getShort(); - if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) { - return new TransactionLogKey(new ByteBufferAccessor(buffer), (short) 0).transactionalId(); - } else { - throw new IllegalStateException("Unknown version " + version + " from the transaction log message key"); - } - } - - - public sealed interface ReadResult permits TxnRecord, TxnTombstone, UnknownKeyVersion, UnknownValueVersion { } public record TxnRecord(String transactionId, TransactionMetadata metadata) implements ReadResult { }