How to Create Dataframe from AWS Athena using Boto3 get_query_results method
Asked Answered
E

9

32

I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:

df = pd.read_csv(OutputLocation)

But this seems like an expensive way. Recently I noticed the get_query_results method of boto3 which returns a complex dictionary of the results.

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

I'm facing two main issues:

  1. How can I format the results of get_query_results into pandas data frame?
  2. get_query_results only returns 1000 rows. How can I use it to get two million rows?
Elsie answered 26/8, 2018 at 12:49 Comment(5)
I think it may help the people who will answer your question if you give a sample of the 'complex dictionary' returned. Any sensitive data can be redacted, as it's mainly the structure of the data that would be important. Also, pandas offers DataFrame.from_dict(), DataFrame.from_records(), pandas.read_json(). There are others too, but again it is difficult to say with certainty which to use without knowing the structure of the data. Also, it may benefit you to review the documentation for get_query_results(). Maybe it takes parameter(s), meaning default of 1000 rows can be increased.Keary
Thanks @Keary I see your point. it's really long text to put here. lets use the basic structure from link as reference and keep but one of the fields in the Data list is a varchar looks like another dict. e.g. {temperature=41.1}Elsie
Try response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000) and see if you get 2000 rows this time. Also, it might be reasonable to presume that there is an upper limit to the number of rows that can be returned via a single request (although I can't find any mention of it in the documentation). If there is an upper limit, all you would need to do it is parse the JSON in response for 'NextToken' key, and include it the next time you call client.get_query_results() and you would effectively be getting the next 1000 (or whatever the limit is) rows.Keary
Documentation states get_query_results() returns a Python dictionary, so try d = response['ResultSet']['Rows'], then df = pd.DataFrame.from_dict(d). However, you might not get expected DataFrame if d contains metadata (stuff that you don't want in the final DataFrame). If this is the case, you may need to extract from/mutate d (with a for loop or some other logic) so it contains what you want. This link may help: pandas.pydata.org/pandas-docs/stable/generated/…Keary
Thanks @chillin. As for max limit you can see in this link the limitation of 1000. But I guess you are right I'll have to find a way to use the Next Token.Elsie
B
39

get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?

If you try to add:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

You will obtain the next error:

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

The self.filename can be:

self.filename = response['QueryExecutionId'] + ".csv"

Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.

import time
import boto3
import pandas as pd
import io

class QueryAthena:
    
    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query
        
    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])
            return response

        except Exception as e:
            print(e)                
  
    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))
            
            df = self.obtain_data()
            return df
            
        except Exception as e:
            print(e)      
            
    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()
            
            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  
           
        
if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()
Boettcher answered 24/10, 2018 at 16:33 Comment(7)
Hi @EricBellet , "You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe) " - This is exactly what I did eventually. I can additionally say that I cmpared performance between this approach VS pagination and loading results from S3 is much faster when it comes to 2 Million rows as is my case....Elsie
Hi @NivCohen, did you get 2 million rows with pagination? Have you a code example to share?Boettcher
Hi @EricBellet, I've updated my answer above with pagination example that worked for me in case of 2 Million rows. I had to restore it and fit it , so it's not so well formatted. I hope this will help...Elsie
this is definitely the superior answer, this worked great for me and handles any amount of data with much simpler code. Thanks for putting this into a class so it's easy to incorporate the solution.Nickles
@EricBellet: why we need this self.s3_input = 's3://' + self.bucket + '/my_folder_input'Confute
@EricBellet: btw great answerConfute
Not to be contrarian, but I find that CEM's awswrangler answer below (https://mcmap.net/q/445434/-how-to-create-dataframe-from-aws-athena-using-boto3-get_query_results-method) is by far the best solution for queries I have been making. However, very large queries, you may need special permissions to apply the default ctas_approach=True flag. (Documentation linked in his solution.)Michalmichalak
P
40

You can use AWS SDK for Pandas to create pandas data frame directly querying through Athena.

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

You can find more information here

Practiced answered 28/4, 2021 at 5:29 Comment(5)
Excelent library, this is indeed the best answer in 2021Zeppelin
What is the MaxResults size for this?Rubefaction
If I could upvote this 200× I would do so. A quick test reveals that it will save me many hours, with a trivial substitution of code.Michalmichalak
When I run this on my system I get this result: QueryFailed: Insufficient permissions to execute the query. Error creating table : ... AccessDeniedException: An error occurred (AccessDeniedException) when calling the DeleteTable operation: ... because no identity-based policy allows the glue:DeleteTable action... I feel like something wasn't setup correctly or permissioned correctly. Any idea how to resolve this issue?Nowak
@Nowak your Glue should have the necessary permissions to do operations on Athena. Check this managed policy: AmazonAthenaFullAccess (docs.aws.amazon.com/athena/latest/ug/managed-policies.html). It probably has more permissions that you need. Instead, you can create your own role with specific policies attached - managed, custom or inline. Then give the role to Glue.Practiced
B
39

