Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.coordinator.transaction
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
import org.apache.kafka.common.message.{DescribeTransactionsResponseData, ListTransactionsResponseData}
Expand All @@ -31,6 +32,7 @@ import org.apache.kafka.common.utils.internals.LogContext
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionConfig, TransactionLogConfig, TransactionMetadata, TransactionState, TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch, TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler

import java.util
Expand All @@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.OptionConverters._

object TransactionCoordinator {
val EnforcedRequiredAcks: Short = -1.toShort

def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
Expand Down Expand Up @@ -1004,7 +1007,20 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}

def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
/**
* Return the configuration properties of the transaction state topic.
*
* @return Properties of the transaction state topic.
*/
def transactionStateTopicConfigs: Properties = {
val props = new Properties
props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, txnConfig.transactionLogMinInsyncReplicas.toString)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, txnConfig.transactionLogSegmentBytes.toString)
props
}

def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package kafka.coordinator.transaction

import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.ReplicaManager
import kafka.utils.Logging
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ListTransactionsResponseData
import org.apache.kafka.common.metrics.Metrics
Expand All @@ -38,7 +37,6 @@ import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition
import org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata, TransactionConfig, TransactionLog, TransactionMetadata, TransactionState, TransactionStateManagerConfig, TransactionPartitionAndLeaderEpoch, TransactionalIdAndProducerIdEpoch, TransactionalIdCoordinatorEpochAndMetadata, TransactionalIdCoordinatorEpochAndTransitMetadata, TxnMetadataCacheEntry, TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
Expand Down Expand Up @@ -183,7 +181,7 @@ class TransactionStateManager(brokerId: Int,
if (recordsBuilder == null) {
recordsBuilder = MemoryRecords.builder(
ByteBuffer.allocate(math.min(16384, maxBatchSize)),
TransactionLog.ENFORCED_COMPRESSION,
Compression.NONE,
TimestampType.CREATE_TIME,
0L,
maxBatchSize
Expand Down Expand Up @@ -290,7 +288,7 @@ class TransactionStateManager(brokerId: Int,
inReadLock[Exception](stateLock, () => {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs,
requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
Expand Down Expand Up @@ -438,19 +436,6 @@ class TransactionStateManager(brokerId: Int,
enableTwoPC || (txnTimeoutMs <= config.transactionMaxTimeoutMs && txnTimeoutMs > 0)
}

def transactionTopicConfigs: Properties = {
val props = new Properties

// enforce disabled unclean leader election, no compression types, and compact cleanup policy
props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.transactionLogMinInsyncReplicas.toString)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.transactionLogSegmentBytes.toString)

props
}

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): ConcurrentMap[String, TransactionMetadata] = {
Expand Down Expand Up @@ -672,7 +657,7 @@ class TransactionStateManager(brokerId: Int,
val valueBytes = TransactionLog.valueToBytes(newMetadata, transactionVersionLevel())
val timestamp = time.milliseconds()

val records = MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new SimpleRecord(timestamp, keyBytes, valueBytes))
val records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(timestamp, keyBytes, valueBytes))
val transactionStateTopicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
val transactionStateTopicIdPartition = replicaManager.topicIdPartition(transactionStateTopicPartition)
val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
Expand Down Expand Up @@ -815,7 +800,7 @@ class TransactionStateManager(brokerId: Int,
if (append) {
replicaManager.appendRecords(
timeout = newMetadata.txnTimeoutMs.toLong,
requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = recordsPerPartition,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class BrokerServer(
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
() => groupCoordinator.groupMetadataTopicConfigs,
() => transactionCoordinator.transactionTopicConfigs,
() => transactionCoordinator.transactionStateTopicConfigs,
() => shareCoordinator.shareGroupStateTopicConfigs,
new KRaftTopicCreator(clientToControllerChannelManager),
time,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (controlRecords.nonEmpty) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs.toLong,
requiredAcks = -1,
requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = controlRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ class TransactionStateManagerTest {
val partitionId = transactionManager.partitionFor(transactionalId1)
val topicPartition = new TopicIdPartition(transactionTopicId, partitionId, TRANSACTION_STATE_TOPIC_NAME)
val expectedTombstone = new SimpleRecord(time.milliseconds(), TransactionLog.keyToBytes(transactionalId1), null)
val expectedRecords = MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, expectedTombstone)
val expectedRecords = MemoryRecords.withRecords(Compression.NONE, expectedTombstone)
assertEquals(Set(topicPartition), appendedRecords.keySet)
assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ class KafkaApisTest extends Logging {
case CoordinatorType.TRANSACTION =>
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
Topic.TRANSACTION_STATE_TOPIC_NAME
Expand Down Expand Up @@ -937,7 +937,7 @@ class KafkaApisTest extends Logging {
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString)
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties)
when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new Properties)
true
case _ =>
topicConfigOverride.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numBrokersNeeded.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.transaction;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.record.internal.RecordBatch;
Expand All @@ -42,14 +41,6 @@
*/
public class TransactionLog {

// enforce always using
// 1. cleanup policy = compact
// 2. compression = none
// 3. unclean leader election = disabled
// 4. required acks = -1 when writing
public static final Compression ENFORCED_COMPRESSION = Compression.NONE;
public static final short ENFORCED_REQUIRED_ACKS = (short) -1;

/**
* Generates the bytes for transaction log message key
*
Expand Down Expand Up @@ -103,23 +94,6 @@ public static byte[] valueToBytes(TxnTransitMetadata txnMetadata,
return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
}

/**
* Decodes the transaction log messages' key
*
* @return the transactional id
* @throws IllegalStateException if the version is not a valid transaction log key version
*/
public static String readTxnRecordKey(ByteBuffer buffer) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup! we missed this in #21405

short version = buffer.getShort();
if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) {
return new TransactionLogKey(new ByteBufferAccessor(buffer), (short) 0).transactionalId();
} else {
throw new IllegalStateException("Unknown version " + version + " from the transaction log message key");
}
}



public sealed interface ReadResult permits TxnRecord, TxnTombstone, UnknownKeyVersion, UnknownValueVersion { }

public record TxnRecord(String transactionId, TransactionMetadata metadata) implements ReadResult { }
Expand Down
Loading