How to stream Agent's response in Langchain?
Asked Answered
F

4

9

I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents. Here are some parts of my code:

# Loading the LLM
def load_llm():
    return AzureChatOpenAI(
        temperature=hparams["temperature"],
        top_p=hparams["top_p"],
        max_tokens=hparams["max_tokens"],
        presence_penalty=hparams["presence_penalty"],
        frequency_penalty=hparams["freq_penaulty"],
        streaming=True, 
        callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), 
        verbose=True,
        model_name=hparams["model"],
        deployment_name = models_dict[hparams["model"]],
        )

# Loading the agent
def load_chain(memory, sys_msg, llm):
    """Logic for loading the chain you want to use should go here."""
    agent_chain = initialize_agent(tools, 
                                   llm, 
                                   agent="conversational-react-description", 
                                   verbose=True, 
                                   memory=memory, 
                                   agent_kwargs = {"added_prompt": sys_msg},
                                   streaming=True, 
                                   )
    return agent_chain

# Creating the chatbot to be used in Gradio.
class ChatWrapper:

    def __init__(self, sys_msg):
        self.lock = Lock()
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
        self.chain = load_chain(self.memory, sys_msg, load_llm())
        self.sysmsg = sys_msg
    def __call__(
        self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
    ):
        """Execute the chat functionality."""
        self.lock.acquire()
        try:
            history = history or []
            # Run chain and append input.
            output = self.chain.run(input=inp)
            
            history.append((inp, output))
        except Exception as e:
            raise e
        finally:
            self.lock.release()
        return history, history

I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.

Can you please help me with that?

Fag answered 19/4, 2023 at 16:58 Comment(3)
from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage from langchain.callbacks.base import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler chat = ChatOpenAI(streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), verbose=True, temperature=0) resp = chat([HumanMessage(content="Write me a song about sparkling water.")]) – Rigorous
python.langchain.com/docs/modules/model_io/models/chat/how_to/… StreamingStdOutCallbackHandler – Rigorous
PrivateDocBot Created using langchain and chainlit πŸ”₯πŸ”₯ It also streams using langchain just like ChatGpt it displays word by word and works locally on PDF data. πŸ‘©β€πŸ’» code reference. – Aspiration
S
6

One of possible solutions is to use a queue as a mediator.

  1. Create a queue
from queue import SimpleQueue
q = SimpleQueue()
  1. Create a custom callback, that will write produced tokens into the queue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union


job_done = object() # signals the processing is done

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Empty:
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""
        self.q.put(job_done)
  1. Give the callback to your LLM
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
                                  StreamingStdOutCallbackHandler()]), 
  1. In Gradio code, create a parallel thread, that will run your agent. Read from the queue.

I don't understand your ChatWrapper. Actually, I am not familiar with Gradio, so I will rely on an example from the documentation.

from threading import Thread

def bot(history):
    user_question = history[-1][0]
    thread = Thread(target=chain.run, kwargs={"input": user_question})
    thread.start()
    history[-1][1] = ""
    while True:
        next_token = q.get(block=True) # Blocks until an input is available
        if next_token is job_done:
            break
        history[-1][1] += next_token
        yield history
    thread.join()
Selflove answered 2/6, 2023 at 3:37 Comment(3)
That's cool. It works the similar way in django StreamingHttpResponse. Start another thread to run the chain is the key point. Thanks – Bombard
Very cool! Thanks for the detailed code – Thorax
When I try this, I get the following error: TypeError: Can't instantiate abstract class StreamingGradioCallbackHandler with abstract methods on_agent_action, on_agent_finish, on_chain_end, on_chain_error, on_chain_start, on_text, on_tool_end, on_tool_error, on_tool_start. Any idea on how to fix it? – Bronze
S
1

Following @Nokados excellent workaround, I found that the logic for streaming the response might not necessarily work if you are working with and LLMChain instead of an Agent. You can see below some logic I've used that helps for multiple LLM calls:

thread.start()
history[-1][1] = ""
while thread.is_alive():
    next_token = q.get(block=True) # Blocks until an input is available
    if next_token != job_done:
        history[-1][1] += next_token
    else:
        history[-1][1] += "\n"
    yield history
thread.join()
Smyth answered 17/9, 2023 at 6:7 Comment(0)
A
1

After I read a source code from langchain.chat_models.base, I try to use the stream method and it just works.

def create_model(temperature = 0, **kwargs):
    extra_inputs = dict(kwargs.items())
    streaming_acitve = extra_inputs.get('streaming', False)
    model = AzureChatOpenAI(
        openai_api_base=BASE_URL,
        openai_api_key=API_KEY,
        openai_api_type="azure",
        openai_api_version=API_VERSION,
        deployment_name=DEPLOYMENT_NAME,
        temperature=temperature,
        streaming=streaming_acitve,
        callbacks=[StreamingStdOutCallbackHandler()] if streaming_acitve else [],
    )
    return model

from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage

import gradio as gr

llm = create_model(streaming=True)

def stream_predict(message, history):
    history_langchain_format = []
    for human, ai in history:
        history_langchain_format.append(HumanMessage(content=human))
        history_langchain_format.append(AIMessage(content=ai))
    history_langchain_format.append(HumanMessage(content=message))
    
    gpt_response = llm.stream(history_langchain_format)

    partial_message = ""
    for chunk in gpt_response:
        partial_message = partial_message + chunk.dict()['content']
        yield partial_message   
      
gr.ChatInterface(stream_predict).queue().launch()

Amazonas answered 18/9, 2023 at 10:50 Comment(0)
S
-1

If you can write on stdout, why don't you also read from it?

import subprocess

def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/

Severus answered 26/4, 2023 at 16:11 Comment(2)
Yea that's a way around but I thought there might be a way to properly implement it using on_llm_new_token – Fag
If you had to write your own, you would anyways want a buffer where the information is held until Gradio listens in. Since the info is already there, IMO you might as well just use stdout and call it a day. – Severus

© 2022 - 2024 β€” McMap. All rights reserved.