Response payload is not completed using asyncio/aiohttp
Asked Answered
H

3

22

I've written a Python 3.7 script that asynchronously (asyncio 3.4.3 and aiohttp 3.5.4) creates a Salesforce bulk API (v45.0) job/batch using multiple objects queried by a single SOQL statement each, waits for the batches to complete, upon completion downloads (streaming) the results to a server, does some data transformations, and then finally synchronously uploads the results to SQL Server 2016 SP1 (13.0.4560.0). I have had plenty of successful trial runs with this and thought it was working perfectly, however, I've recently started intermittently receiving the following error and am kind of at a loss on how to fix as there are very few reports/solutions of this on the web:

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

Sample code snippet:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{'Account': {'job':
                {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
             'id': '8752R00000iUjtReqS'},
             'soql': 'select Id,Name from Account'},

 'Contact': {'job':
                {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
             'id': '8800o00000POIkLlLa'},
             'soql': 'select Id,Name from Contact'}}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
                    async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

Traceback (error lines won't match code snippet above):

Traceback (most recent call last):

File "C:\Code\salesforce.py", line 252, in asyncio.run(asyncDownload())

File "C:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main)

File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete return future.result()

File "C:\Code\salesforce.py", line 241, in asyncDownload await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

File "C:\Code\salesforce.py", line 183, in retrieveResults chunk = await r.content.read(81920)

File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 369, in read await self._wait('read')

File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 297, in _wait await waiter

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

The root of the problem seems to begin with r.content.read(81920) which should be streaming data in 81920 byte chunks but that's about as far as I can get.

I don't think this is a network issue on my end as there are other small jobs connected to external sources on this server that finish without issue while this job runs. Does anyone have any idea what is going on here?

Thank you!

-Edit:

I've tried iter_any() instead of read() and still get the same error...

async for data in r.content.iter_any():
    await outfile.write(data)

I've tried readline() and still get the same error...

async for line in r.content.readline():
    await outfile.write(line)

I have since worked in some retry functionality in the error handling piece of the code (not included in the original problem), which ultimately allows the jobs to complete. The payload errors are still happening, and that is still the main issue, but retrying the downloads has been a successful workaround. The problem still persists if anyone is able to provide further information.

Haven answered 28/5, 2019 at 16:47 Comment(6)
Job finishes ok on SF end? You can see it in Setup -> Bulk data load jobs?Serow
@Serow Yes, jobs finish without issue, the 'Completed' batch status is my trigger to begin the retrieveResults() function.Haven
No idea, sorry. What if you issue same request from Postman, curl, SF Workbench? If it seems to work fine in sandbox but dies in production / developer edition - maybe you're exhausting the rolling 24h API requests limit? (Ideally you'd see a HTTP header about it)Serow
I should add that I believe this occurs while the data is streaming to disk (The while True: section) because a 200Mb download will start and then the error randomly appears sometime through the download, but not always. I know for a fact I'm not exhausting the API limit - I keep an eye on it frequently and am consistently under 5% usage. I'm going to try regular requests to see if I can at least complete my downloads, just not a fan of losing the async functionality.Haven
I have a script that is throwing this same error and is retrieving JSON data that has been successfully running for months now. For me, the issue was with the server; The disk was full. As soon as I cleared up some space it started working normally again. You might want to contact the server admin to check.Emancipator
@Emancipator Definitely not a storage issue. There's always at least ~200GB storage space free on the server. The last paragraph in my original posts's Edit has been the working solution for the unexplained timeouts.Haven
S
1

Hi have you try to insert await asyncio.sleep(0) in :

                    ...
                    while True:
                        chunk = await r.content.read(81920)
                        await asyncio.sleep(0)
                        if not chunk:
                            break
                        await outfile.write(chunk)
                    ...
Singlestick answered 7/9, 2021 at 8:57 Comment(2)
@Haven This on my case has resolve the same issue I had giving loop a chance to process other coroutines. I also notice on slow connections ClientPayloadError appear often, so I have setup the timeouts this way :Singlestick
timeout=aiohttp.ClientTimeout(total=60*60, sock_read=240); async with aiohttp.ClientSession(timeout=timeout) as session:Singlestick
M
0

I had this error in Amazon Lambda (this was thrown on requests)

await asyncio.gather(*tasks) # tasks like asyncio.ensure_future()

Solution, fix the build env:

FROM amazonlinux:2 AS 

to

FROM lambci/lambda:build-python3.8 

I guess the problem is that .so files or something at a lower level, used internally by the library to manage coroutines, is not compatible with the lambda environment. Hence, building in the right docker base you solve the issue.

Morphophonemics answered 30/11, 2021 at 16:18 Comment(1)
Anyway, I don't understand why this mismatch happens since docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html aws lambda python3.8 is based on amazon linux 2Morphophonemics
C
-1

"The event loop is already running," is a common issue when using asyncio.run within a script that is already running within an event loop. To resolve this, you can use asyncio.create_task to create and run your asynchronous tasks.

  • Modify async def retrieveResults :
    Added session as a parameter. Used async with session: instead of creating a new ClientSession.
    async def retrieveResults(session, jobId, batchId, sfObject):
        headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
        
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data)  # list of batch results
    
            for resultID in batchResults:
                async with session.get(
                    url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}',
                    headers=headers,
                    timeout=None
                ) as r:
                    async with aiofiles.open(
                        f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv',
                        'wb'
                    ) as outfile:
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)
  • Modify async def downloadResults:
    Used async with aiohttp.ClientSession() as session: to create a session within the context of the function. Passed session to the retrieveResults function.
    async def asyncDownload():
        async with aiohttp.ClientSession() as session:
            tasks = [
                retrieveResults(session, objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject)
                for sfObject in objectDictionary
            ]
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        asyncio.run(asyncDownload())
Cush answered 5/12, 2023 at 17:49 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.