(AWS) Athena: Query Results seem too short
Asked Answered
M

4

8

My Athena queries appear to be too short in their results. Trying to figure out Why?

Setup:

Glue Catalogs (118.6 Gig in size). Data: Stored in S3 in both CSV and JSON format. Athena Query: When I query data for a whole table, I only get 40K results per Query, there should be 121Million Records for that query on average for one month's data.

Does Athena Cap query result data? Is this a service limit (the documentation does not suggest this to be the case).

Must answered 18/1, 2018 at 19:26 Comment(0)
F
6

So, getting 1000 results at a time obviously doesn't scale. Thankfully, there's a simple workaround. (Or maybe this is how it was supposed to be done all along.)

When you run an Athena query, you should get a QueryExecutionId. This Id corresponds to the output file you'll find in S3.

Here's a snippet I wrote:

s3 = boto3.resource("s3")
athena = boto3.client("athena")
response: Dict = athena.start_query_execution(QueryString=query, WorkGroup="<your_work_group>")
execution_id: str = response["QueryExecutionId"]
print(execution_id)

# Wait until the query is finished
while True:
    try:
        athena.get_query_results(QueryExecutionId=execution_id)
        break
    except botocore.exceptions.ClientError as e:
        time.sleep(5)

local_filename: str = "temp/athena_query_result_temp.csv"
s3.Bucket("athena-query-output").download_file(execution_id + ".csv", local_filename)
return pd.read_csv(local_filename)

Make sure the corresponding WorkGroup has "Query result location" set, e.g. "s3://athena-query-output/"

Also see this thread with similar answers: How to Create Dataframe from AWS Athena using Boto3 get_query_results method

Fortner answered 23/9, 2020 at 15:36 Comment(0)
C
4

It seems that there is a limit of 1000. You should use NextToken to iterate over the results.

Quote of the GetQueryResults Documentation

MaxResults The maximum number of results (rows) to return in this request.

Type: Integer

Valid Range: Minimum value of 0. Maximum value of 1000.

Required: No

Cannonade answered 18/1, 2018 at 20:13 Comment(2)
From the SDK perspective this makes sense. But in the Console , the query limit exceeds well above 1000. A table that has 121,000,000 records for a select * returns 40,000 records.Must
Okay. Didn't know that you are using the CLI SDK. But if CLI returns 40000, the approach is the same. Iterate with help of NextToken through your results.Cannonade
I
2

Another option is Paginate and count approach : Don't know whether better way to do it like select count(*) from table like...

Here is the complete example code ready to use. Used python boto3 athena api I used paginator and converted result as list of dict and also returning count along with the result.

below are 2 methods First one will paginate second one will convert paginated result to list of dict and calculate count.

Note : converting in to list of dict is not necessary in this case. If you don't want that.. in the code you can modify to have only count

def get_athena_results_paginator(params, athena_client):
    """

    :param params:
    :param athena_client:
    :return:
    """
    query_id = athena_client.start_query_execution(
        QueryString=params['query'],
        QueryExecutionContext={
            'Database': params['database']
        }
        # ,
        # ResultConfiguration={
        #     'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        # }
        , WorkGroup=params['workgroup']

    )['QueryExecutionId']
    query_status = None
    while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
        query_status = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
        if query_status == 'FAILED' or query_status == 'CANCELLED':
            raise Exception('Athena query with the string "{}" failed or was cancelled'.format(params.get('query')))
        time.sleep(10)
    results_paginator = athena_client.get_paginator('get_query_results')
    results_iter = results_paginator.paginate(
        QueryExecutionId=query_id,
        PaginationConfig={
            'PageSize': 1000
        }
    )
    count, results = result_to_list_of_dict(results_iter)
    return results, count


def result_to_list_of_dict(results_iter):
    """

    :param results_iter:
    :return:
    """
    results = []
    column_names = None
    count = 0
    for results_page in results_iter:
        print(len(list(results_iter)))
        for row in results_page['ResultSet']['Rows']:
            count = count + 1
            column_values = [col.get('VarCharValue', None) for col in row['Data']]
            if not column_names:
                column_names = column_values
            else:
                results.append(dict(zip(column_names, column_values)))
    return count, results

Ikeikebana answered 19/11, 2021 at 5:54 Comment(0)
M
2
import time
import json
import boto3
import pandas as pd
from datetime import datetime, timedelta, date

res = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': '         '  # Replace with your actual database
    },
    ResultConfiguration={
        'OutputLocation': '                            ',  # Replace with your actual S3 output location aws-athena-query-results-129137241730-eu-west-1
    }
)
while True:
    # Get the query execution status
    execution = client.get_query_execution(QueryExecutionId=res['QueryExecutionId'])
    status = execution['QueryExecution']['Status']['State']
    if status == 'SUCCEEDED':
        # If the query execution succeeded, get the query results
        response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'])
        # Fetch all the rows
        while 'NextToken' in response:
            next_response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], NextToken=response['NextToken'])
            response['ResultSet']['Rows'].extend(next_response['ResultSet']['Rows'])
            if 'NextToken' in next_response:
                response['NextToken'] = next_response.get('NextToken')
            else:
                break
                
        # Process the results in JSON FORMAT --> WILL  have to parse to pd dataframe
        resultados = response
     elif status == 'FAILED' or status == 'CANCELLED':
            # If the query execution failed or was cancelled, raise an exception
            raise Exception('Query execution failed or was cancelled')
else:
    # If the query execution is still running, wait for a while before checking the status again
    time.sleep(5)
Mccain answered 13/12, 2023 at 3:30 Comment(1)
try to use --> while 'NextToken' in response:Mccain

© 2022 - 2024 — McMap. All rights reserved.