How to read a CSV file from a stream and process each line as it is written?
Asked Answered
U

3

45

I would like to read a CSV file from the standard input and process each row as it comes. My CSV outputting code writes rows one by one, but my reader waits the stream to be terminated before iterating the rows. Is this a limitation of csv module? Am I doing something wrong?

My reader code:

import csv
import sys
import time


reader = csv.reader(sys.stdin)
for row in reader:
    print "Read: (%s) %r" % (time.time(), row)

My writer code:

import csv
import sys
import time


writer = csv.writer(sys.stdout)
for i in range(8):
    writer.writerow(["R%d" % i, "$" * (i+1)])
    sys.stdout.flush()
    time.sleep(0.5)

Output of python test_writer.py | python test_reader.py:

Read: (1309597426.3) ['R0', '$']
Read: (1309597426.3) ['R1', '$$']
Read: (1309597426.3) ['R2', '$$$']
Read: (1309597426.3) ['R3', '$$$$']
Read: (1309597426.3) ['R4', '$$$$$']
Read: (1309597426.3) ['R5', '$$$$$$']
Read: (1309597426.3) ['R6', '$$$$$$$']
Read: (1309597426.3) ['R7', '$$$$$$$$']

As you can see all print statements are executed at the same time, but I expect there to be a 500ms gap.

Untread answered 2/7, 2011 at 9:8 Comment(1)
What happens if you only run python test_writer.py?Truong
P
52

As it says in the documentation,

In order to make a for loop the most efficient way of looping over the lines of a file (a very common operation), the next() method uses a hidden read-ahead buffer.

And you can see by looking at the implementation of the csv module (line 784) that csv.reader calls the next() method of the underlyling iterator (via PyIter_Next).

So if you really want unbuffered reading of CSV files, you need to convert the file object (here sys.stdin) into an iterator whose next() method actually calls readline() instead. This can easily be done using the two-argument form of the iter function. So change the code in test_reader.py to something like this:

for row in csv.reader(iter(sys.stdin.readline, '')):
    print("Read: ({}) {!r}".format(time.time(), row))

For example,

$ python test_writer.py | python test_reader.py
Read: (1388776652.964925) ['R0', '$']
Read: (1388776653.466134) ['R1', '$$']
Read: (1388776653.967327) ['R2', '$$$']
Read: (1388776654.468532) ['R3', '$$$$']
[etc]

Can you explain why you need unbuffered reading of CSV files? There might be a better solution to whatever it is you are trying to do.

Penrod answered 2/7, 2011 at 12:1 Comment(1)
Excellent answer, thank you. The reason I needed this was because processing the results as they come would provide me speed. 1st operation is reading from the network and 2nd operation is writing to the disk and they both need certain CPU intensive translations. Also I needed them to be chainable (via pipes) to be able to reuse the scripts (a la unix).Untread
S
1

Maybe it's a limitation. Read this http://docs.python.org/using/cmdline.html#cmdoption-unittest-discover-u

Note that there is internal buffering in file.readlines() and File Objects (for line in sys.stdin) which is not influenced by this option. To work around this, you will want to use file.readline() inside a while 1: loop.

I modified test_reader.py as follows :

import csv, sys, time

while True:
    print "Read: (%s) %r" % (time.time(), sys.stdin.readline())

Output

python test_writer.py | python  test_reader.py
Read: (1309600865.84) 'R0,$\r\n'
Read: (1309600865.84) 'R1,$$\r\n'
Read: (1309600866.34) 'R2,$$$\r\n'
Read: (1309600866.84) 'R3,$$$$\r\n'
Read: (1309600867.34) 'R4,$$$$$\r\n'
Read: (1309600867.84) 'R5,$$$$$$\r\n'
Read: (1309600868.34) 'R6,$$$$$$$\r\n'
Read: (1309600868.84) 'R7,$$$$$$$$\r\n'
Stratagem answered 2/7, 2011 at 10:2 Comment(1)
You are right. But how do I get csv.reader to take advantage of this hack?Untread
E
0

You are flushing stdout, but not stdin.

Sys.stdin also has a flush() method, try using that after each line read if you really want to disable the buffering.

Effervesce answered 2/7, 2011 at 9:32 Comment(1)
It would be cool if the downvoter left a little explanation. I really would like to know why calling stdin.flush() doesn't help.Untread

© 2022 - 2024 — McMap. All rights reserved.