Python multiprocessing safely writing to a file
Asked Answered
C

5

122

I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?

Campbell answered 19/11, 2012 at 1:13 Comment(8)
Check out an example from the documentation of using multiprocessing.Lock to synchronize multiple processes.Rann
You could have a only single process writing results, with a Queue as input that could be fed by the other worker processes. I believe it would be safe to have all the worker processes read-only.Shuttle
I should have mentioned that, to make things more complicated, I'm running multiple different big main problems at the same time on a cluster, with each one writing results to sub-subproblems on the same networked file system. Thus I can get collisions from processes running on separate machines entirely (so I don't think solutions using things like multiprocessing.Lock will work).Campbell
Is the problem you're having with file write collisions, or is it just that you don't want to duplicate work in situations where one worker starts solving a sub-subproblem while another is already working on it? The latter is a bit more difficult to solve (more synchronization is required).Debonair
Well originally I was having file write collisions, but I find that checking for the file's existence immediately before writing (instead of relying on the check I do before I start computing the sub-subproblem) took care of that. Now it's more the latter; I'd like to avoid duplicate work if possible (and can imagine others in the same situation).Campbell
If your networked files system supports file locking, you can use the os specific file create method to exclusively create the file and hold an exclusive lock on it until the results are ready, then close it. Any process that failed to "win" the create race would try to open it and re-try (with a delay) until the were able to open it, then they can read the results.Essay
Ah, thanks JimP! That sounds like exactly what I need. I'll look into it.Campbell
You're essentially programming a database server here. Have you considered using an existing one?Rivy
V
182

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()
Vociferant answered 23/11, 2012 at 13:38 Comment(13)
Hey Mike, thanks for the answer. I think this would work for the question as I phrased it, but I'm not so sure if it will solve the full problem as outlined in the comments to the question, specifically how I have several main programs running across several machines on a networked filesystem, all of which might have processes that will try to write to the same file. (FWIW, I got around my personal problem in a hacky way a while ago but am commenting in case others have similar issues.)Campbell
I really would like to upvote this many times. This has been helpful so many times for me. Once more today.First
Thanks Mike - I'd been struggling with how to use MP Queues. Your example makes it very clear and straightforward.Sonneteer
I had to add a pool.join() below pool.close(). Otherwise my workers would finish before the listener and the process would just stop.Nigrify
Many thanks for this! Note that I had to include herrherr's suggestion, lest it may cause a hard-to-detect flaw in at least my scenario.Sulamith
What about when the consumer is greatly outnumbered and causes memory issues? How would you implement multiple consumers all writing to the same file?Stanislaus
why mp.cpu_count() + 2 when setting number of processes?Turnpike
After adopting this code, my program exits before the listener finishes its work, how could I fix that?Nighthawk
This works great, except that it puts my outputs in a random order to disk, instead of doing it in the order I push data through. I'm using map rather than async for the worker threads. Unsure how to solve that issue.Coign
Tested on Linux, need to change f = open(fn, 'wb') to f = open(fn, 'w') to store the result, other wise the output file will be blank while the code can run like a charm.Treachery
Here is an expanded version of this example.Tongue
this freezes on pool.join(), copied this exactly.. any idea why?Suu
@Turnpike mp.cpu_count()+2 is just common practice or "rule of thumb" to ensure that the pool will be saturated.Keef
F
4

It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')
Fairleigh answered 21/12, 2016 at 17:2 Comment(3)
Can I get some feedback on why this was down-voted? I see that the accepted answer is way better. I just want to learn.Fairleigh
what is params here? I cannot see it being initialised anywhere. Also, mgr.list([]), would it be a empty list? you are appending tuple row and param to params, param contain the object to be processed, what row contains?Orate
It might be downvoted since in your code all the process outputs are stored in memory, this doesn't solve the issue. OP asks about writing each process output to a file while processing. The main problem to solve here is to avoid collision, e.g. multiple processes trying to access the file at the same time.Fanestil
Y
2

In response to the comments saying that this is being run on a cluster, a simple option, which doesn't rely on inter-process communication, is to lock the memoization file using fcntl from the Python standard library.

This works on MacOS and I expect it will work on most unix systems, though it will need to be tested on your particular networked storage implementation:

safe.py

import fcntl
import time

def myprint(*args):
    print(time.ctime(), *args)


def main():
    with open("safe.txt", "r+") as file:

        myprint("locking")

        # this will block (unless LOCK_EX | LOCK_NB is used)
        fcntl.lockf(file, fcntl.LOCK_EX)

        lines = file.readlines()

        # make race conditions more likely
        time.sleep(1)
        
        # "1" or one more than the the previous entry
        newval = int(lines[-1])+1 if lines else 1

        print(newval)

        file.write(str(newval) + "\n")
        file.flush()

        myprint("unlocking")

        fcntl.lockf(file, fcntl.F_UNLCK)


if __name__ == '__main__':
    main()

You can check that it works locally, by running this in a terminal:

touch safe.txt  # this needs to already exist

for x in 1 2 3 4 5
do
  python safe.py &
