Python Multiprocessing Queue, blocks with item remaining
Asked Answered
P

1

0

Running two threads with the Multiprocessing library in Python, putting items into a queue in the daughter thread and getting them from the parents, hangs on the last item.

The question is why is that happening? And how do we get it to work?

The complete code is here. It is for an old USB spectrometer

https://drive.google.com/file/d/18kRFCnqO1GfAdrbgPYOFXp6LUdjy014s/view?usp=drive_link

https://drive.google.com/file/d/1Q0b0i_VLBBpKIapGReJ4wdVr1CiJHS1a/view?usp=drive_link

Following is a rough excerpt to describe what is the problem.

In the setup we have:

    from multiprocessing import Process, Queue, Value

    class Gizmo:

      #blah blah blah, set up the hardware and create a multiprocessing Queue

      self.dataqueue = Queue()

    
    def startreader(self,nframes,nsets)
    
        # clear the dataqueue
        while not self.dataqueue.empty():
            try:
                self.dataqueue.get(False)
                print( 'queue got entry' )
            except Exception as e:
                print( 'queue.get', e )
                break

        print( 'creating reader thread')
        self.readerthread = Process( target = self.Reader_, args=(nframes,nsets) )
        if self.readerthread is None:
            print( 'creating reader thread failed')            
            return False

        print( 'starting reader thread')
        self.readerthread.start()

     def Reader_(self,nrames,nsets):

         #blah blah blah, get a bunch of records and then

         for n,record in enumerate(records):
             self.dataqueue.put( record )
             print( 'reader put record', n )

         return


     def savedata(self)

         while not self.dataqueue.empty():
             print( 'getting record')
             try:
                record = self.dataqueue.get_nowait()
                records.append(record )
                print( 'got record')
             except Exception as e:
                print(e)

          # blah blah blah and write it all to a disk file(changed $ to # while editing)

When we run this, we see four records pushed on to the queue from the reader, in the second thread.

reader put record 0
reader put record 1
reader put record 2
reader put record 3

And, then after seeing the reader exit, we call save(). We see 3 of the 4 records retrieved, and then it hangs on trying to get the fourth record.

getting records
getting record
got record
getting record
got record
getting record
got record
getting record

Again, the questions are:

Why does it hang?

And how do we get it to work?

Penthouse answered 29/8, 2024 at 0:33 Comment(12)
You realize these are not threads, but rather completely separate processes, right?Leoleod
You are "clearing" the queue in multiple places. How is that not an invitation to trash the first good item in the queue?Leoleod
@TimRoberts I clean it before starting a new data acquisition. As I recall all of the cleaning happens in the main thread, and it is before starting the clocked or triggered data acquisition.Penthouse
@TimRoberts, yes these are processes. They are supposed to be.Penthouse
Are you saying it is stuck at queue.get_nowait()? That doesn't sound right. Please see if you can create a minimal reproducible example.Henning
@Henning Yes that is correct, it is stuck on get_nowait(), get(FALSE), get (timeout=1), and etc etc. See saramutha's answer below. I think he has the right answer, and it is what I suspected was happening.Penthouse
@toyota, thank you. i agree with maybe 80% of the edits. will let lhe other's go, i think. I am curious why you feel compelled to edit the question in the first place. That reminds me of a comment from Einstein to a journal editor re his GR paper (about the constant). He too stopped contributing to that journal - forever.Penthouse
get_nowait, as the name suggests, does not wait if there are no items in the queue. It immediately raises an Empty exception if there are no items in the queue. So it never “stacks”, at least for such a reason.Henning
@Henning Yes that is what it is supposed to do, and yet here it does in fact hang. These are the facts. As Saramuthu noted, it is a common issue. There is obviously a flaw in design or a bug.Penthouse
I'm sorry, I was testing on Windows (the behavior is different on Windows). I should have noticed.Henning
The multiprocessing.Queue.empty method is not reliable. Read the docs.Crigger
Yes, empty() is unreliable, the problem here is get(). I am aware of the unreliability of empty().Penthouse
T
0

It is a common issue with mp and queues. It's because the process that writes to the queue (the Reader_ function) finishes and exits before all items are retrieved from the queue by the savedata function. i.e the process that puts items into the queue terminates before all the items are retrieved by the main process

It hangs because: When the Reader_ function finishes execution, the process associated with it terminates. If the queue is still holding items and the process terminates, the queue may not flush all items properly.

In savedata method, you're trying to get records from the queue until it's empty (completely empty). if the queue's pipe gets closed & when the writer process exits: the `Queue.get_nowait() will hang, waiting for more data which will never be got.

Added as comments inside the code with the words #part of the solution

def startreader(self, nframes, nsets):
    # clear the dataqueue
    while not self.dataqueue.empty():
        try:
            self.dataqueue.get(False)
            print('queue got entry')
        except Exception as e:
            print('queue.get', e)
            break

    print('creating reader thread')
    self.readerthread = Process(target=self.Reader_, args=(nframes, nsets))
    if self.readerthread is None:
        print('creating reader thread failed')
        return False

    print('starting reader thread')
    self.readerthread.start()

    # part of the solution : join the process
    # Wait for the reader process to complete
    self.readerthread.join() 
    

def Reader_(self, nframes, nsets):
    # blah blah blah, get a bunch of records and then
    for n, record in enumerate(records):
        self.dataqueue.put(record)
        print('reader put record', n)

    # part of the solution : Put a sentinel value to signal the end of the queue
    self.dataqueue.put(None)

def savedata(self):
    while True:
        print('getting record')
        try:
            record = self.dataqueue.get_nowait()
            
            # part of the solution : Check for sentinel
            if record is None:  
                break
            records.append(record)
            print('got record')
        except Exception as e:
            print(e)
            break

    # blah blah blah and write it all to a disk file
    


def savedata(self):
    while True:
        print('getting record')
        try:
        
            # part of the solution : Use timeout
            record = self.dataqueue.get(timeout=5) 
             
            records.append(record)
            print('got record')
        except Exception as e:
            print(e)
            break
Telangiectasis answered 29/8, 2024 at 2:55 Comment(8)
Thank you, I suspected something of that sort. Does the sentinel packet have the effect of protecting the previous packets? Dropping an actual data packet, is a "fail" for the system. What do you think of multiprocessing.Manager().Queue() ?Penthouse
manager is better. robuster& less prone to pocket-loss. The sentinel itself doesn’t the delivery of previous packets. If there’s a failure in the process that produces the packets, or if the process terminates prematurely, some packets can still be lost or not properly placed into the queue.Telangiectasis
Did my answer solve your problem? if 'yes', give an upvote which encourages me answering.Telangiectasis
@DrM, I didn't give the solution using Manager, because I get comments saying the solution deviates from the OP's original code.Telangiectasis
Manager works and it is 100% within the intent of the question. Whoever said that to you, was mistaken. I apologize that I did not see the comment and weight in on it.Penthouse
Would you like to edit your answer to supply a solution with Manager()? Your opening seems spot-on, and you mentioned an upvote. So if you like, lets go for a "correct answer". But, the proviso is that you dont spend more time on it than it is really worth.Penthouse
Ask a separate question @PenthouseTelangiectasis
"They" probably won't approve of a duplicate question. Here is another idea, how about editing your answer, and take my example, above in the OP, and insert the few lines to use Manager. The text is already very good.Penthouse

© 2022 - 2025 — McMap. All rights reserved.