overwriting file in ziparchive
Asked Answered
S

6

37

I have archive.zip with two files: hello.txt and world.txt

I want to overwrite hello.txt file with new one with that code:

import zipfile

z = zipfile.ZipFile('archive.zip','a')
z.write('hello.txt')
z.close()  

but it won't overwrite file, somehow it creates another instance of hello.txt — take a look at winzip screenshot:

alt text

Since there is no smth like zipfile.remove(), what's the best way to handle this problem?

Synthesis answered 11/1, 2011 at 3:0 Comment(1)
open issue: bugs.python.org/issue6818Meissen
M
49

There's no way to do that with python zipfile module. You have to create a new zip file and recompress everything again from the first file, plus the new modified file.

Below is some code to do just that. But note that it isn't efficient, since it decompresses and then recompresses all data.

import tempfile
import zipfile
import shutil
import os

def remove_from_zip(zipfname, *filenames):
    tempdir = tempfile.mkdtemp()
    try:
        tempname = os.path.join(tempdir, 'new.zip')
        with zipfile.ZipFile(zipfname, 'r') as zipread:
            with zipfile.ZipFile(tempname, 'w') as zipwrite:
                for item in zipread.infolist():
                    if item.filename not in filenames:
                        data = zipread.read(item.filename)
                        zipwrite.writestr(item, data)
        shutil.move(tempname, zipfname)
    finally:
        shutil.rmtree(tempdir)

Usage:

remove_from_zip('archive.zip', 'hello.txt')
with zipfile.ZipFile('archive.zip', 'a') as z:
    z.write('hello.txt')
Misalliance answered 11/1, 2011 at 3:16 Comment(5)
So, there is no efficient method for overwriting files whatsoever? Maybe another zip module? Anyway, thanks for thatSynthesis
@cru3l: That's exactly what I am saying in my answer.Misalliance
You can call an external zip tool. Also, you could create your own interface to a zip library.Aston
It worked well for me except for the fact that tempfile.mkdtemp() throws un exception, probably because it's trying to write somewhere in the server where the script does not have permissions to write. If I replace that with a visible zip in the same folder, it works fine.Hanson
So from this, I have gathered that overwriting a few files within a zipfile is rather cumbersome. I assume it would be just as fast if not faster to simply delete the zip file and make a brand new zipfile in case you are simply updating contents of the zipfile with newer versions of the files already contained.Salsala
T
30

Building on nosklo's answer. UpdateableZipFile A class that inherits from ZipFile, maintians the same interface but adds the ability to overwrite files (via writestr or write) and removing files.

import os
import shutil
import tempfile
from zipfile import ZipFile, ZIP_STORED, ZipInfo


class UpdateableZipFile(ZipFile):
    """
    Add delete (via remove_file) and update (via writestr and write methods)
    To enable update features use UpdateableZipFile with the 'with statement',
    Upon  __exit__ (if updates were applied) a new zip file will override the exiting one with the updates
    """

    class DeleteMarker(object):
        pass

    def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
        # Init base
        super(UpdateableZipFile, self).__init__(file, mode=mode,
                                                compression=compression,
                                                allowZip64=allowZip64)
        # track file to override in zip
        self._replace = {}
        # Whether the with statement was called
        self._allow_updates = False

    def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
        if isinstance(zinfo_or_arcname, ZipInfo):
            name = zinfo_or_arcname.filename
        else:
            name = zinfo_or_arcname
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and name in self.namelist():
            temp_file = self._replace[name] = self._replace.get(name,
                                                                tempfile.TemporaryFile())
            temp_file.write(bytes)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).writestr(zinfo_or_arcname,
                                                    bytes, compress_type=compress_type)

    def write(self, filename, arcname=None, compress_type=None):
        arcname = arcname or filename
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and arcname in self.namelist():
            temp_file = self._replace[arcname] = self._replace.get(arcname,
                                                                   tempfile.TemporaryFile())
            with open(filename, "rb") as source:
                shutil.copyfileobj(source, temp_file)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).write(filename, 
                                                 arcname=arcname, compress_type=compress_type)

    def __enter__(self):
        # Allow updates
        self._allow_updates = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # call base to close zip file, organically
        try:
            super(UpdateableZipFile, self).__exit__(exc_type, exc_val, exc_tb)
            if len(self._replace) > 0:
                self._rebuild_zip()
        finally:
            # In case rebuild zip failed,
            # be sure to still release all the temp files
            self._close_all_temp_files()
            self._allow_updates = False

    def _close_all_temp_files(self):
        for temp_file in self._replace.itervalues():
            if hasattr(temp_file, 'close'):
                temp_file.close()

    def remove_file(self, path):
        self._replace[path] = self.DeleteMarker()

    def _rebuild_zip(self):
        tempdir = tempfile.mkdtemp()
        try:
            temp_zip_path = os.path.join(tempdir, 'new.zip')
            with ZipFile(self.filename, 'r') as zip_read:
                # Create new zip with assigned properties
                with ZipFile(temp_zip_path, 'w', compression=self.compression,
                             allowZip64=self._allowZip64) as zip_write:
                    for item in zip_read.infolist():
                        # Check if the file should be replaced / or deleted
                        replacement = self._replace.get(item.filename, None)
                        # If marked for deletion, do not copy file to new zipfile
                        if isinstance(replacement, self.DeleteMarker):
                            del self._replace[item.filename]
                            continue
                        # If marked for replacement, copy temp_file, instead of old file
                        elif replacement is not None:
                            del self._replace[item.filename]
                            # Write replacement to archive,
                            # and then close it (deleting the temp file)
                            replacement.seek(0)
                            data = replacement.read()
                            replacement.close()
                        else:
                            data = zip_read.read(item.filename)
                        zip_write.writestr(item, data)
            # Override the archive with the updated one
            shutil.move(temp_zip_path, self.filename)
        finally:
            shutil.rmtree(tempdir)

