diff --git a/Sources/Partout/VirtualTunnelInterface.swift b/Sources/Partout/VirtualTunnelInterface.swift index 68a231ad..0403b507 100644 --- a/Sources/Partout/VirtualTunnelInterface.swift +++ b/Sources/Partout/VirtualTunnelInterface.swift @@ -50,7 +50,7 @@ final class VirtualTunnelInterface: IOInterface, @unchecked Sendable { } deinit { - pp_log(ctx, .core, .info, "Deinit VirtualTunnelInterface") + pp_log(ctx, .core, .debug, "Deinit VirtualTunnelInterface") pp_tun_free(tun) } diff --git a/Sources/PartoutCore/Connection/DummyReachabilityObserver.swift b/Sources/PartoutCore/Connection/DummyReachabilityObserver.swift index 6bff3928..b1ee3294 100644 --- a/Sources/PartoutCore/Connection/DummyReachabilityObserver.swift +++ b/Sources/PartoutCore/Connection/DummyReachabilityObserver.swift @@ -14,6 +14,10 @@ public final class DummyReachabilityObserver: ReachabilityObserver { stream.send(true) } + public func stopObserving() { + stream.finish() + } + public var isReachable: Bool { true } diff --git a/Sources/PartoutCore/Connection/NetworkObserver.swift b/Sources/PartoutCore/Connection/NetworkObserver.swift index 6168b68c..35f6bc77 100644 --- a/Sources/PartoutCore/Connection/NetworkObserver.swift +++ b/Sources/PartoutCore/Connection/NetworkObserver.swift @@ -6,11 +6,15 @@ public final class NetworkObserver: @unchecked Sendable { private let ctx: PartoutLoggerContext + private let reachabilityStream: AsyncStream + + private let statusStream: AsyncStream + private let state: AtomicState private let signalSubject: CurrentValueStream - private let isStatusReady: (ConnectionStatus) -> Bool + private let isStatusReady: @Sendable (ConnectionStatus) -> Bool /// Publishes when the network state is ready for reconnection. public let onReady: CurrentValueStream @@ -28,87 +32,88 @@ public final class NetworkObserver: @unchecked Sendable { _ ctx: PartoutLoggerContext, reachabilityStream: AsyncStream, statusStream: AsyncStream, - isStatusReady: @escaping (ConnectionStatus) -> Bool + isStatusReady: @escaping @Sendable (ConnectionStatus) -> Bool ) { self.ctx = ctx + self.reachabilityStream = reachabilityStream + self.statusStream = statusStream state = AtomicState() signalSubject = CurrentValueStream(false) self.isStatusReady = isStatusReady onReady = CurrentValueStream(false) subscriptions = [] - - observeObjects(reachabilityStream, statusStream) } - public func setEnabled(_ isEnabled: Bool) { - signalSubject.send(isEnabled) - } -} - -private extension NetworkObserver { - func satisfied(by tuple: AtomicState.Tuple) -> Bool { - tuple.signal && tuple.isNetworkAvailable && isStatusReady(tuple.connectionStatus) + deinit { + pp_log(ctx, .core, .debug, "Deinit NetworkObserver") } - func tryReady(_ value: AtomicState.Tuple) { - let isSatisfied = satisfied(by: value) - pp_log(ctx, .core, .info, "NetworkObserver.onReady(\(value.debugDescription)) -> \(isSatisfied)") - guard isSatisfied else { - return - } - onReady.send(true) - } -} - -private extension NetworkObserver { - func observeObjects(_ reachabilityStream: AsyncStream, _ statusStream: AsyncStream) { + public func startObserving() { let signalSubscription = Task { [weak self] in - guard let self else { - return - } + guard let self else { return } for await signal in signalSubject.subscribe() { guard !Task.isCancelled else { pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.signalSubject") - return + break } guard let newState = await state.setSignal(signal) else { continue } tryReady(newState) } + pp_log(ctx, .core, .debug, "NetworkObserver.signalSubject terminated") } let reachabilitySubscription = Task { [weak self] in - guard let self else { - return - } + guard let self else { return } for await isNetworkAvailable in reachabilityStream { guard !Task.isCancelled else { pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.reachabilityStream") - return + break } guard let newState = await state.setIsNetworkAvailable(isNetworkAvailable) else { continue } tryReady(newState) } + pp_log(ctx, .core, .debug, "NetworkObserver.reachabilityStream terminated") } let statusSubscription = Task { [weak self] in - guard let self else { - return - } + guard let self else { return } for await connectionStatus in statusStream { guard !Task.isCancelled else { pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.statusStream") - return + break } guard let newState = await state.setConnectionStatus(connectionStatus) else { continue } tryReady(newState) } + pp_log(ctx, .core, .debug, "NetworkObserver.statusStream terminated") } subscriptions = [signalSubscription, reachabilitySubscription, statusSubscription] } + + public func stopObserving() { + signalSubject.finish() + } + + public func setEnabled(_ isEnabled: Bool) { + signalSubject.send(isEnabled) + } +} + +private extension NetworkObserver { + func satisfied(by tuple: AtomicState.Tuple) -> Bool { + tuple.signal && tuple.isNetworkAvailable && isStatusReady(tuple.connectionStatus) + } + + func tryReady(_ value: AtomicState.Tuple) { + let isSatisfied = satisfied(by: value) + pp_log(ctx, .core, .info, "NetworkObserver.onReady(\(value.debugDescription)) -> \(isSatisfied)") + guard isSatisfied else { return } + onReady.send(true) + } } // MARK: - AtomicState diff --git a/Sources/PartoutCore/Connection/POSIXBlockingSocket.swift b/Sources/PartoutCore/Connection/POSIXBlockingSocket.swift index f4316734..16bd8974 100644 --- a/Sources/PartoutCore/Connection/POSIXBlockingSocket.swift +++ b/Sources/PartoutCore/Connection/POSIXBlockingSocket.swift @@ -90,7 +90,7 @@ public actor POSIXBlockingSocket: SocketIOInterface, @unchecked Sendable { } deinit { - pp_log(ctx, .core, .info, "Deinit POSIXBlockingSocket") + pp_log(ctx, .core, .debug, "Deinit POSIXBlockingSocket") pp_socket_free(sock) } diff --git a/Sources/PartoutCore/Connection/ReachabilityObserver.swift b/Sources/PartoutCore/Connection/ReachabilityObserver.swift index 6f46d651..284cdc0b 100644 --- a/Sources/PartoutCore/Connection/ReachabilityObserver.swift +++ b/Sources/PartoutCore/Connection/ReachabilityObserver.swift @@ -7,6 +7,9 @@ public protocol ReachabilityObserver: AnyObject, Sendable { /// Starts observing network events. func startObserving() + /// Stops observing network events. + func stopObserving() + /// True if the network is currently reachable. var isReachable: Bool { get } diff --git a/Sources/PartoutCore/Connection/SimpleConnectionDaemon.swift b/Sources/PartoutCore/Connection/SimpleConnectionDaemon.swift index 94237a0f..a5723eec 100644 --- a/Sources/PartoutCore/Connection/SimpleConnectionDaemon.swift +++ b/Sources/PartoutCore/Connection/SimpleConnectionDaemon.swift @@ -40,6 +40,8 @@ public actor SimpleConnectionDaemon: ConnectionDaemon { private var state: State + private let statusSubject: CurrentValueStream + private var isEvaluatingConnection: Bool private var onHold: Bool @@ -73,6 +75,7 @@ public actor SimpleConnectionDaemon: ConnectionDaemon { onStatus = params.onStatus state = .initial + statusSubject = CurrentValueStream(.disconnected) isEvaluatingConnection = false onHold = false @@ -112,7 +115,7 @@ public actor SimpleConnectionDaemon: ConnectionDaemon { } deinit { - pp_log_id(profile.id, .core, .info, "Deinit daemon") + pp_log_id(profile.id, .core, .debug, "Deinit SimpleConnectionDaemon") } public func start() async throws { @@ -160,10 +163,6 @@ public actor SimpleConnectionDaemon: ConnectionDaemon { } public func stop() async { - await stop(cleanUp: true) - } - - func stop(cleanUp: Bool) async { guard state != .stopped else { assertionFailure("Daemon is stopped") return @@ -195,12 +194,17 @@ public actor SimpleConnectionDaemon: ConnectionDaemon { // Make sure to clear environment on stop, especially last error code clearEnvironment() - if cleanUp { - networkObserver = nil - connection = nil - } + + // Clean up + reachability.stopObserving() + networkObserver?.stopObserving() + networkObserver = nil + // NetworkObserver won't deinit until the connected + // connection stream finishes + connection = nil pp_log_id(profile.id, .core, .notice, "Daemon stopped successfully") + reportStatus(.disconnected) } public func sendMessage(_ input: Message.Input) async throws -> Message.Output? { @@ -224,8 +228,8 @@ private extension SimpleConnectionDaemon { // MARK: - Observation extension SimpleConnectionDaemon { - var statusStream: AsyncStream? { - connection?.statusStream.ignoreErrors() + nonisolated var statusStream: AsyncStream { + statusSubject.subscribe() } func observeEvents() { @@ -260,20 +264,24 @@ extension SimpleConnectionDaemon { } // Observe the network for starting the connection + networkSubscription?.cancel() networkSubscription = Task { [weak self] in guard let self else { return } + pp_log_id(profile.id, .core, .debug, "Network subscription started") for await isReady in onNetworkReadyStream { guard isReady else { continue } guard !Task.isCancelled else { pp_log_id(profile.id, .core, .debug, "Cancelled NetworkObserver.onReady") - return + break } pp_log_id(profile.id, .core, .notice, "Network is ready, start connection") await evaluateConnection() } + pp_log_id(profile.id, .core, .debug, "Network subscription terminated") } // Start monitoring + networkObserver.startObserving() reachability.startObserving() } @@ -369,13 +377,18 @@ extension SimpleConnectionDaemon { controller.setReasserting(false) resumeNetworkObserver(after: reconnectionDelay) } - onStatus?(profile.id, connectionStatus) + reportStatus(connectionStatus) } func onConnectionError(_ error: Error) { environment.setEnvironmentValue(PartoutError(error).code, forKey: TunnelEnvironmentKeys.lastErrorCode) controller.setReasserting(false) } + + func reportStatus(_ status: ConnectionStatus) { + statusSubject.send(status) + onStatus?(profile.id, status) + } } // MARK: - Parameters diff --git a/Sources/PartoutOS/AppleNE/AppExtension/NEObservablePath.swift b/Sources/PartoutOS/AppleNE/AppExtension/NEObservablePath.swift index a41cce95..095390ef 100644 --- a/Sources/PartoutOS/AppleNE/AppExtension/NEObservablePath.swift +++ b/Sources/PartoutOS/AppleNE/AppExtension/NEObservablePath.swift @@ -36,6 +36,9 @@ public final class NEObservablePath: ReachabilityObserver { } monitor.start(queue: monitorQueue) } + + public func stopObserving() { + } } extension NEObservablePath { diff --git a/Sources/PartoutOS/AppleNE/AppExtension/NEPTPForwarder.swift b/Sources/PartoutOS/AppleNE/AppExtension/NEPTPForwarder.swift index c2384497..6f0ab7b4 100644 --- a/Sources/PartoutOS/AppleNE/AppExtension/NEPTPForwarder.swift +++ b/Sources/PartoutOS/AppleNE/AppExtension/NEPTPForwarder.swift @@ -60,7 +60,7 @@ public actor NEPTPForwarder { } deinit { - pp_log(ctx, .os, .info, "Deinit PTP") + pp_log(ctx, .os, .debug, "Deinit PTP") } public func startTunnel(options: [String: NSObject]?) async throws { diff --git a/Sources/PartoutOpenVPNConnection/OpenVPNConnection.swift b/Sources/PartoutOpenVPNConnection/OpenVPNConnection.swift index 7c9fcac5..f0fcdfc7 100644 --- a/Sources/PartoutOpenVPNConnection/OpenVPNConnection.swift +++ b/Sources/PartoutOpenVPNConnection/OpenVPNConnection.swift @@ -70,7 +70,7 @@ public actor OpenVPNConnection { } deinit { - pp_log(ctx, .openvpn, .info, "Deinit OpenVPNConnection") + pp_log(ctx, .openvpn, .debug, "Deinit OpenVPNConnection") } } diff --git a/Sources/PartoutWireGuardConnection/Internal/WireGuardAdapter.swift b/Sources/PartoutWireGuardConnection/Internal/WireGuardAdapter.swift index 7356c865..1ddbc459 100644 --- a/Sources/PartoutWireGuardConnection/Internal/WireGuardAdapter.swift +++ b/Sources/PartoutWireGuardConnection/Internal/WireGuardAdapter.swift @@ -100,7 +100,7 @@ actor WireGuardAdapter { } deinit { - pp_log(ctx, .wireguard, .info, "Deinit WireGuardAdapter") + pp_log(ctx, .wireguard, .debug, "Deinit WireGuardAdapter") // Force remove logger to make sure that no further calls to the instance of this class // can happen after deallocation. diff --git a/Sources/PartoutWireGuardConnection/WireGuardConnection.swift b/Sources/PartoutWireGuardConnection/WireGuardConnection.swift index bfc2c12c..5ac98455 100644 --- a/Sources/PartoutWireGuardConnection/WireGuardConnection.swift +++ b/Sources/PartoutWireGuardConnection/WireGuardConnection.swift @@ -59,7 +59,7 @@ public actor WireGuardConnection: Connection { } deinit { - pp_log(ctx, .wireguard, .info, "Deinit WireGuardConnection") + pp_log(ctx, .wireguard, .debug, "Deinit WireGuardConnection") } public nonisolated var statusStream: AsyncThrowingStream { diff --git a/Tests/PartoutCoreTests/NetworkObserverTests.swift b/Tests/PartoutCoreTests/NetworkObserverTests.swift index a8d13437..d3e0fa46 100644 --- a/Tests/PartoutCoreTests/NetworkObserverTests.swift +++ b/Tests/PartoutCoreTests/NetworkObserverTests.swift @@ -28,6 +28,7 @@ struct NetworkObserverTests { } } + sut.startObserving() reachability.send(true) status.send(.connecting) try await observerExpectation.fulfillment(timeout: 200) @@ -61,6 +62,7 @@ struct NetworkObserverTests { } } + sut.startObserving() reachability.send(true) status.send(.connecting) status.send(.connected) @@ -85,6 +87,7 @@ struct NetworkObserverTests { ) let readyStream = sut.onReady.subscribe() + sut.startObserving() sut.setEnabled(true) status.send(.disconnected) reachability.send(true) // 1 diff --git a/Tests/PartoutCoreTests/SimpleConnectionDaemonTests.swift b/Tests/PartoutCoreTests/SimpleConnectionDaemonTests.swift index 43287c47..42ac4fa2 100644 --- a/Tests/PartoutCoreTests/SimpleConnectionDaemonTests.swift +++ b/Tests/PartoutCoreTests/SimpleConnectionDaemonTests.swift @@ -34,7 +34,7 @@ struct SimpleConnectionDaemonTests { let reachability = MockReachabilityObserver() let sut = try await newDaemon(with: profile, reachability: reachability) - let stream = try #require(await sut.statusStream) + let stream = sut.statusStream try await sut.start() #expect(await stream.nextElement() == .disconnected) @@ -96,7 +96,7 @@ struct SimpleConnectionDaemonTests { reachability: reachability, reconnectionDelay: 100 ) - let stream = try #require(await sut.statusStream) + let stream = sut.statusStream let expAvailable = Expectation() await sut.setTestEvaluateConnection { @@ -131,14 +131,14 @@ struct SimpleConnectionDaemonTests { stopDelay: 100, reconnectionDelay: 5000 ) - let stream = try #require(await sut.statusStream) + let stream = sut.statusStream try await sut.start() #expect(await stream.nextElement() == .disconnected) #expect(await stream.nextElement() == .connecting) #expect(await stream.nextElement() == .connected) reachability.isReachable = false - await sut.stop(cleanUp: false) + await sut.stop() #expect(await stream.nextElement() == .disconnected) } @@ -158,14 +158,14 @@ struct SimpleConnectionDaemonTests { reachability: reachability, stopDelay: 200 ) - let stream = try #require(await sut.statusStream) + let stream = sut.statusStream try await sut.start() #expect(await stream.nextElement() == .disconnected) #expect(await stream.nextElement() == .connecting) #expect(await stream.nextElement() == .connected) reachability.isReachable = false - await sut.stop(cleanUp: false) + await sut.stop() #expect(await stream.nextElement() == .disconnected) } } @@ -216,6 +216,9 @@ private final class MockReachabilityObserver: ReachabilityObserver, @unchecked S isReachable = true } + func stopObserving() { + } + var isReachable: Bool = false { didSet { isReachableSubject.send(isReachable) diff --git a/Tests/PartoutOpenVPNTests/OpenVPNConnectionTests.swift b/Tests/PartoutOpenVPNTests/OpenVPNConnectionTests.swift index 29e28101..af3aca1b 100644 --- a/Tests/PartoutOpenVPNTests/OpenVPNConnectionTests.swift +++ b/Tests/PartoutOpenVPNTests/OpenVPNConnectionTests.swift @@ -404,8 +404,9 @@ private final class MockOpenVPNSession: OpenVPNSessionProtocol, @unchecked Senda } final class MockReachabilityObserver: ReachabilityObserver { - func startObserving() { - } + func startObserving() {} + + func stopObserving() {} let isReachable = true