diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java index 3a1ae7d3119..57ba3cd945d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java @@ -23,16 +23,19 @@ import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Supplier; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.junit.AfterClass; @@ -108,9 +111,27 @@ public void testParallelConnectionBackoff() throws Exception { Future futureConnB = executor.submit(() -> DriverManager.getConnection(jdbcUrl, PROPERTIES)); - // The previous call of connection creation should fill the queue by half. - waitFor(() -> !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(0) - && !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(1), 100, 5000); + // PHOENIX-7859: Poll actual queue state, not the hasCapacity() composite — the multi-step + // calculation (size/capacity < threshold) had a race window. We now check queue.size() + // directly, then verify hasCapacity() matches expectations. + // Note: queueSize >= 1 triggers !hasCapacity() because HA_MAX_QUEUE_SIZE=2 and + // HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD=0.5, so 1/2 = 0.5 which is NOT < 0.5. + waitFor(() -> { + List services = + PhoenixHAExecutorServiceProvider.get(PROPERTIES); + int queueSize1 = ((ThreadPoolExecutor) services.get(0).getExecutorService()).getQueue().size(); + int queueSize2 = ((ThreadPoolExecutor) services.get(1).getExecutorService()).getQueue().size(); + + LOG.debug("Waiting for queues to fill: cluster1 queue={}, cluster2 queue={}", + queueSize1, queueSize2); + + return queueSize1 >= 1 && queueSize2 >= 1; + }, 100, 5000); + + // Verify that hasCapacity() now correctly reports no capacity + List capacity = PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES); + assertFalse("Cluster 1 should have no capacity after queues filled", capacity.get(0).booleanValue()); + assertFalse("Cluster 2 should have no capacity after queues filled", capacity.get(1).booleanValue()); // This should be backed off now, as the capacity is not available. Connection connC = DriverManager.getConnection(jdbcUrl, PROPERTIES);