usage example:

with UpdateableZipFile("C:\Temp\Test2.docx", "a") as o:
    # Overwrite a file with a string
    o.writestr("word/document.xml", "Some data")
    # exclude an exiting file from the zip
    o.remove_file("word/fontTable.xml")
    # Write a new file (with no conflict) to the zp
    o.writestr("new_file", "more data")
    # Overwrite a file with a file
    o.write(r"C:\Temp\example.png", "word/settings.xml")
Tolbutamide answered 16/2, 2016 at 14:46 Comment(17)
When I'm using the UpdateableZipFile class like in your example, two exceptions are raised: File "/home/jezor/Code/updateable_zipfile.py", line 38, in writestr temp_file.write(bytes) TypeError: a bytes-like object is required, not 'str' and File "/home/jezor/Code/updateable_zipfile.py", line 77, in _close_all_temp_files for temp_file in self._replace.itervalues(): AttributeError: 'dict' object has no attribute 'itervalues'. Could you please update your answer? I'm using python 3.4 and that class is the best solution to this question.Wordage
@Jezor. You are trying to run Py2 code in Py3, hence the error. Change itervalues to just values.Fatality
I was unsure about the code considering the low amount of upvotes, but it works very well, seems well coded, and fits the requirements of this and a few repeat questions.Jarvis
Unfortunately I'm getting many BadZipfile: File is not a zip file errors even when creating new ZIP file. Py2.7Larocca
@Larocca You are probably opening existing files that are either not zipfiles or are corrupt. Try using a Try-Catch clause (#39154970) to identify the files and remove them if necessary.Tolbutamide
@OrWeis: This error happens on files created by code above... and not always.Larocca
@Larocca Without additional information it would be hard to help you. But I think it is probably safe to assume the errors are a result of your specific setup. I'd guess processes dying mid writes, or disk flush issues causing the file corruptions.Tolbutamide
as @MadPhysicist said, for Python3.7 replace the itervalues with values, but also may need to adjust usage to o.writestr(r"word/document.xml", "Some data".encode('utf-8') and optionally add the compresslevel=None parameter to writestr and write. I also replaced the parameter 'bytes' with 'data' in writestrGlennaglennie
how can we modify csv file present inside zip folder using this code ? I tried passing dataframe but it requires bytes as format not dataframe or str. @GlennaglennieGompers
with UpdateableZipFile("C:\\Users\\mamahajan\\Downloads\\TuningLog_2018_05_17_1_1.zip", "a") as o: zf = zipfile.ZipFile("C:\\Users\\mamahajan\\Downloads\\TuningLog_2018_05_17_1_1.zip") text_files = zf.infolist() list_ = [] for text_file in text_files: print(text_file.filename) df = pd.read_csv(zf.open(text_file.filename),sep='|',low_memory=False) columns = ['actimizeTransactionKey','actimizeTransactionIdentity'] df.drop(columns, inplace=True, axis=1) o.writestr(text_file.filename,"this is where df needs to be passed")Gompers
I think you'd extract the csv, rebuild it using pandas, then re-insert. Probably worth starting your own question, with reference to this one.Glennaglennie
@AMG, Problem is while passing parameter to writestr(), You need to pass df in bytes format. This is where it loses it shape. Don't want to start duplicate thread.Gompers
can you create a temp file with pandas, use write instead of writestr to add it to the archive, then delete temp?Glennaglennie
@Glennaglennie you're the real MVP. Thank you! Saved me hours of frustrating debuggingBersagliere
Hey @OrWeis thank you for writing this answer, I moved the implementation to a python package github.com/michaelvanstraten/updateable-zip-file would you like me to transfer the ownership over to you?Wretched
Hey @Wretched , that's cool. No need. You can mention me/ just give me credit if you feel like it, I'm 'orweis' on Github.EnjoyTolbutamide
Already linked your Stack Overflow account, but I will add a ref to your GitHub profile too.Wretched
B
1

my solution: read it all -> replace -> write back

def read_zip(fname):
    bio = BytesIO(open(fname, 'rb').read())
    zip = zipfile.ZipFile(bio, 'r')
    fdict = {n:zip.read(n) for n in zip.namelist()}
    zip.close()
    return fdict

def write_zip(fname, fdict):
    bio = BytesIO()
    zip = zipfile.ZipFile(bio, 'w', zipfile.ZIP_DEFLATED)
    for name, data in fdict.items():
        zip.writestr(name, data)
    zip.close()
    open(fname, 'wb').write(bio.getvalue())

Bermudez answered 12/7, 2023 at 2:3 Comment(2)
What is the purpose of bytesio? Seems unnecessary, just open ZipFile (fname) directly.Rajput
This doesn't preserve metadata of contents like date and time. Use ZipFile.infolist() instead of namelist to keep metadata.Rajput
S
0

My solution is similar to the other answers but uses SQLite to manage the intermediate files and provides __getitem__, __setitem__ and __delitem__ for an easy interface. By default the db is in-memory but you can provide a temp file path if you have a zip larger than available memory. And of course SQLite is built into Python and faster than the file system

import sqlite3
import subprocess
import zipfile
from pathlib import Path

from sql import CREATE_TABLE, DELETE_FILE, INSERT_FILE, SELECT_CONTENT


class EditableZip:
    """Intended to make editing files inside zip archive easy, this class is capable of loading files
    from a zip file into a sqlite database, facilitates editing/removing/adding files, and saving
    to a zip.
    The database can be in-memory (default) or in a temporary on disk file if
    temp_db_path is provided.

    If an on-disk file is used, EditableZip.close can be called to remove the file or EditableZip
    can be used as a context manager.

    If auto_save is set to True and an initial zip_path was provided then the file will
    be overwritten when EditableZip closes. If you wish to save to a different file,
    or no zip_path is used in instantiation, auto_save can take a file path.

    Files can be added by item assignment
    with EditableZip(auto_save="example.zip") as ez:
        ez["thing.txt"] = "stuff"
        # empty dir
        ez["empty/"] = None

    Assignment accepts Non-text files as bytes.

    EditableZip is subscriptable. If the subscript is a path in the db, the data will be returned.

    EditableZip.files can be used to iterate over files in the db.
    """

    def __init__(
        self,
        zip_path: None | str | Path = None,
        temp_db_path: None | Path = None,
        auto_save: bool | str | Path = False,
    ):
        self.temp_db_path, self.auto_save, self.file_path = (
            temp_db_path,
            auto_save,
            zip_path,
        )
        self.db = sqlite3.connect(
            str(temp_db_path if temp_db_path is not None else ":memory:")
        )
        self.db.execute(CREATE_TABLE)

        if self.file_path:
            self.load(self.file_path)

    @property
    def files(self):
        "Returns a generator of all file paths in the database."
        try:
            return (
                i[0] for i in self.db.execute("SELECT file_path FROM files").fetchall()
            )
        except TypeError:
            return None

    def load(self, zip_path: str | Path) -> None:
        "Add all files from zip at zip_path to db."
        with zipfile.ZipFile(zip_path, mode="r") as archive:
            for item in archive.infolist():
                self[item.filename] = (
                    None if item.filename[-1] == "/" else archive.read(item)
                )

    def save(self, zip_path: None | str | Path) -> Path:
        "Save all files from db to zip at zip_path."
        zip_path = self.file_path if zip_path is None else zip_path
        with zipfile.ZipFile(zip_path, "w") as archive:
            for file in self.files:
                if file_data := self.fetch(file):
                    archive.writestr(file, file_data)
                else:
                    archive.writestr(zipfile.ZipInfo(file), "")
        return zip_path

    def close(self):
        "Auto save if applicable and close + remove db."
        if self.auto_save:
            self.save(
                zip_path=self.auto_save
                if isinstance(self.auto_save, (str, Path))
                else None
            )
        self.db.close()
        if isinstance(self.temp_db_path, Path):
            self.temp_db_path.unlink(missing_ok=True)

    def fetch(self, file_path: str) -> bytes:
        "Get content of db file for file_path."
        try:
            return self.db.execute(SELECT_CONTENT, {"file_path": file_path}).fetchone()[
                0
            ]
        except TypeError:
            return None

    def __getitem__(self, key):
        result = self.fetch(key)
        try:
            return result.decode("utf-8")
        except AttributeError:
            return result

    def __setitem__(self, file_path, content: str | bytes):
        if isinstance(content, str):
            content = content.encode("utf-8")
        self.db.execute(
            INSERT_FILE,
            {"file_path": file_path, "file_content": content},
        )

    def __delitem__(self, file_path):
        self.db.execute(DELETE_FILE, {"file_path": file_path})

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()


if __name__ == "__main__":
    # A use case: editing epub files.
    # File source:
    # https://archiveofourown.org/downloads/13795605/Victoria%20Potter%20and%20the.epub?updated_at=1650231615
    file_path = Path("Victoria Potter and the.epub")
    new_file = (file_path.parent / (file_path.stem + "- lowercase")).with_suffix(
        file_path.suffix
    )

    # Create a copy of the epub with all letters lowercase
    with EditableZip(zip_path=file_path, auto_save=new_file) as ez:
        for file in ez.files:
            if Path(file).suffix in [".html", ".xhtml"]:
                ez[file] = ez[file].lower()
Serpens answered 26/6, 2022 at 20:32 Comment(1)
Please, can you add the relevant part of your code to the answer?Millesimal
O
0

Based on this answer here's a quick and dirty way to monkey patch stock zipfile to support file deletion (while we waiting for it being accepted for python:main):

from zipfile import ZipFile, ZipInfo
from operator import attrgetter
import functools

def enable_zip_remove(func):
    def _zipfile_remove_member(self, member):
        # get a sorted filelist by header offset, in case the dir order
        # doesn't match the actual entry order
        fp = self.fp
        entry_offset = 0
        filelist = sorted(self.filelist, key=attrgetter('header_offset'))
        for i in range(len(filelist)):
            info = filelist[i]
            # find the target member
            if info.header_offset < member.header_offset:
                continue

            # get the total size of the entry
            entry_size = None
            if i == len(filelist) - 1:
                entry_size = self.start_dir - info.header_offset
            else:
                entry_size = filelist[i + 1].header_offset - info.header_offset

            # found the member, set the entry offset
            if member == info:
                entry_offset = entry_size
                continue

            # Move entry
            # read the actual entry data
            fp.seek(info.header_offset)
            entry_data = fp.read(entry_size)

            # update the header
            info.header_offset -= entry_offset

            # write the entry to the new position
            fp.seek(info.header_offset)
            fp.write(entry_data)
            fp.flush()

        # update state
        self.start_dir -= entry_offset
        self.filelist.remove(member)
        del self.NameToInfo[member.filename]
        self._didModify = True

        # seek to the start of the central dir
        fp.seek(self.start_dir)

    def zipfile_remove(self, member):
        """Remove a file from the archive. The archive must be open with mode 'a'"""

        if self.mode != 'a':
            raise RuntimeError("remove() requires mode 'a'")
        if not self.fp:
            raise ValueError(
                "Attempt to write to ZIP archive that was already closed")
        if self._writing:
            raise ValueError(
                "Can't write to ZIP archive while an open writing handle exists."
            )

        # Make sure we have an info object
        if isinstance(member, ZipInfo):
            # 'member' is already an info object
            zinfo = member
        else:
            # get the info object
            zinfo = self.getinfo(member)

        return self._zipfile_remove_member(zinfo)

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        if not hasattr(ZipFile, "remove"):
            setattr(ZipFile, "_zipfile_remove_member", _zipfile_remove_member)
            setattr(ZipFile, "remove", zipfile_remove)
        return func(*args, **kwargs)
    return wrapper

Usage:

@enable_zip_remove
def replace_zip_file():
    with ZipFile("archive.zip", "a") as z:
        z.remove("hello.txt")
        z.write("hello.txt")

P.S. NSFW

Oeildeboeuf answered 5/9, 2022 at 15:13 Comment(0)
D
-1

Reference: Delete file from zipfile with the ZipFile Module

In short,

You can take the code from https://github.com/python/cpython/blob/659eb048cc9cac73c46349eb29845bc5cd630f09/Lib/zipfile.py and create a separate file from it. After that just reference it from your project instead of built-in python library: import myproject.zipfile as zipfile.

Usage:

with zipfile.ZipFile(f"archive.zip", "a") as z:
    z.remove(f"firstfile.txt")
Dancy answered 4/11, 2021 at 13:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.