Correct way to use NWConnection for long-running TCP socket
Asked Answered
C

3

11

I've been fighting with NWConnection to receive data on a long-running TCP socket all day. I've finally got it working after inflicting the following errors on myself due to lack of documentation:

  1. Incomplete data (due to only calling receive once)
  2. Getting TCP data out-of-order (due to "polling" receive from a timer...resulting in multiple simultaneous closures waiting to get data).
  3. Suffering infinite loops (due to restarting receive after receiving without checking the "isComplete" Bool--once the socket is terminated from the other end this is....bad...very bad).

Summary of what I've learned:

  1. Once you are in the .ready state you can call receive...once and only once
  2. Once you receive some data, you can call receive again...but only if you are still in the .ready state and the isComplete is false.

Here's my code. I think this is right. But if it's wrong please let me know:

    queue = DispatchQueue(label: "hostname", attributes: .concurrent)
    let serverEndpoint = NWEndpoint.Host(hostname)
    guard let portEndpoint = NWEndpoint.Port(rawValue: port) else { return nil }
    connection = NWConnection(host: serverEndpoint, port: portEndpoint, using: .tcp)
    connection.stateUpdateHandler = { [weak self] (newState) in
        switch newState {
        case .ready:
            debugPrint("TcpReader.ready to send")
            self?.receive()
        case .failed(let error):
            debugPrint("TcpReader.client failed with error \(error)")
        case .setup:
            debugPrint("TcpReader.setup")
        case .waiting(_):
            debugPrint("TcpReader.waiting")
        case .preparing:
            debugPrint("TcpReader.preparing")
        case .cancelled:
            debugPrint("TcpReader.cancelled")
        }
    }

func receive() {  
    connection.receive(minimumIncompleteLength: 1, maximumLength: 8192) { (content, context, isComplete, error) in
        debugPrint("\(Date()) TcpReader: got a message \(String(describing: content?.count)) bytes")
        if let content = content {
            self.delegate.gotData(data: content, from: self.hostname, port: self.port)
        }
        if self.connection.state == .ready && isComplete == false {
            self.receive()
        }
    }
}
Consubstantiation answered 4/3, 2019 at 4:12 Comment(8)
I wish I found this post this morning. I am struggling with the issue that if i send multiple bits of data using connection.send then connection receive gets the data combined together. Should I be handling this as something that just happens on the network or should i be throttling my sends or should i send in a different manner?Inflated
I can’t answer that, but it sounds like an excellent question, especially if you include your code. I’d love to see more NWconnection code samples.Consubstantiation
So, it tunes out I was kinda using this in the wrong way (AFAIK). I was seeing the connection as a pipe you open and then put things into continually. When i instead looked at it as an NWConnection is a thing used to send a single thing and is then closed after that single thing, everything started working correctly.Inflated
If you want to connect many times you can handle newConnectionHandler and restart NWListener and NWConnection on the server.Overdose
Timer is unneeded. You should handle NWConnection.receiveMessage to get messages and call receiveNextMessage() to get next.Overdose
@Inflated if you need to send big data you can use NWProtocolFramer.Overdose
Not sure you want .concurrentPatnode
@Inflated did you find a way to keep the connection open?Fluctuate
O
6

I think you can use a short time connection many times. For example a client connects to the host and asks the host to do something and then tells the host to shutdown the connection. The host switches to the waiting mode to ready a new connection. See the diagram below.

You should have the connection timer to shutdown opened connection when the client don't send the close connection or answer event to the host for a particular time.

Overdose answered 21/12, 2019 at 18:58 Comment(1)
Great answer, this helped fix my issue where I couldn't reconnect to the host. Thank you!Doublet
C
3

On a long-running TCP socket, you should implement customized heartbeat for monitor the connection status is alive or disconnected.

The heartbeat can as message or encrypt data to send, usually according to the server spec documents to implement.

Below as sample concept code to explain the flow for reference (without network packet content handler).

I can no guarantee is this common and correct way, but that's work for my project.

import Network

class NetworkService {

    lazy var heartbeatTimeoutTask: DispatchWorkItem = {
        return DispatchWorkItem { self.handleHeartbeatTimeOut() }
    }()

    lazy var connection: NWConnection = {
        // Create the connection
        let connection = NWConnection(host: "x.x.x.x", port: 1234, using: self.parames)
        connection.stateUpdateHandler = self.listenStateUpdate(to:)
        return connection
    }()
    
    lazy var parames: NWParameters = {
        let parames = NWParameters(tls: nil, tcp: self.tcpOptions)
        if let isOption = parames.defaultProtocolStack.internetProtocol as? NWProtocolIP.Options {
            isOption.version = .v4
        }
        parames.preferNoProxies = true
        parames.expiredDNSBehavior = .allow
        parames.multipathServiceType = .interactive
        parames.serviceClass = .background
        return parames
    }()
    
    lazy var tcpOptions: NWProtocolTCP.Options = {
        let options = NWProtocolTCP.Options()
        options.enableFastOpen = true // Enable TCP Fast Open (TFO)
        options.connectionTimeout = 5 // connection timed out
        return options
    }()
    
    let queue = DispatchQueue(label: "hostname", attributes: .concurrent)
    
    private func listenStateUpdate(to state: NWConnection.State) {
        // Set the state update handler
        switch state {
        case .setup:
            // init state
            debugPrint("The connection has been initialized but not started.")
        case .waiting(let error):
            debugPrint("The connection is waiting for a network path change with: \(error)")
            self.disconnect()
        case .preparing:
            debugPrint("The connection in the process of being established.")
        case .ready:
            // Handle connection established
            // this means that the handshake is finished
            debugPrint("The connection is established, and ready to send and receive data.")
            self.receiveData()
            self.sendHeartbeat()
        case .failed(let error):
            debugPrint("The connection has disconnected or encountered an: \(error)")
            self.disconnect()
        case .cancelled:
            debugPrint("The connection has been canceled.")
        default:
            break
        }
    }
    
