Am I doing something wrong or is it not possible to extract a zip file in parallel?
Asked Answered
C

5

12

I created this to test out a parallel extract:

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
    {

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName, entry.FullName);

            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }

    }

and the following unit test for testing:

    [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    }

With MaxDegreeOfParallelism = 1, things work but with 2 it do not.

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source:    c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome:   Failed
Test Duration:  0:00:02.4138753

Result Message: 
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: 
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:  
at System.IO.Compression.Inflater.Decode()
   at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
   at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
   at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
   at System.IO.Stream.CopyTo(Stream destination)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
   at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

Update 2

Here is a my own go at doing it parallel, it dont work either :) Remember to handle exceptions in the continueWith.

public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
        {

            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))
            {

                var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                {
                    semaphore.WaitOne();

                    var task = Task.Run(() =>
                    {
                        var path = Path.Combine(folder.FullName, entry.FullName);

                        Directory.CreateDirectory(Path.GetDirectoryName(path));
                        entry.ExtractToFile(path);
                    });
                    task.ContinueWith(handle =>
                    {
                        try
                        {
                            //do any cleanup/post processing
                        }
                        finally
                        {
                            // Release the semaphore so the next thing can be processed
                            semaphore.Release();
                        }
                    });
                }
                while(MaxDegreeOfParallelism-->0)
                    semaphore.WaitOne(); //Wait here until the last task completes.


            }

        }

And here is the async version:

public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
        {
            return Task.Factory.StartNew(() =>
            {
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))
                {

                    var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                    {
                        semaphore.WaitOne();

                        var task = Task.Run(() =>
                        {
                            var path = Path.Combine(folder.FullName, entry.FullName);

                            Directory.CreateDirectory(Path.GetDirectoryName(path));
                            entry.ExtractToFile(path);
                        });
                        task.ContinueWith(handle =>
                        {
                            try
                            {
                                //do any cleanup/post processing
                            }
                            finally
                            {
                                // Release the semaphore so the next thing can be processed
                                semaphore.Release();
                            }
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
                    }

                }
            });
        }

Update 3

The following exceptions is thrown in the handle.Exception.

{"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."}

Have to find out if ZipFile is thread safe or not.

Creation answered 24/4, 2013 at 18:29 Comment(11)
how much faster is your parallel version than the non parallel one?Strapping
Well, it is for a upload to azure storage, where there will be added alittle more then just extracting the file. But on a simpel extract to folder, it vent from 10sec to 5sec with 2 concurrent tasks. No further benefit for n concurrent tasks. But my zip file is 1500 files from 0-50kb each and a few large ones. The speed up will be more on larger files as it not being limited by context switching.Counterpunch
I suspect a race condition. Would be interesting to see if your parallel (non-async) version works if the files inside the zip are very large ... like a megabyte or more. Can you test that? Create a zip with a few 1-megabyte files in it?Stroud
Looking alittle deeper, it did not work. Sorry about that. Didnt handle the exceptions.Counterpunch
So its not possible to extract in parallel?Counterpunch
Interesting. Documentation for the ZipFile class (and for the extension methods) indicates that those things should be thread safe. I assumed that means you can extract multiple files concurrently.Stroud
I will play some more with it, but so far its not working.Counterpunch
The ZipArchiveEntry has a reference back to the ZipFile (so it can extract or delete), so when you call ExtractToFile, you end up using the original ZipFile concurrently on multiple threads, which is not allowed.Shoddy
Ye, that must be the issue.Counterpunch
What @RaymondChen said: But as a solution you can make copies of the input stream (meaning some byte[]), create separate ZipFiles, and extract them in parallel then (obviously you need to partition the file entries manually) :)Expend
i added a solution doing that. just as proof of concept.Counterpunch
C
4

Disclamer: Its only a proof of concept.

