using serial port in python3 asyncio
Asked Answered
C

8

11

i'm trying and, so far, failing to use python asyncio to access a serial port.

i'd really appreciate any tips on using the new python async framework on a simple fd.

Cheers!

James

Copilot answered 9/2, 2014 at 22:42 Comment(4)
I guess you'll need to create your own transport and protocol to read/write on a tty. Have a look at the source and try to adapt sockets/subrocess transport/protocol for your usecase.Alagez
yeah thats what it looks like, but it also looks like the whole eventloop needs to be rejigged, as it startup creating a socketpair, which isn't relevant here. bit puzzled that it seems like it totally not supported (i mean, read/write to a tty is the simplest use case for async, right?)Copilot
yep. shouldn't be so hard to do. and you're wrong. a socket is not required to use the eventloop. you can at least use a subprocess insteadAlagez
you could try loop.connect_write_pipe()/loop.connect_read_pipe() to connect the fd like demonstrated in the async stdio exampleChamaeleon
F
9

Here is a working example using pyserial-asyncio:

from asyncio import get_event_loop
from serial_asyncio import open_serial_connection

async def run():
    reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
    while True:
        line = await reader.readline()
        print(str(line, 'utf-8'))

loop = get_event_loop()
loop.run_until_complete(run())
Fantasist answered 29/1, 2020 at 17:53 Comment(0)
B
6

It's other way using FD

import asyncio
import serial

s = serial.Serial('/dev/pts/13', 9600)


def test_serial():
    '''
    read a line and print.
    '''
    text = ""
    msg = s.read().decode()
    while (msg != '\n'):
        text += msg
        msg = s.read().decode()
    print(text)
    loop.call_soon(s.write, "ok\n".encode())

loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
Briquet answered 13/1, 2015 at 20:34 Comment(2)
My /dev/pts/... terminal closes on a first keypress and I get always - SerialException('read failed: {}'.format(e)) serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?) . Why?Emelia
@Emelia This typically means another process is reading from the same serial port. Incoming data is only received by one process. If another process has read the data, then it is no longer available for your process.Numidia
E
4

pySerial is getting direct asyncio support. It's in experimental state now but is working as expected for me.

Example taken from the documentation:

