Module langstream.contrib

The Contrib module is where all the other streams and integrations that build on top of core Stream module live. Here you can import the LLMs you want to use.

>>> from langstream.contrib import OpenAIChatStream, GPT4AllStream # etc

Here you can find the reference and code examples, for further tutorials and use cases, consult the documentation.

Expand source code
"""
The Contrib module is where all the other streams and integrations that build on top of
core Stream module live. Here you can import the LLMs you want to use.

>>> from langstream.contrib import OpenAIChatStream, GPT4AllStream # etc

---

Here you can find the reference and code examples, for further tutorials and use cases, consult the [documentation](https://github.com/rogeriochaves/langstream).
"""

from langstream.contrib.llms.open_ai import (
    OpenAICompletionStream,
    OpenAIChatStream,
    OpenAIChatMessage,
    OpenAIChatDelta,
)
from langstream.contrib.llms.gpt4all_stream import GPT4AllStream
from langstream.contrib.llms.lite_llm import (
    LiteLLMChatStream,
    LiteLLMChatMessage,
    LiteLLMChatDelta,
)

__all__ = (
    "OpenAICompletionStream",
    "OpenAIChatStream",
    "OpenAIChatMessage",
    "OpenAIChatDelta",
    "GPT4AllStream",
    "LiteLLMChatStream",
    "LiteLLMChatMessage",
    "LiteLLMChatDelta",
)

Sub-modules

langstream.contrib.llms

Classes

class GPT4AllStream (name: str, call: Callable[[~T], str], model: str, temperature: float = 0, max_tokens: int = 200, top_k=40, top_p=0.1, repeat_penalty=1.18, repeat_last_n=64, n_batch=8, n_threads: Optional[int] = None)

GPT4AllStream is a Stream that allows you to run local LLMs easily using GPT4All model.

GPT4All is a project that focuses on making the LLM models very small and very fast to be able to run in any computer without GPUs. Check out more about the project here.

You can use it as any other stream, on the first use, it will download the model (a few GB). Alternatively, you can point to a locally downloaded model.bin file.

There are serveral parameters you can use to adjust the model output such as temperature, max_tokens, top_k, repeat_penalty, etc, you can read more about them here.

Example

>>> from langstream import join_final_output
>>> from langstream.contrib import GPT4AllStream
>>> import asyncio
...
>>> async def example():
...     greet_stream = GPT4AllStream[str, str](
...         "GreetingStream",
...         lambda name: f"### User: Hello, my name is {name}. How is it going?\n\n### Response:",
...         model="orca-mini-3b-gguf2-q4_0.gguf",
...         temperature=0,
...     )
...
...     return await join_final_output(greet_stream("Alice"))
...
>>> asyncio.run(example()) # doctest:+ELLIPSIS +SKIP
Found model file at ...
" I'm doing well, thank you for asking! How about you?"
Expand source code
class GPT4AllStream(Stream[T, U]):
    """
    GPT4AllStream is a Stream that allows you to run local LLMs easily
    using [GPT4All](https://gpt4all.io/) model.

    [GPT4All](https://gpt4all.io/) is a project that focuses on making
    the LLM models very small and very fast to be able to run in any computer
    without GPUs. Check out more about the project [here](https://gpt4all.io/).

    You can use it as any other stream, on the first use, it will download the model
    (a few GB). Alternatively, you can point to a locally downloaded model.bin file.

    There are serveral parameters you can use to adjust the model output such as
    `temperature`, `max_tokens`, `top_k`, `repeat_penalty`, etc, you can read more
    about them [here](https://docs.gpt4all.io/gpt4all_python.html#generation-parameters).

    Example
    -------

    >>> from langstream import join_final_output
    >>> from langstream.contrib import GPT4AllStream
    >>> import asyncio
    ...
    >>> async def example():
    ...     greet_stream = GPT4AllStream[str, str](
    ...         "GreetingStream",
    ...         lambda name: f"### User: Hello, my name is {name}. How is it going?\\n\\n### Response:",
    ...         model="orca-mini-3b-gguf2-q4_0.gguf",
    ...         temperature=0,
    ...     )
    ...
    ...     return await join_final_output(greet_stream("Alice"))
    ...
    >>> asyncio.run(example()) # doctest:+ELLIPSIS +SKIP
    Found model file at ...
    " I'm doing well, thank you for asking! How about you?"

    """

    def __init__(
        self: "GPT4AllStream[T, str]",
        name: str,
        call: Callable[
            [T],
            str,
        ],
        model: str,
        temperature: float = 0,
        max_tokens: int = 200,
        top_k=40,
        top_p=0.1,
        repeat_penalty=1.18,
        repeat_last_n=64,
        n_batch=8,
        n_threads: Optional[int] = None,
    ) -> None:
        GPT4All = importlib.import_module("gpt4all").GPT4All
        gpt4all = GPT4All(model, n_threads=n_threads)

        async def generate(prompt: str) -> AsyncGenerator[U, None]:
            loop = asyncio.get_event_loop()

            def get_outputs() -> Iterable[str]:
                return gpt4all.generate(
                    prompt,
                    streaming=True,
                    temp=temperature,
                    max_tokens=max_tokens,
                    top_k=top_k,
                    top_p=top_p,
                    repeat_penalty=repeat_penalty,
                    repeat_last_n=repeat_last_n,
                    n_batch=n_batch,
                )

            outputs = await loop.run_in_executor(None, get_outputs)

            for output in outputs:
                yield cast(U, output)

        super().__init__(name, lambda input: generate(call(input)))

