Executing a function in the background while using limited number of cores/threads and queue the extra executions?
Asked Answered
K

1

-1

I want to use a limited number of threads (at most 2) to run a function in a class for removing some files on disk in the background. The rest of my code within the same class is independent of this background function and might get executed tens of times more than the background function. However, I still need to enforce the core/thread limit. So it is possible that the background jobs might exceed 2 and I need to queue them. Note that my background function does not take any arguments.

I am pretty new to multi-threading and multi-processing but I think I have done my homework and looked at many posts here on Stack Overflow and tried a couple of approaches. However, none of those approaches seems to work for me. Here's the structure of my code:

class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        self.backgroundFunc() #I want to run this in the background

Here's how I run the code

import myClass

myClassInstance = myClass()
For element in someList:
    myClassInstance.mainFunc(elem=element)

Note that I cannot start the background job before the stuff in mainFunc has started running.

And here is my first try with threading in my class file:

from threading import Thread
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        thr = Thread(target=self.backgroundFunc)
        thr.start()

However, the problem with this approach is that the program crashes at random times; sometimes right at the beginning of prpgram execution and sometimes later the erro messages are also different every time. I guess it's possibly because threads do not block a piece of memory and things might be being written/read from those memory cells. Or, unlikely, maybe this is because I am running my code on a server and there are some limitations enforced from the server on the allocated resources. In addition, I cannot set a limit on the number of threads and cannot do queuing, in case mainFunc code gets executed more than twice while I already have two background jobs running.

Here's another try with multiprocessing.Process:

from multiprocessing import Process
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        p = Process(target=self.backgroundFunc)
        p.start()

The problem with this approach is that Process will use as many threads/cores that my machine has in its disposal and since the rest of my code automatically is run in parallel, everything becomes super slow very quickly.

I eventually arrived at multiprocessing.Pool but I am still pretty confused on how I can use it effectively. Anyways, here's my try with Pool:

from multiprocessing import Pool
class myClass(object):
    def __init__(self):
        #some stuff
        self.pool = Pool(processes=2)
    def backgroundFunc(self):
        # delete some files on disk
        print('some stuff')
    def mainFunc(self, elem):
        # Do some other things
        self.pool.apply_async(self.backgroundFunc)

However, apply_async seems not to work. None of the print statements that I have in the backgroundFunc print anything on the screen. I added self.pool.close() after apply_async but I get some errors soon after the second processes start. I tried using things like self.pool.apply and some others but it seems that they require a function that takes limited arguments. But my backgroundFunc does not take any arguments. Finally, I do not know how I can do the queuing that I explained earlier using Pool.

Also, I want to have control over how many times and when I want to run backgroundFunc. Also, mainFunc should not wait for all threads to finish running before it exits. If that happens, I won't benefit from multi threading because the background function might take too long to finish. Maybe I should have been more clear in the question; sorry about that.

So I would really appreciate if someone can help me with this. I am pretty confused. Thanks in advance!

Kali answered 3/3, 2018 at 6:10 Comment(11)
Why do you want to use a limited number of threads? Python threads are not the same as system threads and won’t consume more cpu cores.Kozak
@JoelCornett Because I need all other cores to spend time on doing some other processes that I care much more about.Kali
To reiterate, python threads are not system threads, they don’t consume multiple cores. What sort of background work are you doing? Is it IO bound or CPU bound?Kozak
@JoelCornett Sorry I'm pretty new to multi-threading. The background work is IO bound. So far, the easiest approach for me to use multiprocessing.Process but for some reason things become drastically slow very quickly for the other operations in the main function. I don't know any better solution.Kali
Please elaborate on "I want to have control over how many times and when I want to run backgroundFunc".Swallow
Perhaps what you are looking for is a Semaphore to control how many workers are active at one time.Swallow
By the way, you might be interested in using a tool like watchdog to react to filesystem events. Then, instead of calling backgroundFunc multiple times, you could have one process reacting to filesystem events continually until your program terminates. The filesystem-monitoring process won't consume much CPU when there is no filesystem activity.Swallow
@Swallow By having control over how many times I want to run backgorundFunc I meant do not want the behavior of Process that you do not have any control over the things (e.g. self variables) inside backgroundFunc when running. Also, you cannot make sure only two background jobs are running.. I'll look into the things you mentioned here. Will get back to you sometime soonKali
@Swallow I still have to figure things out here. Could you please take a look at another question that I need some help for. I would appreciate if you can take a look at it. I want to parallelize two calls to the same function, each with different arguments, in a for loop but I do not know how to do that in practice. Could you please take a look at my question here and see if you can offer a solution?Kali
It appears you have already received a number of answers so I'll just leave 2 general tips here: (1) If you plan on using multiprocessing in Python, reading Doug Hellman's tutorial on multiprocessing is time very well spent. It will show you with short, runnable examples of all the main design patterns available to you.Swallow
(2) If the answers you are receiving are not satisfactory, it may help to clarify your questions using a runnable MCVE. For example, instead of mentioning "Each of these functions will execute some parallel operations" show us in the code. We probably don't need to see the long serial computation, (so substituting time.sleep(1) may suffice) but we do need to see the structure of your parallel operations and overall program. A small runnable example may enable us to help you better.Swallow
G
1

