TypeError: can't pickle _thread.lock objects
Asked Answered
F

6

112

Trying to run two different functions at the same time with shared queue and get an error...how can I run two functions at the same time with a shared queue? This is Python version 3.6 on Windows 7.

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")


class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()

The logs indicate an error, I tried running console as administrator and I get the same message...

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
Fiddlestick answered 23/5, 2017 at 20:39 Comment(3)
queue.Queue is for inter-thread communication. multiprocessing.Queue is for sending things between processes.Kalgan
@Kalgan I made the change to multiprocessing.Queue and that fixed the issue. Thank you.Fiddlestick
Does this answer your question? multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failedDozen
F
10

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Move the queue to self instead of as an argument to your functions package and send

Foresheet answered 29/6, 2017 at 9:16 Comment(3)
What do you mean by move the queue?Wisteria
you cant pass it along as an argument set it as a property of the classForesheet
This is a bad answer. No proper explanation. A link to another question. No code..Overblouse
F
52

I had the same problem with Pool() in Python 3.6.3.

Error received: TypeError: can't pickle _thread.RLock objects

Let's say we want to add some number num_to_add to each element of some list num_list in parallel. The code is schematically like this:

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)

The problem here is that self in function run_parallel() can't be pickled as it is a class instance. Moving this parallelized function run_parallel() out of the class helped. But it's not the best solution as this function probably needs to use class parameters like self.num_to_add and then you have to pass it as an argument.

Solution:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

Other suggestions above didn't help me.

Forcer answered 18/5, 2018 at 10:48 Comment(2)
To access self outside a class function just create a fakeSelf. See example at the end.Inga
I encountered this same error when I was launching a process from within a class. Was just using "multiprocessing.Process". I moved the function outside class as per your suggestion and it worked for me!Together
A
38

You need to change from queue import Queue to from multiprocessing import Queue.

The root reason is the former Queue is designed for threading module Queue while the latter is for multiprocessing.Process module.

Ambler answered 26/9, 2017 at 10:26 Comment(0)
F
10

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Move the queue to self instead of as an argument to your functions package and send

Foresheet answered 29/6, 2017 at 9:16 Comment(3)
What do you mean by move the queue?Wisteria
you cant pass it along as an argument set it as a property of the classForesheet
This is a bad answer. No proper explanation. A link to another question. No code..Overblouse
O
3

As this is the first answer that shows up when searching for this issue, I will also add my solutions here.

This issue can be caused by many things. Here are two scenarios I have encountered:

  1. Package incompatibility
    • Problem: Trying to use multiprocessing when another part of your code that eventually gets called also needs to create new processes or is incompatible with being copied to a new process because of the use of locks, for example.
    • Solution: My issue was with trying to use a single connection to a MongoDB instance for all of my processes. Creating a new connection for each process resolved the issue.
  2. Class instance
    • Problem: Trying to call pool.starmap from inside of a class to another function in the class. Making it a staticmethod or having a function on the outside call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.
    • Solution: What I ended up doing that worked for me was to separate my class into two classes. Basically, the function you are calling the multiprocessing on needs to be called right after you instantiate a new object for the class it belongs to. Something like this:
from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...
Overblouse answered 12/9, 2022 at 18:53 Comment(0)
I
1

Complementing Marina answer here something to access the whole class. It also fools Pool.map as I needed today.

fakeSelf = None

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + fakeSelf.num_to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        globals()['fakeSelf'] = self
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()
Inga answered 21/7, 2021 at 20:41 Comment(0)
K
1

This issue may be on non-linux OS i.e. Windows & MacOS so I tried below code and it fixed the issue for me.

import platform
if platform.system() != "Linux":
    from multiprocessing import set_start_method
    set_start_method("fork")

Add above code in websockets.py file

optionchain_stream from Zerodha kite trade

P.S: encountered this error in optionchain_stream while trying to fetch the option chain from Zerodha kite.trade

Klapp answered 12/1 at 13:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.