Python: Spacy and memory consumption
Asked Answered
S

3

16

1 - THE PROBLEM

I'm using "spacy" on python for text documents lemmatization. There are 500,000 documents having size up to 20 Mb of clean text.

The problem is the following: spacy memory consuming is growing in time till the whole memory is used.

2 - BACKGROUND

My hardware configuration: CPU: Intel I7-8700K 3.7 GHz (12 cores) Memory: 16 Gb SSD: 1 Tb GPU is onboard but is not used for this task

I'm using "multiprocessing" to split the task among several processes (workers). Each worker receives a list of documents to process. The main process performs monitoring of child processes. I initiate "spacy" in each child process once and use this one spacy instance to handle the whole list of documents in the worker.

Memory tracing says the following:

[ Memory trace - Top 10 ]

/opt/develop/virtualenv/lib/python3.6/site-packages/thinc/neural/mem.py:68: size=45.1 MiB, count=99, average=467 KiB

/opt/develop/virtualenv/lib/python3.6/posixpath.py:149: size=40.3 MiB, count=694225, average=61 B

:487: size=9550 KiB, count=77746, average=126 B

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:33: size=7901 KiB, count=6, average=1317 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_nouns.py:7114: size=5273 KiB, count=57494, average=94 B

prepare_docs04.py:372: size=4189 KiB, count=1, average=4189 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:93: size=3949 KiB, count=5, average=790 KiB

/usr/lib/python3.6/json/decoder.py:355: size=1837 KiB, count=20456, average=92 B

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_adjectives.py:2828: size=1704 KiB, count=20976, average=83 B

prepare_docs04.py:373: size=1633 KiB, count=1, average=1633 KiB

3 - EXPECTATIONS

I have seen a good recommendation to build a separated server-client solution [here]Is possible to keep spacy in memory to reduce the load time?

Is it possible to keep memory consumption under control using "multiprocessing" approach?

4 - THE CODE

Here is a simplified version of my code:

import os, subprocess, spacy, sys, tracemalloc
from multiprocessing import Pipe, Process, Lock
from time import sleep

# START: memory trace
tracemalloc.start()

# Load spacy
spacyMorph = spacy.load("en_core_web_sm")

#
# Get word's lemma
#
def getLemma(word):
    global spacyMorph
    lemmaOutput = spacyMorph(str(word))
    return lemmaOutput


#
# Worker's logic
#
def workerNormalize(lock, conn, params):
    documentCount = 1
    for filenameRaw in params[1]:
        documentTotal = len(params[1])
        documentID = int(os.path.basename(filenameRaw).split('.')[0])

        # Send to the main process the worker's current progress
        if not lock is None:
            lock.acquire()
            try:
                statusMessage = "WORKING:{:d},{:d},".format(documentID, documentCount)
                conn.send(statusMessage)
                documentCount += 1
            finally:
                lock.release()
        else:
            print(statusMessage)

        # ----------------
        # Some code is excluded for clarity sake
        # I've got a "wordList" from file "filenameRaw"
        # ----------------

        wordCount = 1
        wordTotalCount = len(wordList)

        for word in wordList:
            lemma = getLemma(word)
            wordCount += 1

        # ----------------
        # Then I collect all lemmas and save it to another text file
        # ----------------

        # Here I'm trying to reduce memory usage
        del wordList
        del word
        gc.collect()


if __name__ == '__main__':
    lock = Lock()
    processList = []

    # ----------------
    # Some code is excluded for clarity sake
    # Here I'm getting full list of files "fileTotalList" which I need to lemmatize
    # ----------------
    while cursorEnd < (docTotalCount + stepSize):
        fileList = fileTotalList[cursorStart:cursorEnd]

        # ----------------
        # Create workers and populate it with list of files to process
        # ----------------
        processData = {}
        processData['total'] = len(fileList)  # worker total progress
        processData['count'] = 0  # worker documents done count
        processData['currentDocID'] = 0  # current document ID the worker is working on
        processData['comment'] = ''  # additional comment (optional)
        processData['con_parent'], processData['con_child'] = Pipe(duplex=False)
        processName = 'worker ' + str(count) + " at " + str(cursorStart)
        processData['handler'] = Process(target=workerNormalize, name=processName, args=(lock, processData['con_child'], [processName, fileList]))

        processList.append(processData)
        processData['handler'].start()

        cursorStart = cursorEnd
        cursorEnd += stepSize
        count += 1

    # ----------------
    # Run the monitor to look after the workers
    # ----------------
    while True:
        runningCount = 0

        #Worker communication format:
        #STATUS:COMMENTS

        #STATUS:
        #- WORKING - worker is working
        #- CLOSED - worker has finished his job and closed pipe-connection

        #COMMENTS:
        #- for WORKING status:
        #DOCID,COUNT,COMMENTS
        #DOCID - current document ID the worker is working on
        #COUNT - count of done documents
        #COMMENTS - additional comments (optional)


        # ----------------
        # Run through the list of workers ...
        # ----------------
        for i, process in enumerate(processList):
            if process['handler'].is_alive():
                runningCount += 1

                # ----------------
                # .. and check if there is somethng in the PIPE
                # ----------------
                if process['con_parent'].poll():
                    try:
                        message = process['con_parent'].recv()
                        status = message.split(':')[0]
                        comment = message.split(':')[1]

                        # ----------------
                        # Some code is excluded for clarity sake
                        # Update worker's information and progress in "processList"
                        # ----------------

                    except EOFError:
                        print("EOF----")

                # ----------------
                # Some code is excluded for clarity sake
                # Here I draw some progress lines per workers
                # ----------------

            else:
                # worker has finished his job. Close the connection.
                process['con_parent'].close()

        # Whait for some time and monitor again
        sleep(PARAM['MONITOR_REFRESH_FREQUENCY'])


    print("================")
    print("**** DONE ! ****")
    print("================")

    # ----------------
    # Here I'm measuring memory usage to find the most "gluttonous" part of the code
    # ----------------
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')

    print("[ Memory trace - Top 10 ]")
    for stat in top_stats[:10]:
        print(stat)


