I am using boto3 SQS client to receive messages from AWS SQS FIFO queue.
def consume_msgs():
sqs = None
try:
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
except Exception:
logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
logger.error(traceback.format_exc())
### more code to process message
The application is set up as service on EC2 using upstart
. It works fine most of time. But sometimes when I restart the service after code change, the app would exit with the following error
2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
return _get_default_session().client(*args, **kwargs)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
return self._components.get_component(name)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'
Restarting the service usually fixes it. It doesn't happen every time I restart the service. What is confusing is the KeyError
warning leading the actual error traceback. What exactly does this KeyError
refer to? It can't be the AWS_SECRET_ACCESS_KEY
since this key is never changed and it works just fine most of the time. The issue happens quite randomly and comes and goes. Therefore it is hard to debug. And I don't understand how this error escaped the try..except
block
EDIT
Based on comments, this seem to be related to multithreading. consume_msg
is indeed run by multiple threads
def process_msgs():
for i in range(NUM_WORKERS):
t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
t.setDaemon(True)
t.start()
while True:
time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)