From 75030235e5e4759ac60ad8b0bc563124d11752d8 Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Sat, 30 May 2026 12:32:20 -0500 Subject: [PATCH] KAFKA-18862: Consolidate shared heartbeat request manager tests --- .../AbstractHeartbeatRequestManagerTest.java | 334 ++++++++++++++++++ .../ConsumerHeartbeatRequestManagerTest.java | 306 ++-------------- .../ShareHeartbeatRequestManagerTest.java | 294 +-------------- 3 files changed, 370 insertions(+), 564 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java new file mode 100644 index 0000000000000..e6b2556a033db --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +abstract class AbstractHeartbeatRequestManagerTest { + + protected static final String DEFAULT_GROUP_ID = "groupId"; + protected static final String DEFAULT_MEMBER_ID = "member-id"; + protected static final int DEFAULT_MEMBER_EPOCH = 1; + protected static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; + protected static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; + protected static final long DEFAULT_RETRY_BACKOFF_MS = 80; + protected static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; + protected static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; + + protected Time time; + protected Timer pollTimer; + protected CoordinatorRequestManager coordinatorRequestManager; + protected SubscriptionState subscriptions; + protected BackgroundEventHandler backgroundEventHandler; + protected HeartbeatRequestState heartbeatRequestState; + protected AbstractMembershipManager membershipManager; + protected AbstractHeartbeatRequestManager heartbeatRequestManager; + + protected abstract ClientResponse createHeartbeatResponse( + NetworkClientDelegate.UnsentRequest request, Errors error); + + // --------------------------------------------------------------------------------------- + // Inherited tests - these exercise behavior implemented in AbstractHeartbeatRequestManager + // and therefore must produce the same outcome for every concrete subclass. + // --------------------------------------------------------------------------------------- + + @Test + public void testTimerNotDue() { + time.sleep(100); // before heartbeatInterval, no heartbeat should be sent + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + + // Member in state where it should not send Heartbeat anymore + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); + } + + @Test + public void testHeartbeatOutsideInterval() { + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); + when(membershipManager.shouldHeartbeatNow()).thenReturn(true); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + verify(membershipManager).onHeartbeatRequestGenerated(); + } + + @Test + public void testNoCoordinator() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + assertEquals(0, result.unsentRequests.size()); + } + + /** + * This is expected to be the case where a member is already leaving the group and the + * poll timer expires. The poll timer expiration should not transition the member to + * STALE, and the member should continue to send heartbeats while the ongoing leaving + * operation completes (send heartbeats while waiting for callbacks before leaving, or + * send last heartbeat to leave). + */ + @Test + public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() { + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); + when(membershipManager.isLeavingGroup()).thenReturn(true); + + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + // No transition to leave due to stale member should be triggered, because the member + // is already leaving the group. + verify(membershipManager, never()).transitionToSendingLeaveGroup(anyBoolean()); + + assertEquals(1, result.unsentRequests.size(), "A heartbeat request should be generated to" + + " complete the ongoing leaving operation that was triggered before the poll timer expired."); + } + + @Test + public void testSuccessfulHeartbeatTiming() { + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while interval has not expired"); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, + heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), + "Heartbeat timer was not reset to the interval when the heartbeat request was sent."); + + long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3; + time.sleep(partOfInterval); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while only part of the interval has passed"); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval, + heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), + "Time to next interval was not properly updated."); + + inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal. This can + * happen when the consumer never successfully joined the group (e.g., due to an + * InvalidTopicException during poll() and close() sends a leave heartbeat for a group + * that was never created). + */ + @Test + public void testGroupIdNotFoundExceptionWhileUnsubscribed() { + when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); + when(membershipManager.memberEpoch()).thenReturn(-1); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + verify(membershipManager, never()).transitionToFatal(); + verify(membershipManager).onHeartbeatRequestSkipped(); + verify(backgroundEventHandler, never()).add(any()); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. This would indicate + * the group was unexpectedly deleted while the member was actively participating. + */ + @Test + public void testGroupIdNotFoundWhileStableIsFatal() { + when(membershipManager.state()).thenReturn(MemberState.STABLE); + when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + verify(membershipManager).transitionToFatal(); + verify(backgroundEventHandler).add(any()); + } + + @ParameterizedTest + @MethodSource("errorProvider") + public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { + // Handling errors on the second heartbeat + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Manually completing the response to test error handling + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + ClientResponse response = createHeartbeatResponse( + result.unsentRequests.get(0), + error); + result.unsentRequests.get(0).handler().onComplete(response); + AbstractResponse mockResponse = response.responseBody(); + + assertHeartbeatErrorHandling(error, isFatal, mockResponse); + } + + // --------------------------------------------------------------------------------------- + // Shared assertion helpers reused by inherited tests above. + // --------------------------------------------------------------------------------------- + + protected void assertHeartbeatErrorHandling(final Errors error, + final boolean isFatal, + final AbstractResponse response) { + switch (error) { + case NONE: + verify(membershipManager).onHeartbeatSuccess(response); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + break; + + case COORDINATOR_LOAD_IN_PROGRESS: + verify(backgroundEventHandler, never()).add(any()); + assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS); + break; + + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + verify(backgroundEventHandler, never()).add(any()); + verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong()); + assertNextHeartbeatTiming(0); + break; + case UNKNOWN_MEMBER_ID: + case FENCED_MEMBER_EPOCH: + verify(backgroundEventHandler, never()).add(any()); + assertNextHeartbeatTiming(0); + break; + + case TOPIC_AUTHORIZATION_FAILED: + verify(backgroundEventHandler).add(any(ErrorEvent.class)); + assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS); + verify(membershipManager, never()).transitionToFatal(); + break; + + default: + if (isFatal) { + // Drop the coordinator so the follow-up poll inside ensureFatalError() does + // not produce another heartbeat request. + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + ensureFatalError(error); + } else { + verify(backgroundEventHandler, never()).add(any()); + assertNextHeartbeatTiming(0); + } + break; + } + + if (error != Errors.NONE) { + verify(membershipManager).onHeartbeatFailure(false); + } + + if (!isFatal) { + // Make sure a next heartbeat is sent for all non-fatal errors (to retry or rejoin) + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), + "A follow-up heartbeat should be sent after a non-fatal error " + error); + } + } + + protected void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { + long currentTimeMs = time.milliseconds(); + assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); + if (expectedTimeToNextHeartbeatMs != 0) { + assertFalse(heartbeatRequestState.canSendRequest(currentTimeMs)); + time.sleep(expectedTimeToNextHeartbeatMs); + } + assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); + } + + private void ensureFatalError(Errors expectedError) { + verify(membershipManager).transitionToFatal(); + + ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(), + "The fatal error propagated to the app thread does not match the error received in the heartbeat response."); + + // Ensure no further heartbeat is generated after the fatal error. + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No further heartbeat should be sent after a fatal " + expectedError + " error."); + } + + // error, isFatal + private static Collection errorProvider() { + return Arrays.asList( + Arguments.of(Errors.NONE, false), + Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, false), + Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, false), + Arguments.of(Errors.NOT_COORDINATOR, false), + Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, true), + Arguments.of(Errors.INVALID_REQUEST, true), + Arguments.of(Errors.UNKNOWN_MEMBER_ID, false), + Arguments.of(Errors.FENCED_MEMBER_EPOCH, false), + Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true), + Arguments.of(Errors.UNSUPPORTED_VERSION, true), + Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true), + Arguments.of(Errors.FENCED_INSTANCE_ID, true), + Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true), + Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 62f819dc6005b..03ebf4ae0ba44 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -43,8 +43,6 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.common.utils.internals.LogContext; @@ -57,8 +55,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.InOrder; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -81,7 +77,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; @@ -92,28 +87,19 @@ import static org.mockito.Mockito.when; -public class ConsumerHeartbeatRequestManagerTest { - private static final String DEFAULT_GROUP_ID = "groupId"; +public class ConsumerHeartbeatRequestManagerTest + extends AbstractHeartbeatRequestManagerTest { + private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; - private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; - private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; - private static final long DEFAULT_RETRY_BACKOFF_MS = 80; - private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; - private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; - private static final String DEFAULT_MEMBER_ID = "member-id"; - private static final int DEFAULT_MEMBER_EPOCH = 1; - - private Time time; - private Timer pollTimer; - private CoordinatorRequestManager coordinatorRequestManager; - private SubscriptionState subscriptions; - private Metadata metadata; - private ConsumerHeartbeatRequestManager heartbeatRequestManager; + + // Shadows the base field so subclass-only tests can access ConsumerMembershipManager-typed + // methods (groupInstanceId, rackId, serverAssignor). The subclass setUp() assigns the same + // mock to super.membershipManager so inherited tests see the same instance. private ConsumerMembershipManager membershipManager; - private HeartbeatRequestState heartbeatRequestState; + private ConsumerHeartbeatRequestManager heartbeatRequestManager; + private Metadata metadata; private HeartbeatState heartbeatState; - private BackgroundEventHandler backgroundEventHandler; private LogContext logContext; @BeforeEach @@ -126,7 +112,7 @@ public void setUp() { this.backgroundEventHandler = mock(BackgroundEventHandler.class); this.subscriptions = mock(SubscriptionState.class); this.membershipManager = mock(ConsumerMembershipManager.class); - this.metadata = mock(ConsumerMetadata.class); + super.membershipManager = this.membershipManager; Metrics metrics = new Metrics(time); ConsumerConfig config = mock(ConsumerConfig.class); @@ -149,6 +135,9 @@ public void setUp() { backgroundEventHandler, metrics); + super.heartbeatRequestManager = this.heartbeatRequestManager; + this.metadata = mock(ConsumerMetadata.class); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class))); } @@ -230,34 +219,6 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } - @Test - public void testSuccessfulHeartbeatTiming() { - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size(), - "No heartbeat should be sent while interval has not expired"); - assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); - - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); - NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, - heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), - "Heartbeat timer was not reset to the interval when the heartbeat request was sent."); - - long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3; - time.sleep(partOfInterval); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size(), - "No heartbeat should be sent while only part of the interval has passed"); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval, - heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), - "Time to next interval was not properly updated."); - - inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval); - } - @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { @@ -341,22 +302,6 @@ public void testMaximumTimeToWaitWhenHeartbeatShouldBeSkipped(final boolean isUn } } - @Test - public void testTimerNotDue() { - time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - assertEquals(0, result.unsentRequests.size()); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - - // Member in state where it should not send Heartbeat anymore - when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); - } - @Test public void testHeartbeatNotSentIfAnotherOneInFlight() { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); @@ -386,21 +331,6 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { "waiting only for the minimal backoff."); } - @Test - public void testHeartbeatOutsideInterval() { - when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); - when(membershipManager.shouldHeartbeatNow()).thenReturn(true); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - // Heartbeat should be sent - assertEquals(1, result.unsentRequests.size()); - // Interval timer reset - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - // Membership manager updated (to transition out of the heartbeating state) - verify(membershipManager).onHeartbeatRequestGenerated(); - } - @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 @@ -456,57 +386,6 @@ public void testFailureOnFatalException() { verify(backgroundEventHandler).add(any()); } - /** - * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not a fatal error. - * This can happen when the consumer never successfully joined the group - * (e.g., due to an InvalidTopicException during poll() and close() sends - * a leave heartbeat for a group that was never created. - */ - @Test - public void testGroupIdNotFoundExceptionWhileUnsubscribed() { - // Setup: member is in UNSUBSCRIBED state with epoch -1 - when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); - when(membershipManager.memberEpoch()).thenReturn(-1); - - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Complete the heartbeat with GROUP_ID_NOT_FOUND error - ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); - result.unsentRequests.get(0).handler().onComplete(response); - - // Verify: no fatal error, heartbeat skipped (benign) - verify(membershipManager, never()).transitionToFatal(); - verify(membershipManager).onHeartbeatRequestSkipped(); - verify(backgroundEventHandler, never()).add(any()); - } - - /** - * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. - * This would indicate the group was unexpectedly deleted while the member - * was actively participating. - */ - @Test - public void testGroupIdNotFoundWhileStableIsFatal() { - // Setup: member is in STABLE state with positive epoch - when(membershipManager.state()).thenReturn(MemberState.STABLE); - when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); - - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Complete the heartbeat with GROUP_ID_NOT_FOUND error - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); - result.unsentRequests.get(0).handler().onComplete(response); - - // Verify: fatal error - verify(membershipManager).transitionToFatal(); - verify(backgroundEventHandler).add(any()); - } - @Test public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); @@ -537,16 +416,6 @@ public void testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagate inOrder.verify(membershipManager).onHeartbeatFailure(false); } - @Test - public void testNoCoordinator() { - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - assertEquals(0, result.unsentRequests.size()); - } - @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequest(final short version) { @@ -618,7 +487,9 @@ topicId, mkSortedSet(0) assertEquals(Collections.singletonList(expectedTopicPartitions), heartbeatRequest3.data().topicPartitions()); } - private ConsumerGroupHeartbeatRequest getHeartbeatRequest(ConsumerHeartbeatRequestManager heartbeatRequestManager, final short version) { + private ConsumerGroupHeartbeatRequest getHeartbeatRequest( + AbstractHeartbeatRequestManager heartbeatRequestManager, + final short version) { NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); NetworkClientDelegate.UnsentRequest request = pollResult.unsentRequests.get(0); @@ -626,72 +497,6 @@ private ConsumerGroupHeartbeatRequest getHeartbeatRequest(ConsumerHeartbeatReque return (ConsumerGroupHeartbeatRequest) request.requestBuilder().build(version); } - @ParameterizedTest - @MethodSource("errorProvider") - public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { - // Handling errors on the second heartbeat - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Manually completing the response to test error handling - when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - ClientResponse response = createHeartbeatResponse( - result.unsentRequests.get(0), - error); - result.unsentRequests.get(0).handler().onComplete(response); - ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody(); - - switch (error) { - case NONE: - verify(membershipManager).onHeartbeatSuccess(mockResponse); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); - break; - - case COORDINATOR_LOAD_IN_PROGRESS: - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS); - break; - - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - verify(backgroundEventHandler, never()).add(any()); - verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong()); - assertNextHeartbeatTiming(0); - break; - case UNKNOWN_MEMBER_ID: - case FENCED_MEMBER_EPOCH: - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(0); - break; - case TOPIC_AUTHORIZATION_FAILED: - verify(backgroundEventHandler).add(any(ErrorEvent.class)); - assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS); - verify(membershipManager, never()).transitionToFatal(); - break; - default: - if (isFatal) { - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - ensureFatalError(error); - } else { - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(0); - } - break; - } - - if (error != Errors.NONE) { - verify(membershipManager).onHeartbeatFailure(false); - } - - if (!isFatal) { - // Make sure a next heartbeat is sent for all non-fatal errors (to retry or rejoin) - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - } - } - /** * This validates the UnsupportedApiVersion the client generates while building a HB if: * 1. HB API is not supported. @@ -736,16 +541,6 @@ private void mockResponseWithException(UnsupportedVersionException exception, bo result.unsentRequests.get(0).handler().onComplete(response); } - private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { - long currentTimeMs = time.milliseconds(); - assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); - if (expectedTimeToNextHeartbeatMs != 0) { - assertFalse(heartbeatRequestState.canSendRequest(currentTimeMs)); - time.sleep(expectedTimeToNextHeartbeatMs); - } - assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); - } - @Test public void testHeartbeatState() { mockJoiningMemberData(null); @@ -872,29 +667,6 @@ public void testPollOnLeaving(Optional groupInstanceId, CloseOptions.Gro } - /** - * This is expected to be the case where a member is already leaving the group and the poll - * timer expires. The poll timer expiration should not transition the member to STALE, and - * the member should continue to send heartbeats while the ongoing leaving operation - * completes (send heartbeats while waiting for callbacks before leaving, or send last - * heartbeat to leave). - */ - @Test - public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() { - when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); - when(membershipManager.isLeavingGroup()).thenReturn(true); - - time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - // No transition to leave due to stale member should be triggered, because the member is - // already leaving the group - verify(membershipManager, never()).transitionToSendingLeaveGroup(anyBoolean()); - - assertEquals(1, result.unsentRequests.size(), "A heartbeat request should be generated to" + - " complete the ongoing leaving operation that was triggered before the poll timer expired."); - } - @Test public void testisExpiredByUsedForLogging() { when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); @@ -1115,7 +887,7 @@ public void testRackIdInHeartbeatLifecycle() { assertNull(data.rackId()); } - private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs) { + private void assertHeartbeat(AbstractHeartbeatRequestManager hrm, int nextPollMs) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); assertEquals(nextPollMs, pollResult.timeUntilNextPollMs); @@ -1123,50 +895,14 @@ private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs Errors.NONE)); } - private void assertNoHeartbeat(ConsumerHeartbeatRequestManager hrm) { + private void assertNoHeartbeat(AbstractHeartbeatRequestManager hrm) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(0, pollResult.unsentRequests.size()); } - private void ensureFatalError(Errors expectedError) { - verify(membershipManager).transitionToFatal(); - - final ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); - verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); - ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); - assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(), - "The fatal error propagated to the app thread does not match the error received in the heartbeat response."); - - ensureHeartbeatStopped(); - } - - private void ensureHeartbeatStopped() { - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size()); - } - - // error, isFatal - private static Collection errorProvider() { - return Arrays.asList( - Arguments.of(Errors.NONE, false), - Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, false), - Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, false), - Arguments.of(Errors.NOT_COORDINATOR, false), - Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, true), - Arguments.of(Errors.INVALID_REQUEST, true), - Arguments.of(Errors.UNKNOWN_MEMBER_ID, false), - Arguments.of(Errors.FENCED_MEMBER_EPOCH, false), - Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true), - Arguments.of(Errors.UNSUPPORTED_VERSION, true), - Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true), - Arguments.of(Errors.FENCED_INSTANCE_ID, true), - Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true), - Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false)); - } - - private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request, - Errors error) { + @Override + protected ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request, + Errors error) { return createHeartbeatResponse(request, error, "stubbed error message"); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 201aad088e7d0..26e9e795ffceb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -37,21 +37,15 @@ import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.common.utils.internals.LogContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,14 +57,11 @@ import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG; import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -78,27 +69,18 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ShareHeartbeatRequestManagerTest { - private static final String DEFAULT_GROUP_ID = "groupId"; - private static final String DEFAULT_MEMBER_ID = "member-id"; - private static final int DEFAULT_MEMBER_EPOCH = 1; - private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; - private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; - private static final long DEFAULT_RETRY_BACKOFF_MS = 80; - private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; - private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; +public class ShareHeartbeatRequestManagerTest + extends AbstractHeartbeatRequestManagerTest { + private static final String SHARE_CONSUMER_COORDINATOR_METRICS = "consumer-share-coordinator-metrics"; - private Time time; - private Timer pollTimer; - private CoordinatorRequestManager coordinatorRequestManager; - private SubscriptionState subscriptions; - private Metadata metadata; - private ShareHeartbeatRequestManager heartbeatRequestManager; + // Shadows the base field so subclass-only tests can access ShareMembershipManager-typed + // methods. The subclass setUp() assigns the same mock to super.membershipManager so + // inherited tests see the same instance. private ShareMembershipManager membershipManager; - private HeartbeatRequestState heartbeatRequestState; + private ShareHeartbeatRequestManager heartbeatRequestManager; + private Metadata metadata; private ShareHeartbeatRequestManager.HeartbeatState heartbeatState; - private BackgroundEventHandler backgroundEventHandler; private Metrics metrics; private LogContext logContext; @@ -110,6 +92,7 @@ public void setUp() { subscriptions = mock(SubscriptionState.class); backgroundEventHandler = mock(BackgroundEventHandler.class); membershipManager = mock(ShareMembershipManager.class); + super.membershipManager = membershipManager; heartbeatState = mock(ShareHeartbeatRequestManager.HeartbeatState.class); metadata = mock(ConsumerMetadata.class); metrics = new Metrics(time); @@ -135,6 +118,8 @@ public void setUp() { backgroundEventHandler, metrics); + super.heartbeatRequestManager = this.heartbeatRequestManager; + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); } @@ -182,34 +167,6 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } - @Test - public void testSuccessfulHeartbeatTiming() { - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size(), - "No heartbeat should be sent while interval has not expired"); - assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); - - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); - NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, - heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), - "Heartbeat timer was not reset to the interval when the heartbeat request was sent."); - - long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3; - time.sleep(partOfInterval); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size(), - "No heartbeat should be sent while only part of the interval has passed"); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval, - heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), - "Time to next interval was not properly updated."); - - inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval); - } - @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.SHARE_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { @@ -261,22 +218,6 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { } } - @Test - public void testTimerNotDue() { - time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - assertEquals(0, result.unsentRequests.size()); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - - // Member in state where it should not send Heartbeat anymore - when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); - } - @Test public void testHeartbeatNotSentIfAnotherOneInFlight() { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); @@ -306,21 +247,6 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { "waiting only for the minimal backoff."); } - @Test - public void testHeartbeatOutsideInterval() { - when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); - when(membershipManager.shouldHeartbeatNow()).thenReturn(true); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - // Heartbeat should be sent - assertEquals(1, result.unsentRequests.size()); - // Interval timer reset - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - // Membership manager updated (to transition out of the heartbeating state) - verify(membershipManager).onHeartbeatRequestGenerated(); - } - @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 @@ -355,129 +281,6 @@ public void testFailureOnFatalException() { verify(backgroundEventHandler).add(any()); } - /** - * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal. - * This can happen when the consumer never successfully joined the group - * (e.g., due to an InvalidTopicException during poll() and close() sends - * a leave heartbeat for a group that was never created. - */ - @Test - public void testGroupIdNotFoundExceptionWhileUnsubscribed() { - // Setup: member is in UNSUBSCRIBED state with epoch -1 - when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); - when(membershipManager.memberEpoch()).thenReturn(-1); - - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Complete the heartbeat with GROUP_ID_NOT_FOUND error - ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); - result.unsentRequests.get(0).handler().onComplete(response); - - // Verify: no fatal error, heartbeat skipped (benign) - verify(membershipManager, never()).transitionToFatal(); - verify(membershipManager).onHeartbeatRequestSkipped(); - verify(backgroundEventHandler, never()).add(any()); - } - - /** - * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. - * This would indicate the group was unexpectedly deleted while the member - * was actively participating. - */ - @Test - public void testGroupIdNotFoundWhileStableIsFatal() { - // Setup: member is in STABLE state with positive epoch - when(membershipManager.state()).thenReturn(MemberState.STABLE); - when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); - - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Complete the heartbeat with GROUP_ID_NOT_FOUND error - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); - result.unsentRequests.get(0).handler().onComplete(response); - - // Verify: fatal error - verify(membershipManager).transitionToFatal(); - verify(backgroundEventHandler).add(any()); - } - - @Test - public void testNoCoordinator() { - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); - assertEquals(0, result.unsentRequests.size()); - } - - @ParameterizedTest - @MethodSource("errorProvider") - public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { - // Handling errors on the second heartbeat - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - - // Manually completing the response to test error handling - when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - ClientResponse response = createHeartbeatResponse( - result.unsentRequests.get(0), - error); - result.unsentRequests.get(0).handler().onComplete(response); - ShareGroupHeartbeatResponse mockResponse = (ShareGroupHeartbeatResponse) response.responseBody(); - - switch (error) { - case NONE: - verify(membershipManager).onHeartbeatSuccess(mockResponse); - assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); - break; - - case COORDINATOR_LOAD_IN_PROGRESS: - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS); - break; - - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - verify(backgroundEventHandler, never()).add(any()); - verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong()); - assertNextHeartbeatTiming(0); - break; - - case UNKNOWN_MEMBER_ID: - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(0); - break; - - default: - if (isFatal) { - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - ensureFatalError(error); - } else { - verify(backgroundEventHandler, never()).add(any()); - assertNextHeartbeatTiming(0); - } - break; - } - - if (error != Errors.NONE) { - verify(membershipManager).onHeartbeatFailure(false); - } - - if (!isFatal) { - // Make sure a next heartbeat is sent for all non-fatal errors (to retry or rejoin) - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, result.unsentRequests.size()); - } - } - @ParameterizedTest @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) public void testUnsupportedVersionGeneratedOnTheBroker(String errorMsg) { @@ -612,29 +415,6 @@ public void testPollTimerExpiration() { assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); } - /** - * This is expected to be the case where a member is already leaving the group and the poll - * timer expires. The poll timer expiration should not transition the member to STALE, and - * the member should continue to send heartbeats while the ongoing leaving operation - * completes (send heartbeats while waiting for callbacks before leaving, or send last - * heartbeat to leave). - */ - @Test - public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() { - when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); - when(membershipManager.isLeavingGroup()).thenReturn(true); - - time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - // No transition to leave due to stale member should be triggered, because the member is - // already leaving the group - verify(membershipManager, never()).transitionToSendingLeaveGroup(anyBoolean()); - - assertEquals(1, result.unsentRequests.size(), "A heartbeat request should be generated to" + - " complete the ongoing leaving operation that was triggered before the poll timer expired."); - } - @Test public void testHeartbeatMetrics() { assertNotNull(getMetric("heartbeat-response-time-max")); @@ -660,7 +440,7 @@ public void testHeartbeatMetrics() { assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); } - private void assertHeartbeat(ShareHeartbeatRequestManager hrm, int nextPollMs) { + private void assertHeartbeat(AbstractHeartbeatRequestManager hrm, int nextPollMs) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); assertEquals(nextPollMs, pollResult.timeUntilNextPollMs); @@ -668,57 +448,13 @@ private void assertHeartbeat(ShareHeartbeatRequestManager hrm, int nextPollMs) { Errors.NONE)); } - private void assertNoHeartbeat(ShareHeartbeatRequestManager hrm) { + private void assertNoHeartbeat(AbstractHeartbeatRequestManager hrm) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(0, pollResult.unsentRequests.size()); } - private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { - long currentTimeMs = time.milliseconds(); - assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); - if (expectedTimeToNextHeartbeatMs != 0) { - assertFalse(heartbeatRequestState.canSendRequest(currentTimeMs)); - time.sleep(expectedTimeToNextHeartbeatMs); - } - assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); - } - - private void ensureFatalError(Errors expectedError) { - verify(membershipManager).transitionToFatal(); - - final ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); - verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); - ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); - assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(), - "The fatal error propagated to the app thread does not match the error received in the heartbeat response."); - - ensureHeartbeatStopped(); - } - - private void ensureHeartbeatStopped() { - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size()); - } - - // error, isFatal - private static Collection errorProvider() { - return Arrays.asList( - Arguments.of(Errors.NONE, false), - Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, false), - Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, false), - Arguments.of(Errors.NOT_COORDINATOR, false), - Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, true), - Arguments.of(Errors.INVALID_REQUEST, true), - Arguments.of(Errors.UNKNOWN_MEMBER_ID, false), - Arguments.of(Errors.FENCED_MEMBER_EPOCH, false), - Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true), - Arguments.of(Errors.UNSUPPORTED_VERSION, true), - Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true), - Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true)); - } - - private ClientResponse createHeartbeatResponse( + @Override + protected ClientResponse createHeartbeatResponse( final NetworkClientDelegate.UnsentRequest request, final Errors error) { ShareGroupHeartbeatResponseData data = new ShareGroupHeartbeatResponseData()