the program crashes randomly. I guess

It would be easier to concentrate on one problem at a time, without guessing, so, what's the crash?

Here's a test with threading that might inspire you, based on the example for queue.

#!python3
#coding=utf-8
""" https://mcmap.net/q/17318/-executing-a-function-in-the-background-while-using-limited-number-of-cores-threads-and-queue-the-extra-executions/ """

import sys, time, threading, queue

print(sys.version)

class myClass:
    """ """

    def __init__(self):
        """ """
        self.q = queue.Queue()
        self.threads = []
        self.num_worker_threads = 2

    def backgroundFunc(self):
        """ """
        print("thread started")
        while True:
            item = self.q.get()
            if item is None:
                #self.q.task_done()
                break
            print("working on ", item)
            time.sleep(0.5)
            self.q.task_done()
        print("thread stopping")

    def mainFunc(self):
        """ """

        print("starting thread(s)")
        for i in range(self.num_worker_threads):
            t = threading.Thread(target=self.backgroundFunc)
            t.start()
            self.threads.append(t)

        print("giving thread(s) some work")
        for item in range(5):
            self.q.put(item)

        print("giving thread(s) more work")
        for item in range(5,10):
            self.q.put(item)

        # block until all tasks are done
        print("waiting for thread(s) to finish")
        self.q.join()

        # stop workers
        print("stopping thread(s)")
        for i in range(self.num_worker_threads):
            self.q.put(None)
        for t in self.threads:
            self.q.join()

        print("finished")



if __name__ == "__main__":
    print("instance")
    myClassInstance = myClass()

    print("run")
    myClassInstance.mainFunc()

    print("end.")

It prints

3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 17:54:52) [MSC v.1900 32 bit (Intel)]
instance
run
starting thread(s)
thread started
thread started
giving thread(s) some work
giving thread(s) more work
waiting for thread(s) to finish
working on  0
working on  1
working on  2
working on  3
working on  4
working on  5
working on  6
working on  7
working on  8
working on  9
stopping thread(s)
thread stopping
thread stopping
finished
end.
Gapes answered 3/3, 2018 at 9:4 Comment(5)
Move the thread management to __init__ (also mabye docs.python.org/3.6/reference/datamodel.html#object.__del__) if mainFunc gets called more than once. For several instances of myClass, see stackoverflow.com/questions/68645/…Gapes
I have not run your code but it seems to me that your code assigns backgroundFunc to two threads immediately but I do not want that to be the case. I want to have control over how many times and when I want to run backgroundFunc. Also, mainFunc should not wait for all threads to finish running before it exits. If that happens, I won't benefit from multi threading because the background function might take too long to finish. Maybe I should have been more clear in the question; sorry about that.Kali
In addition, from the impression I got from all other posts, your code seems a bit too complicated. Is there a way to simplify it?Kali
I've retried the original example code: it does work as-is. I had issues with q.join() blocking, and added task_done() to solve that - commented out now in the code above, but not tested again.Gapes
I'm gonna try your proposed method sometime soon and will get back to you on it. Thank you!Kali

© 2022 - 2024 — McMap. All rights reserved.