From d03e345bc094c3c1816829b489ea56796f386081 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Tue, 2 Jun 2026 21:31:54 +0200 Subject: [PATCH 1/2] fix: eliminate blocking .get() in HostConnectionPool.initAsync to prevent Netty I/O thread deadlock During session initialization, updateCreatedPools() can execute directly on a Netty I/O thread due to directExecutor() chaining in SessionManager.initAsync(). If a dead node hasn't been marked DOWN yet (triggerOnDown is async on the executor), updateCreatedPools() attempts to create a pool for it. The first connection in the pool was established via Connection.Factory.open(), which calls connection.initAsync().get() -- a synchronous blocking wait. If Netty's round-robin channel assignment lands the new channel on the same blocked I/O thread, the connection can never complete (even connect timeouts are queued on the blocked thread), causing a permanent cyclic deadlock that hangs cluster.connect() indefinitely. Fix: add Connection.Factory.openAsync() variants that return ListenableFuture instead of blocking. Rewrite HostConnectionPool.initAsync() to use openAsync() and chain initAsyncWithConnection() through Futures.transformAsync(), so the calling thread is never blocked during the initial connection setup. Reported via: https://github.com/scylladb/scylla-dtest/pull/7046 --- .../com/datastax/driver/core/Connection.java | 38 +++++++++++++++++ .../driver/core/HostConnectionPool.java | 41 +++++++++++++++---- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 3c7cc6acf9e..3075a37f229 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -1328,6 +1328,44 @@ Connection open(HostConnectionPool pool, int shardId, int serverPort) } } + /** + * Same as {@link #open(HostConnectionPool)}, but returns a future instead of blocking. + * + *