Ancestors

Inherited members

class LiteLLMChatDelta (role: Optional[Literal['assistant', 'function']], content: str, name: Optional[str] = None)

LiteLLMChatDelta is a data class that represents the output of an LiteLLMChatStream.

Attributes

role : Optional[Literal["assistant", "function"]]
The role of the output message, the first message will have the role, while the subsequent partial content output ones will have the role as None. For now the only possible values it will have is either None or "assistant"
name : Optional[str]
The name is used for when role is "function", it represents the name of the function that was called
content : str
A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing
Expand source code
@dataclass
class LiteLLMChatDelta:
    """
    LiteLLMChatDelta is a data class that represents the output of an `LiteLLMChatStream`.

    Attributes
    ----------
    role : Optional[Literal["assistant", "function"]]
        The role of the output message, the first message will have the role, while
        the subsequent partial content output ones will have the role as `None`.
        For now the only possible values it will have is either None or `"assistant"`

    name: Optional[str]
        The name is used for when `role` is `"function"`, it represents the name of the function that was called

    content : str
        A string with the partial content being outputted by the LLM, this generally
        translate to each token the LLM is producing

    """

    role: Optional[Literal["assistant", "function"]]
    content: str
    name: Optional[str] = None

    def __stream_debug__(self):
        name = ""
        if self.name:
            name = f" {self.name}"
        if self.role is not None:
            print(f"{Fore.YELLOW}{self.role.capitalize()}{name}:{Fore.RESET} ", end="")
        print(
            self.content,
            end="",
            flush=True,
        )

Class variables

var content : str
var name : Optional[str]
var role : Optional[Literal['assistant', 'function']]
class LiteLLMChatMessage (role: Literal['system', 'user', 'assistant', 'function'], content: str, name: Optional[str] = None)

LiteLLMChatMessage is a data class that represents a chat message for building LiteLLMChatStream prompt.

Attributes

role : Literal["system", "user", "assistant", "function"]
The role of who sent this message in the chat, can be one of "system", "user", "assistant" or "function"
name : Optional[str]
The name is used for when role is "function", it represents the name of the function that was called
content : str
A string with the full content of what the given role said
Expand source code
@dataclass
class LiteLLMChatMessage:
    """
    LiteLLMChatMessage is a data class that represents a chat message for building `LiteLLMChatStream` prompt.

    Attributes
    ----------
    role : Literal["system", "user", "assistant", "function"]
        The role of who sent this message in the chat, can be one of `"system"`, `"user"`, `"assistant"` or "function"

    name: Optional[str]
        The name is used for when `role` is `"function"`, it represents the name of the function that was called

    content : str
        A string with the full content of what the given role said

    """

    role: Literal["system", "user", "assistant", "function"]
    content: str
    name: Optional[str] = None

    def to_dict(self):
        return {k: v for k, v in self.__dict__.items() if v is not None}

Class variables

var content : str
var name : Optional[str]
var role : Literal['system', 'user', 'assistant', 'function']

Methods

def to_dict(self)
Expand source code
def to_dict(self):
    return {k: v for k, v in self.__dict__.items() if v is not None}
class LiteLLMChatStream (name: str, call: Callable[[~T], List[LiteLLMChatMessage]], model: str, custom_llm_provider: Optional[str] = None, functions: Optional[List[Dict[str, Any]]] = None, function_call: Union[Literal['none', 'auto'], Dict[str, Any], ForwardRef(None)] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)

LiteLLMChatStream is a wrapper for LiteLLM, which gives you access to OpenAI, Azure OpenAI, Anthropic, Google VertexAI, HuggingFace, Replicate, A21, Cohere and a bunch other LLMs all the the same time, all while keeping the standard OpenAI chat interface. Check it out the completion API and the available models on their docs.

The LiteLLMChatStream takes a lambda function that should return a list of LiteLLMChatMessage for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can follow this guide to get started on memory.

The LiteLLMChatStream also produces LiteLLMChatDelta as output, one per token, it contains the role that started the output, and then subsequent content updates. If you want the final content as a string, you will need to use the .content property from the delta and accumulate it for the final result.

To use this stream you will need to have the proper environment keys available depending on the model you are using, like OPENAI_API_KEY, COHERE_API_KEY, HUGGINGFACE_API_KEY, etc, check it out more details on LiteLLM docs

Example

