Efficiently reading lines from compressed, chunked HTTP stream as they arrive
Asked Answered
B

2

10

I've written a HTTP-Server that produces endless HTTP streams consisting of JSON-structured events. Similar to Twitter's streaming API. These events are separated by \n (according to Server-sent events with Content-Type:text/event-stream) and can vary in length.

The response is

  • chunked (HTTP 1.1 Transfer-Encoding:chunked) due to the endless stream
  • compressed (Content-Encoding: gzip) to save bandwidth.

I want to consume these lines in Python as soon as they arrive and as resource-efficient as possible, without reinventing the wheel.

As I'm currently using python-requests, do you know how to make it work? If you think, python-requests cannot help here, I'm totally open for alternative frameworks/libraries.

My current implementation is based on requests and uses iter_lines(...) to receive the lines. But the chunk_size parameter is tricky. If set to 1 it is very cpu-intense, since some events can be several kilobytes. If set to any value above 1, some events got stuck until the next arrive and the whole buffer "got filled". And the time between events can be several seconds. I expected that the chunk_size is some sort of "maximum number of bytes to receive" as in unix's recv(...). The corresponding man-page says:

The receive calls normally return any data available, up to the requested amount, rather than waiting for receipt of the full amount requested.

But this is obviously not how it works in the requests-library. They use it more or less as an "exact number of bytes to receive". While looking at their source code, I couldn't identify which part is responsible for that. Maybe httplib's Response or ssl's SSLSocket.

As a workaround I tried padding my lines on the server to a multiple of the chunk-size. But the chunk-size in the requests-library is used to fetch bytes from the compressed response stream. So this won't work until I can pad my lines so that their compressed byte-sequence is a multiple of the chunk-size. But this seems far too hacky.

I've read that Twisted could be used for non-blocking, non-buffered processing of http streams on the client, but I only found code for creating stream responses on the server.

Bacciferous answered 15/2, 2014 at 12:49 Comment(5)
Do you know a good framework/libraray for that task? Library requests are off-topic, I'm afraid.Wouldst
Sorry about the library request. As written, I'm currently using python-requests and would love to keep using it. So my question is mainly on how to do stuff with python-requests. But: If there is no way, I'm totally fine with using another library.Bacciferous
Looking for lines is possible. The underlying compression zlib supports flushing (Z_SYNC_FLUSH) which is used, when I flush a response in Tornado on a GZipped Response Stream. So, the http stream is perfectly fine, chunked into perfect pieces, with complete compressed lines in it. Just reading them back in Python is difficult.Bacciferous
My analysis was incorrect; I'm writing up more info below. This is not a limitation that requests can work around.Wouldst
I once did something like that by basically ripping out all of the socket handling (ESPECIALLY socket.makefile) to get the buffer handling right and use select() to incrementally read stuff from the socket. Worked fine but was a major PITA to get right. Basically, once you get to the response, extract the socket from the response and handle stuff yourself. This brought CPU usage down from 90% to 2% and increased throughput by quite a bit. (but that was with ancient Python 2.2, 2.7+ httplib does way better with chunked encoding).Splore
B
9

Thanks to Martijn Pieters answer I stopped working around python-requests behavior and looked for a completely different approach.

I ended up using pyCurl. You can use it similar to a select+recv loop without inverting the control flow and giving up control to a dedicated IO-loop as in Tornado, etc. This way it is easy to use a generator that yields new lines as soon as they arrive - without further buffering in intermediate layers that could introduce delay or additional threads that run the IO-loop.

At the same time, it is high-level enough, that you don't need to bother about chunked transfer encoding, SSL encryption or gzip compression.

This was my old code, where chunk_size=1 resulted in 45% CPU load and chunk_size>1 introduced additional lag.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

