trying to split the file download buffer to into separate threads
Asked Answered
B

3

1

I am trying to download the buffer of file into 5 threads but it seems like it's getting garbled.

from numpy import arange
import requests
from threading import Thread
import urllib2

url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length']

splitBy = 5

splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy)

dataLst = []

def bufferSplit(url, idx, splits):
    req = urllib2.Request(url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
    print {'bytes=%d-%d' % (splits[idx], splits[idx+1])}
    dataLst.append(urllib2.urlopen(req).read())


for idx in range(splitBy):
    dlth = Thread(target=bufferSplit, args=(url, idx, splits))
    dlth.start()


print dataLst

with open('page.html', 'w') as fh:
    fh.write(''.join(dataLst))

Update: So I worked over and got little but progress, however if I download a jpg it seems to be corrupted;

from numpy import arange
import os
import requests
import threading
import urllib2

# url ='http://s1.fans.ge/mp3/201109/08/John_Legend_So_High_Remix(fans_ge).mp3'
url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
# url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)


splitBy = 5

dataLst = []


class ThreadedFetch(threading.Thread):
    """ docstring for ThreadedFetch
    """
    def __init__(self, url, fileName, splitBy=5):
        super(ThreadedFetch, self).__init__()
        self.__url = url
        self.__spl = splitBy
        self.__dataLst = []
        self.__fileName = fileName

    def run(self):
        if not sizeInBytes:
            print "Size cannot be determined."
            return
        splits = arange(self.__spl + 1) * (float(sizeInBytes)/self.__spl)
        for idx in range(self.__spl):
            req = urllib2.Request(self.__url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
            self.__dataLst.append(urllib2.urlopen(req).read())


    def getFileData(self):
        return ''.join(self.__dataLst)


fileName = url.split('/')[-1]

dl = ThreadedFetch(url, fileName)
dl.start()
dl.join()
content = dl.getFileData()
if content:
    with open(fileName, 'w') as fh:
        fh.write(content)
    print "Finished Writing file %s" % fileName

Below is how the image after getting downloaded.

corrupted image

Bennett answered 5/7, 2014 at 11:23 Comment(4)
There's nothing in your code that guarantees that all the threads have finished after they're started. I would suggest creating a list of dlth objects called dlthreads and afterwards doing a for th in dlthreads: th.join(). Even with that you probably can't assume they'll all finish sequentially in the same order as started, so the dataLst.append()s could occur out of order. To fix that you'll need to add something that identifies the buffer number to allow reassembling them in the right order.Doubtless
@Doubtless : I have added updated code, in which i got little bit success, however the downloaded content seems to be corrupt.Bennett
I suspect the problem is due to the way you're downloading the data via multiple requests -- so would concentrate on getting that right before trying to multi-thread the process.Doubtless
i got that part right.. will post soon the updateBennett
R
6

Here's another version of the project. Differences:

  • thread code is a single small function

  • each thread downloads a chunk, then stores it in a global threadsafe dictionary

  • threads are started, then join()ed -- they're all running at once

  • when all done, data is reassembled in correct order then written to disk

  • extra printing, to verify everything's correct

  • output file size is calculated, for an extra comparison

source

import os, requests
import threading
import urllib2
import time

URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"

def buildRange(value, numsplits):
    lst = []
    for i in range(numsplits):
        if i == 0:
            lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        else:
            lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
    return lst

def main(url=None, splitBy=3):
    start_time = time.time()
    if not url:
        print "Please Enter some url to begin download."
        return

    fileName = url.split('/')[-1]
    sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
    print "%s bytes to download." % sizeInBytes
    if not sizeInBytes:
        print "Size cannot be determined."
        return

    dataDict = {}

    # split total num bytes into ranges
    ranges = buildRange(int(sizeInBytes), splitBy)

    def downloadChunk(idx, irange):
        req = urllib2.Request(url)
        req.headers['Range'] = 'bytes={}'.format(irange)
        dataDict[idx] = urllib2.urlopen(req).read()

    # create one downloading thread per chunk
    downloaders = [
        threading.Thread(
            target=downloadChunk, 
            args=(idx, irange),
        )
        for idx,irange in enumerate(ranges)
        ]

    # start threads, let run in parallel, wait for all to finish
    for th in downloaders:
        th.start()
    for th in downloaders:
        th.join()

    print 'done: got {} chunks, total {} bytes'.format(
        len(dataDict), sum( (
            len(chunk) for chunk in dataDict.values()
        ) )
    )

    print "--- %s seconds ---" % str(time.time() - start_time)

    if os.path.exists(fileName):
        os.remove(fileName)
    # reassemble file in correct order
    with open(fileName, 'w') as fh:
        for _idx,chunk in sorted(dataDict.iteritems()):
            fh.write(chunk)

    print "Finished Writing file %s" % fileName
    print 'file size {} bytes'.format(os.path.getsize(fileName))

if __name__ == '__main__':
    main(URL)

output

102331 bytes to download.
done: got 3 chunks, total 102331 bytes
--- 0.380599021912 seconds ---
Finished Writing file 607800main_kepler1200_1600-1200.jpg
file size 102331 bytes
Rna answered 5/7, 2014 at 23:48 Comment(8)
I tried the example, but it seems if we increase the number of splits the time taken is more while it is expected if downloads are going parallel in n number of threads it should finish faster but the opposite is happening...Bennett
there's a good bit of overhead to establishing a TCP, then a HTTP connection. There won't be any speedup unless the file is pretty big. A 100KB JPEG probably won't show much difference.Rna
Apart from that how many maximum connections are possible with server?Bennett
Each connection puts some overhead onto the server, so you don't want to do too many. I suggest picking a small number like 2-5.Rna
having used download managers in past, I remember there has always been 6 - 10 max possible splits.Bennett
in theory, you can do many. In practice, you don't want to hog a server's resources. Plus, from the client side you get rapidly diminishing returns. 6 - 10 is probably okay.Rna
And noone seemingly cared to upvote :) What's wrong with ya all??Vorlage
It's all very well and good but this just does not work, we need to change the head request to a get request with stream=True for it to workWarr
B
2

Here is how I got it working if anyone got any suggestion for possible improvement, you are most welcome.

import os
import requests
import threading
import urllib2
import time

url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"

def buildRange(value, numsplits):
    lst = []
    for i in range(numsplits):
        if i == 0:
            lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        else:
            lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
    return lst

class SplitBufferThreads(threading.Thread):
    """ Splits the buffer to ny number of threads
        thereby, concurrently downloading through
        ny number of threads.
    """
    def __init__(self, url, byteRange):
        super(SplitBufferThreads, self).__init__()
        self.__url = url
        self.__byteRange = byteRange
        self.req = None

    def run(self):
        self.req = urllib2.Request(self.__url,  headers={'Range': 'bytes=%s' % self.__byteRange})

    def getFileData(self):
        return urllib2.urlopen(self.req).read()


def main(url=None, splitBy=3):
    start_time = time.time()
    if not url:
        print "Please Enter some url to begin download."
        return

    fileName = url.split('/')[-1]
    sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
    print "%s bytes to download." % sizeInBytes
    if not sizeInBytes:
        print "Size cannot be determined."
        return

    dataLst = []
    for idx in range(splitBy):
        byteRange = buildRange(int(sizeInBytes), splitBy)[idx]
        bufTh = SplitBufferThreads(url, byteRange)
        bufTh.start()
        bufTh.join()
        dataLst.append(bufTh.getFileData())

    content = ''.join(dataLst)

    if dataLst:
        if os.path.exists(fileName):
            os.remove(fileName)
        print "--- %s seconds ---" % str(time.time() - start_time)
        with open(fileName, 'w') as fh:
            fh.write(content)
        print "Finished Writing file %s" % fileName

if __name__ == '__main__':
    main(url)

this is the first bare bone code I have got working, I discovered if I set bufTh buffer thread to Daemon False then process takes more time to finish.

Bennett answered 5/7, 2014 at 19:25 Comment(1)
What is module requests?Doubtless
H
-1

I asked ChatGPT to help me with this. I know, I'm cheating :)

Here's the code it produced, and it worked great with a 10GB file that was taking forever to download:

import os
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

url = "URL-TO-DOWNLOAD"
num_parts = 10
filename = url.split("/")[-1]
r = requests.head(url)
file_size = int(r.headers['content-length'])

class FilePartDownload:
    def __init__(self, start, end, url, part, progress_bar):
        self.start = start
        self.end = end
        self.url = url
        self.part = part
        self.progress_bar = progress_bar

    def download(self):
        # Check if part file already exists
        if os.path.exists(f"{self.part}_{filename}"):
            current_size = os.path.getsize(f"{self.part}_{filename}")
            # Adjust start based on what we already downloaded
            self.start += current_size

        headers = {'Range': f'bytes={self.start}-{self.end}'}
        r = requests.get(self.url, headers=headers, stream=True)
        # Open the file in append mode
        with open(f"{self.part}_{filename}", 'ab') as fp:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    fp.write(chunk)
                    self.progress_bar.update(len(chunk))

def combine_files(parts, filename):
    with open(filename, 'wb') as fp:
        for part in parts:
            with open(f"{part}_{filename}", 'rb') as fpart:
                fp.write(fpart.read())
            os.remove(f"{part}_{filename}")

parts = list(range(num_parts))
starts = [file_size//num_parts * i for i in range(num_parts)]
ends = [file_size//num_parts * i - 1 for i in range(1, num_parts)] + [file_size]

progress_bar = tqdm(total=file_size, unit='B', unit_scale=True, desc="Total Progress")

# Create FilePartDownload instances without starting the downloads
downloads = [FilePartDownload(start, end, url, part, progress_bar) for part, start, end in zip(parts, starts, ends)]

# Update the progress bar with the size of already downloaded parts
for download in downloads:
    if os.path.exists(f"{download.part}_{filename}"):
        progress_bar.update(os.path.getsize(f"{download.part}_{filename}"))

# Start the downloads
with ThreadPoolExecutor() as executor:
    for download in downloads:
        executor.submit(download.download)

progress_bar.close()
combine_files(parts, filename)
Haggle answered 5/6, 2023 at 18:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.