KAFKA-19193 Support rack-aware partitioning for Kafka producer#19850
Conversation
1af9189 to
564e60f
Compare
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
Is there any chance this feature will be included in the next release? |
|
@marcospassos I'm waiting for 4.1 release to be cut, after that I'll ping the mailing list for review. |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
| private final String rack; | ||
|
|
||
| private volatile PartitionLoadStats partitionLoadStats = null; | ||
| private volatile PartitionLoadStatsHolder partitionLoadStats = null; |
There was a problem hiding this comment.
| private volatile PartitionLoadStatsHolder partitionLoadStats = null; | |
| private volatile PartitionLoadStatsHolder partitionLoadStatsHolder = null; |
As it got a bit confusing when later the code is retrieving partitionLoadStats.inThisRack which is a member of PartitionLoadStatsHolder of type PartitionLoadStats. :D
|
|
||
| // Create partitions with "sticky" batch size to accommodate 3 records. | ||
| BuiltInPartitioner builtInPartitionerA = new SequentialPartitioner(logContext, TOPIC_A, 3); | ||
| BuiltInPartitioner builtInPartitionerA = new SequentialPartitioner(logContext, TOPIC_A, 3, false, ""); |
There was a problem hiding this comment.
| BuiltInPartitioner builtInPartitionerA = new SequentialPartitioner(logContext, TOPIC_A, 3, false, ""); | |
| final boolean rackAware = false; | |
| final String clientRackId = ""; | |
| BuiltInPartitioner builtInPartitionerA = new SequentialPartitioner(logContext, TOPIC_A, 3, rackAware, clientRackId); |
|
|
||
| // Check that switching works even when there is one partition. | ||
| BuiltInPartitioner builtInPartitionerB = new SequentialPartitioner(logContext, TOPIC_B, 1); | ||
| BuiltInPartitioner builtInPartitionerB = new SequentialPartitioner(logContext, TOPIC_B, 1, false, ""); |
There was a problem hiding this comment.
| BuiltInPartitioner builtInPartitionerB = new SequentialPartitioner(logContext, TOPIC_B, 1, false, ""); | |
| BuiltInPartitioner builtInPartitionerB = new SequentialPartitioner(logContext, TOPIC_B, 1, rackAware, clientRackId); |
Just to make the meaning of false and "" obvious at a glance :)
| partitionRacks[i] = leader.rack(); | ||
| } | ||
| allPartitions.add(new PartitionInfo(TOPIC_A, i, leader, NODES, NODES)); | ||
| expectedFrequencies[i] = 6 - queueSizes[i]; // 6 is max(queueSizes) + 1 |
|
@kirktrue sure, I will review the PR soon! |
|
For some reason, I cannot reopen the PR (the button isn't even there). Maybe someone else can? |
564e60f to
6816a21
Compare
|
Thank you @LiamClarkeNZ, I addressed your comments |
1c6c786 to
520eeed
Compare
520eeed to
1a47d94
Compare
| this.rack = rack; | ||
|
|
||
| if (rackAware && Utils.isBlank(rack)) { | ||
| throw new IllegalArgumentException("client.rack must be provided if partitioner.rack.aware is enabled"); |
There was a problem hiding this comment.
The KIP states:
Select the next partition from all partitions following the current algorithm in the following cases:
- if the client rack is not specified;
- if there are no partitions with leaders in the same rack as the client;
- if all the “nearby” partitions are unavailable.
- if partitioner.rack.aware is false.
Based on this, is it correct to throw an exception here?
There was a problem hiding this comment.
I think this is OK, for the sake of the user. This exception will be thrown at the producer constructor time, preventing inconsistent config from silently sneaking in.
There was a problem hiding this comment.
Should we use ConfigException instead for consistency?
1a47d94 to
7404aa9
Compare
According to KIP-1123, this commit adds the support for rack-aware partitioning to `BuiltInPartitioner`. It comes with two new configs for the producer: `partitioner.rack.aware` and `client.rack`, which allows enabling the new behavior. Apart from the added unit tests, the desired behavior was validated by `kafka-producer-perf-test.sh` with an existing and a non-existing rack against a 4 node cluster with two racks and 12-partition topic: ```shell ./kafka_2.13-4.1.0-SNAPSHOT/bin/kafka-producer-perf-test.sh \ --topic test-topic --num-records 100000 --throughput -1 --record-size 1 \ --producer-props bootstrap.servers=127.0.0.1:9092 \ client.rack=rack0 partitioner.rack.aware=true ```
7404aa9 to
e8965db
Compare
viktorsomogyi
left a comment
There was a problem hiding this comment.
LGTM, let's merge it when the CI is green.
|
The failed tests are marked as flaky and pass locally |
chia7712
left a comment
There was a problem hiding this comment.
@ivanyu thanks for driving this patch! I left a few minor suggestions for you. On a related note, have you considered adding a metric like rack-aware-fallback-rate? If it's out of scope for your KIP, I'd gladly draft a follow-up KIP to add it. A silent fallback might lead to an unpleasant surprise for users when they see their cloud networking bills!
| this.rack = rack; | ||
|
|
||
| if (rackAware && Utils.isBlank(rack)) { | ||
| throw new IllegalArgumentException("client.rack must be provided if partitioner.rack.aware is enabled"); |
There was a problem hiding this comment.
Should we use ConfigException instead for consistency?
| // Invert and fold the queue size, so that they become separator values in the CFT. | ||
| invertAndFoldQueueSizeArray(queueSizes, maxSizePlus1, length); | ||
|
|
||
| log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}", |
There was a problem hiding this comment.
Should we include the rack-specific load stats (partitionLoadStatsInThisRack) in this trace log when rackAware is true?
|
Hi @chia7712! Thanks for your feedback. This KIP will land only in Kafka 4.4.0 (due to 4.3.0 KIP freeze already in effect). I'll implement what you suggest by then, including the proposed observability KIP. |
| * @param stickyBatchSize How much to produce to partition before switch | ||
| */ | ||
| public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) { | ||
| public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize, boolean rackAware, String rack) { |
There was a problem hiding this comment.
Could we add the new params in the javadoc?
As a follow-up to comments in apache#19850
According to
KIP-1123,
this commit adds the support for rack-aware partitioning to
BuiltInPartitioner. It comes with two new configs for the producer:partitioner.rack.awareandclient.rack, which allows enabling thenew behavior.
Apart from the added unit tests, the desired behavior was tested by
kafka-producer-perf-test.shwith an existing and a non-existing rackagainst a 4 node cluster with two racks and 12-partition topic:
Reviewers: Viktor Somogyi-Vass viktorsomogyi@gmail.com, Liam
Clarke-Hutchinson liam@steelsky.co.nz