MINOR: Don't enforce rebalance under the Streams group protocol#22450
Open
lucasbru wants to merge 1 commit into
Open
MINOR: Don't enforce rebalance under the Streams group protocol#22450lucasbru wants to merge 1 commit into
lucasbru wants to merge 1 commit into
Conversation
StreamThread enforces a consumer rebalance to propagate two kinds of events to the rest of the group via the client-side assignor: an application shutdown request (maybeSendShutdown) and the recovery of a corrupted active task under EOS (the TaskCorruptedException handler, KAFKA-12486, which lets the assignor temporarily move the task to a standby while this client restores its state). Under the Streams group protocol (KIP-1071) rebalances are driven by the broker. The shutdown request is propagated through the group heartbeat, and task recovery is local (the task is closed dirty, revived and re-initialized with its input offsets reset) with any HA failover handled broker-side. In both cases the client-side enforceRebalance does nothing except make AsyncKafkaConsumer log "Operation not supported in new consumer group protocol" - on every run-loop iteration during shutdown. Skip the enforceRebalance calls when running under the Streams group protocol. The classic-protocol behavior is unchanged. The probing/followup rebalance call site is left untouched: it is only reachable when the client-side StreamsPartitionAssignor schedules it, which never happens under the Streams group protocol.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR prevents Kafka Streams from calling Consumer.enforceRebalance(...) when running under the Streams group protocol (KIP-1071), since rebalances are broker-driven in that protocol and the call is unsupported (and noisy) for AsyncKafkaConsumer.
Changes:
- Guard the “task corrupted” rebalance enforcement so it only occurs under the classic group protocol.
- Guard the “shutdown requested” rebalance enforcement so it only occurs under the classic group protocol.
- Add unit tests verifying
enforceRebalanceis skipped under the Streams group protocol while classic-protocol behavior remains unchanged.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java | Skip enforceRebalance for shutdown and task-corruption paths when streamsRebalanceData is present (Streams protocol). |
| streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java | Add/extend tests to assert enforceRebalance is not invoked under Streams protocol and is still invoked under classic protocol where expected. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When a Kafka Streams application requests a shutdown of the whole
application, or when an active task is found to be corrupted under
exactly-once,
StreamThreadenforces a consumer rebalance. In theclassic protocol both of these need a rebalance because the work is
propagated to the rest of the group through the client-side assignor: a
shutdown request is encoded into the assignment so every member learns
it should stop, and a corrupted-task recovery (KAFKA-12486) lets the
high-availability assignor temporarily move the task to a standby while
the affected client restores its state from scratch.
Under the Streams group protocol (KIP-1071) rebalances are driven by the
broker, so neither of these relies on a client-enforced rebalance
anymore. The shutdown request is propagated through the Streams group
heartbeat, and corrupted-task recovery is entirely local: the task is
closed dirty, revived, and re-initialized with its input offsets reset,
after which any high-availability failover is the broker's
responsibility. In both cases the call to
Consumer.enforceRebalancedoes nothing except make the
AsyncKafkaConsumerlog "Operation notsupported in new consumer group protocol". During shutdown this is
logged on every run-loop iteration, which is what surfaced the issue.
This change skips the
enforceRebalancecalls when the thread isrunning under the Streams group protocol, leaving the classic-protocol
behavior untouched. The third
enforceRebalancecall site, theprobing/followup rebalance, is intentionally left alone because it is
only reachable when the client-side
StreamsPartitionAssignorschedulesa followup rebalance, which never happens under the Streams group
protocol.
Added unit tests covering both guarded call sites under the Streams
group protocol, and verified the existing classic-protocol behavior is
unchanged.