Replacing ZipFile.OpenRead with ParallelZipFile.OpenRead in the samples in the code all 4 unittests passes.

   public class ParallelZipFile
    {
        public static ParallelZipArchive OpenRead(string path)
        {

            return new ParallelZipArchive(ZipFile.OpenRead(path),path);
        }
    }
    public class ParallelZipArchive : IDisposable
    {
        internal ZipArchive _archive;
        internal string _path;
        internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();

        public ParallelZipArchive(ZipArchive zip,string path)
        {
            _path = path;
            _archive = zip;
            FreeReaders.Enqueue(zip);
        }

        public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
        {
            get
            {
                var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                int i = 0;
                foreach (var entry in _archive.Entries)
                    list.Add(new ParallelZipArchiveEntry(i++, entry, this));

                return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
            }
        }


        public void Dispose()
        {
            foreach (var archive in FreeReaders)
                archive.Dispose();
        }
    }
    public class ParallelZipArchiveEntry
    {
        private ParallelZipArchive _parent;
        private int _entry;
        public string Name { get; set; }
        public string FullName { get; set; }

        public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
        {
            _entry = entryNr;
            _parent = parent;
            Name = entry.Name;
            FullName = entry.FullName;
        }

        public void ExtractToFile(string path)
        {
            ZipArchive value;
            Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));

            if (!_parent.FreeReaders.TryDequeue(out value))
                value = ZipFile.OpenRead(_parent._path);

            value.Entries.Skip(_entry).First().ExtractToFile(path);



            _parent.FreeReaders.Enqueue(value);
        }
    }

unit tests

[TestClass]
    public class ZipFileTests
    {
        [ClassInitialize()]
        public static void PreInitialize(TestContext context)
        {
            if (Resources.LocalExtractFolderTruth.Exists)
                Resources.LocalExtractFolderTruth.Delete(true);

            ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
        }

        [TestInitialize()]
        public void InitializeTests()
        {
            if (Resources.LocalExtractFolder.Exists)
                Resources.LocalExtractFolder.Delete(true);

        }

        [TestMethod]
        public void ExtractTest()
        {

            Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));

        }
        [TestMethod]
        public async Task ExtractAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public void ExtractSemaphoreTest()
        {

            Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public async Task ExtractSemaphoreAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }

    }
Creation answered 26/4, 2013 at 0:18 Comment(2)
Is there a similar approach with .NET framework's built-in ZipArchive type that is thread safe?Anthodium
This solution will not benefit from the cached list of entries that the ZipArchive holds. If the zip file has many small files, this solution is slower than sequential reading.Hayner
C
2

Worked on the same task recently and here is my result:

I am using DotNetZip.reduced (Ionic.Zip.Reduced.dll v1.9.1.8) on my Xeon 1socket, 8cores, 16cpu; 32GB RAM; SSD drive.

Zip file | Packed size | Unpacked Files | Unpacked size

  • SmallFile1 | 778 MB | 4,926 Files | 1.4 GB
  • LargeFile2 | 6 GB | 29,557 Files | 10.0 GB

I have 5 methods: first does everything in one thread, and 4 other use PLINQ and TPL Parallel class.

Winners are V4 and V5 that work x6 faster than V1. Below are detail results and code.

  • V1 uses ExtractAll
  • V2 Extracts entries in parallel (not thread safe)
  • V3 Extracts entries in parallel by opening new file handle for each entry
  • V4 Extracts entries in parallel using only N+1 file handles
  • V5 Final version

Performance results table Zip file | V1, sec | V2, sec | V3, sec | V4, sec | V5, sec

  • SmallFile1 | 32 | Exception | 8 | 8 | 5
  • LargeFile2 | 200 | Exception | 2000 | 35 | 30

Small file processing Small file processing

Large file processing by V1 Large file processing by V1

Large file processing by V4 Large file processing by V4

Large file processing by V5 Large file processing by V5

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Ionic.Zip;
using Ionic.Zlib;

