Module langstream.contrib
The Contrib module is where all the other streams and integrations that build on top of core Stream module live. Here you can import the LLMs you want to use.
>>> from langstream.contrib import OpenAIChatStream, GPT4AllStream # etc
Here you can find the reference and code examples, for further tutorials and use cases, consult the documentation.
Expand source code
"""
The Contrib module is where all the other streams and integrations that build on top of
core Stream module live. Here you can import the LLMs you want to use.
>>> from langstream.contrib import OpenAIChatStream, GPT4AllStream # etc
---
Here you can find the reference and code examples, for further tutorials and use cases, consult the [documentation](https://github.com/rogeriochaves/langstream).
"""
from langstream.contrib.llms.open_ai import (
OpenAICompletionStream,
OpenAIChatStream,
OpenAIChatMessage,
OpenAIChatDelta,
)
from langstream.contrib.llms.gpt4all_stream import GPT4AllStream
from langstream.contrib.llms.lite_llm import (
LiteLLMChatStream,
LiteLLMChatMessage,
LiteLLMChatDelta,
)
__all__ = (
"OpenAICompletionStream",
"OpenAIChatStream",
"OpenAIChatMessage",
"OpenAIChatDelta",
"GPT4AllStream",
"LiteLLMChatStream",
"LiteLLMChatMessage",
"LiteLLMChatDelta",
)
Sub-modules
langstream.contrib.llms
Classes
class GPT4AllStream (name: str, call: Callable[[~T], str], model: str, temperature: float = 0, max_tokens: int = 200, top_k=40, top_p=0.1, repeat_penalty=1.18, repeat_last_n=64, n_batch=8, n_threads: Optional[int] = None)
-
GPT4AllStream is a Stream that allows you to run local LLMs easily using GPT4All model.
GPT4All is a project that focuses on making the LLM models very small and very fast to be able to run in any computer without GPUs. Check out more about the project here.
You can use it as any other stream, on the first use, it will download the model (a few GB). Alternatively, you can point to a locally downloaded model.bin file.
There are serveral parameters you can use to adjust the model output such as
temperature
,max_tokens
,top_k
,repeat_penalty
, etc, you can read more about them here.Example
>>> from langstream import join_final_output >>> from langstream.contrib import GPT4AllStream >>> import asyncio ... >>> async def example(): ... greet_stream = GPT4AllStream[str, str]( ... "GreetingStream", ... lambda name: f"### User: Hello, my name is {name}. How is it going?\n\n### Response:", ... model="orca-mini-3b-gguf2-q4_0.gguf", ... temperature=0, ... ) ... ... return await join_final_output(greet_stream("Alice")) ... >>> asyncio.run(example()) # doctest:+ELLIPSIS +SKIP Found model file at ... " I'm doing well, thank you for asking! How about you?"
Expand source code
class GPT4AllStream(Stream[T, U]): """ GPT4AllStream is a Stream that allows you to run local LLMs easily using [GPT4All](https://gpt4all.io/) model. [GPT4All](https://gpt4all.io/) is a project that focuses on making the LLM models very small and very fast to be able to run in any computer without GPUs. Check out more about the project [here](https://gpt4all.io/). You can use it as any other stream, on the first use, it will download the model (a few GB). Alternatively, you can point to a locally downloaded model.bin file. There are serveral parameters you can use to adjust the model output such as `temperature`, `max_tokens`, `top_k`, `repeat_penalty`, etc, you can read more about them [here](https://docs.gpt4all.io/gpt4all_python.html#generation-parameters). Example ------- >>> from langstream import join_final_output >>> from langstream.contrib import GPT4AllStream >>> import asyncio ... >>> async def example(): ... greet_stream = GPT4AllStream[str, str]( ... "GreetingStream", ... lambda name: f"### User: Hello, my name is {name}. How is it going?\\n\\n### Response:", ... model="orca-mini-3b-gguf2-q4_0.gguf", ... temperature=0, ... ) ... ... return await join_final_output(greet_stream("Alice")) ... >>> asyncio.run(example()) # doctest:+ELLIPSIS +SKIP Found model file at ... " I'm doing well, thank you for asking! How about you?" """ def __init__( self: "GPT4AllStream[T, str]", name: str, call: Callable[ [T], str, ], model: str, temperature: float = 0, max_tokens: int = 200, top_k=40, top_p=0.1, repeat_penalty=1.18, repeat_last_n=64, n_batch=8, n_threads: Optional[int] = None, ) -> None: GPT4All = importlib.import_module("gpt4all").GPT4All gpt4all = GPT4All(model, n_threads=n_threads) async def generate(prompt: str) -> AsyncGenerator[U, None]: loop = asyncio.get_event_loop() def get_outputs() -> Iterable[str]: return gpt4all.generate( prompt, streaming=True, temp=temperature, max_tokens=max_tokens, top_k=top_k, top_p=top_p, repeat_penalty=repeat_penalty, repeat_last_n=repeat_last_n, n_batch=n_batch, ) outputs = await loop.run_in_executor(None, get_outputs) for output in outputs: yield cast(U, output) super().__init__(name, lambda input: generate(call(input)))
Ancestors
- Stream
- typing.Generic
Inherited members
class LiteLLMChatDelta (role: Optional[Literal['assistant', 'function']], content: str, name: Optional[str] = None)
-
LiteLLMChatDelta is a data class that represents the output of an
LiteLLMChatStream
.Attributes
role
:Optional[Literal["assistant", "function"]]
- The role of the output message, the first message will have the role, while
the subsequent partial content output ones will have the role as
None
. For now the only possible values it will have is either None or"assistant"
name
:Optional[str]
- The name is used for when
role
is"function"
, it represents the name of the function that was called content
:str
- A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing
Expand source code
@dataclass class LiteLLMChatDelta: """ LiteLLMChatDelta is a data class that represents the output of an `LiteLLMChatStream`. Attributes ---------- role : Optional[Literal["assistant", "function"]] The role of the output message, the first message will have the role, while the subsequent partial content output ones will have the role as `None`. For now the only possible values it will have is either None or `"assistant"` name: Optional[str] The name is used for when `role` is `"function"`, it represents the name of the function that was called content : str A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing """ role: Optional[Literal["assistant", "function"]] content: str name: Optional[str] = None def __stream_debug__(self): name = "" if self.name: name = f" {self.name}" if self.role is not None: print(f"{Fore.YELLOW}{self.role.capitalize()}{name}:{Fore.RESET} ", end="") print( self.content, end="", flush=True, )
Class variables
var content : str
var name : Optional[str]
var role : Optional[Literal['assistant', 'function']]
class LiteLLMChatMessage (role: Literal['system', 'user', 'assistant', 'function'], content: str, name: Optional[str] = None)
-
LiteLLMChatMessage is a data class that represents a chat message for building
LiteLLMChatStream
prompt.Attributes
role
:Literal["system", "user", "assistant", "function"]
- The role of who sent this message in the chat, can be one of
"system"
,"user"
,"assistant"
or "function" name
:Optional[str]
- The name is used for when
role
is"function"
, it represents the name of the function that was called content
:str
- A string with the full content of what the given role said
Expand source code
@dataclass class LiteLLMChatMessage: """ LiteLLMChatMessage is a data class that represents a chat message for building `LiteLLMChatStream` prompt. Attributes ---------- role : Literal["system", "user", "assistant", "function"] The role of who sent this message in the chat, can be one of `"system"`, `"user"`, `"assistant"` or "function" name: Optional[str] The name is used for when `role` is `"function"`, it represents the name of the function that was called content : str A string with the full content of what the given role said """ role: Literal["system", "user", "assistant", "function"] content: str name: Optional[str] = None def to_dict(self): return {k: v for k, v in self.__dict__.items() if v is not None}
Class variables
var content : str
var name : Optional[str]
var role : Literal['system', 'user', 'assistant', 'function']
Methods
def to_dict(self)
-
Expand source code
def to_dict(self): return {k: v for k, v in self.__dict__.items() if v is not None}
class LiteLLMChatStream (name: str, call: Callable[[~T], List[LiteLLMChatMessage]], model: str, custom_llm_provider: Optional[str] = None, functions: Optional[List[Dict[str, Any]]] = None, function_call: Union[Literal['none', 'auto'], Dict[str, Any], ForwardRef(None)] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)
-
LiteLLMChatStream
is a wrapper for LiteLLM, which gives you access to OpenAI, Azure OpenAI, Anthropic, Google VertexAI, HuggingFace, Replicate, A21, Cohere and a bunch other LLMs all the the same time, all while keeping the standard OpenAI chat interface. Check it out the completion API and the available models on their docs.The
LiteLLMChatStream
takes a lambda function that should return a list ofLiteLLMChatMessage
for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can follow this guide to get started on memory.The
LiteLLMChatStream
also producesLiteLLMChatDelta
as output, one per token, it contains therole
that started the output, and then subsequentcontent
updates. If you want the final content as a string, you will need to use the.content
property from the delta and accumulate it for the final result.To use this stream you will need to have the proper environment keys available depending on the model you are using, like
OPENAI_API_KEY
,COHERE_API_KEY
,HUGGINGFACE_API_KEY
, etc, check it out more details on LiteLLM docsExample
>>> from langstream import Stream, join_final_output >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta >>> import asyncio ... >>> async def example(): ... recipe_stream: Stream[str, str] = LiteLLMChatStream[str, LiteLLMChatDelta]( ... "RecipeStream", ... lambda recipe_name: [ ... LiteLLMChatMessage( ... role="system", ... content="You are Chef Claude, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs", ... ), ... LiteLLMChatMessage( ... role="user", ... content=f"Hello, could you write me a recipe for {recipe_name}?", ... ), ... ], ... model="claude-2", ... max_tokens=10, ... ).map(lambda delta: delta.content) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP "Of course! Here's a simple and delicious recipe"
You can also pass LiteLLM function schemas in the
function
argument with all parameter definitions, just like for OpenAI model, but be aware that not all models support it. Once you pass afunction
param, the model may then produce afunction
roleLiteLLMChatDelta
as output, using your function, with thecontent
field as a json which you can parse to call an actual function.Take a look at our OpenAI guide to learn more about LLM function calls in LangStream, it works the same with LiteLLM.
Function Call Example
>>> from langstream import Stream, collect_final_output >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta >>> from typing import Literal, Union, Dict >>> import asyncio ... >>> async def example(): ... def get_current_weather( ... location: str, format: Literal["celsius", "fahrenheit"] = "celsius" ... ) -> Dict[str, str]: ... return { ... "location": location, ... "forecast": "sunny", ... "temperature": "25 C" if format == "celsius" else "77 F", ... } ... ... stream : Stream[str, Union[LiteLLMChatDelta, Dict[str, str]]] = LiteLLMChatStream[str, Union[LiteLLMChatDelta, Dict[str, str]]]( ... "WeatherStream", ... lambda user_input: [ ... LiteLLMChatMessage(role="user", content=user_input), ... ], ... model="gpt-3.5-turbo", ... functions=[ ... { ... "name": "get_current_weather", ... "description": "Gets the current weather in a given location, use this function for any questions related to the weather", ... "parameters": { ... "type": "object", ... "properties": { ... "location": { ... "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages", ... "type": "string", ... }, ... "format": { ... "description": "A string with the full content of what the given role said", ... "type": "string", ... "enum": ("celsius", "fahrenheit"), ... }, ... }, ... "required": ["location"], ... }, ... } ... ], ... temperature=0, ... ).map( ... lambda delta: get_current_weather(**json.loads(delta.content)) ... if delta.role == "function" and delta.name == "get_current_weather" ... else delta ... ) ... ... return await collect_final_output(stream("how is the weather today in Rio de Janeiro?")) ... >>> asyncio.run(example()) # doctest:+SKIP [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]
Expand source code
class LiteLLMChatStream(Stream[T, U]): """ `LiteLLMChatStream` is a wrapper for [LiteLLM](https://github.com/BerriAI/litellm), which gives you access to OpenAI, Azure OpenAI, Anthropic, Google VertexAI, HuggingFace, Replicate, A21, Cohere and a bunch other LLMs all the the same time, all while keeping the standard OpenAI chat interface. Check it out the completion API and the available models [on their docs](https://docs.litellm.ai/docs/). The `LiteLLMChatStream` takes a lambda function that should return a list of `LiteLLMChatMessage` for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can [follow this guide to get started on memory](https://rogeriochaves.github.io/langstream/docs/llms/memory). The `LiteLLMChatStream` also produces `LiteLLMChatDelta` as output, one per token, it contains the `role` that started the output, and then subsequent `content` updates. If you want the final content as a string, you will need to use the `.content` property from the delta and accumulate it for the final result. To use this stream you will need to have the proper environment keys available depending on the model you are using, like `OPENAI_API_KEY`, `COHERE_API_KEY`, `HUGGINGFACE_API_KEY`, etc, check it out more details on [LiteLLM docs](https://docs.litellm.ai/docs/completion/supported) Example ------- >>> from langstream import Stream, join_final_output >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta >>> import asyncio ... >>> async def example(): ... recipe_stream: Stream[str, str] = LiteLLMChatStream[str, LiteLLMChatDelta]( ... "RecipeStream", ... lambda recipe_name: [ ... LiteLLMChatMessage( ... role="system", ... content="You are Chef Claude, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs", ... ), ... LiteLLMChatMessage( ... role="user", ... content=f"Hello, could you write me a recipe for {recipe_name}?", ... ), ... ], ... model="claude-2", ... max_tokens=10, ... ).map(lambda delta: delta.content) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP "Of course! Here's a simple and delicious recipe" You can also pass LiteLLM function schemas in the `function` argument with all parameter definitions, just like for OpenAI model, but be aware that not all models support it. Once you pass a `function` param, the model may then produce a `function` role `LiteLLMChatDelta` as output, using your function, with the `content` field as a json which you can parse to call an actual function. Take a look [at our OpenAI guide](https://rogeriochaves.github.io/langstream/docs/llms/open_ai_functions) to learn more about LLM function calls in LangStream, it works the same with LiteLLM. Function Call Example --------------------- >>> from langstream import Stream, collect_final_output >>> from langstream.contrib import LiteLLMChatStream, LiteLLMChatMessage, LiteLLMChatDelta >>> from typing import Literal, Union, Dict >>> import asyncio ... >>> async def example(): ... def get_current_weather( ... location: str, format: Literal["celsius", "fahrenheit"] = "celsius" ... ) -> Dict[str, str]: ... return { ... "location": location, ... "forecast": "sunny", ... "temperature": "25 C" if format == "celsius" else "77 F", ... } ... ... stream : Stream[str, Union[LiteLLMChatDelta, Dict[str, str]]] = LiteLLMChatStream[str, Union[LiteLLMChatDelta, Dict[str, str]]]( ... "WeatherStream", ... lambda user_input: [ ... LiteLLMChatMessage(role="user", content=user_input), ... ], ... model="gpt-3.5-turbo", ... functions=[ ... { ... "name": "get_current_weather", ... "description": "Gets the current weather in a given location, use this function for any questions related to the weather", ... "parameters": { ... "type": "object", ... "properties": { ... "location": { ... "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages", ... "type": "string", ... }, ... "format": { ... "description": "A string with the full content of what the given role said", ... "type": "string", ... "enum": ("celsius", "fahrenheit"), ... }, ... }, ... "required": ["location"], ... }, ... } ... ], ... temperature=0, ... ).map( ... lambda delta: get_current_weather(**json.loads(delta.content)) ... if delta.role == "function" and delta.name == "get_current_weather" ... else delta ... ) ... ... return await collect_final_output(stream("how is the weather today in Rio de Janeiro?")) ... >>> asyncio.run(example()) # doctest:+SKIP [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}] """ def __init__( self: "LiteLLMChatStream[T, LiteLLMChatDelta]", name: str, call: Callable[ [T], List[LiteLLMChatMessage], ], model: str, custom_llm_provider: Optional[str] = None, functions: Optional[List[Dict[str, Any]]] = None, function_call: Optional[Union[Literal["none", "auto"], Dict[str, Any]]] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3, ) -> None: async def chat_completion( messages: List[LiteLLMChatMessage], ) -> AsyncGenerator[StreamOutput[LiteLLMChatDelta], None]: loop = asyncio.get_event_loop() @retry(tries=retries) def get_completions(): function_kwargs = {} if functions is not None: function_kwargs["functions"] = functions if function_call is not None: function_kwargs["function_call"] = function_call litellm = importlib.import_module("litellm") # import litellm return litellm.completion( request_timeout=timeout, model=model, custom_llm_provider=custom_llm_provider, messages=[m.to_dict() for m in messages], temperature=temperature, # type: ignore (why is their type int?) stream=True, max_tokens=max_tokens, # type: ignore (why is their type float?) **function_kwargs, ) completions = await loop.run_in_executor(None, get_completions) pending_function_call: Optional[LiteLLMChatDelta] = None completions = ( completions if hasattr(completions, "__iter__") else [completions] ) # from litellm import ModelResponse # from litellm.utils import StreamingChoices for output in completions: # output = cast(ModelResponse, output) if len(output.choices) == 0: continue # choices = cast(List[StreamingChoices], output.choices) choices = output.choices delta = choices[0].delta if not delta: continue delta_function_call = delta.model_dump().get("function_call") if delta_function_call is not None: role = delta.role function_name: Optional[str] = ( delta_function_call["name"] if "name" in delta_function_call else None ) function_arguments: Optional[str] = ( delta_function_call["arguments"] if "arguments" in delta_function_call else None ) if function_name is not None: pending_function_call = LiteLLMChatDelta( role="function", name=function_name, content=function_arguments or "", ) elif ( pending_function_call is not None and function_arguments is not None ): pending_function_call.content += function_arguments elif delta.content is not None: role = cast( Union[Literal["assistant", "function"], None], delta.role ) yield self._output_wrap( LiteLLMChatDelta( role=role, content=delta.content, ) ) else: if pending_function_call: yield self._output_wrap(pending_function_call) pending_function_call = None if pending_function_call: yield self._output_wrap(pending_function_call) pending_function_call = None super().__init__( name, lambda input: cast(AsyncGenerator[U, None], chat_completion(call(input))), )
Ancestors
- Stream
- typing.Generic
Inherited members
class OpenAIChatDelta (role: Optional[Literal['assistant', 'function']], content: str, name: Optional[str] = None)
-
OpenAIChatDelta is a data class that represents the output of an
OpenAIChatStream
.Attributes
role
:Optional[Literal["assistant", "function"]]
- The role of the output message, the first message will have the role, while
the subsequent partial content output ones will have the role as
None
. For now the only possible values it will have is either None or"assistant"
name
:Optional[str]
- The name is used for when
role
is"function"
, it represents the name of the function that was called content
:str
- A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing
Expand source code
@dataclass class OpenAIChatDelta: """ OpenAIChatDelta is a data class that represents the output of an `OpenAIChatStream`. Attributes ---------- role : Optional[Literal["assistant", "function"]] The role of the output message, the first message will have the role, while the subsequent partial content output ones will have the role as `None`. For now the only possible values it will have is either None or `"assistant"` name: Optional[str] The name is used for when `role` is `"function"`, it represents the name of the function that was called content : str A string with the partial content being outputted by the LLM, this generally translate to each token the LLM is producing """ role: Optional[Literal["assistant", "function"]] content: str name: Optional[str] = None def __stream_debug__(self): name = "" if self.name: name = f" {self.name}" if self.role is not None: print(f"{Fore.YELLOW}{self.role.capitalize()}{name}:{Fore.RESET} ", end="") print( self.content, end="", flush=True, )
Class variables
var content : str
var name : Optional[str]
var role : Optional[Literal['assistant', 'function']]
class OpenAIChatMessage (role: Literal['system', 'user', 'assistant', 'function'], content: str, name: Optional[str] = None)
-
OpenAIChatMessage is a data class that represents a chat message for building
OpenAIChatStream
prompt.Attributes
role
:Literal["system", "user", "assistant", "function"]
- The role of who sent this message in the chat, can be one of
"system"
,"user"
,"assistant"
or "function" name
:Optional[str]
- The name is used for when
role
is"function"
, it represents the name of the function that was called content
:str
- A string with the full content of what the given role said
Expand source code
@dataclass class OpenAIChatMessage: """ OpenAIChatMessage is a data class that represents a chat message for building `OpenAIChatStream` prompt. Attributes ---------- role : Literal["system", "user", "assistant", "function"] The role of who sent this message in the chat, can be one of `"system"`, `"user"`, `"assistant"` or "function" name: Optional[str] The name is used for when `role` is `"function"`, it represents the name of the function that was called content : str A string with the full content of what the given role said """ role: Literal["system", "user", "assistant", "function"] content: str name: Optional[str] = None def to_dict(self): return {k: v for k, v in self.__dict__.items() if v is not None}
Class variables
var content : str
var name : Optional[str]
var role : Literal['system', 'user', 'assistant', 'function']
Methods
def to_dict(self)
-
Expand source code
def to_dict(self): return {k: v for k, v in self.__dict__.items() if v is not None}
class OpenAIChatStream (name: str, call: Callable[[~T], List[OpenAIChatMessage]], model: str, functions: Optional[List[Dict[str, Any]]] = None, function_call: Union[Literal['none', 'auto'], Dict[str, Any], ForwardRef(None)] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)
-
OpenAIChatStream
gives you access to the more powerful LLMs from OpenAI, likegpt-3.5-turbo
andgpt-4
, they are structured in a chat format with roles.The
OpenAIChatStream
takes a lambda function that should return a list ofOpenAIChatMessage
for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can follow this guide to get started on memory.The
OpenAIChatStream
also producesOpenAIChatDelta
as output, one per token, it contains therole
that started the output, and then subsequentcontent
updates. If you want the final content as a string, you will need to use the.content
property from the delta and accumulate it for the final result.To use this stream you will need an
OPENAI_API_KEY
environment variable to be available, and then you can generate chat completions out of it.You can read more about the chat completion API on OpenAI API reference
Example
>>> from langstream import Stream, join_final_output >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta >>> import asyncio ... >>> async def example(): ... recipe_stream: Stream[str, str] = OpenAIChatStream[str, OpenAIChatDelta]( ... "RecipeStream", ... lambda recipe_name: [ ... OpenAIChatMessage( ... role="system", ... content="You are ChefGPT, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs", ... ), ... OpenAIChatMessage( ... role="user", ... content=f"Hello, could you write me a recipe for {recipe_name}?", ... ), ... ], ... model="gpt-3.5-turbo", ... max_tokens=10, ... ).map(lambda delta: delta.content) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP "Of course! Here's a simple and delicious recipe"
You can also pass OpenAI function schemas in the
function
argument with all parameter definitions, the model may then produce afunction
roleOpenAIChatDelta
, using your function, with thecontent
field as a json which you can parse to call an actual function.Take a look at our guide to learn more about OpenAI function calls in LangStream.
Function Call Example
>>> from langstream import Stream, collect_final_output >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta >>> from typing import Literal, Union, Dict >>> import asyncio ... >>> async def example(): ... def get_current_weather( ... location: str, format: Literal["celsius", "fahrenheit"] = "celsius" ... ) -> Dict[str, str]: ... return { ... "location": location, ... "forecast": "sunny", ... "temperature": "25 C" if format == "celsius" else "77 F", ... } ... ... stream : Stream[str, Union[OpenAIChatDelta, Dict[str, str]]] = OpenAIChatStream[str, Union[OpenAIChatDelta, Dict[str, str]]]( ... "WeatherStream", ... lambda user_input: [ ... OpenAIChatMessage(role="user", content=user_input), ... ], ... model="gpt-3.5-turbo", ... functions=[ ... { ... "name": "get_current_weather", ... "description": "Gets the current weather in a given location, use this function for any questions related to the weather", ... "parameters": { ... "type": "object", ... "properties": { ... "location": { ... "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages", ... "type": "string", ... }, ... "format": { ... "description": "A string with the full content of what the given role said", ... "type": "string", ... "enum": ("celsius", "fahrenheit"), ... }, ... }, ... "required": ["location"], ... }, ... } ... ], ... temperature=0, ... ).map( ... lambda delta: get_current_weather(**json.loads(delta.content)) ... if delta.role == "function" and delta.name == "get_current_weather" ... else delta ... ) ... ... return await collect_final_output(stream("how is the weather today in Rio de Janeiro?")) ... >>> asyncio.run(example()) # doctest:+SKIP [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}]
Expand source code
class OpenAIChatStream(Stream[T, U]): """ `OpenAIChatStream` gives you access to the more powerful LLMs from OpenAI, like `gpt-3.5-turbo` and `gpt-4`, they are structured in a chat format with roles. The `OpenAIChatStream` takes a lambda function that should return a list of `OpenAIChatMessage` for the assistant to reply, it is stateless, so it doesn't keep memory of the past chat messages, you will have to handle the memory yourself, you can [follow this guide to get started on memory](https://rogeriochaves.github.io/langstream/docs/llms/memory). The `OpenAIChatStream` also produces `OpenAIChatDelta` as output, one per token, it contains the `role` that started the output, and then subsequent `content` updates. If you want the final content as a string, you will need to use the `.content` property from the delta and accumulate it for the final result. To use this stream you will need an `OPENAI_API_KEY` environment variable to be available, and then you can generate chat completions out of it. You can read more about the chat completion API on [OpenAI API reference](https://platform.openai.com/docs/api-reference/chat) Example ------- >>> from langstream import Stream, join_final_output >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta >>> import asyncio ... >>> async def example(): ... recipe_stream: Stream[str, str] = OpenAIChatStream[str, OpenAIChatDelta]( ... "RecipeStream", ... lambda recipe_name: [ ... OpenAIChatMessage( ... role="system", ... content="You are ChefGPT, an assistant bot trained on all culinary knowledge of world's most proeminant Michelin Chefs", ... ), ... OpenAIChatMessage( ... role="user", ... content=f"Hello, could you write me a recipe for {recipe_name}?", ... ), ... ], ... model="gpt-3.5-turbo", ... max_tokens=10, ... ).map(lambda delta: delta.content) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP "Of course! Here's a simple and delicious recipe" You can also pass OpenAI function schemas in the `function` argument with all parameter definitions, the model may then produce a `function` role `OpenAIChatDelta`, using your function, with the `content` field as a json which you can parse to call an actual function. Take a look [at our guide](https://rogeriochaves.github.io/langstream/docs/llms/open_ai_functions) to learn more about OpenAI function calls in LangStream. Function Call Example --------------------- >>> from langstream import Stream, collect_final_output >>> from langstream.contrib import OpenAIChatStream, OpenAIChatMessage, OpenAIChatDelta >>> from typing import Literal, Union, Dict >>> import asyncio ... >>> async def example(): ... def get_current_weather( ... location: str, format: Literal["celsius", "fahrenheit"] = "celsius" ... ) -> Dict[str, str]: ... return { ... "location": location, ... "forecast": "sunny", ... "temperature": "25 C" if format == "celsius" else "77 F", ... } ... ... stream : Stream[str, Union[OpenAIChatDelta, Dict[str, str]]] = OpenAIChatStream[str, Union[OpenAIChatDelta, Dict[str, str]]]( ... "WeatherStream", ... lambda user_input: [ ... OpenAIChatMessage(role="user", content=user_input), ... ], ... model="gpt-3.5-turbo", ... functions=[ ... { ... "name": "get_current_weather", ... "description": "Gets the current weather in a given location, use this function for any questions related to the weather", ... "parameters": { ... "type": "object", ... "properties": { ... "location": { ... "description": "The city to get the weather, e.g. San Francisco. Guess the location from user messages", ... "type": "string", ... }, ... "format": { ... "description": "A string with the full content of what the given role said", ... "type": "string", ... "enum": ("celsius", "fahrenheit"), ... }, ... }, ... "required": ["location"], ... }, ... } ... ], ... temperature=0, ... ).map( ... lambda delta: get_current_weather(**json.loads(delta.content)) ... if delta.role == "function" and delta.name == "get_current_weather" ... else delta ... ) ... ... return await collect_final_output(stream("how is the weather today in Rio de Janeiro?")) ... >>> asyncio.run(example()) # doctest:+SKIP [{'location': 'Rio de Janeiro', 'forecast': 'sunny', 'temperature': '25 C'}] """ def __init__( self: "OpenAIChatStream[T, OpenAIChatDelta]", name: str, call: Callable[ [T], List[OpenAIChatMessage], ], model: str, functions: Optional[List[Dict[str, Any]]] = None, function_call: Optional[Union[Literal["none", "auto"], Dict[str, Any]]] = None, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3, ) -> None: async def chat_completion( messages: List[OpenAIChatMessage], ) -> AsyncGenerator[StreamOutput[OpenAIChatDelta], None]: loop = asyncio.get_event_loop() @retry(tries=retries) def get_completions(): function_kwargs = {} if functions is not None: function_kwargs["functions"] = functions if function_call is not None: function_kwargs["function_call"] = function_call # import openai return OpenAIChatStream.client().chat.completions.create( timeout=timeout, model=model, messages=cast(Any, [m.to_dict() for m in messages]), temperature=temperature, stream=True, max_tokens=max_tokens, **function_kwargs, ) completions = await loop.run_in_executor(None, get_completions) pending_function_call: Optional[OpenAIChatDelta] = None for output in completions: if len(output.choices) == 0: continue delta = output.choices[0].delta if not delta: continue if delta.function_call is not None: role = delta.role function_name: Optional[str] = delta.function_call.name function_arguments: Optional[str] = delta.function_call.arguments if function_name is not None: pending_function_call = OpenAIChatDelta( role="function", name=function_name, content=function_arguments or "", ) elif ( pending_function_call is not None and function_arguments is not None ): pending_function_call.content += function_arguments elif delta.content is not None: role = cast( Union[Literal["assistant", "function"], None], delta.role ) yield self._output_wrap( OpenAIChatDelta( role=role, content=delta.content, ) ) else: if pending_function_call: yield self._output_wrap(pending_function_call) pending_function_call = None if pending_function_call: yield self._output_wrap(pending_function_call) pending_function_call = None super().__init__( name, lambda input: cast(AsyncGenerator[U, None], chat_completion(call(input))), ) _client_ = None @staticmethod def client(): """ Returns the OpenAI client instance being used to make the LLM calls. """ if not OpenAIChatStream._client_: openai = importlib.import_module("openai") OpenAIChatStream._client_ = openai.OpenAI() return OpenAIChatStream._client_
Ancestors
- Stream
- typing.Generic
Static methods
def client()
-
Returns the OpenAI client instance being used to make the LLM calls.
Expand source code
@staticmethod def client(): """ Returns the OpenAI client instance being used to make the LLM calls. """ if not OpenAIChatStream._client_: openai = importlib.import_module("openai") OpenAIChatStream._client_ = openai.OpenAI() return OpenAIChatStream._client_
Inherited members
class OpenAICompletionStream (name: str, call: Callable[[~T], str], model: str, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3)
-
OpenAICompletionStream
uses the most simple LLMs from OpenAI based on GPT-3 for text completion, if you are looking for ChatCompletion, take a look atOpenAIChatStream
.The
OpenAICompletionStream
takes a lambda function that should return a string with the prompt for completion.To use this stream you will need an
OPENAI_API_KEY
environment variable to be available, and then you can generate completions out of it.You can read more about the completion API on OpenAI API reference
Example
>>> from langstream import join_final_output >>> from langstream.contrib import OpenAICompletionStream >>> import asyncio ... >>> async def example(): ... recipe_stream = OpenAICompletionStream[str, str]( ... "RecipeStream", ... lambda recipe_name: f"Here is my {recipe_name} recipe: ", ... model="ada", ... ) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP '【Instant Noodles】\n\nIngredients:\n\n1 cup of water'
Expand source code
class OpenAICompletionStream(Stream[T, U]): """ `OpenAICompletionStream` uses the most simple LLMs from OpenAI based on GPT-3 for text completion, if you are looking for ChatCompletion, take a look at `OpenAIChatStream`. The `OpenAICompletionStream` takes a lambda function that should return a string with the prompt for completion. To use this stream you will need an `OPENAI_API_KEY` environment variable to be available, and then you can generate completions out of it. You can read more about the completion API on [OpenAI API reference](https://platform.openai.com/docs/api-reference/completions) Example ------- >>> from langstream import join_final_output >>> from langstream.contrib import OpenAICompletionStream >>> import asyncio ... >>> async def example(): ... recipe_stream = OpenAICompletionStream[str, str]( ... "RecipeStream", ... lambda recipe_name: f"Here is my {recipe_name} recipe: ", ... model="ada", ... ) ... ... return await join_final_output(recipe_stream("instant noodles")) ... >>> asyncio.run(example()) # doctest:+SKIP '【Instant Noodles】\\n\\nIngredients:\\n\\n1 cup of water' """ def __init__( self: "OpenAICompletionStream[T, str]", name: str, call: Callable[ [T], str, ], model: str, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, timeout: int = 5, retries: int = 3, ) -> None: async def completion(prompt: str) -> AsyncGenerator[U, None]: loop = asyncio.get_event_loop() @retry(tries=retries) def get_completions(): return OpenAICompletionStream.client().completions.create( model=model, prompt=prompt, temperature=temperature, stream=True, max_tokens=max_tokens, timeout=timeout, ) completions = await loop.run_in_executor(None, get_completions) for output in completions: output = cast(dict, output.model_dump()) if "choices" in output: if len(output["choices"]) > 0: if "text" in output["choices"][0]: yield output["choices"][0]["text"] super().__init__(name, lambda input: completion(call(input))) _client_ = None @staticmethod def client(): """ Returns the OpenAI client instance being used to make the LLM calls. """ if not OpenAICompletionStream._client_: openai = importlib.import_module("openai") OpenAICompletionStream._client_ = openai.OpenAI() return OpenAICompletionStream._client_
Ancestors
- Stream
- typing.Generic
Static methods
def client()
-
Returns the OpenAI client instance being used to make the LLM calls.
Expand source code
@staticmethod def client(): """ Returns the OpenAI client instance being used to make the LLM calls. """ if not OpenAICompletionStream._client_: openai = importlib.import_module("openai") OpenAICompletionStream._client_ = openai.OpenAI() return OpenAICompletionStream._client_
Inherited members