1

Please see my code block below. In order to run it, I do the following. When I run the following code, things print as expected, and the process exits as expected.

stream = await client.create_completion(...)
stream.subscribe(print) # works perfectly

All the print commands run as expected. Everything looks great, as long as I am doing things in a reactive way.

However, when I do the following, it hangs indefinitely:

stream = await client.create_completion(...)
stream.pipe(ops.to_iterable()).run()` # hangs :(

Why is this? What am I doing wrong?

Thanks in advance for your help.

from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import Choice
import reactivex as rx

from app.ai.bridge.chat.chat_completion_types import ChatRequest, ToolConfig

from app.ai.bridge.chat.drivers.openai_chat_driver_reactive_mappers import (
    map_message_to_openai,
    map_toolconfig_to_openai,
)

import asyncio


class OpenaiClientReactive:
    def __init__(self, openai_client: AsyncOpenAI) -> None:
        self.api = openai_client

    async def create_completion(
        self, chat_request: ChatRequest, tool_config: ToolConfig | None = None
    ) -> rx.Observable[Choice]:
        stream: rx.subject.ReplaySubject[Choice] = rx.subject.ReplaySubject()

        async def do_stream() -> None:
            async_stream: AsyncStream[ChatCompletionChunk] = await self._stream_openai(
                chat_request, tool_config
            )

            max_variants_expected = chat_request.options.num_variants
            num_indexes_completed = 0

            async for chunk in async_stream:
                for choice in chunk.choices:
                    if choice.finish_reason:
                        num_indexes_completed += 1
                    stream.on_next(choice)

                if max_variants_expected == num_indexes_completed:
                    # If all indexes are complete, we can complete the stream
                    print("All indexes complete")
                    break

            await async_stream.close()
            stream.on_completed()

        asyncio.create_task(do_stream())

        return stream

    async def _stream_openai(
        self,
        chat_request: ChatRequest,
        tool_config: ToolConfig | None = None,
    ) -> AsyncStream[ChatCompletionChunk]:
        mapped_messages = mapped_messages = [
            map_message_to_openai(message) for message in chat_request.context.messages
        ]
        if tool_config:
            return await self.api.chat.completions.create(
                # TODO: Move this to database driven configuration, since it's an LLM.
                model="gpt-3.5-turbo",
                messages=mapped_messages,
                stream=True,
                n=chat_request.options.num_variants,
                tools=map_toolconfig_to_openai(tool_config),
            )
        else:
            return await self.api.chat.completions.create(
                # TODO: Move this to database driven configuration, since it's an LLM.
                model="gpt-3.5-turbo",
                messages=mapped_messages,
                stream=True,
                n=chat_request.options.num_variants,
            )

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.