SSL error using multiprocessing in Python with Google Cloud services
Asked Answered
D

2

8

In my application on Flask I use multiprocessing in a batch of files - user uploads a .zip with many pdf files -, after the upload, a new entity is created on the database for each file, then a thread is started and call a multiprocessing pool so each file starts a process which has interactions to Google Cloud services such as Google Storage and Google Datastore.

import threading
import multiprocessing
import sys

class ProcessMulti(threading.Thread):
    def __init__(self, files_ids):
        self.files_ids = files_ids
        super().__init__()

    def run(self):
        with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
            for i, _ in enumerate(pool.imap_unordered(process_one, self.files_ids), 1):
                sys.stderr.write('\rdone {0:%}'.format(i/len(self.files_ids)))

def process_one(file_id):

    print("Process started by {}".format(file_id))
    file = File(file_id)
    file.process()
    print("Process finished by {}".format(file_id))

    return file.id

Inside File object, there are trivial interactions with Google Datastore and Google Storage - for example reding files from bucket or modifying data. Everything works smoothly locally... but in production using SSL connection, when trying to start the process, the following error is thrown and nothing happens at all:

Process started by 5377634535997440
E1004 15:49:32.711329522   32255 ssl_transport_security.cc:476] Corruption detected.
E1004 15:49:32.711356181   32255 ssl_transport_security.cc:452] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1004 15:49:32.711361146   32255 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED

Anyone has a clue to what's causing this error? I did some research and found some errors related to overload the SSL socket... but I have no idea which actions to fix that or alternatives to multiprocessing with similar performance. Thank you.

Dale answered 4/10, 2019 at 18:55 Comment(4)
There is nothing about SSL in the code you've shown. But such problems occur for example if there is a SSL socket which is used in both client and server. Since the SSL state is per process any SSL activities in one process will implicitly invalidate the SSL state in the other process.Rajkot
@SteffenUllrich thank you for your comment, so if SSL state is per process what would be a better practice for using multiprocessing? Create a SSL state alone for each process? how could I archieve that?Dale
The SSL state is bound to the SSL socket. Thus, don't use the same SSL socket in two different processes. For example do the TCP level accept in the server but do the ssl_wrap in the child so that the SSL state is only in the child process.Rajkot
This github issue is marked as closed now, but seems as though it could be related to what you're experiencing, github.com/googleapis/google-cloud-python/issues/3501 . If anything, there is some helpful information in the comments of that issue regarding multithreading with the google-cloud-storage python client.Phenacite
D
1

I ended up exchanging mutiprocessing and threading operations to celery task queues as there were some concerns regarding thread safety when connecting to gcloud services that I couldn't overcome. Celery implementation has been a good solution for many multiple async tasks on my app.

#Import celery instance with app context already set
from main_app import celery

@celery.task
def process_one(file_id):

    print("Process started by {}".format(file_id))
    file = File(file_id)
    file.process()
    print("Process finished by {}".format(file_id))

    return file.id
Dale answered 3/12, 2019 at 2:29 Comment(0)
S
4

An alternative solution that worked for us was to set GRPC_POLL_STRATEGY to 'poll' in the parent process:

os.environ['GRPC_POLL_STRATEGY']='poll'

We were getting the Decryption error: TSI_DATA_CORRUPTED error while using multi-threading with Firebase.

Source: https://github.com/grpc/grpc/issues/28557

Storehouse answered 1/11, 2022 at 10:33 Comment(0)
D
1

I ended up exchanging mutiprocessing and threading operations to celery task queues as there were some concerns regarding thread safety when connecting to gcloud services that I couldn't overcome. Celery implementation has been a good solution for many multiple async tasks on my app.

#Import celery instance with app context already set
from main_app import celery

@celery.task
def process_one(file_id):

    print("Process started by {}".format(file_id))
    file = File(file_id)
    file.process()
    print("Process finished by {}".format(file_id))

    return file.id
Dale answered 3/12, 2019 at 2:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.