This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.
Can this be solved by poll() or select.select() ?
This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.
Can this be solved by poll() or select.select() ?
If you want to unblock a UDP read from another thread, send it a datagram!
Rgds, Martin
(addr, port)
combination is unique. How about if REUSEADDR
is on and several processes are listening? –
Tessellated A good way to handle this kind of asynchronous interruption is the old C pipe trick. You can create a pipe and use select
/poll
on both socket and pipe: Now when you want interrupt receiver you can just send a char to the pipe.
interruptable_socket.py
import os
import socket
import select
class InterruptableUdpSocketReceiver(object):
def __init__(self, host, port):
self._host = host
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._r_pipe, self._w_pipe = os.pipe()
self._interrupted = False
def bind(self):
self._socket.bind((self._host, self._port))
def recv(self, buffersize, flags=0):
if self._interrupted:
raise RuntimeError("Cannot be reused")
read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
if self._socket in read:
return self._socket.recv(buffersize, flags)
return ""
def interrupt(self):
self._interrupted = True
os.write(self._w_pipe, "I".encode())
A test suite:
test_interruptable_socket.py
import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest
class Sender(object):
def __init__(self, destination_host, destination_port):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self._dest = (destination_host, destination_port)
def send(self, message):
self._socket.sendto(message, self._dest)
class Test(unittest.TestCase):
def create_receiver(self, host="127.0.0.1", port=3010):
receiver = InterruptableUdpSocketReceiver(host, port)
receiver.bind()
return receiver
def create_sender(self, host="127.0.0.1", port=3010):
return Sender(host, port)
def create_sender_receiver(self, host="127.0.0.1", port=3010):
return self.create_sender(host, port), self.create_receiver(host, port)
def test_create(self):
self.create_receiver()
def test_recv_async(self):
sender, receiver = self.create_sender_receiver()
start = time.time()
send_message = "TEST".encode('UTF-8')
Timer(0.1, sender.send, (send_message, )).start()
message = receiver.recv(128)
elapsed = time.time()-start
self.assertGreaterEqual(elapsed, 0.095)
self.assertLess(elapsed, 0.11)
self.assertEqual(message, send_message)
def test_interrupt_async(self):
receiver = self.create_receiver()
start = time.time()
Timer(0.1, receiver.interrupt).start()
message = receiver.recv(128)
elapsed = time.time()-start
self.assertGreaterEqual(elapsed, 0.095)
self.assertLess(elapsed, 0.11)
self.assertEqual(0, len(message))
def test_exception_after_interrupt(self):
sender, receiver = self.create_sender_receiver()
receiver.interrupt()
with self.assertRaises(RuntimeError):
receiver.recv(128)
if __name__ == '__main__':
unittest.main()
Now this code is just a starting point. To make it more generic I see we should fix follow issues:
socket.recv()
, extend interrupt to others recv
methods become very simpleFirst of all we change test_interrupt_async()
to check exception instead empty message:
from interruptable_socket import InterruptException
def test_interrupt_async(self):
receiver = self.create_receiver()
start = time.time()
with self.assertRaises(InterruptException):
Timer(0.1, receiver.interrupt).start()
receiver.recv(128)
elapsed = time.time()-start
self.assertGreaterEqual(elapsed, 0.095)
self.assertLess(elapsed, 0.11)
After this we can replace return ''
by raise InterruptException
and the tests pass again.
The ready to extend version can be :
interruptable_socket.py
import os
import socket
import select
class InterruptException(Exception):
pass
class InterruptableUdpSocketReceiver(object):
def __init__(self, host, port):
self._host = host
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._async_interrupt = AsycInterrupt(self._socket)
def bind(self):
self._socket.bind((self._host, self._port))
def recv(self, buffersize, flags=0):
self._async_interrupt.wait_for_receive()
return self._socket.recv(buffersize, flags)
def interrupt(self):
self._async_interrupt.interrupt()
class AsycInterrupt(object):
def __init__(self, descriptor):
self._read, self._write = os.pipe()
self._interrupted = False
self._descriptor = descriptor
def interrupt(self):
self._interrupted = True
self._notify()
def wait_for_receive(self):
if self._interrupted:
raise RuntimeError("Cannot be reused")
read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
if self._descriptor not in read:
raise InterruptException
def _notify(self):
os.write(self._write, "I".encode())
Now wraps more recv
function, implement a windows version or take care of socket timeouts become really simple.
recv
s methods and override (and test) all of them. Moreover if I subclass socket
I should take care of socket timeout in all recv
s methods. That is an example and a suite of tests: replace collaborator by subclass is simple (but you should resolve above issues). But before do it is better 1- use exception instead of return empty message, 2- create a decorator for check and wait incoming message to use it in all 4 recv
s, 3- extract an object that do the asynchronous notification to replace it by a UDP implementation in windows. –
Gallon recv_from
in your object why make a subclass that take care of all socket
's methods? When you need some other methods you can consider to change it in a subclass, your test will help you to do it safe. –
Gallon The solution here is to forcibly close the socket. The problem is that the method for doing this is OS-specific and Python does not do a good job of abstracting the way to do it or the consequences. Basically, you need to do a shutdown() followed by a close() on the socket. On POSIX systems such as Linux, the shutdown is the key element in forcing recvfrom to stop (a call to close() alone won't do it). On Windows, shutdown() does not affect the recvfrom and the close() is the key element. This is exactly the behavior that you would see if you were implementing this code in C and using either native POSIX sockets or Winsock sockets, so Python is providing a very thin layer on top of those calls.
On both POSIX and Windows systems, this sequence of calls results in an OSError being raised. However, the location of the exception and the details of it are OS-specific. On POSIX systems, the exception is raised on the call to shutdown() and the errno value of the exception is set to 107 (Transport endpoint is not connected). On Windows systems, the exception is raised on the call to recvfrom() and the winerror value of the exception is set to 10038 (An operation was attempted on something that is not a socket). This means that there's no way to do this in an OS-agnositc way, the code has to account for both Windows and POSIX behavior and errors. Here's a simple example I wrote up:
import socket
import threading
import time
class MyServer(object):
def __init__(self, port:int=0):
if port == 0:
raise AttributeError('Invalid port supplied.')
self.port = port
self.socket = socket.socket(family=socket.AF_INET,
type=socket.SOCK_DGRAM)
self.socket.bind(('0.0.0.0', port))
self.exit_now = False
print('Starting server.')
self.thread = threading.Thread(target=self.run_server,
args=[self.socket])
self.thread.start()
def run_server(self, socket:socket.socket=None):
if socket is None:
raise AttributeError('No socket provided.')
buffer_size = 4096
while self.exit_now == False:
data = b''
try:
data, address = socket.recvfrom(buffer_size)
except OSError as e:
if e.winerror == 10038:
# Error is, "An operation was attempted on something that
# is not a socket". We don't care.
pass
else:
raise e
if len(data) > 0:
print(f'Received {len(data)} bytes from {address}.')
def stop(self):
self.exit_now = True
try:
self.socket.shutdown(socket.SHUT_RDWR)
except OSError as e:
if e.errno == 107:
# Error is, "Transport endpoint is not connected".
# We don't care.
pass
else:
raise e
self.socket.close()
self.thread.join()
print('Server stopped.')
if __name__ == '__main__':
server = MyServer(5555)
time.sleep(2)
server.stop()
exit(0)
close()
alone will do it unless the socket FD has been inherited. –
Graben Implement a quit command on the server and client sockets. Should work something like this:
Thread1:
status: listening
handler: quit
Thread2: client
exec: socket.send "quit" ---> Thread1.socket @ host:port
Thread1:
status: socket closed()
To properly close a tcp socket in python, you have to call socket.shutdown(arg) before calling socket.close(). See the python socket documentation, the part about shutdown.
If the socket is UDP, you can't call socket.shutdown(...), it would raise an exception. And calling socket.close() alone would, like for tcp, keep the blocked operations blocking. close() alone won't interrupt them.
Many suggested solutions (not all), don't work or are seen as cumbersome as they involve 3rd party libraries. I haven't tested poll() or select(). What does definately work, is the following:
firstly, create an official Thread object for whatever thread is running socket.recv(), and save the handle to it. Secondly, import signal. Signal is an official library, which enables sending/recieving linux/posix signals to processes (read its documentation). Thirdly, to interrupt, assuming that handle to your thread is called udpThreadHandle:
signal.pthread_kill(udpthreadHandle.ident, signal.SIGINT)
and ofcourse, in the actual thread/loop doing the recieving:
try:
while True:
myUdpSocket.recv(...)
except KeyboardInterrupt:
pass
Notice, the exception handler for KeyboardInterrupt (generated by SIGINT), is OUTSIDE the recieve loop. This silently terminates the recieve loop and its thread.
© 2022 - 2024 — McMap. All rights reserved.