>>> from langstream import Stream, join_final_output
>>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta
>>> import asyncio
...
>>> async def example():
...     recipe_stream: Stream[str, str] = LiteLLMChatStream[str, LiteLLMChatDelta](
...         "RecipeStream",
...         lambda recipe_name: [
...             LiteLLMChatMessage(
...                 role="system",
...                 content="You are Chef Claude, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs",
...             ),
...             LiteLLMChatMessage(
...                 role="user",
...                 content=f"Hello, could you write me a recipe for {recipe_name}?",
...             ),
...         ],
...         model="claude-2",
...         max_tokens=10,
...     ).map(lambda delta: delta.content)
...
...     return await join_final_output(recipe_stream("instant noodles"))
...
>>> asyncio.run(example()) # doctest:+SKIP
"Of course! Here's a simple and delicious recipe"

You can also pass LiteLLM function schemas in the function argument with all parameter definitions, just like for OpenAI model, but be aware that not all models support it. Once you pass a function param, the model may then produce a function role LiteLLMChatDelta as output, using your function, with the content field as a json which you can parse to call an actual function.

Take a look at our OpenAI guide to learn more about LLM function calls in LangStream, it works the same with LiteLLM.

Function Call Example

>>> from langstream import Stream, collect_final_output
>>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta
>>> from typing import Literal, Union, Dict
>>> import asyncio
...
>>> async def example():
...     def get_current_weather(
...         location: str, format: Literal["celsius", "fahrenheit"] = "celsius"
...     ) -> Dict[str, str]:
...         return {
...             "location": location,
...             "forecast": "sunny",
...             "temperature": "25 C" if format == "celsius" else "77 F",
...         }
...
...     stream : Stream[str, Union[LiteLLMChatDelta, Dict[str, str]]] = LiteLLMChatStream[str, Union[LiteLLMChatDelta, Dict[str, str]]](
...         "WeatherStream",
...         lambda user_input: [
...             LiteLLMChatMessage(role="user", content=user_input),
...         ],
...         model="gpt-3.5-turbo",
...         functions=[
...             {
...                 "name": "get_current_weather",
...                 "description": "Gets the current weather in a given location, use this function for any questions related to the weather",
...                 "parameters": {
...                     "type": "object",
...                     "properties": {
...                         "location": {
...                             "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages",
...                             "type": "string",
...                         },
...                         "format": {
...                             "description": "A string with the full content of what the given role said",
...                             "type": "string",
...                             "enum": ("celsius", "fahrenheit"),
...                         },
...                     },
...                     "required": ["location"],
...                 },
...             }
...         ],
...         temperature=0,
...     ).map(
...         lambda delta: get_current_weather(**json.loads(delta.content))
...         if delta.role == "function" and delta.name == "get_current_weather"
...         else delta
...     )
...
...     return await collect_final_output(stream("how is the weather today in Rio de Janeiro?"))
...
>>> asyncio.run(example()) # doctest:+SKIP
[{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]
Expand source code
class LiteLLMChatStream(Stream[T, U]):
    """
    `LiteLLMChatStream` is a wrapper for [LiteLLM](https://github.com/BerriAI/litellm), which gives you access to OpenAI, Azure OpenAI, Anthropic, Google VertexAI,
    HuggingFace, Replicate, A21, Cohere and a bunch other LLMs all the the same time, all while keeping the standard OpenAI chat interface. Check it out the completion API
    and the available models [on their docs](https://docs.litellm.ai/docs/).

    The `LiteLLMChatStream` takes a lambda function that should return a list of `LiteLLMChatMessage` for the assistant to reply, it is stateless, so it doesn't keep
    memory of the past chat messages, you will have to handle the memory yourself, you can [follow this guide to get started on memory](https://rogeriochaves.github.io/langstream/docs/llms/memory).

    The `LiteLLMChatStream` also produces `LiteLLMChatDelta` as output, one per token, it contains the `role` that started the output, and then subsequent `content` updates.
    If you want the final content as a string, you will need to use the `.content` property from the delta and accumulate it for the final result.

    To use this stream you will need to have the proper environment keys available depending on the model you are using, like `OPENAI_API_KEY`, `COHERE_API_KEY`, `HUGGINGFACE_API_KEY`, etc,
    check it out more details on [LiteLLM docs](https://docs.litellm.ai/docs/completion/supported)

    Example
    -------

    >>> from langstream import Stream, join_final_output
    >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta
    >>> import asyncio
    ...
    >>> async def example():
    ...     recipe_stream: Stream[str, str] = LiteLLMChatStream[str, LiteLLMChatDelta](
    ...         "RecipeStream",
    ...         lambda recipe_name: [
    ...             LiteLLMChatMessage(
    ...                 role="system",
    ...                 content="You are Chef Claude, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs",
    ...             ),
    ...             LiteLLMChatMessage(
    ...                 role="user",
    ...                 content=f"Hello, could you write me a recipe for {recipe_name}?",
    ...             ),
    ...         ],
    ...         model="claude-2",
    ...         max_tokens=10,
    ...     ).map(lambda delta: delta.content)
    ...
    ...     return await join_final_output(recipe_stream("instant noodles"))
    ...
    >>> asyncio.run(example()) # doctest:+SKIP
    "Of course! Here's a simple and delicious recipe"

    You can also pass LiteLLM function schemas in the `function` argument with all parameter definitions, just like for OpenAI model, but be aware that not all models support it.
    Once you pass a `function` param, the model may then produce a `function` role `LiteLLMChatDelta` as output,
    using your function, with the `content` field as a json which you can parse to call an actual function.

    Take a look [at our OpenAI guide](https://rogeriochaves.github.io/langstream/docs/llms/open_ai_functions) to learn more about LLM function calls in LangStream, it works the same with LiteLLM.

    Function Call Example
    ---------------------

    >>> from langstream import Stream, collect_final_output
    >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta
    >>> from typing import Literal, Union, Dict
    >>> import asyncio
    ...
    >>> async def example():
    ...     def get_current_weather(
    ...         location: str, format: Literal["celsius", "fahrenheit"] = "celsius"
    ...     ) -> Dict[str, str]:
    ...         return {
    ...             "location": location,
    ...             "forecast": "sunny",
    ...             "temperature": "25 C" if format == "celsius" else "77 F",
    ...         }
    ...
    ...     stream : Stream[str, Union[LiteLLMChatDelta, Dict[str, str]]] = LiteLLMChatStream[str, Union[LiteLLMChatDelta, Dict[str, str]]](
    ...         "WeatherStream",
    ...         lambda user_input: [
    ...             LiteLLMChatMessage(role="user", content=user_input),
    ...         ],
    ...         model="gpt-3.5-turbo",
    ...         functions=[
    ...             {
    ...                 "name": "get_current_weather",
    ...                 "description": "Gets the current weather in a given location, use this function for any questions related to the weather",
    ...                 "parameters": {
    ...                     "type": "object",
    ...                     "properties": {
    ...                         "location": {
    ...                             "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages",
    ...                             "type": "string",
    ...                         },
    ...                         "format": {
    ...                             "description": "A string with the full content of what the given role said",
    ...                             "type": "string",
    ...                             "enum": ("celsius", "fahrenheit"),
    ...                         },
    ...                     },
    ...                     "required": ["location"],
    ...                 },
    ...             }
    ...         ],
    ...         temperature=0,
    ...     ).map(
    ...         lambda delta: get_current_weather(**json.loads(delta.content))
    ...         if delta.role == "function" and delta.name == "get_current_weather"
    ...         else delta
    ...     )
    ...
    ...     return await collect_final_output(stream("how is the weather today in Rio de Janeiro?"))
    ...
    >>> asyncio.run(example()) # doctest:+SKIP
    [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]

    """

    def __init__(
        self: "LiteLLMChatStream[T, LiteLLMChatDelta]",
        name: str,
        call: Callable[
            [T],
            List[LiteLLMChatMessage],
        ],
        model: str,
        custom_llm_provider: Optional[str] = None,
        functions: Optional[List[Dict[str, Any]]] = None,
        function_call: Optional[Union[Literal["none", "auto"], Dict[str, Any]]] = None,
        temperature: Optional[float] = 0,
        max_tokens: Optional[int] = None,
        timeout: int = 5,
        retries: int = 3,
    ) -> None:
        async def chat_completion(
            messages: List[LiteLLMChatMessage],
        ) -> AsyncGenerator[StreamOutput[LiteLLMChatDelta], None]:
            loop = asyncio.get_event_loop()

            @retry(tries=retries)
            def get_completions():
                function_kwargs = {}
                if functions is not None:
                    function_kwargs["functions"] = functions
                if function_call is not None:
                    function_kwargs["function_call"] = function_call

                litellm = importlib.import_module("litellm")
                # import litellm

                return litellm.completion(
                    request_timeout=timeout,
                    model=model,
                    custom_llm_provider=custom_llm_provider,
                    messages=[m.to_dict() for m in messages],
                    temperature=temperature,  # type: ignore (why is their type int?)
                    stream=True,
                    max_tokens=max_tokens,  # type: ignore (why is their type float?)
                    **function_kwargs,
                )

            completions = await loop.run_in_executor(None, get_completions)

            pending_function_call: Optional[LiteLLMChatDelta] = None

            completions = (
                completions if hasattr(completions, "__iter__") else [completions]
            )
            # from litellm import ModelResponse
            # from litellm.utils import StreamingChoices

            for output in completions:
                # output = cast(ModelResponse, output)
                if len(output.choices) == 0:
                    continue

                # choices = cast(List[StreamingChoices], output.choices)
                choices = output.choices
                delta = choices[0].delta
                if not delta:
                    continue

                delta_function_call = delta.model_dump().get("function_call")
                if delta_function_call is not None:
                    role = delta.role
                    function_name: Optional[str] = (
                        delta_function_call["name"]
                        if "name" in delta_function_call
                        else None
                    )
                    function_arguments: Optional[str] = (
                        delta_function_call["arguments"]
                        if "arguments" in delta_function_call
                        else None
                    )

                    if function_name is not None:
                        pending_function_call = LiteLLMChatDelta(
                            role="function",
                            name=function_name,
                            content=function_arguments or "",
                        )
                    elif (
                        pending_function_call is not None
                        and function_arguments is not None
                    ):
                        pending_function_call.content += function_arguments
                elif delta.content is not None:
                    role = cast(
                        Union[Literal["assistant", "function"], None], delta.role
                    )
                    yield self._output_wrap(
                        LiteLLMChatDelta(
                            role=role,
                            content=delta.content,
                        )
                    )
                else:
                    if pending_function_call:
                        yield self._output_wrap(pending_function_call)
                        pending_function_call = None
            if pending_function_call:
                yield self._output_wrap(pending_function_call)
                pending_function_call = None

        super().__init__(
            name,
            lambda input: cast(AsyncGenerator[U, None], chat_completion(call(input))),
        )

Ancestors

Inherited members

class OpenAIChatDelta (role: Optional[Literal['assistant', 'function']], content: str, name: Optional[str] = None)

OpenAIChatDelta is a data class that represents the output of an OpenAIChatStream.

Attributes

role : Optional[Literal["assistant", "function"]]
The role of the output message, the first message will have the role, while the subsequent partial content output ones will have the role as None. For now the only possible values it will have is either None or "assistant"
name : Optional[str]
The name is used for when role is "function", it represents the name of the function that was called
content : str
A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing
Expand source code
@dataclass
class OpenAIChatDelta:
    """
    OpenAIChatDelta is a data class that represents the output of an `OpenAIChatStream`.

    Attributes
    ----------
    role : Optional[Literal["assistant", "function"]]
        The role of the output message, the first message will have the role, while
        the subsequent partial content output ones will have the role as `None`.
        For now the only possible values it will have is either None or `"assistant"`

    name: Optional[str]
        The name is used for when `role` is `"function"`, it represents the name of the function that was called

    content : str
        A string with the partial content being outputted by the LLM, this generally
        translate to each token the LLM is producing

    """

    role: Optional[Literal["assistant", "function"]]
    content: str
    name: Optional[str] = None

    def __stream_debug__(self):
        name = ""
        if self.name:
            name = f" {self.name}"
        if self.role is not None:
            print(f"{Fore.YELLOW}{self.role.capitalize()}{name}:{Fore.RESET} ", end="")
        print(
            self.content,
            end="",
            flush=True,
        )

Class variables

var content : str
var name : Optional[str]
var role : Optional[Literal['assistant', 'function']]
class OpenAIChatMessage (role: Literal['system', 'user', 'assistant', 'function'], content: str, name: Optional[str] = None)

OpenAIChatMessage is a data class that represents a chat message for building OpenAIChatStream prompt.

Attributes

role : Literal["system", "user", "assistant", "function"]
The role of who sent this message in the chat, can be one of "system", "user", "assistant" or "function"
name : Optional[str]
The name is used for when role is "function", it represents the name of the function that was called
content : str
A string with the full content of what the given role said
Expand source code
@dataclass
class OpenAIChatMessage:
    """
    OpenAIChatMessage is a data class that represents a chat message for building `OpenAIChatStream` prompt.

    Attributes
    ----------
    role : Literal["system", "user", "assistant", "function"]
        The role of who sent this message in the chat, can be one of `"system"`, `"user"`, `"assistant"` or "function"

    name: Optional[str]
        The name is used for when `role` is `"function"`, it represents the name of the function that was called

    content : str
        A string with the full content of what the given role said

    """

    role: Literal["system", "user", "assistant", "function"]
    content: str
    name: Optional[str] = None

    def to_dict(self):
        return {k: v for k, v in self.__dict__.items() if v is not None}

Class variables

var content : str
var name : Optional[str]
var role : Literal['system', 'user', 'assistant', 'function']

Methods

def to_dict(self)
Expand source code
def to_dict(self):
    return {k: v for k, v in self.__dict__.items() if v is not None}
class OpenAIChatStream (name: str, call: Callable[[~T], List[OpenAIChatMessage]], model: str, functions: Optional[List[Dict[str, Any]]] = None, function_call: Union[Literal['none', 'auto'], Dict[str, Any], ForwardRef(None)] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)

OpenAIChatStream gives you access to the more powerful LLMs from OpenAI, like gpt-3.5-turbo and gpt-4, they are structured in a chat format with roles.

The OpenAIChatStream takes a lambda function that should return a list of OpenAIChatMessage for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can follow this guide to get started on memory.

The OpenAIChatStream also produces OpenAIChatDelta as output, one per token, it contains the role that started the output, and then subsequent content updates. If you want the final content as a string, you will need to use the .content property from the delta and accumulate it for the final result.

To use this stream you will need an OPENAI_API_KEY environment variable to be available, and then you can generate chat completions out of it.

You can read more about the chat completion API on OpenAI API reference

Example

>>> from langstream import Stream, join_final_output
>>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta
>>> import asyncio
...
>>> async def example():
...     recipe_stream: Stream[str, str] = OpenAIChatStream[str, OpenAIChatDelta](
...         "RecipeStream",
...         lambda recipe_name: [
...             OpenAIChatMessage(
...                 role="system",
...                 content="You are ChefGPT, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs",
...             ),
...             OpenAIChatMessage(
...                 role="user",
...                 content=f"Hello, could you write me a recipe for {recipe_name}?",
...             ),
...         ],
...         model="gpt-3.5-turbo",
...         max_tokens=10,
...     ).map(lambda delta: delta.content)
...
...     return await join_final_output(recipe_stream("instant noodles"))
...
>>> asyncio.run(example()) # doctest:+SKIP
"Of course! Here's a simple and delicious recipe"

You can also pass OpenAI function schemas in the function argument with all parameter definitions, the model may then produce a function role OpenAIChatDelta, using your function, with the content field as a json which you can parse to call an actual function.

Take a look at our guide to learn more about OpenAI function calls in LangStream.

Function Call Example

>>> from langstream import Stream, collect_final_output
>>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta
>>> from typing import Literal, Union, Dict
>>> import asyncio
...
>>> async def example():
...     def get_current_weather(
...         location: str, format: Literal["celsius", "fahrenheit"] = "celsius"
...     ) -> Dict[str, str]:
...         return {
...             "location": location,
...             "forecast": "sunny",
...             "temperature": "25 C" if format == "celsius" else "77 F",
...         }
...
...     stream : Stream[str, Union[OpenAIChatDelta, Dict[str, str]]] = OpenAIChatStream[str, Union[OpenAIChatDelta, Dict[str, str]]](
...         "WeatherStream",
...         lambda user_input: [
...             OpenAIChatMessage(role="user", content=user_input),
...         ],
...         model="gpt-3.5-turbo",
...         functions=[
...             {
...                 "name": "get_current_weather",
...                 "description": "Gets the current weather in a given location, use this function for any questions related to the weather",
...                 "parameters": {
...                     "type": "object",
...                     "properties": {
...                         "location": {
...                             "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages",
...                             "type": "string",
...                         },
...                         "format": {
...                             "description": "A string with the full content of what the given role said",
...                             "type": "string",
...                             "enum": ("celsius", "fahrenheit"),
...                         },
...                     },
...                     "required": ["location"],
...                 },
...             }
...         ],
...         temperature=0,
...     ).map(
...         lambda delta: get_current_weather(**json.loads(delta.content))
...         if delta.role == "function" and delta.name == "get_current_weather"
...         else delta
...     )
...
...     return await collect_final_output(stream("how is the weather today in Rio de Janeiro?"))
...
>>> asyncio.run(example()) # doctest:+SKIP
[{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]
Expand source code
class OpenAIChatStream(Stream[T, U]):
    """
    `OpenAIChatStream` gives you access to the more powerful LLMs from OpenAI, like `gpt-3.5-turbo` and `gpt-4`, they are structured in a chat format with roles.

    The `OpenAIChatStream` takes a lambda function that should return a list of `OpenAIChatMessage` for the assistant to reply, it is stateless, so it doesn't keep
    memory of the past chat messages, you will have to handle the memory yourself, you can [follow this guide to get started on memory](https://rogeriochaves.github.io/langstream/docs/llms/memory).

    The `OpenAIChatStream` also produces `OpenAIChatDelta` as output, one per token, it contains the `role` that started the output, and then subsequent `content` updates.
    If you want the final content as a string, you will need to use the `.content` property from the delta and accumulate it for the final result.

    To use this stream you will need an `OPENAI_API_KEY` environment variable to be available, and then you can generate chat completions out of it.

    You can read more about the chat completion API on [OpenAI API reference](https://platform.openai.com/docs/api-reference/chat)

    Example
    -------

    >>> from langstream import Stream, join_final_output
    >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta
    >>> import asyncio
    ...
    >>> async def example():
    ...     recipe_stream: Stream[str, str] = OpenAIChatStream[str, OpenAIChatDelta](
    ...         "RecipeStream",
    ...         lambda recipe_name: [
    ...             OpenAIChatMessage(
    ...                 role="system",
    ...                 content="You are ChefGPT, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs",
    ...             ),
    ...             OpenAIChatMessage(
    ...                 role="user",
    ...                 content=f"Hello, could you write me a recipe for {recipe_name}?",
    ...             ),
    ...         ],
    ...         model="gpt-3.5-turbo",
    ...         max_tokens=10,
    ...     ).map(lambda delta: delta.content)
    ...
    ...     return await join_final_output(recipe_stream("instant noodles"))
    ...
    >>> asyncio.run(example()) # doctest:+SKIP
    "Of course! Here's a simple and delicious recipe"

    You can also pass OpenAI function schemas in the `function` argument with all parameter definitions, the model may then produce a `function` role `OpenAIChatDelta`,
    using your function, with the `content` field as a json which you can parse to call an actual function.

    Take a look [at our guide](https://rogeriochaves.github.io/langstream/docs/llms/open_ai_functions) to learn more about OpenAI function calls in LangStream.

    Function Call Example
    ---------------------

    >>> from langstream import Stream, collect_final_output
    >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta
    >>> from typing import Literal, Union, Dict
    >>> import asyncio
    ...
    >>> async def example():
    ...     def get_current_weather(
    ...         location: str, format: Literal["celsius", "fahrenheit"] = "celsius"
    ...     ) -> Dict[str, str]:
    ...         return {
    ...             "location": location,
    ...             "forecast": "sunny",
    ...             "temperature": "25 C" if format == "celsius" else "77 F",
    ...         }
    ...
    ...     stream : Stream[str, Union[OpenAIChatDelta, Dict[str, str]]] = OpenAIChatStream[str, Union[OpenAIChatDelta, Dict[str, str]]](
    ...         "WeatherStream",
    ...         lambda user_input: [
    ...             OpenAIChatMessage(role="user", content=user_input),
    ...         ],
    ...         model="gpt-3.5-turbo",
    ...         functions=[
    ...             {
    ...                 "name": "get_current_weather",
    ...                 "description": "Gets the current weather in a given location, use this function for any questions related to the weather",
    ...                 "parameters": {
    ...                     "type": "object",
    ...                     "properties": {
    ...                         "location": {
    ...                             "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages",
    ...                             "type": "string",
    ...                         },
    ...                         "format": {
    ...                             "description": "A string with the full content of what the given role said",
    ...                             "type": "string",
    ...                             "enum": ("celsius", "fahrenheit"),
    ...                         },
    ...                     },
    ...                     "required": ["location"],
    ...                 },
    ...             }
    ...         ],
    ...         temperature=0,
    ...     ).map(
    ...         lambda delta: get_current_weather(**json.loads(delta.content))
    ...         if delta.role == "function" and delta.name == "get_current_weather"
    ...         else delta
    ...     )
    ...
    ...     return await collect_final_output(stream("how is the weather today in Rio de Janeiro?"))
    ...
    >>> asyncio.run(example()) # doctest:+SKIP
    [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]

    """

    def __init__(
        self: "OpenAIChatStream[T, OpenAIChatDelta]",
        name: str,
        call: Callable[
            [T],
            List[OpenAIChatMessage],
        ],
        model: str,
        functions: Optional[List[Dict[str, Any]]] = None,
        function_call: Optional[Union[Literal["none", "auto"], Dict[str, Any]]] = None,
        temperature: Optional[float] = 0,
        max_tokens: Optional[int] = None,
        timeout: int = 5,
        retries: int = 3,
    ) -> None:
        async def chat_completion(
            messages: List[OpenAIChatMessage],
        ) -> AsyncGenerator[StreamOutput[OpenAIChatDelta], None]:
            loop = asyncio.get_event_loop()

            @retry(tries=retries)
            def get_completions():
                function_kwargs = {}
                if functions is not None:
                    function_kwargs["functions"] = functions
                if function_call is not None:
                    function_kwargs["function_call"] = function_call

                # import openai

                return OpenAIChatStream.client().chat.completions.create(
                    timeout=timeout,
                    model=model,
                    messages=cast(Any, [m.to_dict() for m in messages]),
                    temperature=temperature,
                    stream=True,
                    max_tokens=max_tokens,
                    **function_kwargs,
                )

            completions = await loop.run_in_executor(None, get_completions)

            pending_function_call: Optional[OpenAIChatDelta] = None

            for output in completions:
                if len(output.choices) == 0:
                    continue

                delta = output.choices[0].delta
                if not delta:
                    continue

                if delta.function_call is not None:
                    role = delta.role
                    function_name: Optional[str] = delta.function_call.name
                    function_arguments: Optional[str] = delta.function_call.arguments

                    if function_name is not None:
                        pending_function_call = OpenAIChatDelta(
                            role="function",
                            name=function_name,
                            content=function_arguments or "",
                        )
                    elif (
                        pending_function_call is not None
                        and function_arguments is not None
                    ):
                        pending_function_call.content += function_arguments
                elif delta.content is not None:
                    role = cast(
                        Union[Literal["assistant", "function"], None], delta.role
                    )
                    yield self._output_wrap(
                        OpenAIChatDelta(
                            role=role,
                            content=delta.content,
                        )
                    )
                else:
                    if pending_function_call:
                        yield self._output_wrap(pending_function_call)
                        pending_function_call = None
            if pending_function_call:
                yield self._output_wrap(pending_function_call)
                pending_function_call = None

        super().__init__(
            name,
            lambda input: cast(AsyncGenerator[U, None], chat_completion(call(input))),
        )

    _client_ = None
    @staticmethod
    def client():
        """
        Returns the OpenAI client instance being used to make the LLM calls.
        """

        if not OpenAIChatStream._client_:
            openai = importlib.import_module("openai")
            OpenAIChatStream._client_ = openai.OpenAI()

        return OpenAIChatStream._client_

Ancestors

Static methods

def client()

Returns the OpenAI client instance being used to make the LLM calls.

Expand source code
@staticmethod
def client():
    """
    Returns the OpenAI client instance being used to make the LLM calls.
    """

    if not OpenAIChatStream._client_:
        openai = importlib.import_module("openai")
        OpenAIChatStream._client_ = openai.OpenAI()

    return OpenAIChatStream._client_

Inherited members

class OpenAICompletionStream (name: str, call: Callable[[~T], str], model: str, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)

OpenAICompletionStream uses the most simple LLMs from OpenAI based on GPT-3 for text completion, if you are looking for ChatCompletion, take a look at OpenAIChatStream.

The OpenAICompletionStream takes a lambda function that should return a string with the prompt for completion.

To use this stream you will need an OPENAI_API_KEY environment variable to be available, and then you can generate completions out of it.

You can read more about the completion API on OpenAI API reference

Example

>>> from langstream import join_final_output
>>> from langstream.contrib import OpenAICompletionStream
>>> import asyncio
...
>>> async def example():
...     recipe_stream = OpenAICompletionStream[str, str](
...         "RecipeStream",
...         lambda recipe_name: f"Here is my {recipe_name} recipe: ",
...         model="ada",
...     )
...
...     return await join_final_output(recipe_stream("instant noodles"))
...
>>> asyncio.run(example()) # doctest:+SKIP
'【Instant Noodles】\n\nIngredients:\n\n1 cup of water'
Expand source code
class OpenAICompletionStream(Stream[T, U]):
    """
    `OpenAICompletionStream` uses the most simple LLMs from OpenAI based on GPT-3 for text completion, if you are looking for ChatCompletion, take a look at `OpenAIChatStream`.

    The `OpenAICompletionStream` takes a lambda function that should return a string with the prompt for completion.

    To use this stream you will need an `OPENAI_API_KEY` environment variable to be available, and then you can generate completions out of it.

    You can read more about the completion API on [OpenAI API reference](https://platform.openai.com/docs/api-reference/completions)

    Example
    -------

    >>> from langstream import join_final_output
    >>> from langstream.contrib import OpenAICompletionStream
    >>> import asyncio
    ...
    >>> async def example():
    ...     recipe_stream = OpenAICompletionStream[str, str](
    ...         "RecipeStream",
    ...         lambda recipe_name: f"Here is my {recipe_name} recipe: ",
    ...         model="ada",
    ...     )
    ...
    ...     return await join_final_output(recipe_stream("instant noodles"))
    ...
    >>> asyncio.run(example()) # doctest:+SKIP
    '【Instant Noodles】\\n\\nIngredients:\\n\\n1 cup of water'

    """

    def __init__(
        self: "OpenAICompletionStream[T, str]",
        name: str,
        call: Callable[
            [T],
            str,
        ],
        model: str,
        temperature: Optional[float] = 0,
        max_tokens: Optional[int] = None,
        timeout: int = 5,
        retries: int = 3,
    ) -> None:
        async def completion(prompt: str) -> AsyncGenerator[U, None]:
            loop = asyncio.get_event_loop()

            @retry(tries=retries)
            def get_completions():
                return OpenAICompletionStream.client().completions.create(
                    model=model,
                    prompt=prompt,
                    temperature=temperature,
                    stream=True,
                    max_tokens=max_tokens,
                    timeout=timeout,
                )

            completions = await loop.run_in_executor(None, get_completions)

            for output in completions:
                output = cast(dict, output.model_dump())
                if "choices" in output:
                    if len(output["choices"]) > 0:
                        if "text" in output["choices"][0]:
                            yield output["choices"][0]["text"]

        super().__init__(name, lambda input: completion(call(input)))

    _client_ = None
    @staticmethod
    def client():
        """
        Returns the OpenAI client instance being used to make the LLM calls.
        """

        if not OpenAICompletionStream._client_:
            openai = importlib.import_module("openai")
            OpenAICompletionStream._client_ = openai.OpenAI()

        return OpenAICompletionStream._client_

Ancestors

Static methods

def client()

Returns the OpenAI client instance being used to make the LLM calls.

Expand source code
@staticmethod
def client():
    """
    Returns the OpenAI client instance being used to make the LLM calls.
    """

    if not OpenAICompletionStream._client_:
        openai = importlib.import_module("openai")
        OpenAICompletionStream._client_ = openai.OpenAI()

    return OpenAICompletionStream._client_

Inherited members