'''

Squishy answered 25/4, 2019 at 2:45 Comment(1)
You seem to be using Lock and Queues together where I wouldn't expect them to occur. It is also hard to actually run this code without knowing what params could be. In general, I would expect that you would find some useful info in my repo spacy-extreme which deals with memory issues when using spaCy.Eatmon
S
10

Memory leaks with spacy

Memory problems when processing large amounts of data seem to be a known issue, see some relevant github issues:

Unfortunately, it doesn't look like there's a good solution yet.

Lemmatization

Looking at your particular lemmatization task, I think your example code is a bit too over-simplified, because you're running the full spacy pipeline on single words and then not doing anything with the results (not even inspecting the lemma?), so it's hard to tell what you actually want to do.

I'll assume you just want to lemmatize, so in general, you want to disable the parts of the pipeline that you're not using as much as possible (especially parsing if you're only lemmatizing, see https://spacy.io/usage/processing-pipelines#disabling) and use nlp.pipe to process documents in batches. Spacy can't handle really long documents if you're using the parser or entity recognition, so you'll need to break up your texts somehow (or for just lemmatization/tagging you can just increase nlp.max_length as much as you need).

Breaking documents into individual words as in your example kind of the defeats the purpose of most of spacy's analysis (you often can't meaningfully tag or parse single words), plus it's going to be very slow to call spacy this way.

Lookup lemmatization

If you just need lemmas for common words out of context (where the tagger isn't going to provide any useful information), you can see if the lookup lemmatizer is good enough for your task and skip the rest of the processing:

from spacy.lemmatizer import Lemmatizer
from spacy.lang.en import LOOKUP
lemmatizer = Lemmatizer(lookup=LOOKUP)
print(lemmatizer(u"ducks", ''), lemmatizer(u"ducking", ''))

Output:

['duck'] ['duck']

It is just a static lookup table, so it won't do well on unknown words or capitalization for words like "wugs" or "DUCKS", so you'll have to see if it works well enough for your texts, but it would be much much faster without memory leaks. (You could also just use the table yourself without spacy, it's here: https://github.com/michmech/lemmatization-lists.)

Better lemmatization

Otherwise, use something more like this to process texts in batches:

nlp = spacy.load('en', disable=['parser', 'ner'])
# if needed: nlp.max_length = MAX_DOC_LEN_IN_CHAR
for doc in nlp.pipe(texts):
  for token in doc:
    print(token.lemma_)

If you process one long text (or use nlp.pipe() for lots of shorter texts) instead of processing individual words, you should be able to tag/lemmatize (many) thousands of words per second in one thread.

Sakmar answered 25/4, 2019 at 12:6 Comment(2)
Thank you for your recommendations. I'll go through them carefully and think what can I apply for my case. I see your point about splitting words and agree with you. But in my case I have documents with 2 mixed languages: English and Russian. Spacy can handle English only but not Russian. So I have to check every word and send it to different libraries to lemmatize.Squishy
For Spacy 3, I found the lemmatizer moved to spacy.pipeline.lemmatizer, but I couldn't find the LOOKUP.Infrared
C
11

For people who land on this in the future, I found a hack that seems to work well:

import spacy
import en_core_web_lg
import multiprocessing

docs = ['Your documents']

def process_docs(docs, n_processes=None):
    # Load the model inside the subprocess, 
    # as that seems to be the main culprit of the memory issues
    nlp = en_core_web_lg.load()

    if not n_processes:
        n_processes = multiprocessing.cpu_count()

    processed_docs = [doc for doc in nlp.pipe(docs, disable=['ner', 'parser'], n_process=n_processes)]


    # Then do what you wish beyond this point. I end up writing results out to s3.
    pass

for x in range(10):
    # This will spin up a subprocess, 
    # and everytime it finishes it will release all resources back to the machine.
    with multiprocessing.Manager() as manager:
        p = multiprocessing.Process(target=process_docs, args=(docs))
        p.start()
        p.join()

The idea here is to put everything Spacy-related into a subprocess so all the memory gets released once the subprocess finishes. I know it's working because I can actually watch the memory get released back to the instance every time the subprocess finishes (also the instance no longer crashes xD).

Full Disclosure: I have no idea why Spacy seems to go up in memory overtime, I've read all over trying to find a simple answer, and all the github issues I've seen claim they've fixed the issue yet I still see this happening when I use Spacy on AWS Sagemaker instances.

Hope this helps someone! I know I spent hours pulling my hair out over this.

Credit to another SO answer that explains a bit more about subprocesses in Python.

Convexoconcave answered 15/1, 2021 at 10:9 Comment(0)
S
10

Memory leaks with spacy

Memory problems when processing large amounts of data seem to be a known issue, see some relevant github issues:

Unfortunately, it doesn't look like there's a good solution yet.

Lemmatization

Looking at your particular lemmatization task, I think your example code is a bit too over-simplified, because you're running the full spacy pipeline on single words and then not doing anything with the results (not even inspecting the lemma?), so it's hard to tell what you actually want to do.

I'll assume you just want to lemmatize, so in general, you want to disable the parts of the pipeline that you're not using as much as possible (especially parsing if you're only lemmatizing, see https://spacy.io/usage/processing-pipelines#disabling) and use nlp.pipe to process documents in batches. Spacy can't handle really long documents if you're using the parser or entity recognition, so you'll need to break up your texts somehow (or for just lemmatization/tagging you can just increase nlp.max_length as much as you need).

Breaking documents into individual words as in your example kind of the defeats the purpose of most of spacy's analysis (you often can't meaningfully tag or parse single words), plus it's going to be very slow to call spacy this way.

Lookup lemmatization

If you just need lemmas for common words out of context (where the tagger isn't going to provide any useful information), you can see if the lookup lemmatizer is good enough for your task and skip the rest of the processing:

from spacy.lemmatizer import Lemmatizer
from spacy.lang.en import LOOKUP
lemmatizer = Lemmatizer(lookup=LOOKUP)
print(lemmatizer(u"ducks", ''), lemmatizer(u"ducking", ''))

Output:

['duck'] ['duck']

It is just a static lookup table, so it won't do well on unknown words or capitalization for words like "wugs" or "DUCKS", so you'll have to see if it works well enough for your texts, but it would be much much faster without memory leaks. (You could also just use the table yourself without spacy, it's here: https://github.com/michmech/lemmatization-lists.)

Better lemmatization

Otherwise, use something more like this to process texts in batches:

nlp = spacy.load('en', disable=['parser', 'ner'])
# if needed: nlp.max_length = MAX_DOC_LEN_IN_CHAR
for doc in nlp.pipe(texts):
  for token in doc:
    print(token.lemma_)

If you process one long text (or use nlp.pipe() for lots of shorter texts) instead of processing individual words, you should be able to tag/lemmatize (many) thousands of words per second in one thread.

Sakmar answered 25/4, 2019 at 12:6 Comment(2)
Thank you for your recommendations. I'll go through them carefully and think what can I apply for my case. I see your point about splitting words and agree with you. But in my case I have documents with 2 mixed languages: English and Russian. Spacy can handle English only but not Russian. So I have to check every word and send it to different libraries to lemmatize.Squishy
For Spacy 3, I found the lemmatizer moved to spacy.pipeline.lemmatizer, but I couldn't find the LOOKUP.Infrared
S
0

Manual call to garbage collection after each spacy invocation within each subprocess appears to help. Doing something similar to this with spark and it has greatly reduced the memory leaks, though is not a silver bullet. Hope this helps someone.

import gc

class EmailNER:

    def __init__(self, model_path='resources/models/ner-last-v1'):
        self.model_path = model_path

    def get_ner(self, data, target_col='raw_body', out_type='filter'):
        print('Loading Spacy NER Model..')
        nlp = spacy.load(self.model_path)
        nlp.max_length = 1500000

        print('Performing NER..')
        @f.udf(returnType=ArrayType(StringType()))
        def get_ents_arr(convo):
            doc = nlp(str(convo)).ents
            out = [str(ent.text) for ent in doc]
            # manually call garbage collection to plug leaks
            gc.collect()
            return out

        elif out_type == 'filter':
            ner_data = data.withColumn('ents', get_ents_arr(f.col(target_col)))
        
        return ner_data
Shakeup answered 15/3, 2023 at 21:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.