Parallelising a 'for' loop in Cython: beyond prange
Asked Answered
E

2

3

I am struggling to correctly parallelise a function using Cython. Basically, the problem is to bin some data. The actual code is a bit long, but in the end it does something like this:

def bin_var(double[:] dist,
            double[:] values,
            double[:] bin_def,
            double[:] varg, long[:] count):

    dbin = (bin_def[1] - bin_def[0]) / bin_def[2]

    for n1 in range(values.size):
            if (dist[n1] < bin_def[0]) or (dist[n1] >= bin_def[1]):
                continue
            else:
                ni = int((dist - bin_def[0]) / dbin)
                count[ni] += 1
                varg[ni] += calc_something(values[ni])

    # Compute the mean
    for n1 in range(int(bin_def[2])):
        varg[ni] /= count[ni]

This code lends itself to some simple parallelisation (values and dist are very large): one needs to split the first for loop over separate processes, each working on its own version of the count and varg arrays. When that is done, one has to combine everything together by summing the different versions of count and varg before the second for loop (much shorter).

That said, it's two days I'm trying to understand how to implement efficiently this in Cython, and I am starting to suspect it is not possible with the current version of the language. Note that just using prange from cython.parallel for the first loop does not provide correct results, because (I assume) of the simultaneous access of ni, count and varg from different threads.

Is Cython parallel support really so limited? I was getting such a nice speedup single-threaded, I just hoped I could continue...

Edmond answered 20/12, 2017 at 14:21 Comment(0)
R
5

