Stream Bytes chunks to csv rows in python
Asked Answered
D

2

9

I need to process a large remote CSV line by line without downloading it entirely.

Below is the closest I got. I iterate byte chunks from Azure, and have some code to handle truncated lines. But this cannot work if csv values contain a newline as I am not able to discernate between value newlines and csv newlines.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://learn.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()

    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]

    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]

Ideally I would use csv.DictReader() but I was not able to to so as it downloads the file entirely.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader

Here is an update using some hints by @H.Leger

Please note that this still does not work

file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?

EDIT: Final solution based on @paiv answer

EDIT: Updated solution to use io instead of codecs for faster parsing

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter 
# https://mcmap.net/q/1194864/-stream-bytes-chunks-to-csv-rows-in-python

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res



# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 

# update csv field limit to handle large fields
# https://mcmap.net/q/21494/-_csv-error-field-larger-than-field-limit-131072
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)
Dedicated answered 12/5, 2021 at 16:2 Comment(14)
Have you tried to use pandas:kite.com/python/answers/…? We can defile how many lines we read at one time.Rank
@Jim Xu I couldn't find a way to use this without downloading the entire file firstLightsome
Hi pandas.read_csv support http url. I think you can try it with the file URL which contained as token.Rank
@Jim Xu I don't know if I can use an URL for this. I use the pyhton azure datalake storage client with cert authLightsome
@ClémentPrévost Can you provide a small sample of the csv youre trying to parseOutlander
It seems that .chunks() will issue a separate HTTP request for each chunk. If this is acceptable to you, you can adapt .chunks() interface into a read-only file object docs.python.org/3/glossary.html#term-file-object, and stream from it via utf-8 codec into csv reader.Darkroom
@Outlander It's a standard CSV with ; separator and " as quote char. Columns are only quoted if they are multiline or contain a separatorLightsome
@Darkroom I'm not sure how I could do that :sLightsome
When a file is large chunking is the way to go. Usually when CSV files have newline characters in the values the starting and ending of that particular value is delimited by quotes ". Based upon that you can write an iterator which parses character by character and yielding only when an actual CSV line break is encountered. Wikipedia Fields with embedded line breaks must be quoted.Barre
@Rishabh Chauhan That's an idea but that mean reimplementing a CSV parser, which is not trivial. Also I have to handle quoted quotes "" inside a value. Isn't there a simpler and more standard way?Lightsome
@ClémentPrévost How large is the CSV?Barre
@ClémentPrévost This is the closet I have found to a streaming CSV parser in python. github.com/frictionlessdata/tabulator-py#working-with-streamBarre
@risahbhc32 too large to download locally, 100s of Gb.Lightsome
Updated the question with a new idea, it does not work too but for another reasonLightsome
D
7

Disclaimer: I know little Azure specifics. Ultimately, you would want to stream separate chunks too.

In Python, given a file object, you can set up CSV streaming this way:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

Now you can iterate over csvreader, and it will read from file_object in a streaming fasion.

Edit: as @Martijn Pieters suggested, we can gain performance with TextIOWrapper instead of codecs:

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

Check the comment in csv module on newline parameter.

But Azure's StorageStreamDownloader does not provide python's file object interface. It has .chunks() generator (which I assume will invoke separate HTTP request to retrieve next chunk).

You can adapt .chunks() into a file object with a simple adapter:

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res

And use like

downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

Be sure to configure DictReader for the appropriate CSV dialect.

And set appropriate values for max_single_get_size, max_chunk_get_size on the blob client.

Darkroom answered 15/5, 2021 at 14:8 Comment(3)
Why use codecs at all? io.TextIOWrapper() would be more robust.Poisonous
Hello I just tested the solution with io.TextIOWrapper() and the ChunkAdapter is missing some methods to make it work straight away: AttributeError: 'ChunksAdapter' object has no attribute 'readable'.Lightsome
Ok I added the necessary methods to support io.TextIOWrapper and it's way fasterLightsome
I
3

I believe the requests package can be useful for you. Using the stream option while getting your file and the Response.iter_lines() function should do what you need :

import codecs
import csv
import requests

url = "https://navitia.opendatasoft.com//explore/dataset/all-datasets/download?format=csv"
r = requests.get(url, stream=True)  # using the stream option to avoid loading everything

try:
    buffer = r.iter_lines()  # iter_lines() will feed you the distant file line by line
    reader = csv.DictReader(codecs.iterdecode(buffer, 'utf-8'), delimiter=';')
    for row in reader:
        print(row)  # Do stuff here
finally:
    r.close()
Intersidereal answered 17/5, 2021 at 11:17 Comment(3)
Thank you for your answer. I have 2 concerns here: i only have access to the azure lib that feed me a bytes chunk iterator, I cannot use requests. Even if that was possible, do iter_lines and dict reader understand newlines inside column values?Lightsome
Oops, I didn't realize azure was so special. For your second point, there should be no issue with newlines inside string columns if the file is properly formatted.Intersidereal
I updated the question to use codecs.iterdecode and I have a newline-related error.Lightsome

© 2022 - 2024 — McMap. All rights reserved.