Multi-processing in Python: Numpy + Vector Summation -> Huge Slowdown
Asked Answered
F

1

7

Please don't be discouraged by the long post. I try to present as much data as I can, and I really need help with the problem :S. I'll update daily if there are new tips or ideas

Problem:

I try to run a Python code on a two core machine in parallel with the help of parallel processes (to avoid GIL), but have the problem that the code significantly slows down. For example, a run on a one core machine takes 600sec per workload, but a run on a two core machine takes 1600sec (800sec per workload).

What I already tried:

  • I measured the memory, and there appears to be no memory problem. [just using 20% at the high point].

  • I used “htop” to check whether I am really running the program on different cores, or if my core affinity is messed up. But no luck either, my program is running on all of my cores.

  • The problem is a CPU-bounded problem, and so I checked and confirmed that my code is running at 100% CPU on all cores, most of the time.

  • I checked the process ID’s and I am, indeed, spawning two different processes.

  • I changed my function which I’m submitting into the executor [ e.submit(function,[…]) ] to a calculate-pie function and observed a huge speedup. So the problem is probable in my process_function(…) which I am submitting into the executor and not in the code before.

  • Currently I'm using "futures" from "concurrent" to parallize the task. But I also tried the "pool" class from "multiprocessing". However, the result remained the same.

Code:

  • Spawn processes:

    result = [None]*psutil.cpu_count()
    
    e = futures.ProcessPoolExecutor( max_workers=psutil.cpu_count() )
    
    for i in range(psutil.cpu_count()):
        result[i] = e.submit(process_function, ...)
    
  • process_function:

    from math import floor
    from math import ceil
    import numpy
    import MySQLdb
    import time
    
    db = MySQLdb.connect(...)
    cursor  = db.cursor()
    query = "SELECT ...."
    cursor.execute(query)
    
    [...]  #save db results into the variable db_matrix (30 columns, 5.000 rows)
    [...]  #save db results into the variable bp_vector (3 columns, 500 rows)
    [...]  #save db results into the variable option_vector( 3 columns, 4000 rows)
    
    cursor.close()
    db.close()
    
    counter = 0 
    
    for i in range(4000):
        for j in range(500):
             helper[:] = (1-bp_vector[j,0]-bp_vector[j,1]-bp_vector[j,2])*db_matrix[:,0] 
                         + db_matrix[:,option_vector[i,0]] * bp_vector[j,0]  
                         + db_matrix[:,option_vector[i,1]] * bp_vector[j,1]   
                         + db_matrix[:,option_vector[i,2]] * bp_vector[j,2]
    
             result[counter,0] = (helper < -7.55).sum()
    
             counter = counter + 1
    
    return result
    

My guess:

  • My guess is, that for some reason the weigthed vector multiplication which creates the vector "helper" is causing problems. [I believe the Time Measurement section confirms this guess]

  • Could it be the case, that numpy creates these problems? Is numpy compatible with multi-processing? If not, what can I do? [Already answered in the comments]

  • Could it be the case because of the cache memory? I read on the forum about it, but to be honest, didn't really understood it. But if the problem is rooted there, I would make myself familar with this topic.

Time Measurement: (edit)

  • One core: time to get the data from the db: 8 sec.

  • Two core: time to get the data from the db: 12 sec.

  • One core: time to do the double-loop in the process_function: ~ 640 sec.

  • Two core: time to do the double-loop in the process_function: ~ 1600 sec

Update: (edit)

When I measure the time with two processes for every 100 i's in the loop, I see that it is roughly 220% of the time I observe when I measure the same thing while running on only one process. But what is even more mysterious is that if I quit on process during the run, the other process speeds up! The other process then actually speeds up to the same level it had during the solo run. So, there must be some dependencies between the processes I just don't see at the moment :S

Update-2: (edit)

So, I did a few more test runs and measurements. In the test runs, I used as compute-instances either a one-core linux machine (n1-standard-1, 1 vCPU, 3.75 GB memory) or a two-core linux machine (n1-standard-2, 2 vCPUs, 7.5 GB memory) from Google cloud compute engine. However, I did also tests on my local computer and observed roughly the same results. (-> therefore, the virtualized environment should be fine). Here are the results:

P.S: The time here differs from the measurements above, because I limited the loop a little bit and did the testing on Google Cloud instead of on my home pc.

1-core machine, started 1 process:

time: 225sec , CPU utilization: ~100%

1-core machine, started 2 process:

time: 557sec , CPU utilization: ~100%

1-core machine, started 1 process, limited max. CPU-utilization to 50%:

time: 488sec , CPU utilization: ~50%

