How do I abort a socket.recvfrom() from another thread in python?
Asked Answered
C

5

7

This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.

Can this be solved by poll() or select.select() ?

Comparator answered 16/9, 2011 at 19:3 Comment(6)
UDP or TCP has nothing to do with being threaded or not.Landon
You can use the old C trick: create a fake pipe and use select on both fake and socket. When you want to stop simple send a message to the fake one.... If I find some time I'll file an answer with all details.Gallon
@qarma Take a look to my comment and let me know if you are interested in this kind of solution.Gallon
@Micheled'Amico yes it's a valid solutionTessellated
@qarma I'll write it this WEGallon
In my experience select.select() has worked brilliantly for this.Bouie
M
6

If you want to unblock a UDP read from another thread, send it a datagram!

Rgds, Martin

Mccollum answered 16/9, 2011 at 20:4 Comment(1)
That's great if (addr, port) combination is unique. How about if REUSEADDR is on and several processes are listening?Tessellated
G
3

A good way to handle this kind of asynchronous interruption is the old C pipe trick. You can create a pipe and use select/poll on both socket and pipe: Now when you want interrupt receiver you can just send a char to the pipe.

  • pros:
    • Can work both for UDP and TCP
    • Is protocol agnostic
  • cons:
    • select/poll on pipes are not available on Windows, in this case you should replace it by another UDP socket that use as notification pipe

Starting point

interruptable_socket.py

import os
import socket
import select


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._r_pipe, self._w_pipe = os.pipe()
        self._interrupted = False

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
        if self._socket in read:
            return self._socket.recv(buffersize, flags)
        return ""

    def interrupt(self):
        self._interrupted = True
        os.write(self._w_pipe, "I".encode())

A test suite:

test_interruptable_socket.py

import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest


class Sender(object):
    def __init__(self, destination_host, destination_port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self._dest = (destination_host, destination_port)

    def send(self, message):
        self._socket.sendto(message, self._dest)

class Test(unittest.TestCase):
    def create_receiver(self, host="127.0.0.1", port=3010):
        receiver = InterruptableUdpSocketReceiver(host, port)
        receiver.bind()
        return receiver

    def create_sender(self, host="127.0.0.1", port=3010):
        return Sender(host, port)

    def create_sender_receiver(self, host="127.0.0.1", port=3010):
        return self.create_sender(host, port), self.create_receiver(host, port)

    def test_create(self):
        self.create_receiver()

    def test_recv_async(self):
        sender, receiver = self.create_sender_receiver()
        start = time.time()
        send_message = "TEST".encode('UTF-8')
        Timer(0.1, sender.send, (send_message, )).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(message, send_message)

    def test_interrupt_async(self):
        receiver = self.create_receiver()
        start = time.time()
        Timer(0.1, receiver.interrupt).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(0, len(message))

    def test_exception_after_interrupt(self):
        sender, receiver = self.create_sender_receiver()
        receiver.interrupt()
        with self.assertRaises(RuntimeError):
            receiver.recv(128)


if __name__ == '__main__':
    unittest.main()

Evolution

Now this code is just a starting point. To make it more generic I see we should fix follow issues:

  1. Interface: return empty message in interrupt case is not a good deal, is better to use an exception to handle it
  2. Generalization: we should have just a function to call before socket.recv(), extend interrupt to others recv methods become very simple
  3. Portability: to make simple port it to windows we should isolate the async notification in a object to choose the right implementation for our operating system

First of all we change test_interrupt_async() to check exception instead empty message:

from interruptable_socket import InterruptException

def test_interrupt_async(self):
    receiver = self.create_receiver()
    start = time.time()
    with self.assertRaises(InterruptException):
        Timer(0.1, receiver.interrupt).start()
        receiver.recv(128)
    elapsed = time.time()-start
    self.assertGreaterEqual(elapsed, 0.095)
    self.assertLess(elapsed, 0.11)

After this we can replace return '' by raise InterruptException and the tests pass again.

The ready to extend version can be :

interruptable_socket.py

import os
import socket
import select


class InterruptException(Exception):
    pass


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._async_interrupt = AsycInterrupt(self._socket)

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        self._async_interrupt.wait_for_receive()
        return self._socket.recv(buffersize, flags)

    def interrupt(self):
        self._async_interrupt.interrupt()


class AsycInterrupt(object):
    def __init__(self, descriptor):
        self._read, self._write = os.pipe()
        self._interrupted = False
        self._descriptor = descriptor

    def interrupt(self):
        self._interrupted = True
        self._notify()

    def wait_for_receive(self):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
        if self._descriptor not in read:
            raise InterruptException

    def _notify(self):
        os.write(self._write, "I".encode())

Now wraps more recv function, implement a windows version or take care of socket timeouts become really simple.

Gunpoint answered 20/11, 2015 at 22:42 Comment(4)
Amount of code is overkill, why not subclass socket directly?Tessellated
Because in this example I didn't want to take care of all 4 recvs methods and override (and test) all of them. Moreover if I subclass socket I should take care of socket timeout in all recvs methods. That is an example and a suite of tests: replace collaborator by subclass is simple (but you should resolve above issues). But before do it is better 1- use exception instead of return empty message, 2- create a decorator for check and wait incoming message to use it in all 4 recvs, 3- extract an object that do the asynchronous notification to replace it by a UDP implementation in windows.Gallon
@qarma One more thing: subclassing is a powerful tool but sometimes is better to use a collaborator because when you subclassing new object IS the parent object and you should take care of all parent behavior and responsibilities. Use collaborator make simpler follow the single responsibility principle and take your code more clean: If you use just recv_from in your object why make a subclass that take care of all socket's methods? When you need some other methods you can consider to change it in a subclass, your test will help you to do it safe.Gallon
Wow you are prolific! While I would prefer same API as socket, and much smaller code base, your answer is the best there is here, ergo +50 is yours.Tessellated
E
1

The solution here is to forcibly close the socket. The problem is that the method for doing this is OS-specific and Python does not do a good job of abstracting the way to do it or the consequences. Basically, you need to do a shutdown() followed by a close() on the socket. On POSIX systems such as Linux, the shutdown is the key element in forcing recvfrom to stop (a call to close() alone won't do it). On Windows, shutdown() does not affect the recvfrom and the close() is the key element. This is exactly the behavior that you would see if you were implementing this code in C and using either native POSIX sockets or Winsock sockets, so Python is providing a very thin layer on top of those calls.

On both POSIX and Windows systems, this sequence of calls results in an OSError being raised. However, the location of the exception and the details of it are OS-specific. On POSIX systems, the exception is raised on the call to shutdown() and the errno value of the exception is set to 107 (Transport endpoint is not connected). On Windows systems, the exception is raised on the call to recvfrom() and the winerror value of the exception is set to 10038 (An operation was attempted on something that is not a socket). This means that there's no way to do this in an OS-agnositc way, the code has to account for both Windows and POSIX behavior and errors. Here's a simple example I wrote up:

import socket
import threading
import time

class MyServer(object):
    def __init__(self, port:int=0):
        if port == 0:
            raise AttributeError('Invalid port supplied.')

        self.port = port
        self.socket = socket.socket(family=socket.AF_INET,
                type=socket.SOCK_DGRAM)
        self.socket.bind(('0.0.0.0', port))

        self.exit_now = False

        print('Starting server.')
        self.thread = threading.Thread(target=self.run_server,
                args=[self.socket])
        self.thread.start()

    def run_server(self, socket:socket.socket=None):
        if socket is None:
            raise AttributeError('No socket provided.')

        buffer_size = 4096

        while self.exit_now == False:
            data = b''
            try:
                data, address = socket.recvfrom(buffer_size)
            except OSError as e:
                if e.winerror == 10038:
                    # Error is, "An operation was attempted on something that
                    # is not a socket".  We don't care.
                    pass
                else:
                    raise e
            if len(data) > 0:
                print(f'Received {len(data)} bytes from {address}.')

    def stop(self):
        self.exit_now = True
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except OSError as e:
            if e.errno == 107:
                # Error is, "Transport endpoint is not connected".
                # We don't care.
                pass
            else:
                raise e
        self.socket.close()
        self.thread.join()
        print('Server stopped.')


if __name__ == '__main__':
    server = MyServer(5555)
    time.sleep(2)
    server.stop()
    exit(0)
Emboly answered 11/2, 2021 at 3:20 Comment(1)
A call to close() alone will do it unless the socket FD has been inherited.Graben
E
0

Implement a quit command on the server and client sockets. Should work something like this:

Thread1: 
    status: listening
    handler: quit

Thread2: client
    exec: socket.send "quit"  ---> Thread1.socket @ host:port

Thread1: 
    status: socket closed()
Evangelin answered 16/9, 2011 at 21:10 Comment(0)
N
0

To properly close a tcp socket in python, you have to call socket.shutdown(arg) before calling socket.close(). See the python socket documentation, the part about shutdown.

If the socket is UDP, you can't call socket.shutdown(...), it would raise an exception. And calling socket.close() alone would, like for tcp, keep the blocked operations blocking. close() alone won't interrupt them.

Many suggested solutions (not all), don't work or are seen as cumbersome as they involve 3rd party libraries. I haven't tested poll() or select(). What does definately work, is the following:

firstly, create an official Thread object for whatever thread is running socket.recv(), and save the handle to it. Secondly, import signal. Signal is an official library, which enables sending/recieving linux/posix signals to processes (read its documentation). Thirdly, to interrupt, assuming that handle to your thread is called udpThreadHandle:

signal.pthread_kill(udpthreadHandle.ident, signal.SIGINT)

and ofcourse, in the actual thread/loop doing the recieving:

try:
    while True:
       myUdpSocket.recv(...)
except KeyboardInterrupt:
    pass

Notice, the exception handler for KeyboardInterrupt (generated by SIGINT), is OUTSIDE the recieve loop. This silently terminates the recieve loop and its thread.

Noteworthy answered 4/11, 2021 at 16:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.