I can think of three options here:

  1. Use the GIL to ensure that the += is done single threaded:

    varg_ni = calc_something(values[ni]) # keep this out 
                   # of the single threaded block...
    with gil:
        count[ni] += 1
        varg[ni] += varg_ni
    

    This is easy and won't be too bad provided the work done in calc_something is reasonably large

  2. Make count and varg 2D arrays with each thread writing to a different column. Sum along the second dimension afterwards:

    # rough, untested outline....
    
    # might need to go in a `with parallel()` block
    num_threads = openmp.omp_get_num_threads()
    
    cdef double[:,:] count_tmp = np.zeros((count.shape[0],num_threads))
    cdef double[:,:] varg_tmp = np.zeros((varg.shape[0],num_threads))
    
    # then in the loop:
    count_tmp[ni,cython.parallel.threadid()] += 1
    varg_tmp[ni,cython.parallel.threadid()] += calc_something(values[ni])
    
    # after the loop:
    count[:] = np.sum(count_tmp,axis=1)
    varg[:] = np.sum(varg_tmp,axis=1)
    

    You could also do something similar using the ideas in the local_buf example in the documentation.

  3. (NOTE - GCC is currently giving me an "internal compiler error" for this - I feel it should work, but for the moment it doesn't seem to be working, so try option 3 at your own risk...) Use the openmp atomic directive to do the addition atomically. This requires a bit of work to circumvent Cython but shouldn't be too difficult. Create a short C header file with an add_inplace macro:

    #define add_inplace(x,y) _Pragma("omp atomic") x+=y
    

    The _Pragma is a C99 feature that should allow you to put pragmas in preprocessor statements. Then tell Cython about that header file (as if it's a function):

    cdef extern from "header.h":
        void add_inplace(...) nogil # just use varargs to make Cython think it accepts anything
    

    Then in the loop do:

    add_inplace(count[ni], 1)
    add_inplace(varg[ni], calc_something(values[ni]))
    

    Because this uses macro trickery it may be a bit fragile (i.e. definitely won't work with PyObject*s, but it should generate the correct C code when using standard C numeric types. (Inspect the code to be sure)

Revell answered 20/12, 2017 at 18:12 Comment(3)
Thanks, I had tried the with gil way, but it was so slow for my calc_something that I did not even mention it. The other options seem on the other hand very promising.Edmond
Solution 2 was the way to go for me, but it is worth the effort only for large problems (as those I am actually interested in). Note however that prange has some weird behaviour, as forcing a return after the loop. I did not try solution 3, as it's a bit beyond reach for me.Edmond
I couldn't quite get solution 3 to work so I think you're well-advised to avoid it :)Revell
A
1

For everybody who is still struggling, here is the solution that is much faster than with gil: . Nowadays (7 years later), it is possible to import atomic/openmp directly. It took me a while to figure that out since there is nothing about in the official documentation.

The atomic solution

from libcpp.atomic cimport atomic
from cython.parallel cimport prange

cpdef parallel_loop():
    cdef: 
        Py_ssize_t a = 100000 
        Py_ssize_t loopvar 
        atomic[int] *atticounter1= new atomic[int](0)
        int finalvar = 0
    try:
        for loopvar in prange(a,nogil=True):
            if loopvar % 1000 == 0:
                atticounter1.fetch_add(1)
        finalvar = atticounter1.load()
    finally:
        del atticounter1
    return finalvar

f=parallel_loop()
print(f)

The openmp solution

Most of the time, you want to do more than count positive results. In this case you can use openmp.

Here is a working example of a code that locates RGB colors in an image

The pyx file:

from cython.parallel cimport prange
cimport openmp
import numpy as np
cimport numpy as np
import cython
cimport cython

cpdef void searchforcolor(
    unsigned char[:] pic,
    unsigned char[::1] colors,
    Py_ssize_t[:,::1] results,
    Py_ssize_t[::1] countervar,
    Py_ssize_t width,
    Py_ssize_t totallengthpic,
    Py_ssize_t totallengthcolor,
    int cpus,
    ):
    cdef:
        Py_ssize_t i, j
        unsigned char r,g,b
        openmp.omp_lock_t locker
    if cpus < 1:
        cpus=openmp.omp_get_max_threads()
    if cpus > 1:
        openmp.omp_set_num_threads(cpus)
        openmp.omp_init_lock(&locker)
        for i in prange(0, totallengthcolor, 3, nogil=True): # should be possible, but not work not working: use_threads_if=cpus>1
            r = colors[i]
            g = colors[i + 1]
            b = colors[i + 2]
            for j in range(0, totallengthpic, 3):
                if (r == pic[j]) and (g == pic[j+1]) and (b == pic[j+2]):
                    openmp.omp_set_lock(&locker)
                    results[countervar[0]][1] = ((j / 3) // width) #x
                    results[countervar[0]][0] = ((j / 3) % width) #y
                    results[countervar[0]][2] = b
                    results[countervar[0]][3] = g
                    results[countervar[0]][4] = r
                    countervar[0]+=1
                    openmp.omp_unset_lock(&locker)
        openmp.omp_destroy_lock(&locker)
    else:
        for i in range(0, totallengthcolor, 3):
            r = colors[i]
            g = colors[i + 1]
            b = colors[i + 2]
            for j in range(0, totallengthpic, 3):
                if (r == pic[j]) and (g == pic[j+1]) and (b == pic[j+2]):
                    results[countervar[0]][1] = ((j / 3) // width) #x
                    results[countervar[0]][0] = ((j / 3) % width) #y
                    countervar[0]+=1

The python code:

def search_colors(pic, colors, cpus=-1):
    if not isinstance(colors, np.ndarray):
        colors = np.array(colors, dtype=np.uint8)
    rav_colors = np.ascontiguousarray(colors.ravel())
    totallengthcolor = rav_colors.shape[0] - 1
    totallenghtpic = np.prod(pic.shape) - 1
    width = pic.shape[1]
    results = np.zeros((totallenghtpic, 5), dtype=np.int64)
    countervar = np.zeros(1, dtype=np.int64) # this is going to be our atomic counter
    searchforcolor(
        pic.ravel(),
        rav_colors,
        results,
        countervar,
        width,
        totallenghtpic,
        totallengthcolor,
        cpus,
    )
    return results[: countervar[0]]


picpath = r"C:\Users\hansc\Downloads\pexels-alex-andrews-2295744.jpg"
pic = cv2.imread(picpath)
colors0 = np.array([[255, 255, 255]], dtype=np.uint8)
resus0 = search_colors(pic=pic, colors=colors0)
colors1 = np.array(
    [
        (66, 71, 69),
        (62, 67, 65),
        (144, 155, 153),
        (52, 57, 55),
        (127, 138, 136),
        (53, 58, 56),
        (51, 56, 54),
        (32, 27, 18),
        (24, 17, 8),
    ],
    dtype=np.uint8,
)
resus1 = search_colors(pic=pic, colors=colors1)
print(resus1)

Important: Compile both examples it with the OpenMP flag!

Aphrodisia answered 4/2 at 20:15 Comment(3)
Yes - that's a good suggestion. It would probably have been possible when I wrote my answer. You don't seem to delete your heap-allocated atomic though so it probably leaks memory. Also the C standard library also defines atomic operations, so those are an option if you don't want to use C++Revell
Thank you very much for that correction! I guess that it wasn't possible, back then, but I am sure that it will help other people, because all answers that I found on stackoverflow suggest the with gil stuff. I am still confused when I need to free something and when not in cython. I am going to edit the answer later, and show also how to call a functionAphrodisia
Thanks for your answer! In the meanwhile, building python bindings from C++ or other languages has become much more straightforward, and I tend to think that only the poor state of Python packaging for compiled extensions can still justify, in some situations, the use of cython.Edmond

© 2022 - 2024 — McMap. All rights reserved.