.

2-core machine, started 2 process:

time: 665sec , CPU-1 utilization: ~100% , CPU-2 utilization: ~100%

the process did not jumped between the cores, each used 1 core

(at least htop displayed these results with the “Process” column)

2-core machine, started 1 process:

time: 222sec , CPU-1 utilization: ~100% (0%) , CPU-2 utilization: ~0% (100%)

however, the process jumped sometimes between the cores

2-core machine, started 1 process, limited max. CPU-utilization to 50%:

time: 493sec , CPU-1 utilization: ~50% (0%) , CPU-2 utilization: ~0% (100%)

however, the process jumped extremely often between the cores

I used "htop" and the python module "time" to obtain these results.

Update - 3: (edit)

I used cProfile for profiling my code:

python -m cProfile -s cumtime fun_name.py

The files are too long to post here, but I believe if they contain valuable information at all, this information is probably the one on top of the outcome-text. Therefore, I will post the first lines of the results here:

1-core machine, started 1 process:

623158 function calls (622735 primitive calls) in 229.286 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall   filename:lineno(function)
        1    0.371    0.371  229.287  229.287   20_with_multiprocessing.py:1(<module>)
        3    0.000    0.000  225.082   75.027   threading.py:309(wait)
        1    0.000    0.000  225.082  225.082   _base.py:378(result)
       25  225.082    9.003  225.082    9.003   {method 'acquire' of 'thread.lock' objects}
        1    0.598    0.598    3.081    3.081   get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.877    0.959   cursors.py:164(execute)
        3    0.000    0.000    2.877    0.959   cursors.py:353(_query)
        3    0.000    0.000    1.958    0.653   cursors.py:315(_do_query)
        3    0.000    0.000    1.943    0.648   cursors.py:142(_do_get_result)
        3    0.000    0.000    1.943    0.648   cursors.py:351(_get_result)
        3    1.943    0.648    1.943    0.648   {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    0.919    0.306   cursors.py:358(_post_get_result)
        3    0.000    0.000    0.917    0.306   cursors.py:324(_fetch_row)
        3    0.917    0.306    0.917    0.306   {built-in method fetch_row}
   591314    0.161    0.000    0.161    0.000   {range}

1-core machine, started 2 process:

626052 function calls (625616 primitive calls) in 578.086 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall   filename:lineno(function)
        1    0.310    0.310  578.087  578.087   20_with_multiprocessing.py:1(<module>)
       30  574.310   19.144  574.310   19.144   {method 'acquire' of 'thread.lock' objects}
        2    0.000    0.000  574.310  287.155   _base.py:378(result)
        3    0.000    0.000  574.310  191.437   threading.py:309(wait)
        1    0.544    0.544    2.854    2.854   get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.563    0.854   cursors.py:164(execute)
        3    0.000    0.000    2.563    0.854   cursors.py:353(_query)
        3    0.000    0.000    1.715    0.572   cursors.py:315(_do_query)
        3    0.000    0.000    1.701    0.567   cursors.py:142(_do_get_result)
        3    0.000    0.000    1.701    0.567   cursors.py:351(_get_result)
        3    1.701    0.567    1.701    0.567   {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    0.848    0.283   cursors.py:358(_post_get_result)
        3    0.000    0.000    0.847    0.282   cursors.py:324(_fetch_row)
        3    0.847    0.282    0.847    0.282   {built-in method fetch_row}
   591343    0.152    0.000    0.152    0.000   {range}

.

2-core machine, started 1 process:

623164 function calls (622741 primitive calls) in 235.954 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall   filename:lineno(function)
        1    0.246    0.246  235.955  235.955   20_with_multiprocessing.py:1(<module>)
        3    0.000    0.000  232.003   77.334   threading.py:309(wait)
       25  232.003    9.280  232.003    9.280   {method 'acquire' of 'thread.lock' objects}
        1    0.000    0.000  232.003  232.003   _base.py:378(result)
        1    0.593    0.593    3.104    3.104   get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.774    0.925   cursors.py:164(execute)
        3    0.000    0.000    2.774    0.925   cursors.py:353(_query)
        3    0.000    0.000    1.981    0.660   cursors.py:315(_do_query)
        3    0.000    0.000    1.970    0.657   cursors.py:142(_do_get_result)
        3    0.000    0.000    1.969    0.656   cursors.py:351(_get_result)
        3    1.969    0.656    1.969    0.656   {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    0.794    0.265 cursors.py:358(_post_get_result)  
        3    0.000    0.000    0.792    0.264   cursors.py:324(_fetch_row)
        3    0.792    0.264    0.792    0.264   {built-in method fetch_row}
   591314    0.144    0.000    0.144    0.000   {range}

2-core machine, started 2 process:

626072 function calls (625636 primitive calls) in 682.460 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall   filename:lineno(function)
        1    0.334    0.334  682.461  682.461   20_with_multiprocessing.py:1(<module>)
        4    0.000    0.000  678.231  169.558   threading.py:309(wait)
       33  678.230   20.552  678.230   20.552   {method 'acquire' of 'thread.lock' objects}
        2    0.000    0.000  678.230  339.115   _base.py:378(result)
        1    0.527    0.527    2.974    2.974   get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.723    0.908   cursors.py:164(execute)
        3    0.000    0.000    2.723    0.908   cursors.py:353(_query)
        3    0.000    0.000    1.749    0.583   cursors.py:315(_do_query)
        3    0.000    0.000    1.736    0.579   cursors.py:142(_do_get_result)
        3    0.000    0.000    1.736    0.579   cursors.py:351(_get_result)
        3    1.736    0.579    1.736    0.579   {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    0.975    0.325   cursors.py:358(_post_get_result)
        3    0.000    0.000    0.973    0.324   cursors.py:324(_fetch_row)
        3    0.973    0.324    0.973    0.324   {built-in method fetch_row}
        5    0.093    0.019    0.304    0.061   __init__.py:1(<module>)
        1    0.017    0.017    0.275    0.275   __init__.py:106(<module>)
        1    0.005    0.005    0.198    0.198   add_newdocs.py:10(<module>)
   591343    0.148    0.000    0.148    0.000   {range}

I, personally, don't really know what to do with these results. Would be glad to receive tips, hints or any other help - thanks :)

Reply to Answer-1: (edit)

Roland Smith looked at the data and suggest, that multiprocessing might hurt the performance more than it does help. Therefore, I did one more measurement without multiprocessing (like the code he suggested):

Am I right in the conclusion, that this is not the case? Because the measured times appear to be similar to the times measured before with multiprocessing?

1-core machine:

Database access took 2.53 seconds

Matrix manipulation took 236.71 seconds

1842384 function calls (1841974 primitive calls) in 241.114 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall   filename:lineno(function)
        1  219.036  219.036  241.115  241.115   20_with_multiprocessing.py:1(<module>)
   406000    0.873    0.000   18.097    0.000   {method 'sum' of 'numpy.ndarray' objects}
   406000    0.502    0.000   17.224    0.000   _methods.py:31(_sum)
   406001   16.722    0.000   16.722    0.000   {method 'reduce' of 'numpy.ufunc' objects}
        1    0.587    0.587    3.222    3.222   get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.964    0.988   cursors.py:164(execute)
        3    0.000    0.000    2.964    0.988   cursors.py:353(_query)
        3    0.000    0.000    1.958    0.653   cursors.py:315(_do_query)
        3    0.000    0.000    1.944    0.648   cursors.py:142(_do_get_result)
        3    0.000    0.000    1.944    0.648   cursors.py:351(_get_result)
        3    1.944    0.648    1.944    0.648   {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    1.006    0.335   cursors.py:358(_post_get_result)
        3    0.000    0.000    1.005    0.335   cursors.py:324(_fetch_row)
        3    1.005    0.335    1.005    0.335   {built-in method fetch_row}
   591285    0.158    0.000    0.158    0.000   {range}

2-core machine:

Database access took 2.32 seconds

Matrix manipulation took 242.45 seconds

1842390 function calls (1841980 primitive calls) in 246.535 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1  224.705  224.705  246.536  246.536 20_with_multiprocessing.py:1(<module>)
   406000    0.911    0.000   17.971    0.000 {method 'sum' of 'numpy.ndarray' objects}
   406000    0.526    0.000   17.060    0.000 _methods.py:31(_sum)
   406001   16.534    0.000   16.534    0.000 {method 'reduce' of 'numpy.ufunc' objects}
        1    0.617    0.617    3.113    3.113 get_BP_Verteilung_Vektoren.py:1(get_BP_Verteilung_Vektoren)
        3    0.000    0.000    2.789    0.930 cursors.py:164(execute)
        3    0.000    0.000    2.789    0.930 cursors.py:353(_query)
        3    0.000    0.000    1.938    0.646 cursors.py:315(_do_query)
        3    0.000    0.000    1.920    0.640 cursors.py:142(_do_get_result)
        3    0.000    0.000    1.920    0.640 cursors.py:351(_get_result)
        3    1.920    0.640    1.920    0.640 {method 'store_result' of '_mysql.connection' objects}
        3    0.001    0.000    0.851    0.284 cursors.py:358(_post_get_result)
        3    0.000    0.000    0.849    0.283 cursors.py:324(_fetch_row)
        3    0.849    0.283    0.849    0.283 {built-in method fetch_row}
   591285    0.160    0.000    0.160    0.000 {range}
Foudroyant answered 3/4, 2016 at 9:56 Comment(7)
There is no problem with numpy and multiprocessing.Hydracid
If you don't know what causes a problem, measure. How long does the database access take? How long do the numpy calculations take? Is there a difference in these times between sequential and parallel processing?Hydracid
Is the database on the same server? If so, then making the queries to the database might block the other process causing context-switchesBulky
thanks for all your quick comments! I'll try to address them all: @ Smith: Thanks for pointing out that there is no problem between numpy and multiprocessing. One reason less to worry about. I did do measurement and will include it in the original post. @ YnkDK: Yes, the database is on the same server, and the data getting time is indeed longer in the parallel run than in the sequential run, however, the time difference is not that big. [see "measurement edit in original post]Foudroyant
Can't you vectorize that for loop? You are not using numpy's potential at all.Tetrad
It might also be that numpy is already trying to use multiple cores for some calculations. If so, spawning an extra process will obviously just be overhead. When you run your code on a single core, does the resource monitor clearly show that it is actually just using one core, or does it use multiple?Beery
added a new update. @ Ovren: Good idea! Unfortunately, I think the new update section expels it again :(. @ imaluengo: Another great point! Mh, I thought about it for 10min without having a good idea how to vectorize the loop, but I will continue my search after the next coffee :)Foudroyant
H
2

Your programs seem to spend most of their time acquiring locks. That seems to indicate that in your case multiprocessing hurts more than it helps.

Remove all the multiprocessing stuff and start measuring how long things take without it. E.g. like this.

from math import floor
from math import ceil
import numpy
import MySQLdb
import time

start = time.clock()
db = MySQLdb.connect(...)
cursor  = db.cursor()
query = "SELECT ...."
cursor.execute(query)
stop = time.clock()
print "Database access took {:.2f} seconds".format(stop - start)

start = time.clock()
[...]  #save db results into the variable db_matrix (30 columns, 5.000 rows)
[...]  #save db results into the variable bp_vector (3 columns, 500 rows)
[...]  #save db results into the variable option_vector( 3 columns, 4000 rows)
stop = time.clock()
print "Creating matrices took {:.2f} seconds".format(stop - start)
cursor.close()
db.close()

counter = 0 

start = time.clock()
for i in range(4000):
    for j in range(500):
         helper[:] = (1-bp_vector[j,0]-bp_vector[j,1]-bp_vector[j,2])*db_matrix[:,0] 
                     + db_matrix[:,option_vector[i,0]] * bp_vector[j,0]  
                     + db_matrix[:,option_vector[i,1]] * bp_vector[j,1]   
                     + db_matrix[:,option_vector[i,2]] * bp_vector[j,2]

         result[counter,0] = (helper < -7.55).sum()

         counter = counter + 1
stop = time.clock()
print "Matrix manipulation took {:.2f} seconds".format(stop - start)

Edit-1

Based on your measurements I stand by my conclusion (in a slightly rephrased form) that on a multi-core machine, using multiprocessing as you are doing now hurts your performance very much. On a dual-core machine the program with multiprocessing takes much longer than the one without it!

That there is no difference between using multiprocessing or not when using a single core machine isn't very relevant, I think. A single core machine won't see that much benefit from multiprocessing anyway.

The new measurements show that most of the time is spent in the matrix manipulation. This is logical since you are using an explicit nested for-loop, which is not very fast.

There are basically four possible solutions;

The first is to re-write your nested loop into numpy operations. Numpy operations have implicit loops (written in C) instead of explicit loops in Python and thus are faster. (A rare case where explicit is worse than implicit. ;-) ) The downside is that this will probable use a significant amount of memory.

The second option is to split up the calculations of helper, which consists out of 4 parts. Execute each part in a separate process and add the results together at the end. This does incur some overhead; each process has to retrieve all data from the database, and has to transfer the partial result back to the main process (maybe also via the database?).

The third option might be to use pypy instead of Cpython. It can be significantly faster.

A fourth option would be to re-write the critical matrix manipulation in Cython or C.

Hydracid answered 4/4, 2016 at 23:4 Comment(1)
Thanks for sticking with me. I edited the "Reply to Answer-1" section in my start post. Unfortunately, the measured times are nearly the same.Foudroyant

© 2022 - 2024 — McMap. All rights reserved.