Retry failed sqlalchemy queries
Asked Answered
J

4

14

Every time I'm restarting mysql service, my app is receiving the following error on any query:

result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 727, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 1066, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 656, in _read_packet
    packet_header = self._read_bytes(4)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 702, in _read_bytes
    CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: ...] [parameters: {...}] (Background on this error at: http://sqlalche.me/e/e3q8)

Any query after that will succeed as usual.

This is just a common use case for example, in general I might want to retry any query depending on the error.

Is there any way to catch and retry the query in some low level sqlalchemy api? Doing try-except or a custom query method in my code is not reasonable as I use it too many times and its not maintainable.

Jarrell answered 13/11, 2018 at 18:16 Comment(1)
What's wrong with a custom query or a wrapper around it? Using that as a basis for try... except... block for each query seems the most sensible solution in this case. Do you have a snippet of code in minimal reproducible example format to demonstrate the issue?Natal
J
9

EDIT: SQLAlchemy's creator commented that this approach is not advised.

Apparently sqlalchemy has a great option to customize the query class, which was exactly what I needed.

class implementation:

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError
from time import sleep

class RetryingQuery(BaseQuery):

    __retry_count__ = 3
    __retry_sleep_interval_sec__ = 0.5

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "Lost connection to MySQL server during query" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    logging.debug(
                        "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
                        self.__retry_sleep_interval_sec__, attempts
                    )
                    sleep(self.__retry_sleep_interval_sec__)
                    continue
                else:
                    raise

usage:

class BaseModel(Model):
    ...
    query_class = RetryingQuery
    ...

db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)
Jarrell answered 14/11, 2018 at 12:16 Comment(6)
Can you please explain the logic behind it.Lipography
@Lipography by using this interception of errors you can handle all queries without the need to wrap all of sqlalchemy apiJarrell
I am a bit of a beginner here can you please explain how overriding iter we are able to retry Query methodsLipography
@Lipography this requires a bit of digging in sqlalchemy code. Most of the query executors are calling __iter__ eventually as the most low level common point. I found this from their source code and trial and error. Its absolutely not fail proof and might not cover every caseJarrell
hi there - SQLAlchemy author here. I've just been pointed to this recipe. I would highly recommend against using a pattern such as the above. "retry" for connectivity should be performed at the top level of a transaction only, and that's what the pool pre_ping feature is for. if you lose a connection in the middle of a transaction, you need to re-run your whole operation. explicit is better than implicit.Tautomerism
@Tautomerism thanks for pointing it out. Indeed explicit is better and I'd avoid this approach today as well. Though I did encounter environments where an explicit approach was simply impossible.Jarrell
S
10

Thanks a lot for this snippet, I had to adapt it a bit to work with sqlalchemy.orm directly: If it can be of use to anyone..

from sqlalchemy.exc import OperationalError, StatementError
from sqlalchemy.orm.query import Query as _Query
from time import sleep

class RetryingQuery(_Query):
    __max_retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts <= self.__max_retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "/!\ Database connection error: retrying Strategy => sleeping for {}s"
                    " and will retry (attempt #{} of {}) \n Detailed query impacted: {}".format(
                        sleep_for, attempts, self.__max_retry_count__, ex)
                )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()

And for usage: pass option to the sessionmaker:

sqlalchemy.orm.sessionmaker(bind=engine, query_cls=RetryingQuery)
Sultan answered 10/3, 2020 at 9:30 Comment(0)
J
9

EDIT: SQLAlchemy's creator commented that this approach is not advised.

Apparently sqlalchemy has a great option to customize the query class, which was exactly what I needed.

class implementation:

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError
from time import sleep

class RetryingQuery(BaseQuery):

    __retry_count__ = 3
    __retry_sleep_interval_sec__ = 0.5

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "Lost connection to MySQL server during query" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    logging.debug(
                        "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
                        self.__retry_sleep_interval_sec__, attempts
                    )
                    sleep(self.__retry_sleep_interval_sec__)
                    continue
                else:
                    raise

