WebSockets in FastAPI - ConnectionClosedOK: received 1000 (OK)
Asked Answered
I

2

2

I have 3 clients, who periodically send data to my server. I use the example from FastAPI's documentation.

This is my server code:

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_response(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/chargeStationState/{client_id}")
async def websocket_endpoint(websocket: WebSocket,
                             client_id: int,
                             db: Session = Depends(deps.get_db)):

    await manager.connect(websocket)
    try:
        while True:
            message = await websocket.receive_json()
            logging.info(message)
          
            ## read data from db
            response = {
                             "stations": "repsonse",
                             "timestamp": int(time.time())
            }
            await manager.send_response(response, websocket)
            #await manager.broadcast(f"Client #{client_id} says: {data}")

    except WebSocketDisconnect:
        manager.disconnect(websocket)

This is the client code:

async for websocket in websockets.connect("ws://127.0.0.1:8001/chargeStationState/1"):
            message = {'name':'station1'}
            await websocket.send(json.dumps(message))
            p = await asyncio.wait_for(websocket.recv(), timeout=10)
            print(p)
            await asyncio.sleep(2)

So, I would like to have 5 clients, who will talk to my server and send sensor data, but after 5 minutes I get the following error

websockets.exceptions.ConnectionClosedOK: received 1000 (OK) 

then receiving 1000 (OK) and cannot identify where the issue lies.

Immolate answered 19/3, 2023 at 6:30 Comment(1)
@Chris I fixed code hope this is minimal reproducable code..Polack
M
2

The websockets.exceptions.ConnectionClosedOK: received 1000 (OK) exception is raised when a websocket connection has terminated. Similarly, the ConnectionClosedError exception. Such exceptions occur, when one of the sides, either the client or server side, terminates the connection unexpectedly, while the other side is trying to send or waiting to receive a websocket message. Note: If one side, let's say the server, has sent out a message already and then terminates the connection, and the cleint is then trying to receive the message (due to some delay, maybe due to using some sleep function, e,g., await asyncio.sleep(), before calling websocket.recv()), no error should occur, and the message should be received succesfully. If, however, the client was already awaiting for a message (at websocket.recv() line) and the server terminated the connection (without sending a message beforehand), a ConnectionClosed should be raised.

One has to make sure to properly catch such exceptions in both the server and client sides. This can easily be done by catching the ConnectionClosed exception, which is raised when trying to interact with a closed connection. This will catch both ConnectionClosedOK and ConnectionClosedError exceptions as well, as ConnectionClosed is the base class for those exceptions (the base class for all exceptions defined by websockets is WebSocketException—see the relevant source code as well). On server side, one should also make sure to catch the FastAPI/Starlette's WebSocketDisconnect, which is internaly raised on similar situations, i.e., when the client has disconnected, while the server is waiting to receive a message.

A working example is provided below, based on the code provided in your question. Related examples can be found here, as well as here and here.

Working Example

app.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from websockets.exceptions import ConnectionClosed
import uvicorn
import time

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_json(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_json(message)


app = FastAPI()
manager = ConnectionManager()

@app.websocket('/chargeStationState')
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_json()
            response = {'station': data['station'], 'timestamp': time.ctime()}
            await manager.send_personal_message(response, websocket)
    except (WebSocketDisconnect, ConnectionClosed):
        manager.disconnect(websocket)

test.py

The example below uses the async for expression, which is useful when you want to re-establish a connection, in case it was terminated. The example also uses a while loop statement inside the async for loop, so that the (same) connection established in the first place remains open for sending/receiving data, unless an exception occurs in the body of the loop, which is handled in a way that websockets.connect() in async for expression is reached, in order to reconnect with the next iteration (otherwise, you could let the exception bubble up and break out of the loop, or call break on your own inside the exception handler). If an error occurs while establishing the connection, websockets.connect() retries with exponential backoff. The backoff delay starts at three seconds and increases up to one minute (see the relevant documentation).

from websockets.exceptions import ConnectionClosed
import websockets
import asyncio
import json

async def main():
    url = 'ws://127.0.0.1:8000/chargeStationState'
    data = json.dumps({'station':'1'})
    
    async for websocket in websockets.connect(url):
        try:
            while True:
                await websocket.send(data)
                print(await asyncio.wait_for(websocket.recv(), timeout=10))
                await asyncio.sleep(2)
        except ConnectionClosed:
            continue  # attempt reconnecting to the server (otherwise, call break)
    
asyncio.run(main())
Mateo answered 19/3, 2023 at 7:36 Comment(0)
F
0

The websockets.exceptions.ConnectionClosedOK exception can get thrown when you start sending a message to a WebSocket that is in the middle of closing.

Because the client can close the socket whenever it wants, this can happen at any time and with async code the chances of this happening is pretty decent if you both send frequently and clients come and go frequently.

You can catch ConnectionClosedOK to handle this:

from WebSocket.exceptions import ConnectionClosedOK

try:
    await websocket.send(...)
except ConnectionClosedOK:
    # ...

Depending on your use case you may want to catch its parent exception, ConnectionClosed, instead as this will handle both expected and unexpected closure states.

Here is the code in websockets that manages throwing this exception.

Fuliginous answered 11/1 at 18:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.