Why do I sometimes get Key Error using SQS client
Asked Answered
H

3

10

I am using boto3 SQS client to receive messages from AWS SQS FIFO queue.

def consume_msgs():
    sqs = None
    try:
        sqs = boto3.client('sqs',
                       region_name=S3_BUCKET_REGION,
                       aws_access_key_id=AWS_ACCESS_KEY_ID,
                       aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    except Exception:
        logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
        logger.error(traceback.format_exc())

  ### more code to process message

The application is set up as service on EC2 using upstart. It works fine most of time. But sometimes when I restart the service after code change, the app would exit with the following error

2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
  File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
    return _get_default_session().client(*args, **kwargs)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
    return self._components.get_component(name)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
    del self._deferred[name]
KeyError: 'endpoint_resolver'

Restarting the service usually fixes it. It doesn't happen every time I restart the service. What is confusing is the KeyError warning leading the actual error traceback. What exactly does this KeyError refer to? It can't be the AWS_SECRET_ACCESS_KEY since this key is never changed and it works just fine most of the time. The issue happens quite randomly and comes and goes. Therefore it is hard to debug. And I don't understand how this error escaped the try..except block

EDIT

Based on comments, this seem to be related to multithreading. consume_msg is indeed run by multiple threads def process_msgs():

for i in range(NUM_WORKERS):
    t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
    t.setDaemon(True)
    t.start()
while True:
    time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)
Hadji answered 6/10, 2018 at 1:43 Comment(0)
P
3

This github issue suggests you should set the sqs client in the top-level once (rather than in the function):

sqs = boto3.client('sqs',
                   region_name=S3_BUCKET_REGION,
                   aws_access_key_id=AWS_ACCESS_KEY_ID,
                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY)


def consume_msgs():
    # code to process message
Protero answered 6/10, 2018 at 1:57 Comment(5)
This issue mentions the same thing / same solution.Protero
consume_msg is run by multiple threads (see my new edits). Moving sqs client out of this function will have it ended up in the main thread. Well each thread still needs its own sqs client, right?Hadji
The link you provided did suggest creating session for each thread instead of having the clients sharing the same session. Something like boto3.Session().client.Hadji
@Hadji I think you're misreading that, or - at least - some comments suggest putting it at the top-level (and in the main thread) if you're using threading.Protero
You could also move the polling to the main thread and have only the processing on the worker threads. It sounds like multiple sqs clients and threads is a bad combination for boto3 (likely buggy).Protero
F
16

Maybe I misunderstand some of the other answers, but in the case of multithreaded execution, I don't think that having one boto3 client object and passing it to other functions will work if those functions are executed in separate threads. I've been experiencing sporadic endpoint_resolver errors invoking a boto3 client service, and they were stopped by following the example in the documentation and the comments on boto3 GitHub issues such as #1246 and #1592, and creating a separate session object in each thread. In my case, it meant an almost trivial change in my code, going from

client = boto3.client(variant, region_name = creds['region_name'],
                      aws_access_key_id = ...,
                      aws_secret_access_key = ...)

to

session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
                        aws_access_key_id = ...,
                        aws_secret_access_key = ...)

in the function that is executed in separate threads. My reading of the OP's code for consume_msgs() is that a similar change could be made and it would eliminate the occasional endpoint_resolver error.

Fidelity answered 7/1, 2020 at 20:45 Comment(1)
In fact only creating a boto client isn't thread safe. Using one client in separate threads is ok. On the other hand using same resource and session in separate threads is not thread safe. Another catch: creating a session in each thread is quite time consuming. You can find multithreaded client example in documentationCacie
P
3

This github issue suggests you should set the sqs client in the top-level once (rather than in the function):

sqs = boto3.client('sqs',
                   region_name=S3_BUCKET_REGION,
                   aws_access_key_id=AWS_ACCESS_KEY_ID,
                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY)


def consume_msgs():
    # code to process message
Protero answered 6/10, 2018 at 1:57 Comment(5)
This issue mentions the same thing / same solution.Protero
consume_msg is run by multiple threads (see my new edits). Moving sqs client out of this function will have it ended up in the main thread. Well each thread still needs its own sqs client, right?Hadji
The link you provided did suggest creating session for each thread instead of having the clients sharing the same session. Something like boto3.Session().client.Hadji
@Hadji I think you're misreading that, or - at least - some comments suggest putting it at the top-level (and in the main thread) if you're using threading.Protero
You could also move the polling to the main thread and have only the processing on the worker threads. It sounds like multiple sqs clients and threads is a bad combination for boto3 (likely buggy).Protero
C
0

I got this error when creating a client for S3 but AFAIK it is the same issue. There's non-thread-safe code that is used in the process of creating a client:

        if name in self._deferred:
            factory = self._deferred[name]
            self._components[name] = factory()
            # Only delete the component from the deferred dict after
            # successfully creating the object from the factory as well as
            # injecting the instantiated value into the _components dict.
            del self._deferred[name]

(from botocore/session.py in the get_component method - that is the code that raises the KeyError when trying to delete a key that was deleted by a different thread)

Locking the client creation solved it for me (as suggested in https://github.com/boto/boto3/pull/806)

Christalchristalle answered 13/6, 2019 at 8:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.