This avoids blocking the calling thread (e.g. a Netty I/O thread) while waiting for the + * protocol handshake to complete, preventing potential cyclic thread deadlocks. + */ + ListenableFuture openAsync(final HostConnectionPool pool) { + return openAsync(pool, -1, 0); + } + + /** + * Same as {@link #open(HostConnectionPool, int, int)}, but returns a future instead of + * blocking. + * + *

This avoids blocking the calling thread (e.g. a Netty I/O thread) while waiting for the + * protocol handshake to complete, preventing potential cyclic thread deadlocks. + */ + ListenableFuture openAsync( + final HostConnectionPool pool, final int shardId, final int serverPort) { + if (isShutdown) + return Futures.immediateFailedFuture( + new ConnectionException(pool.host.getEndPoint(), "Connection factory is shut down")); + + pool.host.convictionPolicy.signalConnectionsOpening(1); + final Connection connection = + new Connection(buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool); + + return Futures.transformAsync( + connection.initAsync(shardId, serverPort), + new AsyncFunction() { + @Override + public ListenableFuture apply(Void input) { + return Futures.immediateFuture(connection); + } + }, + MoreExecutors.directExecutor()); + } + /** * Creates new connections and associate them to the provided connection pool, but does not * start them. diff --git a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java index e5a00e9c2a9..dc6bc375157 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java +++ b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java @@ -276,14 +276,39 @@ ListenableFuture initAsync(Connection reusedConnection) { if (reusedConnection != null && reusedConnection.setOwner(this)) { return initAsyncWithConnection(reusedConnection); } - try { - return initAsyncWithConnection(manager.connectionFactory().open(this)); - } catch (Exception e) { - phase.compareAndSet(Phase.INITIALIZING, Phase.INIT_FAILED); - SettableFuture future = SettableFuture.create(); - future.setException(e); - return future; - } + + // Use the non-blocking openAsync to avoid a cyclic deadlock. The blocking open() call could + // be executed on a Netty I/O thread (via directExecutor() in SessionManager.initAsync), and + // if Netty's round-robin assigns the new channel to the same blocked I/O thread, the driver + // hangs indefinitely since no timeout task can interrupt it. + ListenableFuture initFuture = + Futures.transformAsync( + manager.connectionFactory().openAsync(this), + new AsyncFunction() { + @Override + public ListenableFuture apply(Connection conn) { + return initAsyncWithConnection(conn); + } + }, + MoreExecutors.directExecutor()); + + Futures.addCallback( + initFuture, + new FutureCallback() { + @Override + public void onSuccess(Void v) {} + + @Override + public void onFailure(Throwable t) { + // initAsyncWithConnection already sets INIT_FAILED in its own callback for async + // connection failures; this covers the case where openAsync itself fails (e.g. + // factory shutdown, or immediate ECONNREFUSED before the Netty channel is created). + phase.compareAndSet(Phase.INITIALIZING, Phase.INIT_FAILED); + } + }, + MoreExecutors.directExecutor()); + + return initFuture; } ListenableFuture initAsyncWithConnection(Connection reusedConnection) { From 36c2472bbb4d3f49c59713ce18defb674a5dcd83 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Wed, 3 Jun 2026 12:44:25 +0200 Subject: [PATCH 2/2] test: regression test for Netty I/O thread deadlock in HostConnectionPool.initAsync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a test that verifies initAsync() returns a future promptly without blocking the calling thread, even when the underlying openAsync() never completes (simulating a dead node that won't fail fast, e.g. firewalled). The test submits initAsync() to an actual Netty I/O thread — the exact thread context in which the original deadlock occurred — and asserts the returned future is received within 2 seconds. If someone reverts the fix back to the blocking open().get(), the submitted.get(2, SECONDS) call will time out and the test will fail (actually deadlock permanently because the timeout task itself is queued on the blocked I/O thread). --- .../driver/core/HostConnectionPoolTest.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java index 88964995887..e5867ebcb09 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java @@ -61,6 +61,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -68,6 +69,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -320,6 +322,70 @@ public void should_close_uninitialized_pool_without_npe() throws Exception { } } + /** + * Ensures that {@link HostConnectionPool#initAsync(Connection)} does not block the calling thread + * while establishing the first connection for a new pool. + * + *

Previously, {@code initAsync} called the synchronous {@code Connection.Factory.open()}, + * which internally performed an unbounded {@code .get()} on the connection handshake future. This + * could cause a permanent cyclic deadlock when the call was made on a Netty I/O thread (via + * {@code directExecutor()} chaining in {@code SessionManager.initAsync()}): if Netty's + * round-robin assigned the new channel to the same blocked I/O thread, neither the connection nor + * any timeout task queued on that thread could ever complete. + * + *

The fix replaces the blocking {@code open()} with the non-blocking {@code openAsync()}, so + * the calling thread is never stalled waiting for the protocol handshake. + * + * @test_category connection:connection_pool + * @see scylla-dtest#7046 + */ + @Test(groups = "short") + public void initAsync_should_not_block_calling_thread() throws Exception { + Cluster cluster = createClusterBuilder().build(); + try { + Session session = cluster.connect(); + Host host = TestUtils.findHost(cluster, 1); + SessionManager sessionManager = (SessionManager) session; + + // Spy on the factory and stub openAsync() to return a future that never completes, + // simulating a connection attempt to a node that does not respond (e.g. firewalled). + Connection.Factory factory = spy(cluster.manager.connectionFactory); + cluster.manager.connectionFactory = factory; + final SettableFuture neverCompletingFuture = SettableFuture.create(); + doReturn(neverCompletingFuture).when(factory).openAsync(any(HostConnectionPool.class)); + + final HostConnectionPool pool = + new HostConnectionPool(host, HostDistance.LOCAL, sessionManager); + + // Submit initAsync() to the actual Netty I/O thread, reproducing the exact thread context + // of the original bug: SessionManager.initAsync() chains updateCreatedPools() via + // directExecutor(), which runs on whatever thread completes the last pool-creation future — + // potentially a Netty I/O thread. + ExecutorService ioThread = + (ExecutorService) cluster.manager.connectionFactory.eventLoopGroup.next(); + java.util.concurrent.Future> submitted = + ioThread.submit( + new Callable>() { + @Override + public ListenableFuture call() { + return pool.initAsync(null); + } + }); + + // initAsync() must return a future promptly without blocking the I/O thread. + // With the old blocking open().get() this would time out here — and then deadlock + // permanently because the timeout task itself is queued on the same blocked thread. + ListenableFuture initFuture = submitted.get(2, TimeUnit.SECONDS); + + // The returned future should still be pending because openAsync() never completed. + assertThat(initFuture.isDone()).isFalse(); + + pool.closeAsync().force(); + } finally { + cluster.close(); + } + } + /** * Validates that if the keyspace tied to the Session's pool state is different than the keyspace * on the connection being used in dequeue that {@link Connection#setKeyspaceAsync(String)} is set