usage:

class BaseModel(Model):
    ...
    query_class = RetryingQuery
    ...

db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)
Jarrell answered 14/11, 2018 at 12:16 Comment(6)
Can you please explain the logic behind it.Lipography
@Lipography by using this interception of errors you can handle all queries without the need to wrap all of sqlalchemy apiJarrell
I am a bit of a beginner here can you please explain how overriding iter we are able to retry Query methodsLipography
@Lipography this requires a bit of digging in sqlalchemy code. Most of the query executors are calling __iter__ eventually as the most low level common point. I found this from their source code and trial and error. Its absolutely not fail proof and might not cover every caseJarrell
hi there - SQLAlchemy author here. I've just been pointed to this recipe. I would highly recommend against using a pattern such as the above. "retry" for connectivity should be performed at the top level of a transaction only, and that's what the pool pre_ping feature is for. if you lose a connection in the middle of a transaction, you need to re-run your whole operation. explicit is better than implicit.Tautomerism
@Tautomerism thanks for pointing it out. Indeed explicit is better and I'd avoid this approach today as well. Though I did encounter environments where an explicit approach was simply impossible.Jarrell
E
1

I had to slightly adapt it to make it work with Postgres, which has a different error message. I know the question is tagged mysql, but found this question via search (and had the exact same problem) so might help someone.

I also had to catch StatementError: (sqlalchemy.exc.InvalidRequestError) Can't reconnect until invalid transaction is rolled back which blew up Flask before the retry could happen.

Finally I made it exponential backoff, because why not 😸

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError, StatementError
from time import sleep

class RetryingQuery(BaseQuery):
    __retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "Database connection error: {} - sleeping for {}s"
                        " and will retry (attempt #{} of {})".format(
                            ex, sleep_for, attempts, self.__retry_count__
                        )
                    )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()
Extrajudicial answered 15/1, 2020 at 0:3 Comment(0)
G
1

SQLAlchemy also allows you to listen to the engine_connect event which is fired before a connection is created. This makes it possible to implement custom logic for pessimistic disconnection handling

The snippet below implements exponential backoff for retries. It's taken from Apache Airflow's SQLAlchemy Utils: http://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/sqlalchemy.html

@event.listens_for(engine, "engine_connect")
    def ping_connection(connection, branch):
        """
        Pessimistic SQLAlchemy disconnect handling. Ensures that each
        connection returned from the pool is properly connected to the database.

        http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
        """
        if branch:
            # "branch" refers to a sub-connection of a connection,
            # we don't want to bother pinging on these.
            return

        start = time.time()
        backoff = initial_backoff_seconds

        # turn off "close with result".  This flag is only used with
        # "connectionless" execution, otherwise will be False in any case
        save_should_close_with_result = connection.should_close_with_result

        while True:
            connection.should_close_with_result = False

            try:
                connection.scalar(select([1]))
                # If we made it here then the connection appears to be healthy
                break
            except exc.DBAPIError as err:
                if time.time() - start >= reconnect_timeout_seconds:
                    log.error(
                        "Failed to re-establish DB connection within %s secs: %s",
                        reconnect_timeout_seconds,
                        err)
                    raise
                if err.connection_invalidated:
                    log.warning("DB connection invalidated. Reconnecting...")

                    # Use a truncated binary exponential backoff. Also includes
                    # a jitter to prevent the thundering herd problem of
                    # simultaneous client reconnects
                    backoff += backoff * random.random()
                    time.sleep(min(backoff, max_backoff_seconds))

                    # run the same SELECT again - the connection will re-validate
                    # itself and establish a new connection.  The disconnect detection
                    # here also causes the whole connection pool to be invalidated
                    # so that all stale connections are discarded.
                    continue
                else:
                    log.error(
                        "Unknown database connection error. Not retrying: %s",
                        err)
                    raise
            finally:
                # restore "close with result"
                connection.should_close_with_result = save_should_close_with_result

Goodish answered 21/10, 2020 at 6:9 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.