Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush/Reset StreamData when calling data.close() or have a seperate function for it #2961

Open
fmaag opened this issue Sep 10, 2024 · 0 comments
Labels
ai/ui enhancement New feature or request

Comments

@fmaag
Copy link

fmaag commented Sep 10, 2024

Feature Description

I have built my working Chat Application that also supports RAG using langchain and the LangChainAdapter.

For each request in the chat I want to show the sources on the frontend. I came across StreamData which also has an example for the Langchainadapter. I got it working fine and can return the context accordingly in the data object of the useChat hook by appending it in the handleChainEnd Callback of langchain. There are some slight inconveniences like for some reason the callback firing twice and me having to deal with duplicates but this is fine and not an error of the vercel ai sdk.

The issue is mainly that the StreamData from the useChat Hook never gets reset on new messages or mapped to the messages. Meaning each new message that calls my RAG Chain results in additional duplicates being added to the same StreamData. I can think of a few workarounds for mapping a StreamData object to the relevant message in the messages. But for this to work I would need to be able to actually clear the StreamData. I initially thought this was handled by the .close() function that I am calling in the end but this not the case? For now, I might think about storing the runId of each langchain run in the StreamData as well and then mapping each runId to the assistant messages but this can be inconsistent in some cases.

A snippet of my code is below and I also found some discussion previously that managed a temporary wrapping of the useChat Hook but no out of the box functionality for this.

    const ragChainWithSources = RunnableSequence.from([
          RunnablePassthrough.assign({
            contextualized_question: async (input) => {
              if (input.chat_history && input.chat_history.length > 0) {
                return contextualizeQChain.invoke({
                  chat_history: input.chat_history,
                  question: input.question,
                });
              }
              return input.question;
            }
          }),
          RunnableMap.from({
            context: (input) => retriever.getRelevantDocuments(input.contextualized_question),
            question: (input) => input.contextualized_question,
            chat_history: (input) => input.chat_history,
          }),
          RunnablePassthrough.assign({
            answer: RunnableSequence.from([
              (input) => ({
                context: formatDocumentsAsString(input.context),
                question: input.question,
                chat_history: input.chat_history,
              }),
              qaPrompt,
              llm,
              new StringOutputParser(),
            ]),
          }),

        ]);
        
        const data = new StreamData()
        const customHandler = {
          handleChainEnd: async (outputs) => {
          //filter out some duplicates already
            if (outputs.context && Array.isArray(outputs.context)) {
              data.append({ context: outputs.context });
            }
          },
        };
        
        const ragResponse = await ragChainWithSources.stream({ question, chat_history  }, { callbacks:[customHandler]});

        //Here follows some needed code to make it work with the langchainadapter 
        //This basically extracts the ragResponse.answer and assigns it to the extractedAnswerStream
        let accumulatedResponse = {};
        const extractAnswerStream = new TransformStream({
          transform(chunk, controller) {

            accumulatedResponse = { ...accumulatedResponse, ...chunk };

            if (accumulatedResponse.answer) {
              controller.enqueue(accumulatedResponse.answer);
              accumulatedResponse = {};
            }
          },
          flush(controller) {
            if (accumulatedResponse.answer) {
              controller.enqueue(accumulatedResponse.answer);
            }
          }
        });
        const answerStream = ragResponse.pipeThrough(extractAnswerStream);

        
        return LangChainAdapter.toDataStreamResponse(answerStream, {data, callbacks:{onFinal() { data.close()}}});

Use Case

Displaying sources for RAG responses implemented using langchainadapter

Additional context

No response

@lgrammel lgrammel added enhancement New feature or request ai/ui labels Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ai/ui enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants