How can you process a CSV from Azure Blob Storage as a stream in Python
Asked Answered
V

2

6

It is simple to get a StorageStreamDownloader using the azure.storage.blob package:

from azure.storage.blob import BlobServiceClient

blob_service_client = BlobServiceClient.from_connection_string("my azure connection string")
container_client = blob_service_client.get_container_client("my azure container name")
blob_client = container_client.get_blob_client("my azure file name")
storage_stream_downloader = blob_client.download_blob()

and it is simple to process a file-like object, or more specifically, I think, a string-returning iterator (or the file path of the object) in the csv package:

import csv
from io import StringIO
 
csv_string = """col1, col2
a,b
c,d"""
with StringIO(csv_string) as csv_file:
  for row in csv.reader(csv_file):
    print(row) # or rather whatever I actually want to do on a row by row basis, e.g. ascertain that the file contains a row that meets a certain condition

What I'm struggling with is getting the streaming data from my StorageStreamDownloader into csv.reader() in such a way that I can process each line as it arrives rather than waiting for the whole file to download.

The Microsoft docs strike me as a little underwritten by their standards (the chunks() method has no annotation?) but I see there is a readinto() method for reading into a stream. I have tried reading into a BytesIO stream but cannot work out how to get the data out into csv.reader() without just outputting the buffer to a new file and reading that file. This all strikes me as a thing that should be doable but I'm probably missing something obvious conceptually, perhaps to do with itertools or asyncio, or perhaps I'm just using the wrong csv tool for my needs?

Volley answered 4/2, 2021 at 13:10 Comment(2)
You can use pandas to read CSV file with BytesIO.Alkalimeter
That's really helpful, thanks Jim. I was trying to stick to the specialised csv library as I'm not doing any actual data point analysis but if pandas handles it then I'll give that a goVolley
A
5

Based on a comment by Jim Xu:

stream = blob_client.download_blob()  
with io.BytesIO() as buf:
  stream.readinto(buf)

  # needed to reset the buffer, otherwise, panda won't read from the start
  buf.seek(0)

  data = pd.read_csv(buf)

or

csv_content = blob_client.download_blob().readall()
data = pd.read_csv(io.BytesIO(csv_content ))
Atchley answered 12/4, 2021 at 14:34 Comment(0)
A
1

If you want to read csv file on row by one row, you can use the method pd.read_csv(filename, chunksize=1). For more details, please refer to here and here

For example (I use pandas1.2.1)

with pd.read_csv(content, chunksize=1) as reader:

    for chunk in reader:
        print(chunk)
        print('---------------')

enter image description here

Besides, if you want to use the method chunks(), we need to set max_chunk_get_size and max_single_get_size to the same value when we create BlobClient. For more details, please refer to here and here

For example

from azure.storage.blob import BlobClient

key = '<account_key>'

blob_client = BlobClient(account_url='https://andyprivate.blob.core.windows.net',
                         container_name='input',
                         blob_name='cities.csv',
                         credential=key,
                         max_chunk_get_size=1024,
                         max_single_get_size=1024)
stream = blob_client.download_blob()

for chunk in stream.chunks():
    print(len(chunk))

enter image description here

Alkalimeter answered 8/2, 2021 at 5:57 Comment(8)
Thanks very much indeed. Will try and get it implemented today and accept answerVolley
Thanks again for this but I still haven't managed to get this working together. content in your first example seems to be effectively the same as csv_file in my question, and I still don't see how I stream azure files into it? The chunk params in the second example seem very helpful tho, for optimizing once I have the stream processing actually workingVolley
@ChristopherAlcock in the first sample, you can use the method readinto() to read into BytesIO stream. Then use pandas to process the stream.Alkalimeter
Hi Jim, I've finally worked out what was going wrong for me here. The pandas read_csv unsurprisingly returns a pandas dataframe, which behaves very differently to the csv reader, so I had to change all my processing code too, which I hadn't foolishly hadn't expected. Thanks for your help.Volley
I've worked on this further and whilst if I put my csv data into a BytesIO buffer csv_string like so buffer = BytesIO(csv_string.encode('utf-8')), I can then process it with pandas.read_csv(). However if I get the data from Azure in the way suggested and read that into a buffer, such that calling .getvalue() is identical to the buffer from string, pandas cannot process the data: pandas.errors.EmptyDataError('No columns to parse from file'). I am tempted to believe that what I'm trying to do isn't possibleVolley
@ChristopherAlcock please try to use the following code stream = blob_client.download_blob() with BytesIO() as buf : stream.readinto(buf) pandas.read_csv(buf)Alkalimeter
Hi Jim, Thanks again for trying, but this is essentially what I was already trying, and despite reading in a very simple valid csv, it returns EmptyDataError: No columns to parse from fileVolley
If I do a print(buf1.getvalue()) instead of read_csv, i get b'Col1,Col2,Col3\nval1,val2,val3\nval4,val5,val6\nval7,val8,val9', which as far as I can see is totally acceptable csv dataVolley

© 2022 - 2024 — McMap. All rights reserved.