diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt b/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt index 0bb4b0d58..1e2b0c750 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/browser/DefaultBrowser.kt @@ -298,6 +298,9 @@ open class DefaultBrowser( logger.debug("Closing connection...") connection?.close() connection = null + logger.debug("Closing HTTP API client...") + http?.close() + http = null logger.debug("Canceling coroutine scope...") coroutineScope.cancel() logger.info("Browser process stopped") diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt b/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt index 42a8bdbfd..f2165af33 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/connection/DefaultConnection.kt @@ -4,6 +4,7 @@ import dev.kdriver.cdp.* import dev.kdriver.cdp.domain.* import dev.kdriver.core.browser.Browser import dev.kdriver.core.browser.Config.Defaults +import dev.kdriver.core.exceptions.ConnectionClosedException import io.ktor.util.logging.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -34,6 +35,9 @@ open class DefaultConnection( private var socketSubscription: Job? = null + private val connectMutex = Mutex() + private val prepareMutex = Mutex() + private val currentIdMutex = Mutex() private var currentId = 0L @@ -63,8 +67,13 @@ open class DefaultConnection( private suspend fun connect() { if (transport.isActive) return - transport.connect() - startListening() + // Guard so concurrent first commands don't each open a session (which would leak the + // duplicate sockets/listeners). Double-checked: skip the lock once connected (ISSUE-4). + connectMutex.withLock { + if (transport.isActive) return@withLock + transport.connect() + startListening() + } } private fun startListening() { @@ -86,22 +95,42 @@ open class DefaultConnection( logger.debug("WebSocket exception while receiving message: {}", e) } } + // incoming() completed without error => the socket was closed. Fail any in-flight + // commands so their callers observe the disconnect instead of hanging (ISSUE-3). + failPendingRequests(ConnectionClosedException()) } catch (e: CancellationException) { throw e } catch (e: Exception) { - e.printStackTrace() - // Handle disconnect, maybe trigger reconnect logic here + logger.error("WebSocket receive loop terminated: {}", e) + failPendingRequests(ConnectionClosedException(cause = e)) } } } + /** + * Completes every in-flight request waiter exceptionally and clears the registry, so callers + * parked in [callCommand] observe a failure rather than hanging when the connection goes away. + */ + private suspend fun failPendingRequests(cause: Throwable) { + val pending = pendingRequestsMutex.withLock { + val snapshot = pendingRequests.values.toList() + pendingRequests.clear() + snapshot + } + pending.forEach { it.completeExceptionally(cause) } + } + @InternalCdpApi override suspend fun callCommand(method: String, parameter: JsonElement?, mode: CommandMode): JsonElement? { connect() if (mode == CommandMode.DEFAULT) owner?.let { browser -> - if (browser.config.expert) prepareExpert() - if (browser.config.headless) prepareHeadless() + // Serialize preparation so concurrent first commands run it once, not N times (ISSUE-4). + // prepare* issue ONE_SHOT commands, which skip this block, so prepareMutex isn't re-entered. + prepareMutex.withLock { + if (browser.config.expert) prepareExpert() + if (browser.config.headless) prepareHeadless() + } } val requestId = currentIdMutex.withLock { currentId++ } @@ -128,6 +157,8 @@ open class DefaultConnection( transport.close() socketSubscription?.cancel() socketSubscription = null + // Fail any commands still awaiting a reply that will now never come (ISSUE-3). + failPendingRequests(ConnectionClosedException("Connection closed")) } override suspend fun updateTarget() { diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt b/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt index 2c87facb0..5670f145b 100644 --- a/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt +++ b/core/src/commonMain/kotlin/dev/kdriver/core/connection/KtorWebSocketTransport.kt @@ -53,6 +53,9 @@ class KtorWebSocketTransport( override suspend fun close() { session?.close() session = null + // Close the engine-backed client too, otherwise its threads/connection pool leak across + // browser create/stop cycles (ISSUE-9). + client.close() } private fun parseWebSocketUrl(url: String): WebSocketInfo { diff --git a/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt b/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt new file mode 100644 index 000000000..a2b8bfb6e --- /dev/null +++ b/core/src/commonMain/kotlin/dev/kdriver/core/exceptions/ConnectionClosedException.kt @@ -0,0 +1,12 @@ +package dev.kdriver.core.exceptions + +/** + * Thrown to a pending CDP command when the underlying WebSocket connection is closed (gracefully or + * by a disconnect) before its response is received. + * + * This lets callers observe a failure instead of hanging forever on a reply that will never arrive. + */ +class ConnectionClosedException( + message: String = "The connection to the browser was closed before a response was received", + cause: Throwable? = null, +) : RuntimeException(message, cause) diff --git a/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt b/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt new file mode 100644 index 000000000..40dbccb8c --- /dev/null +++ b/core/src/jvmTest/kotlin/dev/kdriver/core/connection/ConnectionLifecycleTest.kt @@ -0,0 +1,122 @@ +package dev.kdriver.core.connection + +import dev.kdriver.cdp.CommandMode +import dev.kdriver.core.exceptions.ConnectionClosedException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs + +/** + * Lifecycle behavior of [DefaultConnection]: failing in-flight commands on disconnect/close + * (ISSUE-3) and not opening duplicate sessions under concurrent first commands (ISSUE-4). + */ +class ConnectionLifecycleTest { + + private class FakeTransport( + /** When set, [connect] awaits this before marking the connection active. */ + private val connectGate: CompletableDeferred? = null, + ) : WebSocketTransport { + private val channel = Channel(Channel.UNLIMITED) + override var isActive: Boolean = false + private set + var connectCount: Int = 0 + private set + + override suspend fun connect() { + connectCount++ + connectGate?.await() + isActive = true + } + + override suspend fun send(message: String) { + // No-op: the test controls when/whether a response or disconnect happens. + } + + override fun incoming(): Flow = channel.receiveAsFlow() + + /** Simulate the socket dropping (no graceful close()). */ + fun simulateDisconnect() = channel.close() + + override suspend fun close() { + isActive = false + channel.close() + } + } + + private class TestConnection( + scope: CoroutineScope, + private val transport: FakeTransport, + ) : DefaultConnection("ws://stub/devtools/page/stub", scope) { + override fun createTransport(): WebSocketTransport = transport + } + + @Test + fun callCommand_failsWithConnectionClosed_onDisconnect() = runTest(UnconfinedTestDispatcher()) { + val transport = FakeTransport() + val connection = TestConnection(this, transport) + + // Sends, then parks awaiting a reply that never comes. Capture the outcome here rather than + // via async{}.await(), whose exception would also propagate to (and fail) the test scope. + val outcome = CompletableDeferred() + launch { + try { + connection.callCommand("Some.method", null, CommandMode.ONE_SHOT) + } catch (e: Throwable) { + outcome.complete(e) + } + } + + // The socket drops with the command still in flight. + transport.simulateDisconnect() + + assertIs(withTimeout(2_000) { outcome.await() }) + } + + @Test + fun callCommand_failsWithConnectionClosed_onClose() = runTest(UnconfinedTestDispatcher()) { + val transport = FakeTransport() + val connection = TestConnection(this, transport) + + val outcome = CompletableDeferred() + launch { + try { + connection.callCommand("Some.method", null, CommandMode.ONE_SHOT) + } catch (e: Throwable) { + outcome.complete(e) + } + } + + connection.close() + + assertIs(withTimeout(2_000) { outcome.await() }) + } + + @Test + fun connect_opensTransportOnce_underConcurrentFirstCommands() = runTest(UnconfinedTestDispatcher()) { + val gate = CompletableDeferred() + val transport = FakeTransport(connectGate = gate) + val connection = TestConnection(this, transport) + + // Two first commands race into connect() while it's still in progress (isActive == false). + val c1 = launch { runCatching { connection.callCommand("A.x", null, CommandMode.ONE_SHOT) } } + val c2 = launch { runCatching { connection.callCommand("B.y", null, CommandMode.ONE_SHOT) } } + + // Let the first connect complete; the second must reuse it, not open a new session. + gate.complete(Unit) + + assertEquals(1, transport.connectCount, "connect() must open the transport exactly once") + + c1.cancel() + c2.cancel() + connection.close() + } +}