Is python zipfile thread-safe?
Asked Answered
P

4

11

In a django project, I need to generate some pdf files for objects in db. Since each file takes a few seconds to generate, I use celery to run tasks asynchronously.

Problem is, I need to add each file to a zip archive. I was planning to use the python zipfile module, but different tasks can be run in different threads, and I wonder what will happen if two tasks try to add a file to the archive at the same time.

Is the following code thread safe or not? I cannot find any valuable information in the python's official doc.

try:
    zippath = os.path.join(pdf_directory, 'archive.zip')
    zipfile = ZipFile(zippath, 'a')
    zipfile.write(pdf_fullname)
finally:
    zipfile.close()

Note: this is running under python 2.6

Pad answered 8/2, 2012 at 14:26 Comment(2)
Which celery concurrency method do you use? If your code is executed in celery tasks with default multiprocessing concurrency method then they are executed in separate processes and you don't need to worry about thread safety.Clougher
Then, the problem is not thread-safety, it's simultaneous file write access.Pad
H
6

No, it is not thread-safe in that sense. If you're appending to the same zip file, you'd need a lock there, or the file contents could get scrambled. If you're appending to different zip files, using separate ZipFile() objects, then you're fine.

Harneen answered 8/2, 2012 at 14:36 Comment(0)
S
2

While this question is old, it's still high up on google results, so I just want to chime in to say that I noticed on python 3.4 64bit on windows the lzma zipfile is thread-safe; all others fail.

with zipfile.ZipFile("test.zip", "w", zipfile.ZIP_LZMA) as zip:
    #do stuff in threads

Note that you can't bind the same file with multiple zipfile.ZipFile instances, instead you have to use the same one in all threads; here that is the variable named zip.

In my case I get about 80-90% CPU use on 8 cores and SSD, which is nice.

Satiety answered 29/7, 2015 at 20:58 Comment(1)
This didn't work. I am getting the same result as zipfile.ZIP_DEFLATED in Python 3.9.13 on Windows 10Byer
S
2

Python 3.5.5 makes writing to ZipFile and reading multiple ZipExtFiles threadsafe: https://docs.python.org/3.5/whatsnew/changelog.html#id93

As far as I can tell, the change has not been backported to Python 2.7.

Update: after studying the code and some testing, it becomes apparent that the locking is still not thoroughly implemented. It correctly works only for writestr and doesn't work for open and write.

Seneca answered 6/5, 2018 at 14:22 Comment(3)
look for bpo-14099 in the changelogUrsulaursulette
@AndreHolzner Yesh, that's what I meant. By my comment about incomplete support is still true, at least for 3.5.5.Seneca
This seems to work but i still need to call Lock() before writestr() to the file in Python 3.9.13Byer
W
1

I tried the solutions mentioned above and couldn't get them to work under python 3.9.13. ZipFile specifically blocks you from trying to write multiple files at once with the follwing error message:

ValueError: Can't write to ZIP archive while an open writing handle exists.

The issue is that the compression is the heavy part on my machine and there's no reason not do the compression on multiple threads. Only the write itself has to be separate. So I went ahead and wrote a class to implement a thread-safe writestr function and I use it together with ThreadPoolExecutor to get a 10x speed imporvement on my machine. The new writestr function allows the compression to happen in multiple threads while the writing happens under the thread lock and I ignore the _writing property is causing the exception in the first place.

It's kind of hacky but it works for me.

The code for the new class is as follows:

import zipfile, time

class EmptyCompressor(object):
    def flush(self):
        return bytes(0)

class ZipFileParallel(zipfile.ZipFile):
    def writestr(self, zinfo_or_arcname, data,
                 compress_type=None, compresslevel=None):
        """Write a file into the archive.  The contents is 'data', which
        may be either a 'str' or a 'bytes' instance; if it is a 'str',
        it is encoded as UTF-8 first.
        'zinfo_or_arcname' is either a ZipInfo instance or
        the name of the file in the archive."""
        if isinstance(data, str):
            data = data.encode("utf-8")
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                            date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo._compresslevel = self.compresslevel
            if zinfo.filename[-1] == '/':
                zinfo.external_attr = 0o40775 << 16  # drwxrwxr-x
                zinfo.external_attr |= 0x10  # MS-DOS directory flag
            else:
                zinfo.external_attr = 0o600 << 16  # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        if not self.fp:
            raise ValueError(
                "Attempt to write to ZIP archive that was already closed")

        if compress_type is not None:
            zinfo.compress_type = compress_type

        if compresslevel is not None:
            zinfo._compresslevel = compresslevel

        zinfo.file_size = len(data)  # Uncompressed size
        crc = zipfile.crc32(data, 0)
        # compress data
        compressor = zipfile._get_compressor(zinfo.compress_type, zinfo._compresslevel)
        data = compressor.compress(data)
        data += compressor.flush()

        with self._lock:
            with self.open(zinfo, mode='w') as dest:
                dest._compressor = None # remove the compressor so it doesn't compress again
                dest.write(data)
                dest._crc = crc
                dest._file_size = zinfo.file_size
                dest._compress_size = len(data)
                dest._compressor = EmptyCompressor() # use an empty compressor

And an example of using it is as follows:

file = ZipFileParallel('file.zip','w',zipfile.ZIP_BZIP2)
with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
    
        for img in enumerate(images):
            fname = f'images/{idx}.raw'
            futures.append(executor.submit(file.writestr, fname ,img.tobytes()))
    
    concurrent.futures.wait(futures)
Wien answered 1/6, 2023 at 10:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.