Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,44 @@ Connection open(HostConnectionPool pool, int shardId, int serverPort)
}
}

/**
* Same as {@link #open(HostConnectionPool)}, but returns a future instead of blocking.
*
* <p>This avoids blocking the calling thread (e.g. a Netty I/O thread) while waiting for the
* protocol handshake to complete, preventing potential cyclic thread deadlocks.
*/
ListenableFuture<Connection> openAsync(final HostConnectionPool pool) {
return openAsync(pool, -1, 0);
}

/**
* Same as {@link #open(HostConnectionPool, int, int)}, but returns a future instead of
* blocking.
*
* <p>This avoids blocking the calling thread (e.g. a Netty I/O thread) while waiting for the
* protocol handshake to complete, preventing potential cyclic thread deadlocks.
*/
ListenableFuture<Connection> openAsync(
final HostConnectionPool pool, final int shardId, final int serverPort) {
if (isShutdown)
return Futures.immediateFailedFuture(
new ConnectionException(pool.host.getEndPoint(), "Connection factory is shut down"));

pool.host.convictionPolicy.signalConnectionsOpening(1);
final Connection connection =
new Connection(buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool);

return Futures.transformAsync(
connection.initAsync(shardId, serverPort),
new AsyncFunction<Void, Connection>() {
@Override
public ListenableFuture<Connection> apply(Void input) {
return Futures.immediateFuture(connection);
}
},
MoreExecutors.directExecutor());
Comment on lines +1358 to +1366
}

/**
* Creates new connections and associate them to the provided connection pool, but does not
* start them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,39 @@ ListenableFuture<Void> initAsync(Connection reusedConnection) {
if (reusedConnection != null && reusedConnection.setOwner(this)) {
return initAsyncWithConnection(reusedConnection);
}
try {
return initAsyncWithConnection(manager.connectionFactory().open(this));
} catch (Exception e) {
phase.compareAndSet(Phase.INITIALIZING, Phase.INIT_FAILED);
SettableFuture<Void> future = SettableFuture.create();
future.setException(e);
return future;
}

// Use the non-blocking openAsync to avoid a cyclic deadlock. The blocking open() call could
// be executed on a Netty I/O thread (via directExecutor() in SessionManager.initAsync), and
// if Netty's round-robin assigns the new channel to the same blocked I/O thread, the driver
// hangs indefinitely since no timeout task can interrupt it.
ListenableFuture<Void> initFuture =
Futures.transformAsync(
manager.connectionFactory().openAsync(this),
new AsyncFunction<Connection, Void>() {
@Override
public ListenableFuture<Void> apply(Connection conn) {
return initAsyncWithConnection(conn);
}
},
MoreExecutors.directExecutor());

Futures.addCallback(
initFuture,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {}

@Override
public void onFailure(Throwable t) {
// initAsyncWithConnection already sets INIT_FAILED in its own callback for async
// connection failures; this covers the case where openAsync itself fails (e.g.
// factory shutdown, or immediate ECONNREFUSED before the Netty channel is created).
phase.compareAndSet(Phase.INITIALIZING, Phase.INIT_FAILED);
}
},
MoreExecutors.directExecutor());

return initFuture;
}

ListenableFuture<Void> initAsyncWithConnection(Connection reusedConnection) {
Expand Down
Loading