How to process requests from multiiple users using ML model and FastAPI?
Asked Answered
M

1

3

I'm studying the process of distributing artificial intelligence modules through FastAPI.

I created a FastAPI app that answers questions using a pre-learned Machine Learning model.

In this case, it is not a problem for one user to use it, but when multiple users use it at the same time, the response may be too slow.

Hence, when multiple users enter a question, is there any way to copy the model and load it in at once?

class sentencebert_ai():
    def __init__(self) -> None:
        super().__init__()

 def ask_query(self,query, topN):
        startt = time.time()

        ask_result = []
        score = []
        result_value = []  
        embedder = torch.load(model_path)
        corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
        query_embedding = embedder.encode(query, convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
        cos_scores = cos_scores.cpu()

        top_results = np.argpartition(-cos_scores, range(topN))[0:topN]

        for idx in top_results[0:topN]:        
            ask_result.append(corpusid[idx].item())
            #.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
            score.append(round(cos_scores[idx].item(),3))

        #서버에 json array 형태로 내보내기 위한 작업
        for i,e in zip(ask_result,score):
            result_value.append({"pred_id":i,"pred_weight":e})
        endd = time.time()
        print('시간체크',endd-startt)
        return result_value
        # return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)



class Item_inference(BaseModel):
    text : str
    topN : Optional[int] = 1

@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
  
    # db.append(item.dict())
    item.dict()
    results = _ai.ask_query(item.text, item.topN)

    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default='9003', type=int)
    # parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
    args = parser.parse_args()

    _ai = sentencebert_ai()
    uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)

corrected version

@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ``` 
Madson answered 25/3, 2022 at 7:13 Comment(7)
This question is not very clear, can you reformulate it and complete a bit the code ?Leopoldoleor
If your recommendation engine takes a lot of time, there's not really much you can do magically to speed that up - limit the amount of work done that is not specific to each user (so that depends on how ask_query is implemented). Since this is probably CPU bound, you might want to instead start multiple instances (worker threads/processes) of your application when using gunicorn or similar, so that you can use more processor cores efficiently.Astylar
i edit my code thank uMadson
@Astylar Thank you. I didn't know Gunicorn, but I'll give it a try!Madson
@Astylar OP is already using uvicorn, no need to use gunicorn.Pushbike
Ah, I missed that. My bad. uvicorn should support the same through workers.Astylar
@Astylar Thanks to you, I learned about the workers parameter and I can use it.Madson
G
5

First, you should rather not load your model every time a request arrives, but rahter have it loaded once at startup (you could use the startup event for this) and store it on the app instance—using the generic app.state attribute (see implementation of State too)—which you can later retrieve, as described here and here (Update: startup event has recently been deprecated, and since then, the recommended way to handle startup and shutdown events is using the lifespan parameter of the FastAPI app, as demonstrated in this answer. You might still find the references provided earler useful, as they also provide information on additional concepts in FastAPI, as well as you could keep using the startup event for now, but it might be completely removed in future versions). For instance:

from fastapi import Request

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load('<model_path>')

Second, if you do not have any async functions inside your endpoint that you have to await, you could define your endpoint with def instead of async def. In this way, FastAPI will process the requests concurrently, as each request will run in a separate thread; whereas, async def endpoints run on the main thread, i.e., the server processes the requests sequentially, as long as there is no await call to some CPU/IO-bound (blocking) operation inside such routes. If so, the keyword await would pass function control back to the event loop, thus allowing other tasks/requests in the event loop to run. Please have a look at the answers here and here, as well as all the references included in them, to understand the concept of async/await, as well as the difference between using def and async def. Example with def endpoint:

@app.post('/')
def your_endpoint(request: Request):
    model = request.app.state.model
    # run your synchronous ask_query() function here

Alternatively, as described here, you could, preferably, run your CPU-bound task in a separate process, using ProcessPoolExecutor, and integrate with asyncio, in order to await it to finish its work and return the result(s)—in this case, you would need to define your endpoint with async def, as the await keyword only works within an async function. Beware that it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc.; that is, your code must be under if __name__ == '__main__'. Example:

from fastapi import FastAPI, Request
import concurrent.futures
import asyncio
import uvicorn

class MyAIClass():
    def __init__(self) -> None:
        super().__init__()

    def ask_query(self, model, query, topN):
        # ...
 
ai = MyAIClass()
app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load('<model_path>')

@app.post('/')
async def your_endpoint(request: Request):
    model = request.app.state.model

    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)


if __name__ == '__main__':
    uvicorn.run(app)

Note that if you plan on having several workers active at the same time, each worker has its own memory—in other words, workers do not share the same memory—and hence, each worker will load their own instance of the ML model into memory (RAM). If, for instance, you are using four workers for your app, the model will result in being loaded four times into RAM. Thus, if the model, as well as other variables in your code, are consuming a large amount of memory, each process/worker will consume an equivalent amount of memory. If you would like to avoid that, you may have a look at how to share objects across multiple workers, as well as—if you are using Gunicorn as a process manager with Uvicorn workers—you can use Gunicorn's --preload flag. As per the documentation:

Command line: --preload

Default: False

Load application code before the worker processes are forked.

By preloading an application you can save some RAM resources as well as speed up server boot times. Although, if you defer application loading to each worker process, you can reload your application code easily by restarting workers.

Example:

gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app

Note that you cannot combine Gunicorn's --preload with --reload flag, as when the code is preloaded into the master process, the new worker processes—which will automatically be created, if your application code has changed—will still have the old code in memory, due to how fork() works.

Grumous answered 25/3, 2022 at 8:4 Comment(4)
Thank you for the best answer that suits my situation.Madson
Thank you Chris. Like the advice you gave, As a result of saving the model in the app and loading it, The average response time was reduced to 0.72 -> 0.5. And since I don't need to use concurrence, I decided to use def instead of async def. Could you please check if the method is correct?Madson
I was able to save time by instantiating. In the performance load test, I didn't see any reduction in time by using def, so I was wondering if I understood something wrong.Madson
@WONJUN it seems like your ask_query is doing some CPU intensive work which means when you have multiple calls to your endpoint performance will get a hit (even after loading the model in the startup phase). You probably should use FastAPI with multiprocessing, see an example here: #63170365Zajac

© 2022 - 2024 — McMap. All rights reserved.