namespace A1
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Console.ReadKey();

            CancellationToken cancellationToken = CancellationToken.None;

            string path = @"e:\1\";
            string zf1 = Path.Combine(path, "1.zip");
            string zf2 = Path.Combine(path, "2.zip");
            Stopwatch sw = new Stopwatch();

            List<string> zipFiles = new List<string>
            {
                zf1,
                zf2,
            };
            List<Action<string, string, CancellationToken>> methods = new List<Action<string, string, CancellationToken>>
            {
                ExtractAllFilesFromZipFileV1,
                //ExtractAllFilesFromZipFileV2,
                //ExtractAllFilesFromZipFileV3,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
            };

            zipFiles.Reverse();
            methods.Reverse();

            zipFiles.ForEach(f => methods.ForEach(m =>
            {
                string fileName = Path.GetFileName(f);
                string targetDirectory = path + Guid.NewGuid().ToString("N");
                sw.Restart();
                // Unzip
                try
                {
                    m(f, targetDirectory, cancellationToken);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
                sw.Stop();
                Console.WriteLine("{0} processed by {1} in {2} seconds", fileName, m.GetMethodInfo().Name, sw.Elapsed.TotalSeconds.ToString("F3"));
                Thread.Sleep(5 * 1000);
                Directory.Delete(targetDirectory, true);
                Thread.Sleep(5 * 1000);
            }));
        }

        private static void ExtractAllFilesFromZipFileV1(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.ExtractAll(targetDirectory);
            }
        }

        private static void ExtractAllFilesFromZipFileV2(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.Entries
                    .AsParallel()
                    .ForAll(v =>
                    {
                        v.Extract(targetDirectory);
                    });
            }
        }


        private static void ExtractAllFilesFromZipFileV3(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                int count = zipFile.Entries.Count;

                Enumerable.Range(0, count)
                    .AsParallel()
                    .ForAll(v =>
                    {
                        cancellationToken.ThrowIfCancellationRequested();

                        using (ZipFile zf = new ZipFile(zipFileName))
                        {
                            // Get the right entry to extract
                            zf.Entries
                                .Skip(v)
                                .First()
                                .Extract(targetDirectory);
                        }
                    });
            }
        }

        private static void ExtractAllFilesFromZipFileV4(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                int count = zipFile.Entries.Count();

                // Cache instances of ZipFile used by threads
                // Make sure that we have only open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, ZipFile> dictionary = new ConcurrentDictionary<int, ZipFile>();

                try
                {
                    Parallel.For(0, count,
                        () =>
                        {
                            // GetOrAdd. Use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                return new ZipFile(zipFileName);
                            });
                        },
                        (int i, ParallelLoopState loopState, ZipFile zf) =>
                        {
                            cancellationToken.ThrowIfCancellationRequested();

                            // Get the right entry to extract
                            ZipEntry entry = zf.Entries
                                .Skip(i)
                                .First();

                            // Extract to a file
                            entry.Extract(targetDirectory);

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (ZipFile zf in dictionary.Values)
                    {
                        zf.Dispose();
                    }
                }
            } // using
        }

        private static void ExtractAllFilesFromZipFileV5(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                ICollection<ZipEntry> zipEntries = zipFile.Entries;
                int count = zipEntries.Where(v => !v.IsDirectory).Count();

                // Caclulate max DOP
                int maxDop = (int)1.5 * Math.Min(count, Environment.ProcessorCount);


                List<Tuple<int, long>> entries = zipEntries
                    .Select((v, i) => Tuple.Create(i, v))
                    .Where(v => !v.Item2.IsDirectory)
                    .Select(v => Tuple.Create(v.Item1, v.Item2.UncompressedSize))
                    .ToList();

                // Load balance between threads
                List<List<Tuple<int, long>>> groupedItems = entries.ToBuckets(maxDop, v => v.Item2 + 10 * 1024 * 1024).ToList();

                // Ensure seq reading from zip file.
                for (int i = 0; i < groupedItems.Count; ++i)
                {
                    groupedItems[i] = groupedItems[i].OrderBy(v => v.Item1).ToList();
                }

                // Cache instances of ZipFile used by threads
                // Make sure that we have open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>> dictionary = new ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>>(maxDop, maxDop);
                ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop, };

                try
                {
                    Parallel.For(0, maxDop, parallelOptions,
                        () =>
                        {
                            // GetOrAdd. Re-use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                ZipFile zf = new ZipFile(zipFileName) { ExtractExistingFile = ExtractExistingFileAction.Throw, FlattenFoldersOnExtract = false, ZipErrorAction = ZipErrorAction.Throw, };
                                zf.ExtractProgress += (sender, e) =>
                                {
                                    cancellationToken.ThrowIfCancellationRequested();
                                };
                                return Tuple.Create(zf, zf.Entries.ToList());
                            });
                        },
                        (int j, ParallelLoopState loopState, Tuple<ZipFile, List<ZipEntry>> zf) =>
                        {

                            List<Tuple<int, long>> list = groupedItems[j];
                            for (int n = 0; n < list.Count; ++n)
                            {
                                cancellationToken.ThrowIfCancellationRequested();

                                int i = list[n].Item1;

                                // Get the right entry to extract
                                ZipEntry entry = zf.Item2[i];
                                Debug.Assert(entry.UncompressedSize == list[n].Item2);

                                // Extract to a file
                                entry.Extract(targetDirectory);
                            }

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (Tuple<ZipFile, List<ZipEntry>> zf in dictionary.Values)
                    {
                        try
                        {
                            zf.Item2.Clear();
                            zf.Item1.Dispose();
                        }
                        catch (ZlibException)
                        {
                            // There is a well known defect in Ionic.ZLib
                            // This exception may happen when you read only part of file (not entire file)
                            // and close its handle.
                            // Ionic.Zlib.ZlibException: Bad CRC32 in GZIP trailer. (actual(D202EF8D)!=expected(A39D1010))
                        }
                    }
                }
            }
        }


        private static IEnumerable<List<T>> ToBuckets<T>(this IEnumerable<T> list, int bucketCount, Func<T, long> getWeight)
        {
            List<T> sortedList = list.OrderByDescending(v => getWeight(v)).ToList();

            List<long> runningTotals = Enumerable.Repeat(0L, bucketCount).ToList();
            List<List<T>> buckets = Enumerable.Range(0, bucketCount)
                .Select(v => new List<T>(sortedList.Count / bucketCount))
                .ToList();

            foreach (T item in sortedList)
            {
                // MinBy runningTotal
                int i = runningTotals.IndexOfMin();
                // Add to bucket
                runningTotals[i] += getWeight(item);
                buckets[i].Add(item);
            }

            return buckets;
        }

        public static int IndexOfMin<T>(this IEnumerable<T> source, IComparer<T> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<T>.Default;
            }

            using (IEnumerator<T> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                T minValue = enumerator.Current;

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    if (comparer.Compare(enumerator.Current, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = enumerator.Current;
                    }
                }

                return minIndex;
            }
        }

        public static int IndexOfMinBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> selector, IComparer<TKey> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<TKey>.Default;
            }

            using (IEnumerator<TSource> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                TKey minValue = selector(enumerator.Current);

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    TKey value = selector(enumerator.Current);
                    if (comparer.Compare(value, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = value;
                    }
                }

                return minIndex;
            }
        }
    }
}
Chaos answered 3/5, 2016 at 1:55 Comment(0)
N
1