get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?

If you try to add:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

You will obtain the next error:

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

The self.filename can be:

self.filename = response['QueryExecutionId'] + ".csv"

Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.

import time
import boto3
import pandas as pd
import io

class QueryAthena:
    
    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query
        
    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])
            return response

        except Exception as e:
            print(e)                
  
    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))
            
            df = self.obtain_data()
            return df
            
        except Exception as e:
            print(e)      
            
    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()
            
            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  
           
        
if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()
Boettcher answered 24/10, 2018 at 16:33 Comment(7)
Hi @EricBellet , "You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe) " - This is exactly what I did eventually. I can additionally say that I cmpared performance between this approach VS pagination and loading results from S3 is much faster when it comes to 2 Million rows as is my case....Elsie
Hi @NivCohen, did you get 2 million rows with pagination? Have you a code example to share?Boettcher
Hi @EricBellet, I've updated my answer above with pagination example that worked for me in case of 2 Million rows. I had to restore it and fit it , so it's not so well formatted. I hope this will help...Elsie
this is definitely the superior answer, this worked great for me and handles any amount of data with much simpler code. Thanks for putting this into a class so it's easy to incorporate the solution.Nickles
@EricBellet: why we need this self.s3_input = 's3://' + self.bucket + '/my_folder_input'Confute
@EricBellet: btw great answerConfute
Not to be contrarian, but I find that CEM's awswrangler answer below (https://mcmap.net/q/445434/-how-to-create-dataframe-from-aws-athena-using-boto3-get_query_results-method) is by far the best solution for queries I have been making. However, very large queries, you may need special permissions to apply the default ctas_approach=True flag. (Documentation linked in his solution.)Michalmichalak
E
22

I have a solution for my first question, using the following function

def results_to_df(results):
 
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
         values = []
         for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return listed_results

and then:

t = results_to_df(response)
pd.DataFrame(t)

As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:

def run_query(query, database, s3_output):
    ''' 
    Function for executing Athena queries and return the query ID 
    '''
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response



def format_result(results):
    '''
    This function format the results toward append in the needed format.
    '''
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    formatted_results = []
 
    for result in results['ResultSet']['Rows'][0:]:
        values = []
        for field in result['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        formatted_results.append(
            dict(zip(columns, values))
        )
    return formatted_results



res = run_query(query_2, database, s3_ouput) #query Athena



import sys
import boto3

marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()

while True:
    paginator = client.get_paginator('get_query_results')
    response_iterator = paginator.paginate( 
        QueryExecutionId=query_id,
        PaginationConfig={
            'MaxItems': 1000,
            'PageSize': 1000,
            'StartingToken': marker})

    for page in response_iterator:
        i = i + 1
        format_page = format_result(page)
        if i == 1:
            formatted_results = pd.DataFrame(format_page)
        elif i > 1:
            formatted_results = formatted_results.append(pd.DataFrame(format_page))

    try:
        marker = page['NextToken']
    except KeyError:
        break

print ("My program took", time.time() - start_time, "to run")

It's not formatted so good but I think it does the job...

2021 Update

Today I'm using custom wrapping for aws-data-wrangler as the best solution for the original question I asked several years ago.

import awswrangler as wr

def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
    """
    An end 2 end Athena query method, based on the AWS Wrangler package. 
    The method will execute a query and will return a pandas dataframe as an output.
    you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html

    Args:
        - query: SQL query.

        - database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).

        - ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.

        - categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.

        - chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.

        - s3_output (str, optional): Amazon S3 path.

        - workgroup (str, optional): Athena workgroup. 

        - keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False

        - profile (str, optional): aws account profile. if boto3_session profile will be ignored.

        - boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.

        - max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0

    Returns:
        - Pandas DataFrame

    """
    # test for boto3 session and profile.
    if ((boto3_session == None) & (profile != None)):
        boto3_session = boto3.Session(profile_name=profile, region_name=region_name)

    print("Querying AWS Athena...")

    try:
        # Retrieving the data from Amazon Athena
        athena_results_df = wr.athena.read_sql_query(
            query,
            database=database,
            boto3_session=boto3_session,
            categories=categories,
            chunksize=chunksize,
            ctas_approach=ctas_approach,
            s3_output=s3_output,
            workgroup=workgroup,
            keep_files=keep_files,
            max_cache_seconds=max_cache_seconds
        )

        print("Query completed, data retrieved successfully!")
    except Exception as e:
        print(f"Something went wrong... the error is:{e}")
        raise Exception(e)

    return athena_results_df

