Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenAIGenerator and OpenAIChatGenerator streaming_callback as kwargs #7836

Open
Redna opened this issue Jun 10, 2024 · 0 comments
Open

OpenAIGenerator and OpenAIChatGenerator streaming_callback as kwargs #7836

Redna opened this issue Jun 10, 2024 · 0 comments
Labels
2.x Related to Haystack v2.0 P3 Low priority, leave it in the backlog

Comments

@Redna
Copy link

Redna commented Jun 10, 2024

Is your feature request related to a problem? Please describe.
The current implementation of the OpenAIGenerator/OpenAIChatGenerator does not allow to pass a streaming_callback as a parameter in the pipeline.run function. This is causing issues when I want to create a FastApi endpoint with ServerSentEvents.

Currently I need to create a separate pipeline for each request coming in. And creating a pipeline for each request can be slow, because of loading the dependencies, warming_up_models or when using tracers, like Langfuse, causing other issues.

@app.post("/chat")
async def handle_request(request: ChatRequest) -> StreamingResponse:
    loop = asyncio.get_running_loop()
    streamer = TextIteratorStreamer() # custom implementation

    pipe = Pipeline()
    pipe.add_component("retriever", InMemoryBM25Retriever(document_store=docstore))
    pipe.add_component("prompt_builder", PromptBuilder(template=template))
    pipe.add_component("llm", OpenAIGenerator(api_key=Secret.from_token("<your-api-key>"),
                                          api_base_url="http://localhost:30091/v1",
                                          streaming_callback=streamer.add))

    pipe.connect("retriever", "prompt_builder.documents")
    pipe.connect("prompt_builder", "llm")

    loop.run_in_executor(None, pipe.run, {
        "prompt_builder": {
            "query": query
        },
        "retriever": {
            "query": query
        }
    })

    return StreamingResponse(
        consume_streamer(streamer),
        media_type="text/event-stream",
    )

Describe the solution you'd like
Would like to pass the streaming callback in the pipeline run method like it is done e.g. for the bedrock generator
https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/amazon_bedrock/src/haystack_integrations/components/generators/amazon_bedrock/generator.py#L202

so in essence:

pipe = Pipeline()
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=docstore))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator(api_key=Secret.from_token("<your-api-key>"),
                                      api_base_url="http://localhost:30091/v1"))

@app.post("/chat")
async def handle_request(request: ChatRequest) -> StreamingResponse:
    loop = asyncio.get_running_loop()
    streamer = TextIteratorStreamer() # custom implementation

    pipe.connect("retriever", "prompt_builder.documents")
    pipe.connect("prompt_builder", "llm")

    loop.run_in_executor(None, pipe.run, {
        "prompt_builder": {
            "query": query
        },
        "retriever": {
            "query": query
        }, 
        "llm": {
            "generation_kwargs": {
                  "streaming_callback": streamer.add
            }
         }
    })

    return StreamingResponse(
        consume_streamer(streamer),
        media_type="text/event-stream",
    )

Describe alternatives you've considered
Adding another dedicated parameter like streaming_callback. However, might be a breaking change then?

@component.output_types(replies=List[ChatMessage])    
def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
@shadeMe shadeMe added P3 Low priority, leave it in the backlog 2.x Related to Haystack v2.0 labels Jun 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2.x Related to Haystack v2.0 P3 Low priority, leave it in the backlog
Projects
None yet
Development

No branches or pull requests

2 participants