    // MARK: - Socket I/O
    func connect() {
        // Start the connection
        self.connection.start(queue: self.queue)
    }
    
    func disconnect() {
        // Stop the connection
        self.connection.stateUpdateHandler = nil
        self.connection.cancel()
    }
    
    private func sendPacket() {
        var packet: Data? // do something for heartbeat packet
        self.connection.send(content: packet, completion: .contentProcessed({ (error) in
            if let err = error {
                // Handle error in sending
                debugPrint("encounter an error with: \(err) after send Packet")
            } else {
                // Send has been processed
            }
        }))
    }
    
    private func receiveData() {
        self.connection.receive(minimumIncompleteLength: 1, maximumLength: 8192) { [weak self] (data, context, isComplete, error) in
            guard let weakSelf = self else { return }
            if weakSelf.connection.state == .ready && isComplete == false, var data = data, !data.isEmpty {
                // do something for detect heart packet
                weakSelf.parseHeartBeat(&data)
            }
        }
    }
    
    // MARK: - Heartbeat
    private func sendHeartbeat() {
        // sendHeartbeatPacket
        self.sendPacket()
        // trigger timeout mission if the server no response corresponding packet within 5 second
        DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 5.0, execute: self.heartbeatTimeoutTask)
    }
    
    private func handleHeartbeatTimeOut() {
        // this's sample time out mission, you can customize this chunk
        self.heartbeatTimeoutTask.cancel()
        self.disconnect()
    }
    
    private func parseHeartBeat(_ heartbeatData: inout Data) {
        // do something for parse heartbeat
        
        // cancel heartbeat timeout after parse packet success
        self.heartbeatTimeoutTask.cancel()
        
        // send heartbeat for monitor server after computing 15 second
        DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 15.0) {
            self.sendHeartbeat()
        }
    }

}
Cl answered 26/11, 2021 at 9:32 Comment(2)
any reason why only ones u could receive messages through this approach ?Desideratum
I no sure what is your point, the receive package spec is particular by back end design and encryption method, but the common way was first calculate package header bytes length and decrypt, then keep going decrypt and parse package body, then end up got response object like raw data or message of your meaning.Cl
L
0

I struggled with this yesterday and after sleeping on it this is what ended up working for me.

        // use bonjour to find service being advertised on local area network
    let bonjourTCP = NWBrowser.Descriptor.bonjour(type: "_example._tcp." , domain: "local.")
    let bonjourParms = NWParameters.init()
    let browser = NWBrowser(for: bonjourTCP, using: bonjourParms)
    browser.browseResultsChangedHandler = { ( results, changes ) in
        // process browser results and cancel the browser
        browser.cancel()
        for result in results {
            // take the endpoint and create a connection
            self.connection = NWConnection(to: result.endpoint, using: .tcp)
            // set the stateUpdateHandler on the connection
            self.connection!.stateUpdateHandler = { newState in
                switch newState {
                case .ready:
                    print("ready")
                    // when the connection is ready dispatch an async thread to receive the content from the socket
                    DispatchQueue.global(qos: .userInitiated).async {
                        self.connection!.receive(minimumIncompleteLength: 1, maximumLength: 10000) { content, _, _, _ in
                            if let content = content {
                                // here is the key, in the completion block call the method to process the data.
                                self.processReceiveData(content: content, connection: self.connection!)
                            }
                        }
                    }
                case .setup:
                    print("setup")
                case .waiting:
                    print("waiting")
                case .preparing:
                    print("preparing")
                case .cancelled:
                    print("cancelled")
                case .failed:
                    print("failed")
                @unknown default:
                    print("default")
                }
            }
            // start the connection and process the results in the stateUpdateHandler above
            self.connection!.start(queue: DispatchQueue.main)
            // print some logging information about the result of the browser search
            print("result ", result );
            if case .service(let service) = result.endpoint {
                print("bonjourA ",service.name)
            }
        }
    }
    browser.start(queue: DispatchQueue.main)
}

// this method is the called from the completion handler for the connection.receive method
func processReceiveData(content: Data, connection: NWConnection){
    print(String(data: content, encoding: .utf8)!)
    // I'm updating a textView with the data received so dispatch this on the main queue async
    DispatchQueue.main.async {
        self.appendToChatSession(text: String(data: content, encoding: .utf8)!.trimmingCharacters(in: .newlines))
        // here is the key, call receive again and use this method to process the received content so the
        // receive method is called again. 
        self.connection?.receive(minimumIncompleteLength: 1, maximumLength: 10000) { content, _, _, _ in
            if let content = content {
                self.processReceiveData(content: content, connection: self.connection!)
            }
        }
    }
}

Note that if at first you don't succeed RTFM. Here is the comment from the Network header(???) file. Not sure what this is called in Swift, but the function definition in C/Objc/C++ is done in a header file.

/// Receive data from a connection. This may be called before the connection
/// is ready, in which case the receive request will be queued until the
/// connection is ready. The completion handler will be invoked exactly
/// once for each call, so the client must call this function multiple
/// times to receive multiple chunks of data. For protocols that
/// support flow control, such as TCP, calling receive opens the receive
/// window. If the client stops calling receive, the receive window will
/// fill up and the remote peer will stop sending.
Luis answered 4/2 at 16:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.