Wait until AWS Glue crawler has finished running
Asked Answered
N

3

6

In the documentation, I cannot find any way of checking the run status of a crawler. The only way I am doing it currently is constantly checking AWS to check if the file/table has been created.

Is there a better way to block until crawler finishes its run?

Nystatin answered 25/10, 2018 at 19:18 Comment(0)
G
1

You can use boto3 (or similar) to do it. There is the get_crawler method. You will find needed information in "LastCrawl" section

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_crawler

Glomeration answered 26/10, 2018 at 6:28 Comment(0)
D
23

The following function uses boto3. It starts the AWS Glue crawler and waits until its completion. It also logs the status as it progresses. It was tested with Python v3.8 with boto3 v1.17.3.

import logging
import time
import timeit

import boto3

log = logging.getLogger(__name__)


def run_crawler(crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5) -> None:
    """Run the specified AWS Glue crawler, waiting until completion."""
    # Ref: https://stackoverflow.com/a/66072347/
    timeout_seconds = timeout_minutes * 60
    client = boto3.client("glue")
    start_time = timeit.default_timer()
    abort_time = start_time + timeout_seconds

    def wait_until_ready() -> None:
        state_previous = None
        while True:
            response_get = client.get_crawler(Name=crawler)
            state = response_get["Crawler"]["State"]
            if state != state_previous:
                log.info(f"Crawler {crawler} is {state.lower()}.")
                state_previous = state
            if state == "READY":  # Other known states: RUNNING, STOPPING
                return
            if timeit.default_timer() > abort_time:
                raise TimeoutError(f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed.")
            time.sleep(retry_seconds)

    wait_until_ready()
    response_start = client.start_crawler(Name=crawler)
    assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200
    log.info(f"Crawling {crawler}.")
    wait_until_ready()
    log.info(f"Crawled {crawler}.")

Optional bonus: Function to create or update an AWS Glue crawler using some reasonable defaults:

def ensure_crawler(**kwargs: Any) -> None:
    """Ensure that the specified AWS Glue crawler exists with the given configuration.

    At minimum the `Name` and `Targets` keyword arguments are required.
    """
    # Use defaults
    assert all(kwargs.get(k) for k in ("Name", "Targets"))
    defaults = {
        "Role": "AWSGlueRole",
        "DatabaseName": kwargs["Name"],
        "SchemaChangePolicy": {"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DELETE_FROM_DATABASE"},
        "RecrawlPolicy": {"RecrawlBehavior": "CRAWL_EVERYTHING"},
        "LineageConfiguration": {"CrawlerLineageSettings": "DISABLE"},
    }
    kwargs = {**defaults, **kwargs}

    # Ensure crawler
    client = boto3.client("glue")
    name = kwargs["Name"]
    try:
        response = client.create_crawler(**kwargs)
        log.info(f"Created crawler {name}.")
    except client.exceptions.AlreadyExistsException:
        response = client.update_crawler(**kwargs)
        log.info(f"Updated crawler {name}.")
    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
Deathful answered 6/2, 2021 at 0:20 Comment(0)
G
1

You can use boto3 (or similar) to do it. There is the get_crawler method. You will find needed information in "LastCrawl" section

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_crawler

Glomeration answered 26/10, 2018 at 6:28 Comment(0)
A
0

According to the most recent AWS docs, you'd also use a custom waiter to create a status crawler waiter. A MWE of how use it for this purpose is shown below:

from enum import Enum
from customwaiter import CustomWaiter
import logging
import boto3
import botocore

logger = logging.getLogger(__name__)

class CrawlerState(Enum):
    READY = "failure"
    STOPPING = "success"
    RUNNING = "running"
    
class CrawlerStateWaiter(CustomWaiter):
    def __init__(self, client):
        super().__init__(
            "CrawlerState",
            "get_crawler",
            "Crawler.State",
            {"STOPPING": CrawlerState.STOPPING, "READY": CrawlerState.READY},
            client,
            max_tries=100,
        )

    def wait(self, Name):
        self._wait(Name=Name)

if __name__ == "__main__":
    glue_client = boto3.client('glue')
    response = glue_client.start_crawler(
        Name=CrawlerName
    )
    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
    waiter = CrawlerStateWaiter(glue_client)
    waiter.wait(Name=CrawlerName)
    crawler_info = glue_client.get_crawler(
        Name=CrawlerName
    )
    assert crawler_info["Crawler"]["LastCrawl"]["Status"] == "SUCCEEDED"

Here the following assumptions were made:

  1. Once the crawler is running it won't take long until the crawler changes from STOPPING to READY.
  2. In order to check whether the crawler performed successfully, I'm assuming that right after finishing this action would be available.
Altercation answered 21/10, 2021 at 16:5 Comment(2)
Hello Miguel, i'm relatively new to aws. i have created your code for calling the crawler and also placed the custom_waiter.py in the same path. How to call the call from this above job? it will be helpful for me as well as newbies like me. ThanksParalogism
@Paralogism I believe I couldn't fully understand your question, however, once you create your was custom service (i.e. CrawlerStateWaiter object) you'd only create a waiter object like this: waiter = CrawlerStateWaiter(<service_client>), where service_client is you boto client.Altercation

© 2022 - 2024 — McMap. All rights reserved.