Can't pickle the sqlAlchemy engine in the class
Asked Answered
P

1

6

I want to use multiprocessing module with sqlAlchemy in my custom class. Here is the code:

from sqlalchemy import create_engine
engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)


class Client(object):
    def __init__(self):
        self.engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)
        self.pool = Pool(6)

    def run_in_process(self, x): 
        conn = self.engine.connect()
        print(conn)

    def run(self):
        x = 'x' 
        res = self.pool.apply_async(self.run_in_process, (x,))
        res.get()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)


pool = Pool(6)
client = Client()
client.run()

It showed errors:

  File "test_pickle.py", line 32, in <module>
    client.run()
  File "test_pickle.py", line 19, in run
    res.get()
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread._local objects

I know the multiprocessing sometimes is troublesome because of pickle and I know this problem is due to the self.engine, as it can't be pickled. But I have to use engine this variable in the class.

So, how can I make the engine pickable in my example?

Thanks in advance.

Pennyweight answered 20/9, 2019 at 6:33 Comment(3)
Did you find a solution? I'm facing the same problem :)Caird
any updates on the issue?Shulman
Hi, I'm also having the same problem. Did you find a solution?Ranique
J
1

I couldn't find a way to serialize an SQLAlchemy engine. I don't think it is possible without touching the SQLAlchemy source code.

However, I have two solutions for using SQLAlchemy with multiprocessing.

Create the engine in each child process and make sure to close the connection after use.

from sqlalchemy import create_engine
from multiprocessing import Pool

class Client(object):
    def __init__(self):
         # Store the connection details
         self.url = 'mysql+pymysql://a:b@localhost:3306/'
         self.pool = Pool(6)

    def run_in_process(self, x):
         engine = create_engine(self.url)

         conn = engine.connect()
         print(conn)

         # Make sure to disconnect
         conn.close()
         engine.dispose()

    def run(self):
         x = 'x'
         res = self.pool.apply_async(self.run_in_process, (x,))
         res.get()

    def __getstate__(self):
         self_dict = self.__dict__.copy()
         del self_dict['pool']
         return self_dict

    def __setstate__(self, state):
         self.__dict__.update(state)

client = Client()
client.run()

Or create the engine outside your class and make it accessible from everywhere by making it a global variable.

from sqlalchemy import create_engine
from multiprocessing import Pool

# Make the engine accessible from everywhere
global engine
engine = create_engine('mysql+pymysql://a:b@localhost:3306/')

class Client(object):
    def __init__(self):
        self.pool = Pool(6)

    def run_in_process(self, x):
        conn = engine.connect()
        print(conn)

    def run(self):
        x = 'x'
        res = self.pool.apply_async(self.run_in_process, (x,))
        res.get()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

client = Client()
client.run()
Johannejohannes answered 17/10 at 22:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.