Optimization: Dumping JSON from a Streaming API to Mongo
Asked Answered
U

2

9

Background: I have a python module set up to grab JSON objects from a streaming API and store them (bulk insert of 25 at a time) in MongoDB using pymongo. For comparison, I also have a bash command to curl from the same streaming API and pipe it to mongoimport. Both these approaches store data in separate collections.

Periodically, I monitor the count() of the collections to check how they fare.

So far, I see the python module lagging by about 1000 JSON objects behind the curl | mongoimport approach.

Problem: How can I optimize my python module to be ~ in sync with the curl | mongoimport?

I cannot use tweetstream since I am not using the Twitter API but a 3rd party streaming service.

Could someone please help me out here?

Python module:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Thanks for reading.

Urology answered 1/6, 2012 at 18:26 Comment(6)
Do the documents you're inserting have an "_id" field?Kenya
What version of mongo and what version of pymongo are you using?Kenya
@AsyaKamsky Python 2.7, MongoDb 2.0.4 and PyMongo 2.2.Urology
what does your bash curl script/command look like?Kenya
I updated my answer - I think that's the definitive word :)Kenya
Sorry, I won't be able to post the exact curl command as it contains our credentials. But, this issue is applicable to any streaming API. I have answered my own question in terms of what gave us that ultimate performance gain. Your updated answers comes a close second. Anyways, you get the bounty! Thanks again for taking a look at this. I will keep monitoring this thread periodically and based on the max. upvotes, I will accept the corresponding answers. For now, that's yours!Urology
U
1

Got rid of the StringIO library. As the WRITEFUNCTION callback handle_data, in this case, gets invoked for every line, just load the JSON directly. Sometimes, however, there could be two JSON objects contained in data. I am sorry, I can't post the curl command that I use as it contains our credentials. But, as I said, this is a general issue applicable to any streaming API.


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
Urology answered 10/6, 2012 at 15:50 Comment(0)
K
3

Originally there was a bug in your code.

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

You reset the chunk_count but you don't reset the tweet_list. So second time through you try to insert 100 items (50 new ones plus 50 that were already sent to DB the time before). You've fixed this, but still see a difference in performance.

The whole batch size thing turns out to be a red herring. I tried using a large file of json and loading it via python vs. loading it via mongoimport and Python was always faster (even in safe mode - see below).

Taking a closer look at your code, I realized the problem is with the fact that the streaming API is actually handing you data in chunks. You are expected to just take those chunks and put them into the database (that's what mongoimport is doing). The extra work your python is doing to split up the stream, add it to a list and then periodically send batches to Mongo is probably the difference between what I see and what you see.

Try this snippet for your handle_data()

def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

One thing to note is that your python inserts are not running in "safe mode" - you should change that by adding an argument safe=True to your insert statement. You will then get an exception on any insert that fails and your try/catch will print the error exposing the problem.

It doesn't cost much in performance either - I'm currently running a test and after about five minutes, the sizes of two collections are 14120 14113.

Kenya answered 2/6, 2012 at 20:45 Comment(6)
btw, I tried your code - with the fix, Python is about twice as fast inserting data as mongoimport. That's because by default "safe" inserts are off. By turning safe writes on (passing safe=True to insert) Python inserts were still about 75% of the mongoimport times.Kenya
Thanks for pointing those out! I made the required changes (also updated code above): Added "self.tweet_list = []" after self.chunk_count = 0 and increased the batch size to 1000. It still seems to be lagging - python module count is at 5000 while the curl mongoimport combo is at 5718. (It was 4000 : 5662). Any insights?Urology
Since you are only inserting 1,000 at a time, you would always see a multiple of 1,000 - it seems like its not actually behind...Kenya
Yes, but 4000 : 5662 means there's still a lag of 600 at the min. right? could any optimizations be done at these two places - self.string_buffer = cStringIO.StringIO(data) for line in self.string_buffer:?Urology
I did my tests without curl or pycurl - I just dumped a large .json file and loaded it with mongoimport and with pymongo. That's why I suspect your problem isn't not inserting fast enough, maybe it's not getting the data (via pycurl) fast enough.Kenya
Accepted. This should help me troubleshoot the problem more. For a favor, could you neutralize this question by upvoting it? Some gave a -1 with no reason and giving this question a bad reputation :( P.S. Now, we have the same score! =DUrology
U
1

Got rid of the StringIO library. As the WRITEFUNCTION callback handle_data, in this case, gets invoked for every line, just load the JSON directly. Sometimes, however, there could be two JSON objects contained in data. I am sorry, I can't post the curl command that I use as it contains our credentials. But, as I said, this is a general issue applicable to any streaming API.


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
Urology answered 10/6, 2012 at 15:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.