done

cat safe.txt  # should have 1-5 inside

If you combine this with multiprocessing, each process probably needs its own file descriptor (so run open() separately in each process).

Yesteryear answered 18/5, 2023 at 15:10 Comment(0)
N
0

I thought I post my solution to a somewhat simpler problem as well, since whenever I'm looking for my problem this page comes up.

I somewhat based this loosely on @MikeHunter's solution above. The reason why I needed something slightly different is that the arrays that I want to write at the end of each process are fairly large meaning putting them into the queue and getting them from the queue and writing them using a different process means a lot of pickling and unpickling of extremely large arrays. This does not handle the problem of checking many sub-problems and sub-sub problems as requested by OP, but it handle the "title" of the question!

So what do I do?

I parse a lock that all processes have access to and write to the file inside a Lock.acquire() and Lock.release() wrapper. That way none of the processes can write when any of the other ones is. All of this to handle writing to HDF5 files in serial without the MPI compilation requirement.


from multiprocessing import Process, Queue, Lock
import h5py
import numpy as np
from time import sleep, time


def func(i, l, filename, subfilename):

    # Reading from the subfile
    with h5py.File(subfilename, 'r') as ds:
        array = ds['array'][:]

    sleeptime = np.random.rand(1)*4 + 1
    sleep(sleeptime[0])

    # Print array loaded to compare to output in the summary file
    print(i, f'{sleeptime[0]:.3f}', array)

    # Lock out any other process from writing to the summary file
    l.acquire()

    with h5py.File(filename, 'r+') as ds:
        ds['array'][i, :] = array

    # Release the lock
    l.release()


if __name__ == '__main__':

    N = 10
    Nsample = 5

    subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]

    for i in range(N):
        with h5py.File(subfilenames[i], 'w') as ds:
            disp = ds.create_dataset(
                'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')

    filename = 'test.h5'

    with h5py.File(filename, 'w') as ds:
        disp = ds.create_dataset('array', (N, Nsample), dtype='f')

    # Create a lock that is communicated to the workers
    l = Lock()

    # Start the timer
    t0 = time()

    # Distribute the work to the workers
    processes = []

    print(" T  sleeptime     array", flush=True)
    print("-----------------------", flush=True)

    for i in range(N):
        p = Process(target=func, args=(
            i, l, filename, subfilenames[i]))
        p.start()
        processes.append(p)

    # Wait for the workers to finish
    for p in processes:
        p.join()

    # Print time taken
    print(f'Total time taken: {time()-t0:.2f} s')


If you save the script as hello.py you can run and sort the output like so:

python hello.py | sort

Which should generate something like this:

 T  sleeptime     array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s

Check against written HDF5 file:

h5dump test.h5

which should results in something like this

HDF5 "test.h5" {
GROUP "/" {
   DATASET "array" {
      DATATYPE  H5T_IEEE_F32LE
      DATASPACE  SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
      DATA {
      (0,0): 4, 1, 1, 0, 2,
      (1,0): 2, 1, 1, 1, 3,
      (2,0): 1, 2, 2, 4, 3,
      (3,0): 1, 4, 4, 3, 0,
      (4,0): 4, 4, 4, 4, 1,
      (5,0): 1, 3, 1, 0, 4,
      (6,0): 4, 1, 0, 3, 0,
      (7,0): 3, 4, 4, 0, 4,
      (8,0): 4, 3, 3, 1, 4,
      (9,0): 3, 3, 3, 4, 0
      }
   }
}
}

Note on the update

I first was using a queue for my use-case, but I realized that a simple multiprocessing.Lock would do the trick. No need for a complicated Queue.put Queue.get wrap.


Note there are better ways of doing this using mpi4py, but I needed the user not to worry about MPI.

Naca answered 1/8, 2023 at 20:55 Comment(0)
H
0

After much googling, here is what I landed on. In this example I chunk thru a csv file that has 20M rows, running an apply on each chunk and then saving the output back down to a file:

def process_chunk(df, i, lock):
    chunk_data = []
    try: 
        df = df.fillna('')
        df.apply(lambda row: "0" if row['items'] == "[]" else process(row, chunk_data), axis=1)
        apply_data = pd.DataFrame(chunk_data)
        df = pd.merge(df, election_data, left_index=True, right_index=True)
        
        with lock:
            print('writing to file')
            df.to_csv('output_test.csv', header=(i==0), mode='a')
    except Exception as e: 
        print(f'error here: {e}')

    return 'DONE'
    

if __name__ == "__main__":
    chunks = pd.read_csv('really_big_text_file.txt', header=None, low_memory=False, dtype=str, on_bad_lines='skip', encoding='latin-1', chunksize=100000)

    pool = ProcessPoolExecutor(max_workers=6)
    m = multiprocessing.Manager()
    lock = m.Lock()
    futures = [pool.submit(process_chunk, chunk, i, lock) for i, chunk in enumerate(chunks)]
    for future in as_completed(futures):
        print(future.result())
Haroldson answered 2/11, 2023 at 3:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.