Google API is excellent at hiding the complexities of preforming Google Translation. Unfortunately, if you step into Google API code, it’s using standard HTTP requests. This means that when you’re running 20, 000 plus requests, regardless of thread pooling, there will be a huge bottle neck.
Consider creating HTTP requests using aiohttp (you’ll need to install from pip) and asyncio. This will allow you to run asynchronous HTTP requests. (It means you don’t need to use google.cloud.translate_v2, multiprocessing or tqdm.notebook).
Simply call an await method in asyncio.run(), the method can creates an array of methods to preform aiohttp.session.get(). Then call asyncio.gather() to collect all the results.
In the example below I'm using an API key https://console.cloud.google.com/apis/credentials (instead of Google Application Credential / Service Accounts).
Using your example with asyncio & aiohttp, it ran in 30 seconds and without any errors. (Although you might want to extend timeout to session).
It's worth pointing out that Google has a limit of 6 million characters per minute. Your test is doing 360,000. Therefore you'll reach the limit if you run the test 17 times in a minute!
Also the speed is mainly determined by the machine and not Google API. (I ran my tests on a pc with 3GHz, 8 core and 16GB ram).
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
async def main():
statements = ["this is another sentence"]*20000
Logger({'Message': f'Start running Google Translate API for {len(statements)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(statements)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(statements))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
Additional test
The initial test is running the same translation. Therefore I’ve created a test to check the results are not being cached on Google. I manually copied an eBook into a text file. Then in Python, the code opens the file and groups the text into array of 100 characters and then take the first 20,000 item from the array and translate each row. Interestingly it still took under 30 seconds.
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await aiohttp.session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
def readEbook():
# This is a simple test to make sure response is not cached.
# I grabbed a random online pdf (http://sd.blackball.lv/library/Beginning_Software_Engineering_(2015).pdf) and copied text into notepad.
with open("C:\\Dev\\ebook.txt", "r", encoding="utf8") as f:
return f.read()
def chunkText(text):
chunk_size = 100
chunks= len(text)
chunk_array = [text[i:i+chunk_size] for i in range(0, chunks, chunk_size)]
formatResults = [x for x in chunk_array if len(x) > 10]
return formatResults[:20000]
async def main():
data = readEbook()
chunk_data = chunkText(data)
Logger({'Message': f'Start running Google Translate API for {len(chunk_data)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(chunk_data)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(chunk_data))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
Finally you can find more info about the Google Translate API HTTP request https://cloud.google.com/translate/docs/reference/rest/v2/translate and you can run the request through Postman.
HTTP Error 503: Service Unavailable
. – Cycling503
response contains aRetry-After
header with a delay or a date to retry. See developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After – Listersleep_time = 4
andsleep_time *= 4
? – Gaw