Filling a queue and managing multiprocessing in python
Asked Answered
K

3

28

I'm having this problem in python:

  • I have a queue of URLs that I need to check from time to time
  • if the queue is filled up, I need to process each item in the queue
  • Each item in the queue must be processed by a single process (multiprocessing)

So far I managed to achieve this "manually" like this:

while 1:
        self.updateQueue()

        while not self.mainUrlQueue.empty():
            domain = self.mainUrlQueue.get()

            # if we didn't launched any process yet, we need to do so
            if len(self.jobs) < maxprocess:
                self.startJob(domain)
                #time.sleep(1)
            else:
                # If we already have process started we need to clear the old process in our pool and start new ones
                jobdone = 0

                # We circle through each of the process, until we find one free ; only then leave the loop 
                while jobdone == 0:
                    for p in self.jobs :
                        #print "entering loop"
                        # if the process finished
                        if not p.is_alive() and jobdone == 0:
                            #print str(p.pid) + " job dead, starting new one"
                            self.jobs.remove(p)
                            self.startJob(domain)
                            jobdone = 1

However that leads to tons of problems and errors. I wondered if I was not better suited using a Pool of process. What would be the right way to do this?

However, a lot of times my queue is empty, and it can be filled by 300 items in a second, so I'm not too sure how to do things here.

Knowling answered 21/6, 2013 at 18:7 Comment(0)
A
65

You could use the blocking capabilities of queue to spawn multiple process at startup (using multiprocessing.Pool) and letting them sleep until some data are available on the queue to process. If your not familiar with that, you could try to "play" with that simple program:

import multiprocessing
import os
import time

the_queue = multiprocessing.Queue()


def worker_main(queue):
    print os.getpid(),"working"
    while True:
        item = queue.get(True)
        print os.getpid(), "got", item
        time.sleep(1) # simulate a "long" operation

the_pool = multiprocessing.Pool(3, worker_main,(the_queue,))
#                           don't forget the comma here  ^

for i in range(5):
    the_queue.put("hello")
    the_queue.put("world")


time.sleep(10)

Tested with Python 2.7.3 on Linux

This will spawn 3 processes (in addition of the parent process). Each child executes the worker_main function. It is a simple loop getting a new item from the queue on each iteration. Workers will block if nothing is ready to process.

At startup all 3 process will sleep until the queue is fed with some data. When a data is available one of the waiting workers get that item and starts to process it. After that, it tries to get an other item from the queue, waiting again if nothing is available...

Accipitrine answered 21/6, 2013 at 18:41 Comment(7)
this doesn't work on windows in python 2.7.4, you need to have the if name = 'main' part and you should pass the the_queue as a third parameter into the multiprocessing.Pool function,otherwise the worker_main doesn't receive the dataPannikin
I am also interested in how to make this piece of code work. When I run it as it is then it runs, but it prints nothing, probably because the worker_main does not receive the data. But when I pass the_queue as the third parameter I got TypeError: worker_main() argument after * must be a sequence, not QueueBuild
@Build You probably forgot the coma in (queue,). I've edited the code to add a comment pointing out that possible source of error.Accipitrine
Thank you, this was one problem and second was that I've run it directly from Sublime Text2, which for some reason has not printed out the output from processes. When I've run the code from the command line then it worked well.Build
How do you handle closing the workers when there's nothing left to do?Dalston
@SylvainLeroux why is that comma needed? This is the second time I find it but I am confused on its role.Chlorobenzene
@pedrosaurio, the comma and parenthesis turns the the_queue parameter into a tuple Illustrated: >>> type('blah') <class 'str'> >>> type(('blah')) <class 'str'> >>> type(('blah',)) <class 'tuple'>Bombardier
B
12

Added some code (submitting "None" to the queue) to nicely shut down the worker threads, and added code to close and join the_queue and the_pool:

import multiprocessing
import os
import time

NUM_PROCESSES = 20
NUM_QUEUE_ITEMS = 20  # so really 40, because hello and world are processed separately


def worker_main(queue):
    print(os.getpid(),"working")
    while True:
        item = queue.get(block=True) #block=True means make a blocking call to wait for items in queue
        if item is None:
            break

        print(os.getpid(), "got", item)
        time.sleep(1) # simulate a "long" operation


def main():
    the_queue = multiprocessing.Queue()
    the_pool = multiprocessing.Pool(NUM_PROCESSES, worker_main,(the_queue,))
            
    for i in range(NUM_QUEUE_ITEMS):
        the_queue.put("hello")
        the_queue.put("world")
    
    for i in range(NUM_PROCESSES):
        the_queue.put(None)

    # prevent adding anything more to the queue and wait for queue to empty
    the_queue.close()
    the_queue.join_thread()

    # prevent adding anything more to the process pool and wait for all processes to finish
    the_pool.close()
    the_pool.join()

if __name__ == '__main__':
    main()
Bombardier answered 15/9, 2020 at 22:56 Comment(2)
answering @Chlorobenzene question above: "why is that comma needed?" the comma and parenthesis turns the the_queue parameter into a tupleBombardier
Illustrated: >>> type('blah') <class 'str'> >>> type(('blah')) <class 'str'> >>> type(('blah',)) <class 'tuple'>Bombardier
H
1

I have reworked this to used ProcessPoolExecutor rather than Queues as I think this is more current and I have had trouble with my Queues in my own implmentation. This also gets rid of stuffing n Nones in the queue to terminate:

from concurrent.futures import ProcessPoolExecutor
import os
import time

NUM_PROCESSES = 2
NUM_QUEUE_ITEMS = 4  # so really 40, because hello and world are processed separately


def worker(item):
    print(f"{os.getpid()} got {item}\n", end="")
    time.sleep(0.5) # simulate a "long" operation
    return f"Results {os.getpid()} for {item}"

        
def main():
    with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as exe:
        
        values = []
        for i in range(NUM_QUEUE_ITEMS):
            values.append(f"hello {i}")
            values.append(f"world {i}")

        # Maps the method 'worker' with a iterable
        result = exe.map(worker,values)

    for r in result:
        print(f"{r}")
    

if __name__ == "__main__":
    main()
Honolulu answered 15/4, 2024 at 15:55 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.