How do I upload many files at the time to cloud files with Python?
Asked Answered
S

1

9

I'm using the cloudfile module to upload files to rackspace cloud files, using something like this pseudocode:

import cloudfiles

username = '---'
api_key = '---'

conn = cloudfiles.get_connection(username, api_key)
testcontainer = conn.create_container('test')

for f in get_filenames():
    obj = testcontainer.create_object(f)
    obj.load_from_filename(f)

My problem is that I have a lot of small files to upload, and it takes too long this way.

Buried in the documentation, I see that there is a class ConnectionPool, which supposedly can be used to upload files in parallell.

Could someone please show how I can make this piece of code upload more than one file at a time?

Suppository answered 9/3, 2011 at 16:39 Comment(0)
S
8

The ConnectionPool class is meant for a multithreading application that ocasionally has to send something to rackspace.

That way you can reuse your connection but you don't have to keep 100 connections open if you have 100 threads.

You are simply looking for a multithreading/multiprocessing uploader. Here's an example using the multiprocessing library:

import cloudfiles
import multiprocessing

USERNAME = '---'
API_KEY = '---'


def get_container():
    conn = cloudfiles.get_connection(USERNAME, API_KEY)
    testcontainer = conn.create_container('test')
    return testcontainer

def uploader(filenames):
    '''Worker process to upload the given files'''
    container = get_container()

    # Keep going till you reach STOP
    for filename in iter(filenames.get, 'STOP'):
        # Create the object and upload
        obj = container.create_object(filename)
        obj.load_from_filename(filename)

def main():
    NUMBER_OF_PROCESSES = 16

    # Add your filenames to this queue
    filenames = multiprocessing.Queue()

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        multiprocessing.Process(target=uploader, args=(filenames,)).start()

    # You can keep adding tasks until you add STOP
    filenames.put('some filename')

    # Stop all child processes
    for i in range(NUMBER_OF_PROCESSES):
        filenames.put('STOP')

if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()
Sphagnum answered 12/3, 2011 at 13:59 Comment(12)
You don't need multiprocessing for IO bound tasks if cloudfiles API is thread-safe. pool = multiprocessing.Pool(); pool.map(upload_file, get_filenames()) seems like a simpler alternative if you decided to use multiprocessing.Voracity
@WoLpH: Thank you very much for your answer! When I try your code I run into a TypeError: 'Queue' object is not iterable, is this a mistake I have made?Suppository
@J.F. Sebastian: As I understand it the ConnectionPool class is supposed to be thread-safe. I just can't wrap my head around how to incorporate your code suggestions into the code.Suppository
@Hobhouse: that could be a problen on my end. Since I don't have a Rackspace account readily available I was only able to do limited testing. I wrote this code partially based on the multiprocessing examples. docs.python.org/library/multiprocessing.html#examples I see that args is not a tuple anymore, it should be args=(filenames,)Sphagnum
@J.F. Sebastian: wouldn't that mean that you are either using a single connection (network IO bound) or a connection per file? Without keeping the connection open for a worker it would be very inefficient. Using a single connection for all workers would also be very ineffective.Sphagnum
@WoLpH: You could use a connection per worker if you cache the connection for each worker gist.github.com/… or you could use ConnectionPool gist.github.com/…Voracity
@J.F. Sebastian: ah yes, caching the connection for each worker is also an option. But personally I find my method cleaner since it can also work in a multithreading environment with a non-threadsafe connection. Your ConnectionPool example is works but has to recreate the container for every iteration which is also a waste of resources. But still very nice alternatives, you should add them as an answer :)Sphagnum
@WoLpH: The access to connection is serialized in both my examples. Your method works in multithreading environment only if cloudfiles.get_connection() always returns a new connection which it does (or a thread local). ConnectionPool example could use caching too gist.github.com/… So no waste of resources if it matters.Voracity
@J.F. Sebastian - both your code examples work great. I get 100 file uploads in 13-15 seconds with both of them (using 16 processes on a 4-core mac). Does one of them have advantages over the other? @WoLpH: I still get TypeError: 'Queue' object is not iterable when I try to run your code, so I can't get it to run. It's perhaps a tiny error somewhere in my code or yours that I don't see.Suppository
@Hobhouse: multiprocessing version is more resilient to innocent code changes but it requires more memory (if the bottleneck is a network latency and not a network bandwidth or a disk speed then you could improve the performance by using a larger pool that in the multiprocessing case means noticeably more memory). Make sure you use comma here: args=(filenames,) and you use iter(filenames.get, 'STOP') for iteration. Rename it to filename_queue if you already use filenames name somewhere to avoid accidental collisions.Voracity
@J.F. Sebastian: Thank you for code and debugging - I missed the comma in args=(filenames,).Suppository
@J.F. Sebastian: you are right, I did not make the assumption that cloudfiles.get_connection() is threadsafe. Great examples :)Sphagnum

© 2022 - 2024 — McMap. All rights reserved.