Does anybody have any experience/pointers on testing a Flask Content Streaming resource? My application uses Redis Pub/Sub, when receiving a message in a channel it streams the text 'data: {"value":42}
'
The implementation is following Flask docs at: docs
and my unit tests are done following Flask docs too. The messages to Redis pub/sub are sent by a second resource (POST).
I'm creating a thread to listen to the stream while I POST on the main application to the resource that publishes to Redis.
Although the connection assertions pass (I receive OK 200 and a mimetype 'text/event-stream') the data object is empty.
My unit test is like this:
def test_04_receiving_events(self):
headers = [('Content-Type', 'application/json')]
data = json.dumps({"value": 42})
headers.append(('Content-Length', len(data)))
def get_stream():
rv_stream = self.app.get('stream/data')
rv_stream_object = json.loads(rv_stream.data) #error (empty)
self.assertEqual(rv_stream.status_code, 200)
self.assertEqual(rv_stream.mimetype, 'text/event-stream')
self.assertEqual(rv_stream_object, "data: {'value': 42}")
t.stop()
threads = []
t = Thread(target=get_stream)
threads.append(t)
t.start()
time.sleep(1)
rv_post = self.app.post('/sensor', headers=headers, data=data)
threads_done = False
while not threads_done:
threads_done = True
for t in threads:
if t.is_alive():
threads_done = False
time.sleep(1)
The app resource is:
@app.route('/stream/data')
def stream():
def generate():
pubsub = db.pubsub()
pubsub.subscribe('interesting')
for event in pubsub.listen():
if event['type'] == 'message':
yield 'data: {"value":%s}\n\n' % event['data']
return Response(stream_with_context(generate()),
direct_passthrough=True,
mimetype='text/event-stream')
Any pointers or examples of how to test a Content Stream in Flask? Google seems to not help much on this one, unless I'm searching the wrong keywords.
Thanks in advance.