you can read more here

Elsie answered 26/8, 2018 at 19:54 Comment(0)
B
11

A very simple solution is to use a list comprehension with the boto3 Athena paginator. The list comprehension can then be simply passed into the pd.DataFrame() to create a DataFrame as such,

pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
              results['ResultSet']['Rows']])

Boto3 Athena to Pandas DataFrame

import pandas as pd
import boto3

result = get_query_results( . . . ) # your code here

def cleanQueryResult(result) :
    '''
    This will take the dictionary of the raw Boto3 Athena results and turn it into a 
    2D array for further processing

    Parameters
    ----------
    result dict
        The dictionary from the boto3 Athena client function get_query_results

    Returns
    -------
    list(list())
        2D list which is essentially the table result. The first row is the column name.
    '''
    return [[data.get('VarCharValue') for data in row['Data']]
            for row in result['ResultSet']['Rows']]

# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))

Millions of Results

This requires a the paginator object, https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators

As a hint, here's how you can append after each page

df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))
Bugbear answered 14/4, 2020 at 19:31 Comment(1)
To properly set headers, you could do clean_result = cleanQueryResult(result) df = pd.Dataframe(clean_result[1:], columns=clean_result[0]) Inchworm
D
4

Maybe you can try to use pandas read_sql and pyathena:

from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir='s3://bucket/folder',region_name='region')
df = pd.read_sql('select * from database.table', conn) #don't change the "database.table"
Domett answered 21/8, 2020 at 21:5 Comment(3)
Did this actually work for you? When I tried it, I got this error message: ModuleNotFoundError: No module named 'pyathena'Nowak
Hi @ASH, can you try 'pip install pyathena'? You probably need to install it.Domett
I think my environment was not provisioned correctly/fully or the rights are waaaayyy too restrictive. Nothing works, at all. I switched over to SageMaker, and everything works totally fine, just as I would expect it to.Nowak
R
2

I´ve used a while loop approach to solve this, in case NextToken is present, I extend que dataframe:

# Receive Query Results
# Method get_query_results() limits to max 1000, handled with while, and called NextToken.
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'])
results = query_results['ResultSet']['Rows']
while 'NextToken' in query_results:
    query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'], NextToken = query_results['NextToken'])
    results.extend(query_results['ResultSet']['Rows'])
    return results
return query_results['ResultSet']['Rows']
Reverso answered 22/11, 2021 at 17:16 Comment(1)
Welcome, Julio. There are six existing answers to this question, including an accepted answer with 24 upvotes. Are you sure your approach hasn't already been suggested? If not, why might someone prefer your approach over the existing approaches proposed? Are you taking advantage of new capabilities? Are there scenarios where your approach is better suited?Showers
S
0

Try this approach to convert response['records'] into dataframe using columnMetadata:

def results_to_df(response):
    columns = [
        col['label']
        for col in response['columnMetadata']
    ]

    listed_results = [[list(col.values())[0] if list(col.values())[0] else '' for col in 
    record] for record in response['records']]
    df = pd.DataFrame(listed_results, columns=columns)
    return df
Starlight answered 22/1, 2021 at 11:43 Comment(0)
D
0

While not the intent of the question, a few million row csv could be only several hundred Mb. Download query results as csv from the AWS console and then load into pandas using pandas.read_csv() is probably faster and easier than implementing one of the above solutions. It does not scale as well, but the OP only asked for 2 million rows. I have used successfully on files twice that size.

Debtor answered 6/7, 2023 at 20:51 Comment(0)
F
0

While the previously given answers are valid in their own sense, they do not really answer the original question, which is "How can I format the results of get_query_results into pandas data frame? "

If one does not want to rerun the query, or dump results to csv, but create a Pandas dataframe directly from the result object that is at hand, if the limitation of get_query_results only returning 1000 rows is not an issue, or if one makes use of the continue pagination feature of get_query_results and builds up the complete result:

columns = [item['VarCharValue'] for item in result[0]['Data']]
values = [[item.get('VarCharValue', '') for item in row['Data']] for row in result[1:]]
df = pd.DataFrame(values, columns=columns)

Having said that, it might be a good idea to check the size of the result object beforehand. Following gives the size in MB

import sys
sys.getsizeof(result) / 1e6

and keep in mind that the Pandas dataframe will be of similar size, hence you need to have as much free memory available before loading it into a dataframe.

If you have barely enough memory, deleting the result object after creating a dataframe would free up memory

del result
Frogman answered 25/4 at 8:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.