class Output(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print('port opened', transport)
        transport.serial.rts = False
        transport.write(b'hello world\n')

    def data_received(self, data):
        print('data received', repr(data))
        self.transport.close()

    def connection_lost(self, exc):
        print('port closed')
        asyncio.get_event_loop().stop()

loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Edenedens answered 6/9, 2016 at 11:49 Comment(1)
Apparently this support has been removed from pyserial, and is instead being supported by separate package pyserial-asyncio. See pySerial API — pySerial 3.3 documentation and Short introduction — pySerial-asyncio 0.4 documentation.Terrilyn
O
3

Another option is to write all your serial stuff with blocking calls, then run it in a different thread with run_in_executor:

import asyncio
import concurrent

from serial import Serial

# Normal serial blocking reads
# This could also do any processing required on the data
def get_byte():
    return s.read(1)

# Runs blocking function in executor, yielding the result
@asyncio.coroutine
def get_byte_async():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        res = yield from loop.run_in_executor(executor, get_byte)
        return res

def get_and_print():
    b = yield from get_byte_async()
    print (b)

s = Serial("COM11", 19200, timeout=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print())
Obie answered 13/1, 2015 at 17:13 Comment(0)
M
1

I wrote an AsyncFile class a while ago, the interface is easier then low-level protocols.

The original code is here: https://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20

class AsyncFile:
    """A local file class for use with the ``asyncio`` module.
    ``loop`` should be the event loop in use.
    ``filename`` is the name of the file to be opened.
    ``fileobj`` should be a regular file-like object.
    ``mode`` is the open mode accepted by built-in function ``open``.
    If ``filename`` is specified, the named file will be opened. And if
    ``fileobj`` is specified, that file object will be used directly. You
    cannot specify both ``filename`` and ``fileobj``.
    This class can be used in a ``with`` statement.
    """

    DEFAULT_BLOCK_SIZE = 8192

    def __init__(self, loop=None, filename=None,
                 fileobj=None, mode='rb'):
        if (filename is None and fileobj is None) or \
                (filename is not None and fileobj is not None):
            raise RuntimeError('Confilicting arguments')

        if filename is not None:
            if 'b' not in mode:
                raise RuntimeError('Only binary mode is supported')
            fileobj = open(filename, mode=mode)
        elif 'b' not in fileobj.mode:
            raise RuntimeError('Only binary mode is supported')

        fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
        if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
            if filename is not None:
                fileobj.close()
            errcode = ctypes.get_errno()
            raise OSError((errcode, errno.errorcode[errcode]))

        self._fileobj = fileobj

        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._rbuffer = bytearray()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def fileno(self):
        return self._fileobj.fileno()

    def seek(self, offset, whence=None):
        if whence is None:
            return self._fileobj.seek(offset)
        else:
            return self._fileobj.seek(offset, whence)

    def tell(self):
        return self._fileobj.tell()

    def _read_ready(self, future, n, total):
        if future.cancelled():
            self._loop.remove_reader(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.read(n)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if not res:     # EOF
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_result(bytes(self._rbuffer))
            return

        self._rbuffer.extend(res)

        if total > 0:
            more_to_go = total - len(self._rbuffer)
            if more_to_go <= 0:  # enough
                res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
                self._loop.remove_reader(self._fileobj.fileno())
                future.set_result(bytes(res))
            else:
                more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
                self._loop.add_reader(self._fileobj.fileno(),
                                      self._read_ready,
                                      future, more_to_go, total)
        else:   # total < 0
            # This callback is still registered with total < 0,
            # nothing to do here
            pass

    @asyncio.coroutine
    def read(self, n=-1):
        future = asyncio.Future(loop=self._loop)

        if n == 0:
            future.set_result(b'')
        else:
            try:
                res = self._fileobj.read(n)
            except (BlockingIOError, InterruptedError):
                if n < 0:
                    self._rbuffer.clear()
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, self.DEFAULT_BLOCK_SIZE, n)
                else:
                    self._rbuffer.clear()
                    read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, read_block_size, n)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def _write_ready(self, future, data, written):
        if future.cancelled():
            self._loop.remove_writer(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.write(data)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if res < len(data):
            data = data[res:]
            self._loop.add_writer(self._fileobj.fileno(),
                                  self._write_ready,
                                  future, data, written + res)
        else:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_result(written + res)

    @asyncio.coroutine
    def write(self, data):
        future = asyncio.Future(loop=self._loop)

        if len(data) == 0:
            future.set_result(0)
        else:
            try:
                res = self._fileobj.write(data)
            except (BlockingIOError, InterruptedError):
                self._loop.add_writer(self._fileobj.fileno(),
                                      self._write_ready,
                                      future, data, 0)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def stat(self):
        return os.stat(self._fileobj.fileno(), follow_symlinks=True)

    def close(self):
        self._loop.remove_reader(self._fileobj.fileno())
        self._loop.remove_writer(self._fileobj.fileno())
        self._fileobj.close()
Maximo answered 20/5, 2015 at 3:38 Comment(1)
Can you give some short example code which uses this class for a serial port with asyncio?Terrilyn
E
1

Here's my try at asyncio serial port. This interface lets you wrap a serial.Serial instance into AIOSerial class, which then enables you to do await AIOSerial.readline() and await AIOSerial.write(data) and not have to use asyncio.Protocol() style callbacks.

import asyncio
import sys

import serial


class AIOSerial:
    def __init__(self, serial, ioloop=None):
        self._serial = serial
        # Asynchronous I/O requires non-blocking devices
        self._serial.timeout = 0
        self._serial.write_timeout = 0

        if ioloop is not None:
            self.loop = ioloop
        else:
            self.loop = asyncio.get_event_loop()
        self.loop.add_reader(self._serial.fd, self._on_read)
        self._rbuf = b''
        self._rbytes = 0
        self._wbuf = b''
        self._rfuture = None
        self._delimiter = None

    def _on_read(self):
        data = self._serial.read(4096)
        self._rbuf += data
        self._rbytes = len(self._rbuf)
        self._check_pending_read()

    def _on_write(self):
        written = self._serial.write(self._wbuf)
        self._wbuf = self._wbuf[written:]
        if not self._wbuf:
            self.loop.remove_writer(self._serial.fd)

    def _check_pending_read(self):
        future = self._rfuture
        if future is not None:
            # get data from buffer
            pos = self._rbuf.find(self._delimiter)
            if pos > -1:
                ret = self._rbuf[:(pos+len(self._delimiter))]
                self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
                self._delimiter = self._rfuture = None
                future.set_result(ret)
                return future

    async def read_until(self, delimiter=b'\n'):
        while self._delimiter:
            await self._rfuture

        self._delimiter = delimiter
        self._rfuture = asyncio.Future()
        #future = self._check_pending_read()
        return await self._rfuture

    async def readline(self):
        return await self.read_until()

    async def write(self, data):
        need_add_writer = not self._wbuf

        self._wbuf = self._wbuf + data
        if need_add_writer:
            self.loop.add_writer(self._serial.fd, self._on_write)
        return len(data)

Example usage:

async def go_serial():
    ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
    print(ser)
    aser = AIOSerial(ser)

    written = await aser.write(b'test 1\n')
    print('written', written)
    data = await aser.readline()
    print('got from readline', data)

    while True:
        await aser.write(b'.\n')
        data = await aser.readline()
        print('GOT!', data)
        await asyncio.sleep(2.78)

async def main():
    for n in range(120):
        await asyncio.sleep(1)
        print('n=%d' % n)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(go_serial())
    loop.run_until_complete(main())

This sets up serial port and two asyncio tasks: go_serial and main. Main just runs for 120 seconds and then the loop exits. go_serial writes and reads to serial port, expecting reply for every line sent.

Reading and writing to serial port is then done with await aser.write(b'blah') and await aser.readline() (or await aser.read_until(b'\r\n') if you want a different separator).

Note that it's not really production ready, as one would want to have some limits for the amount of buffer in place.

To test this I simulate a serial port with the following script, which outputs the name of the pty created, which is then the parameter to the upper example.

#!/usr/bin/python3
import fcntl
import time
import os
import errno
import pty


chars = []
ser, s = pty.openpty()
oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
# make the PTY non-blocking
fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)

