Streaming in / chunking csv's from S3 to Python
Asked Answered
M

3

14

I intend to perform some memory intensive operations on a very large csv file stored in S3 using Python with the intention of moving the script to AWS Lambda. I know I can read in the whole csv nto memory, but I will definitely run into Lambda's memory and storage limits with such a large filem is there any way to stream in or just read in chunks of a csv at a time into Python using boto3/botocore, ideally by spefifying row numbers to read in?

Here are some things I've already tried:

1) using the range parameter in S3.get_object to specify the range of bytes to read in. Unfortunately this means the last rows get cut off in the middle since there's no ways to specify the number of rows to read in. There are some messy workarounds like scanning for the last newline character, recording the index, and then using that as the starting point for the next bytes range, but I'd like to avoid this clunky solution if possible.

2) Using S3 select to write sql queries to selectively retrieve data from S3 buckets. Unfortunately the row_numbers SQL function isn't supported and it doesn't look like there's a way to read in a a subset of rows.

Manful answered 28/6, 2018 at 14:33 Comment(0)
C
11

Assuming your file isn't compressed, this should involve reading from a stream and splitting on the newline character. Read a chunk of data, find the last instance of the newline character in that chunk, split and process.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

If you have gzipped files, then you need to use BytesIO and the GzipFile class inside the loop; it's a harder problem because you need to retain the Gzip compression details.

Casias answered 2/7, 2018 at 18:42 Comment(3)
Looks like body = s3.get_object(Bucket=bucket, Key=key).read() needs to be replaced by body = s3.get_object(Bucket=bucket, Key=key)['Body']Colchicum
One issue you need to highlight: When the chunk is b'', i.e. we need to break, the user needs to process the partial_chunk otherwise they miss the end of their file...Hord
Another issue - if there is no newline, the code ends up having a single character result chunk, and the processed character also remains in the partial_chunk to be processed on the next loop roundHord
O
3

I have developed a code similar to @Kirk Broadhurst's, but connection timeout was happening if the processing time for each chunk exceeds 5 minutes(roughly). The following code works by opening a new connection for each chunk.

import boto3
import pandas as pd
import numpy as np

# The following credentials should not be hard coded, it's best to get these from cli.
region_name = 'region'
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'aws_secret_access_key'

s3 =boto3.client('s3',region_name=region_name,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

obj = s3.get_object(Bucket='bucket', Key='key')

total_bytes = obj['ContentLength']
chunk_bytes = 1024*1024*5 # 5 MB as an example.
floor = int(total_bytes//chunk_bytes)
whole = total_bytes/chunk_bytes
total_chunks = [1+floor if floor<whole else floor][0]

chunk_size_list = [(i*chunk_bytes, (i+1)*chunk_bytes-1) for i in range(total_chunks)]
a,b = chunk_size_list[-1]
b = total_bytes
chunk_size_list[-1] = (a,b)
chunk_size_list = [f'bytes={a}-{b}' for a,b in chunk_size_list]

prev_str = ''

for i,chunk in enumerate(chunk_size_list):
    s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id, 
                      aws_secret_access_key=aws_secret_access_key)
    byte_obj = s3.get_object(Bucket='bucket', Key='key', Range=chunk_size_list[i])
    byte_obj = byte_obj['Body'].read()
    str_obj = byte_obj.decode('utf-8')
    del byte_obj
    list_obj = str_obj.split('\n')
    # You can use another delimiter instead of ',' below.
    if len(prev_str.split(',')) < len(list_obj[1].split(',')) or len(list_obj[0].split(',')) < len(list_obj[1].split(',')):
        list_obj[0] = prev_str+list_obj[0]
    else:
        list_obj = [prev_str]+list_obj
    prev_str = list_obj[-1]
    del str_obj, list_obj[-1] 
    list_of_elements = [st.split(',') for st in list_obj]
    del list_obj
    df = pd.DataFrame(list_of_elements)
    del list_of_elements
    gc.collect()
    # You can process your pandas dataframe here, but you need to cast it to correct datatypes.
    # casting na values to numpy nan type.
    na_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null']
    df = df.replace(na_values, np.nan)
    dtypes = {col1: 'float32', col2:'category'}
    df = df.astype(dtype=dtypes, copy=False)
Orts answered 14/9, 2019 at 12:22 Comment(0)
A
3

You can use the method iter_lines of the received streaming body to read the body line by line without loading it into memory https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html

Here is a code example:

import boto3

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

for line in body.iter_lines():
    line = line.decode('utf-8')
    ...   # process the line
Anemograph answered 15/7, 2022 at 14:45 Comment(1)
this is not working for excelTswana

© 2022 - 2024 — McMap. All rights reserved.