I've written a Python 3.7
script that asynchronously (asyncio 3.4.3 and aiohttp 3.5.4)
creates a Salesforce
bulk API (v45.0)
job/batch using multiple objects queried by a single SOQL
statement each, waits for the batches to complete, upon completion downloads (streaming) the results to a server, does some data transformations, and then finally synchronously uploads the results to SQL Server 2016 SP1 (13.0.4560.0)
. I have had plenty of successful trial runs with this and thought it was working perfectly, however, I've recently started intermittently receiving the following error and am kind of at a loss on how to fix as there are very few reports/solutions of this on the web:
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
Sample code snippet:
import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree
#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
password=password,
security_token=securityToken,
organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'
#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary =
{'Account': {'job':
{'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
'id': '8752R00000iUjtReqS'},
'soql': 'select Id,Name from Account'},
'Contact': {'job':
{'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
'id': '8800o00000POIkLlLa'},
'soql': 'select Id,Name from Contact'}}
async def retrieveResults(jobId, batchId, sfObject):
headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
async with aiohttp.ClientSession() as session:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
data = await r.text()
batchResults = ElementTree.fromstring(data) #list of batch results
for resultID in batchResults:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
while True:
chunk = await r.content.read(81920)
if not chunk:
break
await outfile.write(chunk)
async def asyncDownload():
await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])
if __name__ == "__main__":
asyncio.run(asyncDownload())
Traceback (error lines won't match code snippet above):
Traceback (most recent call last):
File "C:\Code\salesforce.py", line 252, in asyncio.run(asyncDownload())
File "C:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main)
File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete return future.result()
File "C:\Code\salesforce.py", line 241, in asyncDownload await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])
File "C:\Code\salesforce.py", line 183, in retrieveResults chunk = await r.content.read(81920)
File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 369, in read await self._wait('read')
File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 297, in _wait await waiter
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
The root of the problem seems to begin with r.content.read(81920)
which should be streaming data in 81920 byte chunks but that's about as far as I can get.
I don't think this is a network issue on my end as there are other small jobs connected to external sources on this server that finish without issue while this job runs. Does anyone have any idea what is going on here?
Thank you!
-Edit:
I've tried iter_any()
instead of read()
and still get the same error...
async for data in r.content.iter_any():
await outfile.write(data)
I've tried readline()
and still get the same error...
async for line in r.content.readline():
await outfile.write(line)
I have since worked in some retry functionality in the error handling piece of the code (not included in the original problem), which ultimately allows the jobs to complete. The payload errors are still happening, and that is still the main issue, but retrying the downloads has been a successful workaround. The problem still persists if anyone is able to provide further information.
retrieveResults()
function. – Havenwhile True:
section) because a 200Mb download will start and then the error randomly appears sometime through the download, but not always. I know for a fact I'm not exhausting the API limit - I keep an eye on it frequently and am consistently under 5% usage. I'm going to try regularrequests
to see if I can at least complete my downloads, just not a fan of losing the async functionality. – Haven