The problem is that you open the file only once with only one handle. One handle has one read position and the read position gets messed up if you do parallel reads on the same handle. Open the file multiple times with multiple handles and you should be fine.

Nostalgia answered 25/4, 2013 at 19:56 Comment(0)
G
1

I agree with Floste's answer but i suggest a different approach. If your files are like ~50k inside the zip file:

1) Create a queue of byte arrays for each file. 2) Each member of the queue is an extracted zip file entry. 3) Try to extract files from zip file into byte array and when extraction is complete add it to the queue. 4) Extraction thread should be a single thread, no parallelism. 5) While extraction thread is doing its job, create another threads/tasks for emptying the queue. These tasks will take the extracted data from queue and write them to disk. Since they are different files there will be no race condition or unavailable resource.

There might be needed a mutex or lock for the queue. This may not be the best way but i'm sure you'll get some speed.

Gwalior answered 26/4, 2013 at 13:13 Comment(0)
M
1

I needed parallel decompression on a large archive (~30 GB, ~45k entries of variable size) and came up with this solution leveraging DotNetZip:

    public static void ParallelExtract(
        string archivePath,
        string destinationPath,
        string password,
        CancellationToken token,
        ProgressReportDelegate progress // Could also be Progress<T> or whatever you prefer.
        )
    {
        if (String.IsNullOrEmpty(archivePath))
            throw new ArgumentNullException("archivePath");

        if (String.IsNullOrEmpty(destinationPath))
            throw new ArgumentNullException("destinationPath");

        Stopwatch elapsed = new Stopwatch();
        Stopwatch progressReportingTimer = new Stopwatch();

        elapsed.Start();
        progressReportingTimer.Start();

        object obj = new object();

        int count = -1;
        long bytesExtracted = 0;
        long bytesTotal = -1;

        List<Task> decompressors = new List<Task>();

        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            decompressors.Add(Task.Run(() =>
            {
                using (ZipFile zipFile = new ZipFile(archivePath))
                {
                    if (!String.IsNullOrEmpty(password))
                        zipFile.Password = password;

                    zipFile.ExtractProgress += delegate (object zipSender, ExtractProgressEventArgs zipArgs)
                    {
                        // Report progress after each EntryBytesWritten event, as long as it's been at least 250ms since the last report, so as to not overwhelm listeners like a progress bar. 
                        // Fire regardless upon completion (bytesExtracted == bytesTotal) to provide a final update before finishing.
                        if ((zipArgs.EventType == ZipProgressEventType.Extracting_EntryBytesWritten && progressReportingTimer.ElapsedMilliseconds >= 250) || bytesExtracted == bytesTotal)
                        {
                            int percentage = Percentage(bytesExtracted, bytesTotal);

                            lock (obj)
                            {
                                progress?.Invoke(); // <-- Handle your progress updates here.

                                progressReportingTimer.Restart();
                            }
                        }
                    };

                    // Block all threads until we sum the total size of all entries so that when we begin processing on the threadpool we 
                    // can report progress relative to the total.
                    lock (obj)
                        if (bytesTotal == -1)
                            foreach (var entry in zipFile.Entries)
                                bytesTotal += entry.CompressedSize;

                    var array = zipFile.Entries.ToArray();

                    int index;
                    ZipEntry zipEntry;

                    // Iterate through the archive's entries sequentially despite being on multiple threads.
                    while (count < zipFile.Entries.Count && !token.IsCancellationRequested)
                    {
                        index = Interlocked.Increment(ref count);

                        if (index >= zipFile.Entries.Count)
                            return;

                        zipEntry = array[index];

                        Interlocked.Add(ref bytesExtracted, zipEntry.CompressedSize);

                        zipEntry.Extract(destinationPath, ExtractExistingFileAction.OverwriteSilently);
                    }
                }
            },
            token
            ));
        }

        Task.WaitAll(decompressors.ToArray());
    }

Results:

Hardware: Intel Core i7-4710HQ @ 3.50 GHz (4 cores, 8 with Hyper-Threading), 16 GB RAM, (SATA) SSD, Win10x64 1903:

Archive: 44.5k entries, ~30 GB DEFLATE (Store)

Threads:    Time:
-----------------
1           35:20 (NOTE: This is virtually identical to ZipArchive.ExtractAll())
2           22:14
3           18:40
4           16:49
8           14:42
Mordecai answered 6/9, 2019 at 22:29 Comment(1)
@maytham-ɯɐɥʇʎɐɯ Thanks; it inadvertantly broke when I was editing comments.Mordecai

© 2022 - 2024 — McMap. All rights reserved.