From 67ad05eeaf4987a9fe121e276465ce189765ed58 Mon Sep 17 00:00:00 2001 From: NathanFallet Date: Sat, 23 May 2026 12:44:01 +0200 Subject: [PATCH] Fix connection lifecycle: fail in-flight commands on disconnect, dedupe connect, close clients Three connection-ownership issues: - Disconnect/close hung in-flight commands (ISSUE-3). When the WebSocket dropped or close() was called, the receive loop just ended; any callCommand parked on its response deferred waited forever. The loop now fails all pending requests (new ConnectionClosedException) on both normal completion of incoming() and on error, and close() does the same, so callers observe a failure instead of hanging. The receive-loop error is now logged (logger.error) rather than printStackTrace'd. - connect() was unsynchronized (ISSUE-4). Concurrent first commands could each pass the isActive check and open a duplicate session, leaking the extra socket and listener. connect() is now guarded by a double-checked connectMutex, and the prepareExpert/prepareHeadless block by a separate prepareMutex (distinct mutex avoids re-entrancy: prepare* issue ONE_SHOT commands that skip that block). - HttpClients were never closed (ISSUE-9). KtorWebSocketTransport.close() now closes its engine-backed client, and DefaultBrowser.stop() closes the HTTP-API client, so create/stop cycles no longer leak threads/connection pools. Verified red->green with ConnectionLifecycleTest (fake transport): callCommand fails with ConnectionClosedException on disconnect and on close, and concurrent first commands open the transport exactly once. All three fail against the unfixed code and pass after. ISSUE-9 is resource cleanup, verified by inspection plus the real-browser create/stop suite. Full :core:jvmTest (real Chrome) + :opentelemetry:jvmTest pass; macOS native tests pass; Linux/MinGW compile. --- .../kdriver/core/browser/DefaultBrowser.kt | 3 + .../core/connection/DefaultConnection.kt | 43 +++++- .../core/connection/KtorWebSocketTransport.kt | 3 + .../exceptions/ConnectionClosedException.kt | 12 ++ .../connection/ConnectionLifecycleTest.kt | 122 ++++++++++++++++++ 5 files changed, 177 insertions(+), 6 deletions(-) create mode 100644 core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt create mode 100644 core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt b/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt index 0bb4b0d58..1e2b0c750 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt @@ -298,6 +298,9 @@ open class DefaultBrowser( logger.debug("Closing connection...") connection?.close() connection = null + logger.debug("Closing HTTP API client...") + http?.close() + http = null logger.debug("Canceling coroutine scope...") coroutineScope.cancel() logger.info("Browser process stopped") diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt b/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt index 42a8bdbfd..f2165af33 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt @@ -4,6 +4,7 @@ import dev.kdriver.cdp.* import dev.kdriver.cdp.domain.* import dev.kdriver.core.browser.Browser import dev.kdriver.core.browser.Config.Defaults +import dev.kdriver.core.exceptions.ConnectionClosedException import io.ktor.util.logging.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -34,6 +35,9 @@ open class DefaultConnection( private var socketSubscription: Job? = null + private val connectMutex = Mutex() + private val prepareMutex = Mutex() + private val currentIdMutex = Mutex() private var currentId = 0L @@ -63,8 +67,13 @@ open class DefaultConnection( private suspend fun connect() { if (transport.isActive) return - transport.connect() - startListening() + // Guard so concurrent first commands don't each open a session (which would leak the + // duplicate sockets/listeners). Double-checked: skip the lock once connected (ISSUE-4). + connectMutex.withLock { + if (transport.isActive) return@withLock + transport.connect() + startListening() + } } private fun startListening() { @@ -86,22 +95,42 @@ open class DefaultConnection( logger.debug("WebSocket exception while receiving message: {}", e) } } + // incoming() completed without error => the socket was closed. Fail any in-flight + // commands so their callers observe the disconnect instead of hanging (ISSUE-3). + failPendingRequests(ConnectionClosedException()) } catch (e: CancellationException) { throw e } catch (e: Exception) { - e.printStackTrace() - // Handle disconnect, maybe trigger reconnect logic here + logger.error("WebSocket receive loop terminated: {}", e) + failPendingRequests(ConnectionClosedException(cause = e)) } } } + /** + * Completes every in-flight request waiter exceptionally and clears the registry, so callers + * parked in [callCommand] observe a failure rather than hanging when the connection goes away. + */ + private suspend fun failPendingRequests(cause: Throwable) { + val pending = pendingRequestsMutex.withLock { + val snapshot = pendingRequests.values.toList() + pendingRequests.clear() + snapshot + } + pending.forEach { it.completeExceptionally(cause) } + } + @InternalCdpApi override suspend fun callCommand(method: String, parameter: JsonElement?, mode: CommandMode): JsonElement? { connect() if (mode == CommandMode.DEFAULT) owner?.let { browser -> - if (browser.config.expert) prepareExpert() - if (browser.config.headless) prepareHeadless() + // Serialize preparation so concurrent first commands run it once, not N times (ISSUE-4). + // prepare* issue ONE_SHOT commands, which skip this block, so prepareMutex isn't re-entered. + prepareMutex.withLock { + if (browser.config.expert) prepareExpert() + if (browser.config.headless) prepareHeadless() + } } val requestId = currentIdMutex.withLock { currentId++ } @@ -128,6 +157,8 @@ open class DefaultConnection( transport.close() socketSubscription?.cancel() socketSubscription = null + // Fail any commands still awaiting a reply that will now never come (ISSUE-3). + failPendingRequests(ConnectionClosedException("Connection closed")) } override suspend fun updateTarget() { diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt b/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt index 2c87facb0..5670f145b 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt @@ -53,6 +53,9 @@ class KtorWebSocketTransport( override suspend fun close() { session?.close() session = null + // Close the engine-backed client too, otherwise its threads/connection pool leak across + // browser create/stop cycles (ISSUE-9). + client.close() } private fun parseWebSocketUrl(url: String): WebSocketInfo { diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt b/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt new file mode 100644 index 000000000..a2b8bfb6e --- /dev/null +++ b/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt @@ -0,0 +1,12 @@ +package dev.kdriver.core.exceptions + +/** + * Thrown to a pending CDP command when the underlying WebSocket connection is closed (gracefully or + * by a disconnect) before its response is received. + * + * This lets callers observe a failure instead of hanging forever on a reply that will never arrive. + */ +class ConnectionClosedException( + message: String = "The connection to the browser was closed before a response was received", + cause: Throwable? = null, +) : RuntimeException(message, cause) diff --git a/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt b/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt new file mode 100644 index 000000000..40dbccb8c --- /dev/null +++ b/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt @@ -0,0 +1,122 @@ +package dev.kdriver.core.connection + +import dev.kdriver.cdp.CommandMode +import dev.kdriver.core.exceptions.ConnectionClosedException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs + +/** + * Lifecycle behavior of [DefaultConnection]: failing in-flight commands on disconnect/close + * (ISSUE-3) and not opening duplicate sessions under concurrent first commands (ISSUE-4). + */ +class ConnectionLifecycleTest { + + private class FakeTransport( + /** When set, [connect] awaits this before marking the connection active. */ + private val connectGate: CompletableDeferred? = null, + ) : WebSocketTransport { + private val channel = Channel(Channel.UNLIMITED) + override var isActive: Boolean = false + private set + var connectCount: Int = 0 + private set + + override suspend fun connect() { + connectCount++ + connectGate?.await() + isActive = true + } + + override suspend fun send(message: String) { + // No-op: the test controls when/whether a response or disconnect happens. + } + + override fun incoming(): Flow = channel.receiveAsFlow() + + /** Simulate the socket dropping (no graceful close()). */ + fun simulateDisconnect() = channel.close() + + override suspend fun close() { + isActive = false + channel.close() + } + } + + private class TestConnection( + scope: CoroutineScope, + private val transport: FakeTransport, + ) : DefaultConnection("ws://stub/devtools/page/stub", scope) { + override fun createTransport(): WebSocketTransport = transport + } + + @Test + fun callCommand_failsWithConnectionClosed_onDisconnect() = runTest(UnconfinedTestDispatcher()) { + val transport = FakeTransport() + val connection = TestConnection(this, transport) + + // Sends, then parks awaiting a reply that never comes. Capture the outcome here rather than + // via async{}.await(), whose exception would also propagate to (and fail) the test scope. + val outcome = CompletableDeferred() + launch { + try { + connection.callCommand("Some.method", null, CommandMode.ONE_SHOT) + } catch (e: Throwable) { + outcome.complete(e) + } + } + + // The socket drops with the command still in flight. + transport.simulateDisconnect() + + assertIs(withTimeout(2_000) { outcome.await() }) + } + + @Test + fun callCommand_failsWithConnectionClosed_onClose() = runTest(UnconfinedTestDispatcher()) { + val transport = FakeTransport() + val connection = TestConnection(this, transport) + + val outcome = CompletableDeferred() + launch { + try { + connection.callCommand("Some.method", null, CommandMode.ONE_SHOT) + } catch (e: Throwable) { + outcome.complete(e) + } + } + + connection.close() + + assertIs(withTimeout(2_000) { outcome.await() }) + } + + @Test + fun connect_opensTransportOnce_underConcurrentFirstCommands() = runTest(UnconfinedTestDispatcher()) { + val gate = CompletableDeferred() + val transport = FakeTransport(connectGate = gate) + val connection = TestConnection(this, transport) + + // Two first commands race into connect() while it's still in progress (isActive == false). + val c1 = launch { runCatching { connection.callCommand("A.x", null, CommandMode.ONE_SHOT) } } + val c2 = launch { runCatching { connection.callCommand("B.y", null, CommandMode.ONE_SHOT) } } + + // Let the first connect complete; the second must reuse it, not open a new session. + gate.complete(Unit) + + assertEquals(1, transport.connectCount, "connect() must open the transport exactly once") + + c1.cancel() + c2.cancel() + connection.close() + } +}