Skip to content
2 changes: 1 addition & 1 deletion Sources/Partout/VirtualTunnelInterface.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class VirtualTunnelInterface: IOInterface, @unchecked Sendable {
}

deinit {
pp_log(ctx, .core, .info, "Deinit VirtualTunnelInterface")
pp_log(ctx, .core, .debug, "Deinit VirtualTunnelInterface")
pp_tun_free(tun)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public final class DummyReachabilityObserver: ReachabilityObserver {
stream.send(true)
}

public func stopObserving() {
stream.finish()
}

public var isReachable: Bool {
true
}
Expand Down
77 changes: 41 additions & 36 deletions Sources/PartoutCore/Connection/NetworkObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
public final class NetworkObserver: @unchecked Sendable {
private let ctx: PartoutLoggerContext

private let reachabilityStream: AsyncStream<Bool>

private let statusStream: AsyncStream<ConnectionStatus>

private let state: AtomicState

private let signalSubject: CurrentValueStream<Bool>

private let isStatusReady: (ConnectionStatus) -> Bool
private let isStatusReady: @Sendable (ConnectionStatus) -> Bool

/// Publishes when the network state is ready for reconnection.
public let onReady: CurrentValueStream<Bool>
Expand All @@ -28,87 +32,88 @@ public final class NetworkObserver: @unchecked Sendable {
_ ctx: PartoutLoggerContext,
reachabilityStream: AsyncStream<Bool>,
statusStream: AsyncStream<ConnectionStatus>,
isStatusReady: @escaping (ConnectionStatus) -> Bool
isStatusReady: @escaping @Sendable (ConnectionStatus) -> Bool
) {
self.ctx = ctx
self.reachabilityStream = reachabilityStream
self.statusStream = statusStream
state = AtomicState()
signalSubject = CurrentValueStream(false)
self.isStatusReady = isStatusReady
onReady = CurrentValueStream(false)
subscriptions = []

observeObjects(reachabilityStream, statusStream)
}

public func setEnabled(_ isEnabled: Bool) {
signalSubject.send(isEnabled)
}
}

private extension NetworkObserver {
func satisfied(by tuple: AtomicState.Tuple) -> Bool {
tuple.signal && tuple.isNetworkAvailable && isStatusReady(tuple.connectionStatus)
deinit {
pp_log(ctx, .core, .debug, "Deinit NetworkObserver")
}

func tryReady(_ value: AtomicState.Tuple) {
let isSatisfied = satisfied(by: value)
pp_log(ctx, .core, .info, "NetworkObserver.onReady(\(value.debugDescription)) -> \(isSatisfied)")
guard isSatisfied else {
return
}
onReady.send(true)
}
}

private extension NetworkObserver {
func observeObjects(_ reachabilityStream: AsyncStream<Bool>, _ statusStream: AsyncStream<ConnectionStatus>) {
public func startObserving() {
let signalSubscription = Task { [weak self] in
guard let self else {
return
}
guard let self else { return }
for await signal in signalSubject.subscribe() {
guard !Task.isCancelled else {
pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.signalSubject")
return
break
}
guard let newState = await state.setSignal(signal) else {
continue
}
tryReady(newState)
}
pp_log(ctx, .core, .debug, "NetworkObserver.signalSubject terminated")
}
let reachabilitySubscription = Task { [weak self] in
guard let self else {
return
}
guard let self else { return }
for await isNetworkAvailable in reachabilityStream {
guard !Task.isCancelled else {
pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.reachabilityStream")
return
break
}
guard let newState = await state.setIsNetworkAvailable(isNetworkAvailable) else {
continue
}
tryReady(newState)
}
pp_log(ctx, .core, .debug, "NetworkObserver.reachabilityStream terminated")
}
let statusSubscription = Task { [weak self] in
guard let self else {
return
}
guard let self else { return }
for await connectionStatus in statusStream {
guard !Task.isCancelled else {
pp_log(ctx, .core, .debug, "Cancelled NetworkObserver.statusStream")
return
break
}
guard let newState = await state.setConnectionStatus(connectionStatus) else {
continue
}
tryReady(newState)
}
pp_log(ctx, .core, .debug, "NetworkObserver.statusStream terminated")
}
subscriptions = [signalSubscription, reachabilitySubscription, statusSubscription]
}

public func stopObserving() {
signalSubject.finish()
}

public func setEnabled(_ isEnabled: Bool) {
signalSubject.send(isEnabled)
}
}

private extension NetworkObserver {
func satisfied(by tuple: AtomicState.Tuple) -> Bool {
tuple.signal && tuple.isNetworkAvailable && isStatusReady(tuple.connectionStatus)
}

func tryReady(_ value: AtomicState.Tuple) {
let isSatisfied = satisfied(by: value)
pp_log(ctx, .core, .info, "NetworkObserver.onReady(\(value.debugDescription)) -> \(isSatisfied)")
guard isSatisfied else { return }
onReady.send(true)
}
}

// MARK: - AtomicState
Expand Down
2 changes: 1 addition & 1 deletion Sources/PartoutCore/Connection/POSIXBlockingSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public actor POSIXBlockingSocket: SocketIOInterface, @unchecked Sendable {
}

deinit {
pp_log(ctx, .core, .info, "Deinit POSIXBlockingSocket")
pp_log(ctx, .core, .debug, "Deinit POSIXBlockingSocket")
pp_socket_free(sock)
}

Expand Down
3 changes: 3 additions & 0 deletions Sources/PartoutCore/Connection/ReachabilityObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ public protocol ReachabilityObserver: AnyObject, Sendable {
/// Starts observing network events.
func startObserving()

/// Stops observing network events.
func stopObserving()

/// True if the network is currently reachable.
var isReachable: Bool { get }

Expand Down
39 changes: 26 additions & 13 deletions Sources/PartoutCore/Connection/SimpleConnectionDaemon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public actor SimpleConnectionDaemon: ConnectionDaemon {

private var state: State

private let statusSubject: CurrentValueStream<ConnectionStatus>

private var isEvaluatingConnection: Bool

private var onHold: Bool
Expand Down Expand Up @@ -73,6 +75,7 @@ public actor SimpleConnectionDaemon: ConnectionDaemon {
onStatus = params.onStatus

state = .initial
statusSubject = CurrentValueStream(.disconnected)
isEvaluatingConnection = false
onHold = false

Expand Down Expand Up @@ -112,7 +115,7 @@ public actor SimpleConnectionDaemon: ConnectionDaemon {
}

deinit {
pp_log_id(profile.id, .core, .info, "Deinit daemon")
pp_log_id(profile.id, .core, .debug, "Deinit SimpleConnectionDaemon")
}

public func start() async throws {
Expand Down Expand Up @@ -160,10 +163,6 @@ public actor SimpleConnectionDaemon: ConnectionDaemon {
}

public func stop() async {
await stop(cleanUp: true)
}

func stop(cleanUp: Bool) async {
guard state != .stopped else {
assertionFailure("Daemon is stopped")
return
Expand Down Expand Up @@ -195,12 +194,17 @@ public actor SimpleConnectionDaemon: ConnectionDaemon {

// Make sure to clear environment on stop, especially last error code
clearEnvironment()
if cleanUp {
networkObserver = nil
connection = nil
}

// Clean up
reachability.stopObserving()
networkObserver?.stopObserving()
networkObserver = nil
// NetworkObserver won't deinit until the connected
// connection stream finishes
connection = nil

pp_log_id(profile.id, .core, .notice, "Daemon stopped successfully")
reportStatus(.disconnected)
}

public func sendMessage(_ input: Message.Input) async throws -> Message.Output? {
Expand All @@ -224,8 +228,8 @@ private extension SimpleConnectionDaemon {
// MARK: - Observation

extension SimpleConnectionDaemon {
var statusStream: AsyncStream<ConnectionStatus>? {
connection?.statusStream.ignoreErrors()
nonisolated var statusStream: AsyncStream<ConnectionStatus> {
statusSubject.subscribe()
}

func observeEvents() {
Expand Down Expand Up @@ -260,20 +264,24 @@ extension SimpleConnectionDaemon {
}

// Observe the network for starting the connection
networkSubscription?.cancel()
networkSubscription = Task { [weak self] in
guard let self else { return }
pp_log_id(profile.id, .core, .debug, "Network subscription started")
for await isReady in onNetworkReadyStream {
guard isReady else { continue }
guard !Task.isCancelled else {
pp_log_id(profile.id, .core, .debug, "Cancelled NetworkObserver.onReady")
return
break
}
pp_log_id(profile.id, .core, .notice, "Network is ready, start connection")
await evaluateConnection()
}
pp_log_id(profile.id, .core, .debug, "Network subscription terminated")
}

// Start monitoring
networkObserver.startObserving()
reachability.startObserving()
}

Expand Down Expand Up @@ -369,13 +377,18 @@ extension SimpleConnectionDaemon {
controller.setReasserting(false)
resumeNetworkObserver(after: reconnectionDelay)
}
onStatus?(profile.id, connectionStatus)
reportStatus(connectionStatus)
}

func onConnectionError(_ error: Error) {
environment.setEnvironmentValue(PartoutError(error).code, forKey: TunnelEnvironmentKeys.lastErrorCode)
controller.setReasserting(false)
}

func reportStatus(_ status: ConnectionStatus) {
statusSubject.send(status)
onStatus?(profile.id, status)
}
}

// MARK: - Parameters
Expand Down
3 changes: 3 additions & 0 deletions Sources/PartoutOS/AppleNE/AppExtension/NEObservablePath.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public final class NEObservablePath: ReachabilityObserver {
}
monitor.start(queue: monitorQueue)
}

public func stopObserving() {
}
}

extension NEObservablePath {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public actor NEPTPForwarder {
}

deinit {
pp_log(ctx, .os, .info, "Deinit PTP")
pp_log(ctx, .os, .debug, "Deinit PTP")
}

public func startTunnel(options: [String: NSObject]?) async throws {
Expand Down
2 changes: 1 addition & 1 deletion Sources/PartoutOpenVPNConnection/OpenVPNConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public actor OpenVPNConnection {
}

deinit {
pp_log(ctx, .openvpn, .info, "Deinit OpenVPNConnection")
pp_log(ctx, .openvpn, .debug, "Deinit OpenVPNConnection")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ actor WireGuardAdapter {
}

deinit {
pp_log(ctx, .wireguard, .info, "Deinit WireGuardAdapter")
pp_log(ctx, .wireguard, .debug, "Deinit WireGuardAdapter")

// Force remove logger to make sure that no further calls to the instance of this class
// can happen after deallocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public actor WireGuardConnection: Connection {
}

deinit {
pp_log(ctx, .wireguard, .info, "Deinit WireGuardConnection")
pp_log(ctx, .wireguard, .debug, "Deinit WireGuardConnection")
}

public nonisolated var statusStream: AsyncThrowingStream<ConnectionStatus, Error> {
Expand Down
3 changes: 3 additions & 0 deletions Tests/PartoutCoreTests/NetworkObserverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct NetworkObserverTests {
}
}

sut.startObserving()
reachability.send(true)
status.send(.connecting)
try await observerExpectation.fulfillment(timeout: 200)
Expand Down Expand Up @@ -61,6 +62,7 @@ struct NetworkObserverTests {
}
}

sut.startObserving()
reachability.send(true)
status.send(.connecting)
status.send(.connected)
Expand All @@ -85,6 +87,7 @@ struct NetworkObserverTests {
)
let readyStream = sut.onReady.subscribe()

sut.startObserving()
sut.setEnabled(true)
status.send(.disconnected)
reachability.send(true) // 1
Expand Down
Loading