I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.
Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.
As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.
I have two questions:
- How do I detect the child error in the parent?
- How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.
I am using python 2.7.
Here are the essential parts of my code:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... initializing mongo db connection ...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()