print('Created: %s' % os.ttyname(s))


while True:
    time.sleep(0.1)
    c = None
    try:
        c = os.read(ser, 10)
    except OSError as err:
        if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
            c = None
        else:
            raise
    if c:
        chars.append(c)

    data = b''.join(chars)
    if b'\n' in data:
        one, data = data.split(b'\n', 1)
        b = b'%.6f\n' % time.time()
        os.write(ser, b)
        print(one)
    chars = [data]
Erring answered 9/6, 2018 at 13:52 Comment(0)
T
1

Consider using aioserial.

Here's an example:

import aioserial
import asyncio


async def read_and_print(aioserial_instance: aioserial.AioSerial):
    while True:
        print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)


aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')

asyncio.run(read_and_print(aioserial_com1))

To the moderator,

The answer is just similar to this one but not duplicated.

Tedi answered 25/9, 2018 at 4:0 Comment(0)
C
0

Thanks for the suggestions all, in the end, I solved the problem in a slightly different way, and used the well supported socket connections in asyncio, but then used ser2net (http://sourceforge.net/projects/ser2net/) to access the serial ports.

This took about 10 seconds to configure, and means that the python code can now handle accessing remote serial ports too.

Copilot answered 26/5, 2014 at 9:56 Comment(1)
Interesting solution. Though that possibly isn't a viable solution if you need to do things like changing baud rates, handling framing errors, handling serial port hardware handshaking lines (DTR, DSR, DCD, etc).Terrilyn

© 2022 - 2024 — McMap. All rights reserved.