Understanding Python HTTP streaming
Asked Answered
D

4

9

I'm struggling to access a streaming API using Python and Requests.

What the API says: "We’ve enabled a streaming endpoint to for requesting both quote and trade data utilizing a persistent HTTP socket connection. Streaming data from the API consists of making an Authenticated HTTP request and leaving the HTTP socket open to continually receive data."

How I've been trying to access the data:

s = requests.Session()
def streaming(symbols):
    url = 'https://stream.tradeking.com/v1/market/quotes.json'
    payload = {'symbols': ','.join(symbols)}
    return s.get(url, params=payload, stream=True)  
r = streaming(['AAPL', 'GOOG'])

The Requests docs here show two things of interest: Use a generator/iterator for use with chunked data, passed in the data field. For streaming data, it suggests using code such as:

for line in r.iter_lines():
    print(line)

Neither seem to work, although I've no idea what to put in the generator function, since the example is unclear. Using r.iter_lines(), I get the output: "b'{"status":"connected"}{"status":disconnected"}'"

I can access the headers, and the response is HTTP 200, but can't get valid data, or find clear examples on how to access streaming HTTP data in python. Any help would be appreciated. The API recommends using Jetty for Java to keep the stream open, but I'm not sure how to do this in Python.

Headers: {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}

Diplosis answered 23/7, 2013 at 22:49 Comment(4)
Putting your url into my browser returns an "Invalid authorization header" message. Do you need to authenticate? And/or, are you reading the result in JSON properly?Excursus
@Excursus if OP sees 200 OK then obviously he has auth done.Bixby
The API does indeed require auth. I left out the auth lines for simplicity. I'd post the keys, but they're linked to my trading acct. For the non-streaming requests, I use Request's .json(). I may need to do that here too - not sure.Diplosis
Missed the 200 response initially, my bad. Just also wanted to say that a nice habit for api keys is to read them in from a file via a separate function. That way you can copy/paste code snippets without worrying, and it's easy to include that file in .gitignore if you're using git.Excursus
S
3

Not sure if you figured this out, but TradeKing doesn't put newlines in between their JSON blobs. You thus have to use iter_content to get it byte by byte, append that byte to a buffer, try to decode the buffer, on success clear the buffer and yield the resultant object. :(

Scarce answered 20/2, 2014 at 18:48 Comment(2)
Glad that worked out for you! What I've moved to doing is using NodeJS to listen to the stream and yield out objects over a ZeroMQ interface to python listeners, who then do magic with it. Python just gets sluggish when dealing with large quantities of symbols.Scarce
krillr, can you please post a URL pointing to any helpful info related to your solution with Node and ZeroMQ. Much appreciatedLorou
T
12

As verbsintransit has stated, you need to solve your authentication problems, your streaming problems however can be fixed by using this example:

s = requests.Session()

def streaming(symbols):
    payload = {'symbols': ','.join(symbols)}
    headers = {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}
    req = requests.Request("GET",'https://stream.tradeking.com/v1/market/quotes.json',
                           headers=headers,
                           params=payload).prepare()

    resp = s.send(req, stream=True)

    for line in resp.iter_lines():
        if line:
            yield line


def read_stream():

    for line in streaming(['AAPL', 'GOOG']):
        print line


read_stream()

The if line: condition is checking if the line is an actual message or just a connection keep-alive.

Trona answered 24/7, 2013 at 0:17 Comment(4)
Thanks a lot! I think I may need to wait until tmw when the stock market's open to fully test it. I'll study/try to understand it in the meanwhile, and will post what happens tmw.Diplosis
The key bit of this example is the stream=True in the send(). If you don't set that, Requests attempts to download the entire body. The documentation does show that keyword in use.Huambo
error i got is as follows: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response',))Meadow
I would really like to make this approach work. I experience error and not clear how to go about debugging: urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))Lorou
S
3

Not sure if you figured this out, but TradeKing doesn't put newlines in between their JSON blobs. You thus have to use iter_content to get it byte by byte, append that byte to a buffer, try to decode the buffer, on success clear the buffer and yield the resultant object. :(

Scarce answered 20/2, 2014 at 18:48 Comment(2)
Glad that worked out for you! What I've moved to doing is using NodeJS to listen to the stream and yield out objects over a ZeroMQ interface to python listeners, who then do magic with it. Python just gets sluggish when dealing with large quantities of symbols.Scarce
krillr, can you please post a URL pointing to any helpful info related to your solution with Node and ZeroMQ. Much appreciatedLorou
D
2
import requests
from requests_oauthlib import OAuth1


def streaming(symbols):
    consumer_key     = '***'
    consumer_secret  = '***'
    access_token     = '***'
    access_secret    = '***'

    auth = OAuth1(consumer_key,
        client_secret = consumer_secret,
        resource_owner_key = access_token,
        resource_owner_secret = access_secret)
        
    payload = {'symbols': ','.join(symbols)}
    resp = requests.Session().request("GET",'https://stream.tradeking.com/v1/market/quotes.json',stream=True,auth=auth,params=payload)
    # resp.raise_for_status()
    
    for chunk in resp.iter_content(chunk_size=1):
        if chunk:
            yield chunk.decode('utf8')

#try this
for line in streaming(['AAPL', 'GOOG']):
    print(line)
Dilute answered 13/10, 2020 at 17:50 Comment(1)
Welcome to StackOverflow. While this code may solve the question, including an explanation of how and why this solves the problem would really help to improve the quality of your post, and probably result in more up-votes. Remember that you are answering the question for readers in the future, not just the person asking now. Please edit your answer to add explanations and give an indication of what limitations and assumptions apply.Troxell
L
0

This worked for me:

  import requests

  headers = {'Content-Type': 'application/json'}
  response = requests.post(URL, data=body, headers=headers, stream=True)
  if response.status_code == 200:
      for chunk in response.iter_content(chunk_size=1024):
          if chunk:
             print("DO something")
Lignify answered 5/3 at 13:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.