Here is my new code based on pyCurl: (Unfortunately the curl_easy_* style perform blocks completely, which makes it difficult to yield lines in between without using threads. Thus I'm using the curl_multi_* methods)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

This code tries to fetch as many bytes as possible from the incoming stream, without blocking unnecessarily if there are only a few. In comparison, the CPU load is around 0.2%

Bacciferous answered 16/2, 2014 at 10:20 Comment(3)
Won't this self.received_buffer.truncate(0) lead to data loss? I mean the data that gets written to self.received_buffer by callback method (self.received_buffer.write) but has not yet been read self.received_buffer.getvalue()?Male
The write does not happen asynchronously, the WRITEFUNCTION is only called inside a perform(). When you should call perform to read/write data is indicated by select() to not poll unnecessarily.Bacciferous
Nice work! Is there anyway that you can add an example of how you used it? Showing how you could use it with progressbar.ProgressBar, would be greatly appreciated, as well.Kirtley
W
9

It is not requests' fault that your iter_lines() calls are blocking.

The Response.iter_lines() method calls Response.iter_content(), which calls urllib3's HTTPResponse.stream(), which calls HTTPResponse.read().

These calls pass along a chunk-size, which is what is passed on to the socket as self._fp.read(amt). This is the problematic call, as self._fp is a file object produced by socket.makefile() (as done by the httplib module); and this .read() call will block until amt (compressed) bytes are read.

This low-level socket file object does support a .readline() call that will work more efficiently, but urllib3 cannot make use of this call when handling compressed data; line terminators are not going to be visible in the compressed stream.

Unfortunately, urllib3 won't call self._fp.readline() when the response isn't compressed either; the way the calls are structured it'd be hard to pass along you want to read in line-buffering mode instead of in chunk-buffering mode as it is.

I must say that HTTP is not the best protocol to use for streaming events; I'd use a different protocol for this. Websockets spring to mind, or a custom protocol for your specific use-case.

Wouldst answered 15/2, 2014 at 13:26 Comment(5)
Thanks for pointing out, that the "issue" is deep down in the http stack used by requests!Bacciferous
@ThomasB.: Yeah, and there are no libraries that I know of that'll solve this in a clean way either. The compression isn't helping here; you would have to use urllib2, then access the raw socket from the response object, do non-blocking reads and do your own decompressing. Not pretty.Wouldst
I've chosen SSE after reading SSE vs WebSockets. I only need downstream, The JS client is super simple, the server is super simple and reverse proxying, encryption, compression just works fine over HTTP.Bacciferous
I've switched to PyCurl. It transparently handles chunking, compression, ssl verification, ... and is much more responsive (regarding buffer-laziness). I'll update my question to include my findings.Bacciferous
@ThomasB.: I'd make that a self-answer instead.Wouldst
B
9

Thanks to Martijn Pieters answer I stopped working around python-requests behavior and looked for a completely different approach.

I ended up using pyCurl. You can use it similar to a select+recv loop without inverting the control flow and giving up control to a dedicated IO-loop as in Tornado, etc. This way it is easy to use a generator that yields new lines as soon as they arrive - without further buffering in intermediate layers that could introduce delay or additional threads that run the IO-loop.

At the same time, it is high-level enough, that you don't need to bother about chunked transfer encoding, SSL encryption or gzip compression.

This was my old code, where chunk_size=1 resulted in 45% CPU load and chunk_size>1 introduced additional lag.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

Here is my new code based on pyCurl: (Unfortunately the curl_easy_* style perform blocks completely, which makes it difficult to yield lines in between without using threads. Thus I'm using the curl_multi_* methods)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

This code tries to fetch as many bytes as possible from the incoming stream, without blocking unnecessarily if there are only a few. In comparison, the CPU load is around 0.2%

Bacciferous answered 16/2, 2014 at 10:20 Comment(3)
Won't this self.received_buffer.truncate(0) lead to data loss? I mean the data that gets written to self.received_buffer by callback method (self.received_buffer.write) but has not yet been read self.received_buffer.getvalue()?Male
The write does not happen asynchronously, the WRITEFUNCTION is only called inside a perform(). When you should call perform to read/write data is indicated by select() to not poll unnecessarily.Bacciferous
Nice work! Is there anyway that you can add an example of how you used it? Showing how you could use it with progressbar.ProgressBar, would be greatly appreciated, as well.Kirtley

© 2022 - 2024 — McMap. All rights reserved.