Package langstream

🪽🔗 LangStream

This is the top level module for LangStream, all the core classes and functions are made available to be imported from here:

>>> from langstream import Stream, as_async_generator, filter_final_output # etc

Stream

The Stream is the core concept of LangStream for working with LLMs (Large Language Models), it provides a way to create composable calls to LLMs and any other processing elements.

Since LLMs produce one token at a time, streams are essentially Python AsyncGenerators under the hood, with type-safe composable functions on top. By making use of the type hints and the composable functions, LangStream makes it very easy to build and debug a complex stream of execution for working with LLMs.

Since async generation streams is the fundamental block of LangStream, it's processing should never be blocking. When executing a Stream with input, you get back an AsyncGenerator that produces the outputs of all pieces in the whole Stream, not only the final one. This means you have access to the each step of what is happening, making it easier to display and debug. The final output at the edge of the stream is marked as final when you process the Stream stream, with helpers available to filter them.

On this module, we document the low-level concepts of Streams, so we use simple examples of hardcoded string generation. But one you learn the composition fundamentals here, you can expect to apply the same functions for composing everythere in LangStream.

Core Concepts

Stream: A Stream takes in a name and a function, which takes an input and produces an asynchronous stream of outputs from it (AsyncGenerator). Streams can be streamed together with the composition methods below, and their input and output types can be specifying in the type signature.

SingleOutputStream: A SingleOutputStream is a subtype of Stream that produces a single asynchronous final output after processing, rather than an asynchronous stream of outputs.

Composition Methods

Stream.map(): Transforms the output of the Stream by applying a function to each token as they arrive. This is non-blocking and maps as stream generations flow in: stream.map(lambda token: token.lower())

Stream.and_then(): Applies a function on the list of results of the Stream. Differently from map, this is blocking, and collects the outputs before applying the function. It can also take another Stream as argument, effectively composing two Streams together: first_stream.and_then(second_stream).

Stream.collect(): Collects the output of a Stream into a list. This is a blocking operation and can be used when the next processing step requires the full output at once.

Stream.join(): Joins the output of a string producing Stream into a single string by concatenating each item.

Stream.gather(): Gathers results from a stream that produces multiple async generators and processes them in parallel, returning a list of lists of the results of all generators, allowing you to execute many Streams at the same time, this is similar to asyncio.gather.

Contrib: OpenAI, GPT4All and more

The core of LangStream is kept small and stable, so all the integrations that build on top of it live separate, under the langstream.contrib module. Check it out for reference and code examples of the integrations.

Examples

Using Stream to process text data:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
...     # Stream that splits a sentence into words
...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
...     # Stream that capitalizes each word
...     capitalized_stream = words_stream.map(lambda word: word.capitalize())
...     # Join the capitalized words into a single string
...     stream = capitalized_stream.join(" ")
...
...     async for output in stream("this is an example"):
...         if output.final:
...             return output.data
...
>>> asyncio.run(example())
'This Is An Example'

Here you can find the reference and code examples, for further tutorials and use cases, consult the documentation.

Expand source code
"""
🪽🔗 LangStream

This is the top level module for [LangStream](https://github.com/rogeriochaves/langstream),
all the core classes and functions are made available to be imported from here:

>>> from langstream import Stream, as_async_generator, filter_final_output # etc

Stream
=====

The `Stream` is the core concept of LangStream for working with LLMs (Large Language Models), it
provides a way to create composable calls to LLMs and any other processing elements.

Since LLMs produce one token at a time, streams are essentially Python AsyncGenerators under the
hood, with type-safe composable functions on top. By making use of the type hints and the
composable functions, LangStream makes it very easy to build and debug a complex stream of execution
for working with LLMs.

Since async generation streams is the fundamental block of LangStream, it's processing should never be blocking.
When executing a Stream with input, you get back an AsyncGenerator that produces the outputs of all
pieces in the whole Stream, not only the final one. This means you have access to the each step of
what is happening, making it easier to display and debug. The final output at the edge of the stream
is marked as `final` when you process the Stream stream, with helpers available to filter them.

On this module, we document the low-level concepts of Streams, so we use simple examples of hardcoded
string generation. But one you learn the composition fundamentals here, you can expect to apply the
same functions for composing everythere in LangStream.

Core Concepts
-------------

`Stream`:
    A Stream takes in a name and a function, which takes an input and produces an asynchronous stream of outputs from it (AsyncGenerator).
    Streams can be streamed together with the composition methods below, and their input and output types can be
    specifying in the type signature.

`SingleOutputStream`:
    A SingleOutputStream is a subtype of Stream that produces a single asynchronous final output after processing,
    rather than an asynchronous stream of outputs.

Composition Methods
------------------

`Stream.map`:
    Transforms the output of the Stream by applying a function to each token as they arrive. This is
    non-blocking and maps as stream generations flow in: `stream.map(lambda token: token.lower())`

`Stream.and_then`:
    Applies a function on the list of results of the Stream. Differently from `map`, this is blocking,
    and collects the outputs before applying the function. It can also take another Stream as argument,
    effectively composing two Streams together: `first_stream.and_then(second_stream)`.

`Stream.collect`:
    Collects the output of a Stream into a list. This is a blocking operation and can be used
    when the next processing step requires the full output at once.

`Stream.join`:
    Joins the output of a string producing Stream into a single string by concatenating each item.

`Stream.gather`:
    Gathers results from a stream that produces multiple async generators and processes them in parallel,
    returning a list of lists of the results of all generators, allowing you to execute many Streams at
    the same time, this is similar to `asyncio.gather`.

Contrib: OpenAI, GPT4All and more
---------------------------------

The core of LangStream is kept small and stable, so all the integrations that build on top of it live separate,
under the `langstream.contrib` module. Check it out for reference and code examples of the integrations.

Examples
--------

Using Stream to process text data:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     # Stream that splits a sentence into words
    ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
    ...     # Stream that capitalizes each word
    ...     capitalized_stream = words_stream.map(lambda word: word.capitalize())
    ...     # Join the capitalized words into a single string
    ...     stream = capitalized_stream.join(" ")
    ...
    ...     async for output in stream("this is an example"):
    ...         if output.final:
    ...             return output.data
    ...
    >>> asyncio.run(example())
    'This Is An Example'

---

Here you can find the reference and code examples, for further tutorials and use cases, consult the [documentation](https://github.com/rogeriochaves/langstream).
"""

from langstream.core.stream import Stream, StreamOutput, SingleOutputStream
from langstream.utils.stream import (
    debug,
    filter_final_output,
    collect_final_output,
    join_final_output,
)
from langstream.utils.async_generator import (
    as_async_generator,
    collect,
    join,
    gather,
    next_item,
)

__all__ = (
    "Stream",
    "StreamOutput",
    "SingleOutputStream",
    "debug",
    "filter_final_output",
    "collect_final_output",
    "join_final_output",
    "as_async_generator",
    "collect",
    "join",
    "gather",
    "next_item",
)

Sub-modules

langstream.contrib

The Contrib module is where all the other streams and integrations that build on top of core Stream module live. Here you can import the LLMs you want …

langstream.core
langstream.utils

Functions

async def as_async_generator(*values: ~T) ‑> AsyncGenerator[~T, Any]

Creates an asynchronous generator out of simple values, it's useful for converting a single value or a list of values to a streamed output of a Stream

Example

>>> import asyncio
>>> async def run_example():
...     async for value in as_async_generator(1, 2, 3):
...         print(value)
...
>>> asyncio.run(run_example())
1
2
3
Expand source code
async def as_async_generator(*values: T) -> AsyncGenerator[T, Any]:
    """
    Creates an asynchronous generator out of simple values, it's useful for
    converting a single value or a list of values to a streamed output of a Stream

    Example
    -------
    >>> import asyncio
    >>> async def run_example():
    ...     async for value in as_async_generator(1, 2, 3):
    ...         print(value)
    ...
    >>> asyncio.run(run_example())
    1
    2
    3
    """
    for item in values:
        yield item
async def collect(async_generator: AsyncGenerator[~T, Any]) ‑> List[~T]

Collect items from an async generator into a list.

>>> import asyncio
>>> async def async_gen():
...     yield "hello"
...     yield "how"
...     yield "can"
...     yield "I"
...     yield "assist"
...     yield "you"
...     yield "today"
>>> asyncio.run(collect(async_gen()))
['hello', 'how', 'can', 'I', 'assist', 'you', 'today']
Expand source code
async def collect(async_generator: AsyncGenerator[T, Any]) -> List[T]:
    """
    Collect items from an async generator into a list.

    >>> import asyncio
    >>> async def async_gen():
    ...     yield "hello"
    ...     yield "how"
    ...     yield "can"
    ...     yield "I"
    ...     yield "assist"
    ...     yield "you"
    ...     yield "today"
    >>> asyncio.run(collect(async_gen()))
    ['hello', 'how', 'can', 'I', 'assist', 'you', 'today']
    """
    return [item async for item in async_generator]
async def collect_final_output(async_iterable: AsyncGenerator[StreamOutput[~T], Any]) ‑> Iterable[~T]

Blocks the stream until it is done, then joins the final output values into a single list.

Example

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def all_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
...     async for output in greet_stream("Alice"):
...         print(output)
...
>>> async def collected_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
...     output = await collect_final_output(greet_stream("Alice"))
...     print(output)
...
>>> asyncio.run(all_outputs())
StreamOutput(stream='GreetingStream', data='Hello, ', final=True)
StreamOutput(stream='GreetingStream', data='Alice', final=True)
StreamOutput(stream='GreetingStream', data='!', final=True)
>>> asyncio.run(collected_outputs())
['Hello, ', 'Alice', '!']
Expand source code
async def collect_final_output(
    async_iterable: AsyncGenerator[StreamOutput[T], Any]
) -> Iterable[T]:
    """
    Blocks the stream until it is done, then joins the final output values into a single list.

    Example
    -------
    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def all_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
    ...     async for output in greet_stream("Alice"):
    ...         print(output)
    ...
    >>> async def collected_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
    ...     output = await collect_final_output(greet_stream("Alice"))
    ...     print(output)
    ...
    >>> asyncio.run(all_outputs())
    StreamOutput(stream='GreetingStream', data='Hello, ', final=True)
    StreamOutput(stream='GreetingStream', data='Alice', final=True)
    StreamOutput(stream='GreetingStream', data='!', final=True)
    >>> asyncio.run(collected_outputs())
    ['Hello, ', 'Alice', '!']
    """
    return await collect(filter_final_output(async_iterable))
def debug(stream: Callable[[~T], AsyncGenerator[StreamOutput[~U], Any]]) ‑> Stream[~T, ~U]

A helper for helping you debugging streams. Simply wrap any piece of the stream or the whole stream together with the debug() function to print out everything that goes through it and its nested streams.

For example, you can wrap the whole stream to debug everything:

>>> from langstream import Stream, join_final_output
>>> import asyncio
...
>>> async def debug_whole_stream():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?")
...     stream = debug(
...         greet_stream.join().and_then(polite_stream)
...     )
...     await join_final_output(stream("Alice"))
...
>>> asyncio.run(debug_whole_stream())
<BLANKLINE>
<BLANKLINE>
> GreetingStream
<BLANKLINE>
Hello, Alice!
<BLANKLINE>
> PoliteStream
<BLANKLINE>
Hello, Alice! How are you?

Or, alternatively, you can debug just a piece of it:

>>> from langstream import Stream, join_final_output
>>> import asyncio
...
>>> async def debug_whole_stream():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?")
...     stream = debug(greet_stream).join().and_then(polite_stream)
...     await join_final_output(stream("Alice"))
...
>>> asyncio.run(debug_whole_stream())
<BLANKLINE>
<BLANKLINE>
> GreetingStream
<BLANKLINE>
Hello, Alice!
Expand source code
def debug(
    stream: Callable[[T], AsyncGenerator[StreamOutput[U], Any]]
) -> Stream[T, U]:
    """
    A helper for helping you debugging streams. Simply wrap any piece of the stream or the whole stream together
    with the `debug` function to print out everything that goes through it and its nested streams.

    For example, you can wrap the whole stream to debug everything:
    >>> from langstream import Stream, join_final_output
    >>> import asyncio
    ...
    >>> async def debug_whole_stream():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?")
    ...     stream = debug(
    ...         greet_stream.join().and_then(polite_stream)
    ...     )
    ...     await join_final_output(stream("Alice"))
    ...
    >>> asyncio.run(debug_whole_stream())
    <BLANKLINE>
    <BLANKLINE>
    \x1b[32m> GreetingStream\x1b[39m
    <BLANKLINE>
    Hello, Alice!
    <BLANKLINE>
    \x1b[32m> PoliteStream\x1b[39m
    <BLANKLINE>
    Hello, Alice! How are you?

    Or, alternatively, you can debug just a piece of it:
    >>> from langstream import Stream, join_final_output
    >>> import asyncio
    ...
    >>> async def debug_whole_stream():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?")
    ...     stream = debug(greet_stream).join().and_then(polite_stream)
    ...     await join_final_output(stream("Alice"))
    ...
    >>> asyncio.run(debug_whole_stream())
    <BLANKLINE>
    <BLANKLINE>
    \x1b[32m> GreetingStream\x1b[39m
    <BLANKLINE>
    Hello, Alice!
    """

    async def debug(input: T) -> AsyncGenerator[StreamOutput[U], Any]:
        last_stream = ""
        last_output = ""
        async for output in stream(input):
            if output.stream != last_stream and last_output == output.data:
                yield output
                continue

            if output.stream != last_stream:
                print("\n", end="", flush=True)
                last_stream = output.stream
                print(f"\n{Fore.GREEN}> {output.stream}{Fore.RESET}\n")
            if hasattr(output.data, "__stream_debug__"):
                output.data.__stream_debug__() # type: ignore
            elif isinstance(output.data, Exception):
                print(f"{Fore.RED}Exception:{Fore.RESET} {output.data}", end="")
            else:
                print(
                    output.data,
                    end=("" if isinstance(output.data, str) else ", "),
                    flush=True,
                )
            last_output = output.data
            yield output

    next_name = f"@debug"
    if hasattr(next, "name"):
        next_name = f"{next.name}@debug"
    return Stream[T, U](next_name, debug)
async def filter_final_output(async_iterable: AsyncGenerator[StreamOutput[~T], Any]) ‑> AsyncGenerator[~T, Any]

Filters only the final output values of a Stream's outputs.

Example

>>> from langstream import Stream, filter_final_output
>>> import asyncio
...
>>> async def all_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
...     async for output in polite_stream("Alice"):
...         print(output)
...
>>> async def only_final_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
...     async for final_output in filter_final_output(polite_stream("Alice")):
...         print(final_output)
...
>>> asyncio.run(all_outputs())
StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False)
StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True)
>>> asyncio.run(only_final_outputs())
Hello, Alice! How are you?
Expand source code
async def filter_final_output(
    async_iterable: AsyncGenerator[StreamOutput[T], Any]
) -> AsyncGenerator[T, Any]:
    """
    Filters only the final output values of a Stream's outputs.

    Example
    -------
    >>> from langstream import Stream, filter_final_output
    >>> import asyncio
    ...
    >>> async def all_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
    ...     async for output in polite_stream("Alice"):
    ...         print(output)
    ...
    >>> async def only_final_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
    ...     async for final_output in filter_final_output(polite_stream("Alice")):
    ...         print(final_output)
    ...
    >>> asyncio.run(all_outputs())
    StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False)
    StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True)
    >>> asyncio.run(only_final_outputs())
    Hello, Alice! How are you?
    """
    async for output in async_iterable:
        if output.final:
            yield cast(T, output.data)
async def gather(async_generators: List[AsyncGenerator[~T, Any]]) ‑> List[List[~T]]

Gather items from a list of async generators into a list of lists.

>>> import asyncio
>>> async def async_gen1():
...     yield "hello"
...     yield "how"
...     yield "can"
>>> async def async_gen2():
...     yield "I"
...     yield "assist"
...     yield "you"
...     yield "today"
>>> asyncio.run(gather([async_gen1(), async_gen2()]))
[['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']]
Expand source code
async def gather(async_generators: List[AsyncGenerator[T, Any]]) -> List[List[T]]:
    """
    Gather items from a list of async generators into a list of lists.

    >>> import asyncio
    >>> async def async_gen1():
    ...     yield "hello"
    ...     yield "how"
    ...     yield "can"
    >>> async def async_gen2():
    ...     yield "I"
    ...     yield "assist"
    ...     yield "you"
    ...     yield "today"
    >>> asyncio.run(gather([async_gen1(), async_gen2()]))
    [['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']]
    """
    return await asyncio.gather(*(collect(generator) for generator in async_generators))
async def join(async_generator: AsyncGenerator[str, Any], separator='') ‑> str

Collect items from an async generator and join them in a string.

>>> import asyncio
>>> async def async_gen():
...     yield "hello "
...     yield "how "
...     yield "can "
...     yield "I "
...     yield "assist "
...     yield "you "
...     yield "today"
>>> asyncio.run(join(async_gen()))
'hello how can I assist you today'
Expand source code
async def join(async_generator: AsyncGenerator[str, Any], separator="") -> str:
    """
    Collect items from an async generator and join them in a string.

    >>> import asyncio
    >>> async def async_gen():
    ...     yield "hello "
    ...     yield "how "
    ...     yield "can "
    ...     yield "I "
    ...     yield "assist "
    ...     yield "you "
    ...     yield "today"
    >>> asyncio.run(join(async_gen()))
    'hello how can I assist you today'
    """
    lst = await collect(async_generator)
    return separator.join(lst)
async def join_final_output(async_iterable: AsyncGenerator[StreamOutput[str], Any]) ‑> str

Blocks a string producing stream until it is done, then joins the final output values into a single string.

Example

>>> from langstream import Stream, as_async_generator, join_final_output
>>> import asyncio
...
>>> async def all_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
...     async for output in greet_stream("Alice"):
...         print(output)
...
>>> async def joined_outputs():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
...     output = await join_final_output(greet_stream("Alice"))
...     print(output)
...
>>> asyncio.run(all_outputs())
StreamOutput(stream='GreetingStream', data='Hello, ', final=True)
StreamOutput(stream='GreetingStream', data='Alice', final=True)
StreamOutput(stream='GreetingStream', data='!', final=True)
>>> asyncio.run(joined_outputs())
Hello, Alice!
Expand source code
async def join_final_output(
    async_iterable: AsyncGenerator[StreamOutput[str], Any]
) -> str:
    """
    Blocks a string producing stream until it is done, then joins the final output values into a single string.

    Example
    -------
    >>> from langstream import Stream, as_async_generator, join_final_output
    >>> import asyncio
    ...
    >>> async def all_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
    ...     async for output in greet_stream("Alice"):
    ...         print(output)
    ...
    >>> async def joined_outputs():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!"))
    ...     output = await join_final_output(greet_stream("Alice"))
    ...     print(output)
    ...
    >>> asyncio.run(all_outputs())
    StreamOutput(stream='GreetingStream', data='Hello, ', final=True)
    StreamOutput(stream='GreetingStream', data='Alice', final=True)
    StreamOutput(stream='GreetingStream', data='!', final=True)
    >>> asyncio.run(joined_outputs())
    Hello, Alice!
    """
    return await join(filter_final_output(async_iterable))
async def next_item(async_generator: AsyncGenerator[~T, Any]) ‑> ~T

Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken

>>> import asyncio
>>> async def async_gen():
...     yield "hello"
...     yield "how"
...     yield "are"
>>> asyncio.run(next_item(async_gen()))
'hello'
Expand source code
async def next_item(async_generator: AsyncGenerator[T, Any]) -> T:
    """
    Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken

    >>> import asyncio
    >>> async def async_gen():
    ...     yield "hello"
    ...     yield "how"
    ...     yield "are"
    >>> asyncio.run(next_item(async_gen()))
    'hello'
    """
    return await async_generator.__aiter__().__anext__()

Classes

class SingleOutputStream (name: str, call: Callable[[~T], Union[AsyncGenerator[StreamOutput[~U], Any], AsyncGenerator[~U, Any], ~U]])
Expand source code
class SingleOutputStream(Stream[T, U]):
    """"""

    _call: Callable[
        [T], Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U]
    ]

    async def _reyield(
        self, async_iterable: AsyncGenerator[StreamOutput[U], Any]
    ) -> AsyncGenerator[Tuple[Optional[U], StreamOutput[U]], Any]:
        final_value: Optional[U] = None
        async for u in async_iterable:
            u_rewrapped = self._output_wrap(u, final=False)
            if u.final:
                final_value = u.data
            yield (final_value, u_rewrapped)

    def map(self, fn: Callable[[U], V]) -> "SingleOutputStream[T, V]":
        """
        Similar to `Stream.map`, this method applies a function to the final output of the stream, but returns a SingleOutputStream.

        The `fn` parameter is a function that takes a value of type U and returns a value of type V.

        For detailed examples, refer to the documentation of `Stream.map`.
        """

        next_name = f"{self.name}@map"

        async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
            # Reyield previous stream so we never block the stream, and at the same time yield mapped values
            final_u: Optional[U] = None
            async for value, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[V], to_reyield)
                final_u = value

            yield self._output_wrap(fn(unwrap(final_u)), name=next_name)

        return SingleOutputStream[T, V](next_name, lambda input: map(input))

    def filter(self, fn: Callable[[U], bool]) -> "SingleOutputStream[T, Union[U, None]]":
        """
        Similar to `Stream.filter`, however, singe SingleOutputStream must always produce a value, this method simply replaces
        the value with a None if the filter function returns False

        The `fn` parameter is a function that takes a value of type U and returns a bool.

        Example:

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
        ...     even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers]))
        ...     return await collect_final_output(even_stream(9))
        ...
        >>> asyncio.run(example())
        [None]
        """
        next_name = f"{self.name}@filter"

        async def filter(input: T) -> AsyncGenerator[StreamOutput[Union[U, None]], Any]:
            # Reyield previous stream so we never block the stream, and at the same time yield filtered values
            final_u: Optional[U] = None
            async for value, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[Union[U, None]], to_reyield)
                final_u = value

            yield self._output_wrap(
                final_u if fn(unwrap(final_u)) else None, name=next_name
            )

        return SingleOutputStream[T, Union[U, None]](
            next_name, lambda input: filter(input)
        )

    def and_then(
        self,
        next: Callable[
            [U],
            Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V],
        ],
    ) -> "Stream[T, V]":
        """
        Similar to `Stream.and_then`, this method takes a function that receives the final output of this stream as its input and returns a new Stream.

        For detailed examples, refer to the documentation of `Stream.and_then`.
        """
        next_name = f"{self.name}@and_then"
        if hasattr(next, "name"):
            next_name = next.name

        async def and_then(
            input: T,
        ) -> AsyncGenerator[StreamOutput[V], Any]:
            # First, reyield previous stream so we never block the stream, and collect the last result when it is done
            final_u: Optional[U] = None
            async for value, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[V], to_reyield)
                final_u = value

            # Then, call in the next stream
            iter_v = self._wrap(next(unwrap(final_u)), name=next_name)
            async for v in iter_v:
                yield v

        return Stream[T, V](next_name, and_then)

    def pipe(
        self,
        fn: Callable[
            [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any]
        ],
    ) -> "Stream[T, V]":
        """
        Similar to `Stream.pipe`, except that it takes a stream that will only even produce a single value, so it effectively works basically the same as `and_then`, only with a different interface.

        For detailed examples, refer to the documentation of `Stream.pipe`.
        """
        next_name = f"{self.name}@pipe"
        if hasattr(next, "name"):
            next_name = next.name

        async def pipe(
            input: T,
        ) -> AsyncGenerator[StreamOutput[V], Any]:
            # First, reyield previous stream so we never block the stream, and collect the last result when it is done
            final_u: Optional[U] = None
            async for value, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[V], to_reyield)
                final_u = value

            # Then, call in the piping function
            single_item_stream = as_async_generator(unwrap(final_u))
            iter_v = self._wrap(fn(single_item_stream), name=next_name)
            async for v in iter_v:
                yield cast(StreamOutput[V], v)

        return Stream[T, V](next_name, pipe)

    def gather(
        self: "Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]",
    ) -> "SingleOutputStream[T, List[List[V]]]":
        """
        Similar to `Stream.gather`, this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list.

        For detailed examples, refer to the documentation of `Stream.gather`.
        """

        next_name = f"{self.name}@gather"

        async def gather(
            input: T,
        ) -> AsyncGenerator[StreamOutput[List[List[V]]], Any]:
            # First, reyield previous stream so we never block the stream, and collect the last result when it is done
            final_u: Optional[
                Union[
                    List[AsyncGenerator[StreamOutput[V], Any]],
                    List[AsyncGenerator[V, Any]],
                ]
            ] = None

            # TODO: try to work out why the type signature of self(input) is not fitting in there, it should
            async for value, to_reyield in self._reyield(cast(Any, self(input))):
                yield cast(StreamOutput[List[List[V]]], to_reyield)
                final_u = value

            if final_u is None:
                final_u = []

            async def consume_async_generator(
                generator: AsyncGenerator[X, Any],
            ) -> Iterable[X]:
                return [item async for item in generator]

            # TODO: should we really wait for everything to arrive before calling asyncio gather? Can we call it during the previous reyield?
            vss: Union[
                List[List[StreamOutput[V]]], List[List[V]]
            ] = await asyncio.gather(*(consume_async_generator(gen) for gen in final_u))

            clean_vss: List[List[V]] = []
            for vs in vss:
                clean_vs: List[V] = []
                for v in vs:
                    v_rewrapped = cast(
                        StreamOutput[List[List[V]]],
                        self._output_wrap(v, final=False),
                    )
                    if isinstance(v, StreamOutput):
                        yield v_rewrapped
                        if v.final:
                            clean_vs.append(v.data)
                    else:
                        clean_vs.append(v)
                clean_vss.append(clean_vs)

            yield self._output_wrap(clean_vss, name=next_name)

        return SingleOutputStream[T, List[List[V]]](next_name, gather)

    def on_error(
        self,
        handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]],
    ) -> "SingleOutputStream[T, Union[U, V]]":
        """
        Similar to `Stream.on_error`, this method handles any uncaught exceptions that might occur during the execution of the current stream.

        For detailed examples, refer to the documentation of `Stream.gather`.
        """

        next_name = f"{self.name}@on_error"
        if hasattr(next, "name"):
            next_name = next.name

        async def on_error(
            input: T,
        ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]:
            try:
                async for output in self(input):
                    yield cast(StreamOutput[Union[U, V]], output)
            except Exception as e:
                async for output in self._wrap(handler(e), name=next_name):
                    yield cast(StreamOutput[Union[U, V]], output)

        return SingleOutputStream[T, Union[U, V]](
            next_name, lambda input: on_error(input)
        )

Ancestors

Methods

def and_then(self, next: Callable[[~U], Union[AsyncGenerator[StreamOutput[~V], Any], AsyncGenerator[~V, Any], ~V]]) ‑> Stream[~T, ~V]

Similar to Stream.and_then(), this method takes a function that receives the final output of this stream as its input and returns a new Stream.

For detailed examples, refer to the documentation of Stream.and_then().

Expand source code
def and_then(
    self,
    next: Callable[
        [U],
        Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V],
    ],
) -> "Stream[T, V]":
    """
    Similar to `Stream.and_then`, this method takes a function that receives the final output of this stream as its input and returns a new Stream.

    For detailed examples, refer to the documentation of `Stream.and_then`.
    """
    next_name = f"{self.name}@and_then"
    if hasattr(next, "name"):
        next_name = next.name

    async def and_then(
        input: T,
    ) -> AsyncGenerator[StreamOutput[V], Any]:
        # First, reyield previous stream so we never block the stream, and collect the last result when it is done
        final_u: Optional[U] = None
        async for value, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[V], to_reyield)
            final_u = value

        # Then, call in the next stream
        iter_v = self._wrap(next(unwrap(final_u)), name=next_name)
        async for v in iter_v:
            yield v

    return Stream[T, V](next_name, and_then)
def filter(self, fn: Callable[[~U], bool]) ‑> SingleOutputStream[~T, Optional[~U]]

Similar to Stream.filter(), however, singe SingleOutputStream must always produce a value, this method simply replaces the value with a None if the filter function returns False

The fn parameter is a function that takes a value of type U and returns a bool.

Example:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
...     even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers]))
...     return await collect_final_output(even_stream(9))
...
>>> asyncio.run(example())
[None]
Expand source code
def filter(self, fn: Callable[[U], bool]) -> "SingleOutputStream[T, Union[U, None]]":
    """
    Similar to `Stream.filter`, however, singe SingleOutputStream must always produce a value, this method simply replaces
    the value with a None if the filter function returns False

    The `fn` parameter is a function that takes a value of type U and returns a bool.

    Example:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
    ...     even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers]))
    ...     return await collect_final_output(even_stream(9))
    ...
    >>> asyncio.run(example())
    [None]
    """
    next_name = f"{self.name}@filter"

    async def filter(input: T) -> AsyncGenerator[StreamOutput[Union[U, None]], Any]:
        # Reyield previous stream so we never block the stream, and at the same time yield filtered values
        final_u: Optional[U] = None
        async for value, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[Union[U, None]], to_reyield)
            final_u = value

        yield self._output_wrap(
            final_u if fn(unwrap(final_u)) else None, name=next_name
        )

    return SingleOutputStream[T, Union[U, None]](
        next_name, lambda input: filter(input)
    )
def gather(self: Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]) ‑> SingleOutputStream[~T, typing.List[typing.List[~V]]]

Similar to Stream.gather(), this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list.

For detailed examples, refer to the documentation of Stream.gather().

Expand source code
def gather(
    self: "Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]",
) -> "SingleOutputStream[T, List[List[V]]]":
    """
    Similar to `Stream.gather`, this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list.

    For detailed examples, refer to the documentation of `Stream.gather`.
    """

    next_name = f"{self.name}@gather"

    async def gather(
        input: T,
    ) -> AsyncGenerator[StreamOutput[List[List[V]]], Any]:
        # First, reyield previous stream so we never block the stream, and collect the last result when it is done
        final_u: Optional[
            Union[
                List[AsyncGenerator[StreamOutput[V], Any]],
                List[AsyncGenerator[V, Any]],
            ]
        ] = None

        # TODO: try to work out why the type signature of self(input) is not fitting in there, it should
        async for value, to_reyield in self._reyield(cast(Any, self(input))):
            yield cast(StreamOutput[List[List[V]]], to_reyield)
            final_u = value

        if final_u is None:
            final_u = []

        async def consume_async_generator(
            generator: AsyncGenerator[X, Any],
        ) -> Iterable[X]:
            return [item async for item in generator]

        # TODO: should we really wait for everything to arrive before calling asyncio gather? Can we call it during the previous reyield?
        vss: Union[
            List[List[StreamOutput[V]]], List[List[V]]
        ] = await asyncio.gather(*(consume_async_generator(gen) for gen in final_u))

        clean_vss: List[List[V]] = []
        for vs in vss:
            clean_vs: List[V] = []
            for v in vs:
                v_rewrapped = cast(
                    StreamOutput[List[List[V]]],
                    self._output_wrap(v, final=False),
                )
                if isinstance(v, StreamOutput):
                    yield v_rewrapped
                    if v.final:
                        clean_vs.append(v.data)
                else:
                    clean_vs.append(v)
            clean_vss.append(clean_vs)

        yield self._output_wrap(clean_vss, name=next_name)

    return SingleOutputStream[T, List[List[V]]](next_name, gather)
def map(self, fn: Callable[[~U], ~V]) ‑> SingleOutputStream[~T, ~V]

Similar to Stream.map(), this method applies a function to the final output of the stream, but returns a SingleOutputStream.

The fn parameter is a function that takes a value of type U and returns a value of type V.

For detailed examples, refer to the documentation of Stream.map().

Expand source code
def map(self, fn: Callable[[U], V]) -> "SingleOutputStream[T, V]":
    """
    Similar to `Stream.map`, this method applies a function to the final output of the stream, but returns a SingleOutputStream.

    The `fn` parameter is a function that takes a value of type U and returns a value of type V.

    For detailed examples, refer to the documentation of `Stream.map`.
    """

    next_name = f"{self.name}@map"

    async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
        # Reyield previous stream so we never block the stream, and at the same time yield mapped values
        final_u: Optional[U] = None
        async for value, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[V], to_reyield)
            final_u = value

        yield self._output_wrap(fn(unwrap(final_u)), name=next_name)

    return SingleOutputStream[T, V](next_name, lambda input: map(input))
def on_error(self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[~V], Any], ~V]]) ‑> SingleOutputStream[~T, typing.Union[~U, ~V]]

Similar to Stream.on_error(), this method handles any uncaught exceptions that might occur during the execution of the current stream.

For detailed examples, refer to the documentation of Stream.gather().

Expand source code
def on_error(
    self,
    handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]],
) -> "SingleOutputStream[T, Union[U, V]]":
    """
    Similar to `Stream.on_error`, this method handles any uncaught exceptions that might occur during the execution of the current stream.

    For detailed examples, refer to the documentation of `Stream.gather`.
    """

    next_name = f"{self.name}@on_error"
    if hasattr(next, "name"):
        next_name = next.name

    async def on_error(
        input: T,
    ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]:
        try:
            async for output in self(input):
                yield cast(StreamOutput[Union[U, V]], output)
        except Exception as e:
            async for output in self._wrap(handler(e), name=next_name):
                yield cast(StreamOutput[Union[U, V]], output)

    return SingleOutputStream[T, Union[U, V]](
        next_name, lambda input: on_error(input)
    )
def pipe(self, fn: Callable[[AsyncGenerator[~U, Any]], AsyncGenerator[Union[StreamOutput[~V], ~V], Any]]) ‑> Stream[~T, ~V]

Similar to Stream.pipe(), except that it takes a stream that will only even produce a single value, so it effectively works basically the same as and_then, only with a different interface.

For detailed examples, refer to the documentation of Stream.pipe().

Expand source code
def pipe(
    self,
    fn: Callable[
        [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any]
    ],
) -> "Stream[T, V]":
    """
    Similar to `Stream.pipe`, except that it takes a stream that will only even produce a single value, so it effectively works basically the same as `and_then`, only with a different interface.

    For detailed examples, refer to the documentation of `Stream.pipe`.
    """
    next_name = f"{self.name}@pipe"
    if hasattr(next, "name"):
        next_name = next.name

    async def pipe(
        input: T,
    ) -> AsyncGenerator[StreamOutput[V], Any]:
        # First, reyield previous stream so we never block the stream, and collect the last result when it is done
        final_u: Optional[U] = None
        async for value, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[V], to_reyield)
            final_u = value

        # Then, call in the piping function
        single_item_stream = as_async_generator(unwrap(final_u))
        iter_v = self._wrap(fn(single_item_stream), name=next_name)
        async for v in iter_v:
            yield cast(StreamOutput[V], v)

    return Stream[T, V](next_name, pipe)

Inherited members

class Stream (name: str, call: Callable[[~T], Union[AsyncGenerator[StreamOutput[~U], Any], AsyncGenerator[~U, Any], ~U]])
Expand source code
class Stream(Generic[T, U]):
    """"""

    _call: Callable[
        [T], Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U]
    ]

    def __init__(
        self,
        name: str,
        call: Callable[
            [T],
            Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U],
        ],
    ) -> None:
        self.name = name
        self._call = call

    def __call__(self, input: T) -> AsyncGenerator[StreamOutput[U], Any]:
        result = self._call(input)
        return self._wrap(result)

    def _wrap(
        self,
        value: Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V],
        final: Optional[bool] = None,
        name: Optional[str] = None,
    ) -> AsyncGenerator[StreamOutput[V], Any]:
        async def _wrap(
            values: Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any]],
        ) -> AsyncGenerator[StreamOutput[V], Any]:
            async for value in values:
                yield self._output_wrap(value, final=final, name=name)

        if isinstance(value, AsyncGenerator):
            return _wrap(value)
        return _wrap(as_async_generator(value))

    def _output_wrap(
        self, value: Union[StreamOutput[V], V], final=None, name=None
    ) -> StreamOutput[V]:
        if isinstance(value, StreamOutput):
            final = final if final is not None else value.final
            return StreamOutput[V](stream=value.stream, data=value.data, final=final)

        final = final if final is not None else True
        return StreamOutput[V](
            stream=self.name if name is None else name, data=value, final=final
        )

    async def _reyield(
        self, async_iterable: AsyncGenerator[StreamOutput[U], Any]
    ) -> AsyncGenerator[Tuple[List[U], StreamOutput[U]], Any]:
        values: List[U] = []
        async for u in async_iterable:
            u_rewrapped = self._output_wrap(u, final=False)
            if u.final:
                values.append(u.data)
            yield (values, u_rewrapped)

    def map(self, fn: Callable[[U], V]) -> "Stream[T, V]":
        """
        Maps the output of the current stream through a function as they arrive.

        The transform function will receive the current output of the stream and
        should return a modified version of it. This method is non-blocking and
        will continue processing the stream in parallel.

        Example:

        >>> from langstream import Stream, join_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
        ...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
        ...     return await join_final_output(polite_stream("Alice"))
        ...
        >>> asyncio.run(example())
        'Hello, Alice! How are you?'


        Example of processing one token at a time:

        >>> from langstream import Stream, as_async_generator, join_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
        ...     accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter
        ...     return await join_final_output(accronym_stream("as soon as possible"))
        ...
        >>> asyncio.run(example())
        'ASAP'
        """

        next_name = f"{self.name}@map"

        async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
            # Reyield previous stream so we never block the stream, and at the same time yield mapped values
            prev_len_values = 0
            async for values, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[V], to_reyield)
                if len(values) > prev_len_values:  # as soon as there is a new value
                    prev_len_values = len(values)
                    yield self._output_wrap(fn(values[-1]), name=next_name)

        return Stream[T, V](next_name, lambda input: map(input))

    def filter(self, fn: Callable[[U], bool]) -> "Stream[T, U]":
        """
        Filters the output of the current stream, keeping only the values that return True.

        This method is non-blocking and expects a function that returns True for keeping the value,
        or False for dropping it, as they arrive.

        Example:

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
        ...     even_stream = numbers_stream.filter(lambda input: input % 2 == 0)
        ...     return await collect_final_output(even_stream(9))
        ...
        >>> asyncio.run(example())
        [0, 2, 4, 6, 8]
        """

        next_name = f"{self.name}@filter"

        async def filter(input: T) -> AsyncGenerator[StreamOutput[U], Any]:
            # Reyield previous stream so we never block the stream, and at the same time yield mapped values
            prev_len_values = 0
            async for values, to_reyield in self._reyield(self(input)):
                yield to_reyield
                if len(values) > prev_len_values:  # as soon as there is a new value
                    prev_len_values = len(values)
                    if fn(values[-1]):
                        yield self._output_wrap(values[-1], name=next_name)

        return Stream[T, U](next_name, lambda input: filter(input))

    def and_then(
        self,
        next: Callable[
            [Iterable[U]],
            Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V],
        ],
    ) -> "Stream[T, V]":
        """
        Processes the output of the current stream through a transformation function or another stream.

        Unlike the map method, which applies transformations to outputs as they arrive,
        the and_then method first collects all the outputs and then passes them to the transformation function or the next stream.
        This method is blocking and will wait for the entire stream to be processed before applying the transformation.

        If `transform` is a function, it should accept the list of collected outputs and return a modified version of it.
        If `transform` is another stream, it is used to process the list of collected outputs.

        Example using a function:

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!"))
        ...     count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs)))
        ...     return await collect_final_output(count_stream("Hi"))
        ...
        >>> asyncio.run(example())
        [2]


        Example using another stream:

        >>> from langstream import Stream, as_async_generator, join_final_output
        >>> from typing import Iterable
        >>> import asyncio
        ...
        >>> async def example():
        ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
        ...     acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym
        ...     composed_stream = words_stream.and_then(acronym_stream)
        ...     return await join_final_output(composed_stream("as soon as possible"))
        ...
        >>> asyncio.run(example())
        'ASAP'
        """

        next_name = f"{self.name}@and_then"
        if hasattr(next, "name"):
            next_name = next.name

        async def and_then(
            input: T,
        ) -> AsyncGenerator[StreamOutput[V], Any]:
            # First, reyield previous stream so we never block the stream, and collect the results until they are done
            iter_u: Iterable[U] = []
            async for values, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[V], to_reyield)
                iter_u = values

            # Then, call in the next stream
            iter_v = self._wrap(next(iter_u), name=next_name)
            async for v in iter_v:
                yield v

        return Stream[T, V](next_name, and_then)

    def pipe(
        self,
        fn: Callable[
            [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any]
        ],
    ) -> "Stream[T, V]":
        """
        Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator.
        Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you
        need to declare an async function and your function needs to use `yield` for generating values, the advantage
        of that is that you have fine control on whether it will be blocking the stream or not.

        In fact, with pipe you can reconstruct `map` and `and_then`, for example:

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> from typing import List, AsyncGenerator
        >>> import asyncio
        ...
        >>> async def example(items):
        ...     async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]:
        ...        waiting_for_mushroom = False
        ...        async for item in stream:
        ...            if item == "Mario":
        ...                waiting_for_mushroom = True
        ...            elif item == "Mushroom" and waiting_for_mushroom:
        ...                yield "Super Mario!"
        ...            else:
        ...                yield item + "?"
        ...
        ...     piped_stream = Stream[List[str], str](
        ...         "PipedStream", lambda items: as_async_generator(*items)
        ...     ).pipe(mario_pipe)
        ...
        ...     return await collect_final_output(piped_stream(items))
        ...
        >>> asyncio.run(example(["Mario", "Mushroom"]))
        ['Super Mario!']
        >>> asyncio.run(example(["Luigi"]))
        ['Luigi?']
        >>> asyncio.run(example(["Mario", "Luigi", "Mushroom"]))
        ['Luigi?', 'Super Mario!']

        As you can see this pipe blocks kinda like `and_then` when it sees "Mario", until a mushroom arrives, but for other random items
        such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, like `map`.

        You can also call another stream from `pipe` directly, just be sure to re-yield its outputs
        """

        next_name = f"{self.name}@pipe"

        async def filter_final_output(
            async_iterable: AsyncGenerator[StreamOutput[U], Any]
        ) -> AsyncGenerator[U, Any]:
            async for output in async_iterable:
                if output.final:
                    yield cast(U, output.data)

        def pipe(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
            previous, final = asyncstdlib.tee(self(input), n=2, lock=asyncio.Lock())

            previous = self._wrap(previous, name=next_name, final=False)
            previous = cast(AsyncGenerator[StreamOutput[V], Any], previous)

            final = filter_final_output(
                cast(AsyncGenerator[StreamOutput[U], Any], final)
            )
            final = cast(
                AsyncGenerator[StreamOutput[V], Any],
                self._wrap(fn(final), name=next_name),
            )

            return merge(previous, final)

        return Stream[T, V](next_name, pipe)

    def collect(self: "Stream[T, U]") -> "SingleOutputStream[T, List[U]]":
        """
        Collects all the outputs produced by the stream and returns them as a list.

        This method is blocking useful when the next stream or processing step needs to have access to the
        entire output all at once, rather than processing elements as they arrive.

        Example:

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     word_stream: Stream[str, List[str]] = Stream[str, str](
        ...         "WordStream", lambda word: as_async_generator(word, "!")
        ...     ).collect()
        ...     return await collect_final_output(word_stream("Hi"))
        ...
        >>> asyncio.run(example())
        [['Hi', '!']]
        """

        next_name = f"{self.name}@collect"

        async def _collect(
            input: T,
        ) -> AsyncGenerator[StreamOutput[List[U]], Any]:
            # First, reyield previous stream so we never block the stream, and collect the results until they are done
            iter_u: Iterable[U] = []
            async for values, to_reyield in self._reyield(self(input)):
                yield cast(StreamOutput[List[U]], to_reyield)
                iter_u = values

            # Then, yield the collected results
            yield self._output_wrap(iter_u, name=next_name)

        return SingleOutputStream[T, List[U]](next_name, _collect)

    def join(self: "Stream[T, str]", separator="") -> "SingleOutputStream[T, str]":
        """
        Joins the output of a string-producing stream into a single string.

        The `join` method concatenates each item in the output of the stream, using the
        provided separator between each element. This is particularly useful when working
        with text, and you want to merge all the generated tokens.

        Note that this method blocks until all outputs of the stream are available, as it
        needs to wait for the complete output to perform the join operation.

        Params
        ----------
        separator : str
            A string that will be used as a separator between the elements. Default is an empty string.

        Example:

        >>> from langstream import Stream, as_async_generator, join_final_output
        >>> import asyncio
        ...
        >>> async def example():
        ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
        ...     capitalized_stream = words_stream.map(lambda word: word.capitalize())
        ...     joined_stream = capitalized_stream.join(" ")
        ...     return await join_final_output(joined_stream("this is an example"))
        ...
        >>> asyncio.run(example())
        'This Is An Example'
        """

        next_name = f"{self.name}@join"

        async def _join(
            input: T,
        ) -> AsyncGenerator[StreamOutput[str], Any]:
            # First, reyield previous stream so we never block the stream, and collect the results until they are done
            iter_u: Iterable[str] = []
            async for values, to_reyield in self._reyield(self(input)):
                yield to_reyield
                iter_u = values

            # Then, return the joined result
            output: str = separator.join(iter_u)
            yield self._output_wrap(output, name=next_name)

        return SingleOutputStream[T, str](next_name, _join)

    def gather(
        self: "Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]",
    ) -> "SingleOutputStream[T, List[List[V]]]":
        """
        Gathers results from multiple streams and processes them in parallel.

        The `gather` method is used to process several streams concurrently, and it waits until all of
        them are complete before continuing. This is similar to `asyncio.gather`, and is useful when you
        want to run multiple asynchronous tasks in parallel and wait for all of them to complete.

        Note that the order of results corresponds to the order of streams passed to the `gather` method.

        >>> from langstream import Stream, as_async_generator, collect_final_output
        >>> import asyncio
        ...
        >>> async def delayed_output(x):
        ...     await asyncio.sleep(0.1)
        ...     yield f"Number: {x}"
        ...
        >>> async def example():
        ...     number_stream = Stream[int, int](
        ...         "NumberStream", lambda x: as_async_generator(*range(x))
        ...     )
        ...     gathered_stream : Stream[int, str] = (
        ...         number_stream.map(delayed_output)
        ...         .gather()
        ...         .and_then(lambda results: as_async_generator(*(r[0] for r in results)))
        ...     )
        ...     return await collect_final_output(gathered_stream(3))
        ...
        >>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel
        ['Number: 0', 'Number: 1', 'Number: 2']
        """
        return self.collect().gather()

    def on_error(
        self,
        handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]],
    ) -> "Stream[T, Union[U, V]]":
        """
        Handles any uncaught exceptions that might occur during the execution of the current stream.

        The `handler` function takes an exception as its argument and returns a new value that
        will be used as the output of the stream instead of the exception. The function can also re-raise
        the exception or raise a new one, which will then be propagated further up the stream.

        If an exception occurs in the `handler` function itself, it will be propagated without any
        further handling.

        Example:

        >>> from langstream import Stream, join_final_output
        >>> import asyncio
        ...
        >>> def failed_greeting(name: str):
        ...     raise Exception(f"Giving {name} a cold shoulder")
        ...
        >>> async def example():
        ...     greet_stream = Stream[str, str](
        ...         "GreetingStream",
        ...         failed_greeting
        ...     ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}")
        ...
        ...     async for output in greet_stream("Alice"):
        ...         print(output)
        ...
        >>> asyncio.run(example())
        StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False)
        StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True)
        """

        next_name = f"{self.name}@on_error"
        if hasattr(next, "name"):
            next_name = next.name

        async def on_error(
            input: T,
        ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]:
            try:
                async for output in self(input):
                    yield cast(StreamOutput[Union[U, V]], output)
            except Exception as e:
                yield cast(StreamOutput[Union[U, V]], self._output_wrap(e, final=False))
                async for output in self._wrap(handler(e), name=next_name):
                    yield cast(StreamOutput[Union[U, V]], output)

        return Stream[T, Union[U, V]](next_name, lambda input: on_error(input))

Ancestors

  • typing.Generic

Subclasses

Methods

def and_then(self, next: Callable[[Iterable[~U]], Union[AsyncGenerator[StreamOutput[~V], Any], AsyncGenerator[~V, Any], ~V]]) ‑> Stream[~T, ~V]

Processes the output of the current stream through a transformation function or another stream.

Unlike the map method, which applies transformations to outputs as they arrive, the and_then method first collects all the outputs and then passes them to the transformation function or the next stream. This method is blocking and will wait for the entire stream to be processed before applying the transformation.

If transform is a function, it should accept the list of collected outputs and return a modified version of it. If transform is another stream, it is used to process the list of collected outputs.

Example using a function:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
...     word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!"))
...     count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs)))
...     return await collect_final_output(count_stream("Hi"))
...
>>> asyncio.run(example())
[2]

Example using another stream:

>>> from langstream import Stream, as_async_generator, join_final_output
>>> from typing import Iterable
>>> import asyncio
...
>>> async def example():
...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
...     acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym
...     composed_stream = words_stream.and_then(acronym_stream)
...     return await join_final_output(composed_stream("as soon as possible"))
...
>>> asyncio.run(example())
'ASAP'
Expand source code
def and_then(
    self,
    next: Callable[
        [Iterable[U]],
        Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V],
    ],
) -> "Stream[T, V]":
    """
    Processes the output of the current stream through a transformation function or another stream.

    Unlike the map method, which applies transformations to outputs as they arrive,
    the and_then method first collects all the outputs and then passes them to the transformation function or the next stream.
    This method is blocking and will wait for the entire stream to be processed before applying the transformation.

    If `transform` is a function, it should accept the list of collected outputs and return a modified version of it.
    If `transform` is another stream, it is used to process the list of collected outputs.

    Example using a function:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!"))
    ...     count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs)))
    ...     return await collect_final_output(count_stream("Hi"))
    ...
    >>> asyncio.run(example())
    [2]


    Example using another stream:

    >>> from langstream import Stream, as_async_generator, join_final_output
    >>> from typing import Iterable
    >>> import asyncio
    ...
    >>> async def example():
    ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
    ...     acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym
    ...     composed_stream = words_stream.and_then(acronym_stream)
    ...     return await join_final_output(composed_stream("as soon as possible"))
    ...
    >>> asyncio.run(example())
    'ASAP'
    """

    next_name = f"{self.name}@and_then"
    if hasattr(next, "name"):
        next_name = next.name

    async def and_then(
        input: T,
    ) -> AsyncGenerator[StreamOutput[V], Any]:
        # First, reyield previous stream so we never block the stream, and collect the results until they are done
        iter_u: Iterable[U] = []
        async for values, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[V], to_reyield)
            iter_u = values

        # Then, call in the next stream
        iter_v = self._wrap(next(iter_u), name=next_name)
        async for v in iter_v:
            yield v

    return Stream[T, V](next_name, and_then)
def collect(self: Stream[T, U]) ‑> SingleOutputStream[~T, typing.List[~U]]

Collects all the outputs produced by the stream and returns them as a list.

This method is blocking useful when the next stream or processing step needs to have access to the entire output all at once, rather than processing elements as they arrive.

Example:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
...     word_stream: Stream[str, List[str]] = Stream[str, str](
...         "WordStream", lambda word: as_async_generator(word, "!")
...     ).collect()
...     return await collect_final_output(word_stream("Hi"))
...
>>> asyncio.run(example())
[['Hi', '!']]
Expand source code
def collect(self: "Stream[T, U]") -> "SingleOutputStream[T, List[U]]":
    """
    Collects all the outputs produced by the stream and returns them as a list.

    This method is blocking useful when the next stream or processing step needs to have access to the
    entire output all at once, rather than processing elements as they arrive.

    Example:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     word_stream: Stream[str, List[str]] = Stream[str, str](
    ...         "WordStream", lambda word: as_async_generator(word, "!")
    ...     ).collect()
    ...     return await collect_final_output(word_stream("Hi"))
    ...
    >>> asyncio.run(example())
    [['Hi', '!']]
    """

    next_name = f"{self.name}@collect"

    async def _collect(
        input: T,
    ) -> AsyncGenerator[StreamOutput[List[U]], Any]:
        # First, reyield previous stream so we never block the stream, and collect the results until they are done
        iter_u: Iterable[U] = []
        async for values, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[List[U]], to_reyield)
            iter_u = values

        # Then, yield the collected results
        yield self._output_wrap(iter_u, name=next_name)

    return SingleOutputStream[T, List[U]](next_name, _collect)
def filter(self, fn: Callable[[~U], bool]) ‑> Stream[~T, ~U]

Filters the output of the current stream, keeping only the values that return True.

This method is non-blocking and expects a function that returns True for keeping the value, or False for dropping it, as they arrive.

Example:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
...     even_stream = numbers_stream.filter(lambda input: input % 2 == 0)
...     return await collect_final_output(even_stream(9))
...
>>> asyncio.run(example())
[0, 2, 4, 6, 8]
Expand source code
def filter(self, fn: Callable[[U], bool]) -> "Stream[T, U]":
    """
    Filters the output of the current stream, keeping only the values that return True.

    This method is non-blocking and expects a function that returns True for keeping the value,
    or False for dropping it, as they arrive.

    Example:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input)))
    ...     even_stream = numbers_stream.filter(lambda input: input % 2 == 0)
    ...     return await collect_final_output(even_stream(9))
    ...
    >>> asyncio.run(example())
    [0, 2, 4, 6, 8]
    """

    next_name = f"{self.name}@filter"

    async def filter(input: T) -> AsyncGenerator[StreamOutput[U], Any]:
        # Reyield previous stream so we never block the stream, and at the same time yield mapped values
        prev_len_values = 0
        async for values, to_reyield in self._reyield(self(input)):
            yield to_reyield
            if len(values) > prev_len_values:  # as soon as there is a new value
                prev_len_values = len(values)
                if fn(values[-1]):
                    yield self._output_wrap(values[-1], name=next_name)

    return Stream[T, U](next_name, lambda input: filter(input))
def gather(self: Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]) ‑> SingleOutputStream[~T, typing.List[typing.List[~V]]]

Gathers results from multiple streams and processes them in parallel.

The gather() method is used to process several streams concurrently, and it waits until all of them are complete before continuing. This is similar to asyncio.gather, and is useful when you want to run multiple asynchronous tasks in parallel and wait for all of them to complete.

Note that the order of results corresponds to the order of streams passed to the gather() method.

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def delayed_output(x):
...     await asyncio.sleep(0.1)
...     yield f"Number: {x}"
...
>>> async def example():
...     number_stream = Stream[int, int](
...         "NumberStream", lambda x: as_async_generator(*range(x))
...     )
...     gathered_stream : Stream[int, str] = (
...         number_stream.map(delayed_output)
...         .gather()
...         .and_then(lambda results: as_async_generator(*(r[0] for r in results)))
...     )
...     return await collect_final_output(gathered_stream(3))
...
>>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel
['Number: 0', 'Number: 1', 'Number: 2']
Expand source code
def gather(
    self: "Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]",
) -> "SingleOutputStream[T, List[List[V]]]":
    """
    Gathers results from multiple streams and processes them in parallel.

    The `gather` method is used to process several streams concurrently, and it waits until all of
    them are complete before continuing. This is similar to `asyncio.gather`, and is useful when you
    want to run multiple asynchronous tasks in parallel and wait for all of them to complete.

    Note that the order of results corresponds to the order of streams passed to the `gather` method.

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> import asyncio
    ...
    >>> async def delayed_output(x):
    ...     await asyncio.sleep(0.1)
    ...     yield f"Number: {x}"
    ...
    >>> async def example():
    ...     number_stream = Stream[int, int](
    ...         "NumberStream", lambda x: as_async_generator(*range(x))
    ...     )
    ...     gathered_stream : Stream[int, str] = (
    ...         number_stream.map(delayed_output)
    ...         .gather()
    ...         .and_then(lambda results: as_async_generator(*(r[0] for r in results)))
    ...     )
    ...     return await collect_final_output(gathered_stream(3))
    ...
    >>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel
    ['Number: 0', 'Number: 1', 'Number: 2']
    """
    return self.collect().gather()
def join(self: Stream[T, str], separator='') ‑> SingleOutputStream[~T, str]

Joins the output of a string-producing stream into a single string.

The join() method concatenates each item in the output of the stream, using the provided separator between each element. This is particularly useful when working with text, and you want to merge all the generated tokens.

Note that this method blocks until all outputs of the stream are available, as it needs to wait for the complete output to perform the join operation.

Params

separator : str A string that will be used as a separator between the elements. Default is an empty string.

Example:

>>> from langstream import Stream, as_async_generator, join_final_output
>>> import asyncio
...
>>> async def example():
...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
...     capitalized_stream = words_stream.map(lambda word: word.capitalize())
...     joined_stream = capitalized_stream.join(" ")
...     return await join_final_output(joined_stream("this is an example"))
...
>>> asyncio.run(example())
'This Is An Example'
Expand source code
def join(self: "Stream[T, str]", separator="") -> "SingleOutputStream[T, str]":
    """
    Joins the output of a string-producing stream into a single string.

    The `join` method concatenates each item in the output of the stream, using the
    provided separator between each element. This is particularly useful when working
    with text, and you want to merge all the generated tokens.

    Note that this method blocks until all outputs of the stream are available, as it
    needs to wait for the complete output to perform the join operation.

    Params
    ----------
    separator : str
        A string that will be used as a separator between the elements. Default is an empty string.

    Example:

    >>> from langstream import Stream, as_async_generator, join_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
    ...     capitalized_stream = words_stream.map(lambda word: word.capitalize())
    ...     joined_stream = capitalized_stream.join(" ")
    ...     return await join_final_output(joined_stream("this is an example"))
    ...
    >>> asyncio.run(example())
    'This Is An Example'
    """

    next_name = f"{self.name}@join"

    async def _join(
        input: T,
    ) -> AsyncGenerator[StreamOutput[str], Any]:
        # First, reyield previous stream so we never block the stream, and collect the results until they are done
        iter_u: Iterable[str] = []
        async for values, to_reyield in self._reyield(self(input)):
            yield to_reyield
            iter_u = values

        # Then, return the joined result
        output: str = separator.join(iter_u)
        yield self._output_wrap(output, name=next_name)

    return SingleOutputStream[T, str](next_name, _join)
def map(self, fn: Callable[[~U], ~V]) ‑> Stream[~T, ~V]

Maps the output of the current stream through a function as they arrive.

The transform function will receive the current output of the stream and should return a modified version of it. This method is non-blocking and will continue processing the stream in parallel.

Example:

>>> from langstream import Stream, join_final_output
>>> import asyncio
...
>>> async def example():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
...     return await join_final_output(polite_stream("Alice"))
...
>>> asyncio.run(example())
'Hello, Alice! How are you?'

Example of processing one token at a time:

>>> from langstream import Stream, as_async_generator, join_final_output
>>> import asyncio
...
>>> async def example():
...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
...     accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter
...     return await join_final_output(accronym_stream("as soon as possible"))
...
>>> asyncio.run(example())
'ASAP'
Expand source code
def map(self, fn: Callable[[U], V]) -> "Stream[T, V]":
    """
    Maps the output of the current stream through a function as they arrive.

    The transform function will receive the current output of the stream and
    should return a modified version of it. This method is non-blocking and
    will continue processing the stream in parallel.

    Example:

    >>> from langstream import Stream, join_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
    ...     return await join_final_output(polite_stream("Alice"))
    ...
    >>> asyncio.run(example())
    'Hello, Alice! How are you?'


    Example of processing one token at a time:

    >>> from langstream import Stream, as_async_generator, join_final_output
    >>> import asyncio
    ...
    >>> async def example():
    ...     words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time
    ...     accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter
    ...     return await join_final_output(accronym_stream("as soon as possible"))
    ...
    >>> asyncio.run(example())
    'ASAP'
    """

    next_name = f"{self.name}@map"

    async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
        # Reyield previous stream so we never block the stream, and at the same time yield mapped values
        prev_len_values = 0
        async for values, to_reyield in self._reyield(self(input)):
            yield cast(StreamOutput[V], to_reyield)
            if len(values) > prev_len_values:  # as soon as there is a new value
                prev_len_values = len(values)
                yield self._output_wrap(fn(values[-1]), name=next_name)

    return Stream[T, V](next_name, lambda input: map(input))
def on_error(self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[~V], Any], ~V]]) ‑> Stream[~T, typing.Union[~U, ~V]]

Handles any uncaught exceptions that might occur during the execution of the current stream.

The handler function takes an exception as its argument and returns a new value that will be used as the output of the stream instead of the exception. The function can also re-raise the exception or raise a new one, which will then be propagated further up the stream.

If an exception occurs in the handler function itself, it will be propagated without any further handling.

Example:

>>> from langstream import Stream, join_final_output
>>> import asyncio
...
>>> def failed_greeting(name: str):
...     raise Exception(f"Giving {name} a cold shoulder")
...
>>> async def example():
...     greet_stream = Stream[str, str](
...         "GreetingStream",
...         failed_greeting
...     ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}")
...
...     async for output in greet_stream("Alice"):
...         print(output)
...
>>> asyncio.run(example())
StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False)
StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True)
Expand source code
def on_error(
    self,
    handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]],
) -> "Stream[T, Union[U, V]]":
    """
    Handles any uncaught exceptions that might occur during the execution of the current stream.

    The `handler` function takes an exception as its argument and returns a new value that
    will be used as the output of the stream instead of the exception. The function can also re-raise
    the exception or raise a new one, which will then be propagated further up the stream.

    If an exception occurs in the `handler` function itself, it will be propagated without any
    further handling.

    Example:

    >>> from langstream import Stream, join_final_output
    >>> import asyncio
    ...
    >>> def failed_greeting(name: str):
    ...     raise Exception(f"Giving {name} a cold shoulder")
    ...
    >>> async def example():
    ...     greet_stream = Stream[str, str](
    ...         "GreetingStream",
    ...         failed_greeting
    ...     ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}")
    ...
    ...     async for output in greet_stream("Alice"):
    ...         print(output)
    ...
    >>> asyncio.run(example())
    StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False)
    StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True)
    """

    next_name = f"{self.name}@on_error"
    if hasattr(next, "name"):
        next_name = next.name

    async def on_error(
        input: T,
    ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]:
        try:
            async for output in self(input):
                yield cast(StreamOutput[Union[U, V]], output)
        except Exception as e:
            yield cast(StreamOutput[Union[U, V]], self._output_wrap(e, final=False))
            async for output in self._wrap(handler(e), name=next_name):
                yield cast(StreamOutput[Union[U, V]], output)

    return Stream[T, Union[U, V]](next_name, lambda input: on_error(input))
def pipe(self, fn: Callable[[AsyncGenerator[~U, Any]], AsyncGenerator[Union[StreamOutput[~V], ~V], Any]]) ‑> Stream[~T, ~V]

Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator. Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you need to declare an async function and your function needs to use yield for generating values, the advantage of that is that you have fine control on whether it will be blocking the stream or not.

In fact, with pipe you can reconstruct map and and_then, for example:

>>> from langstream import Stream, as_async_generator, collect_final_output
>>> from typing import List, AsyncGenerator
>>> import asyncio
...
>>> async def example(items):
...     async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]:
...        waiting_for_mushroom = False
...        async for item in stream:
...            if item == "Mario":
...                waiting_for_mushroom = True
...            elif item == "Mushroom" and waiting_for_mushroom:
...                yield "Super Mario!"
...            else:
...                yield item + "?"
...
...     piped_stream = Stream[List[str], str](
...         "PipedStream", lambda items: as_async_generator(*items)
...     ).pipe(mario_pipe)
...
...     return await collect_final_output(piped_stream(items))
...
>>> asyncio.run(example(["Mario", "Mushroom"]))
['Super Mario!']
>>> asyncio.run(example(["Luigi"]))
['Luigi?']
>>> asyncio.run(example(["Mario", "Luigi", "Mushroom"]))
['Luigi?', 'Super Mario!']

As you can see this pipe blocks kinda like and_then when it sees "Mario", until a mushroom arrives, but for other random items such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, like map.

You can also call another stream from pipe directly, just be sure to re-yield its outputs

Expand source code
def pipe(
    self,
    fn: Callable[
        [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any]
    ],
) -> "Stream[T, V]":
    """
    Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator.
    Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you
    need to declare an async function and your function needs to use `yield` for generating values, the advantage
    of that is that you have fine control on whether it will be blocking the stream or not.

    In fact, with pipe you can reconstruct `map` and `and_then`, for example:

    >>> from langstream import Stream, as_async_generator, collect_final_output
    >>> from typing import List, AsyncGenerator
    >>> import asyncio
    ...
    >>> async def example(items):
    ...     async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]:
    ...        waiting_for_mushroom = False
    ...        async for item in stream:
    ...            if item == "Mario":
    ...                waiting_for_mushroom = True
    ...            elif item == "Mushroom" and waiting_for_mushroom:
    ...                yield "Super Mario!"
    ...            else:
    ...                yield item + "?"
    ...
    ...     piped_stream = Stream[List[str], str](
    ...         "PipedStream", lambda items: as_async_generator(*items)
    ...     ).pipe(mario_pipe)
    ...
    ...     return await collect_final_output(piped_stream(items))
    ...
    >>> asyncio.run(example(["Mario", "Mushroom"]))
    ['Super Mario!']
    >>> asyncio.run(example(["Luigi"]))
    ['Luigi?']
    >>> asyncio.run(example(["Mario", "Luigi", "Mushroom"]))
    ['Luigi?', 'Super Mario!']

    As you can see this pipe blocks kinda like `and_then` when it sees "Mario", until a mushroom arrives, but for other random items
    such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, like `map`.

    You can also call another stream from `pipe` directly, just be sure to re-yield its outputs
    """

    next_name = f"{self.name}@pipe"

    async def filter_final_output(
        async_iterable: AsyncGenerator[StreamOutput[U], Any]
    ) -> AsyncGenerator[U, Any]:
        async for output in async_iterable:
            if output.final:
                yield cast(U, output.data)

    def pipe(input: T) -> AsyncGenerator[StreamOutput[V], Any]:
        previous, final = asyncstdlib.tee(self(input), n=2, lock=asyncio.Lock())

        previous = self._wrap(previous, name=next_name, final=False)
        previous = cast(AsyncGenerator[StreamOutput[V], Any], previous)

        final = filter_final_output(
            cast(AsyncGenerator[StreamOutput[U], Any], final)
        )
        final = cast(
            AsyncGenerator[StreamOutput[V], Any],
            self._wrap(fn(final), name=next_name),
        )

        return merge(previous, final)

    return Stream[T, V](next_name, pipe)
class StreamOutput (stream: str, data: Union[~T, Any], final: bool)

StreamOutput is a data class that represents the output of a Stream at each step.

Attributes

stream : str
The name of the stream that produced this output. This helps in identifying which part of the processing pipeline the output is coming from.
output : Union[T, Any]
The actual output data produced by the stream. This will be type T for final stream output, but can be also be of any type produced by any step of the whole stream.
final : bool
A boolean flag indicating whether this output is the final output of the stream. Only the outputs at the end of the stream are marked as "final".

Example

>>> from langstream import Stream
>>> import asyncio
...
>>> async def example():
...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
...
...     async for output in polite_stream("Alice"):
...         # Output is of type StreamOutput
...         print(output)
...
>>> asyncio.run(example())
StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False)
StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True)
Expand source code
@dataclass
class StreamOutput(Generic[T]):
    """
    StreamOutput is a data class that represents the output of a Stream at each step.

    Attributes
    ----------
    stream : str
        The name of the stream that produced this output. This helps in identifying
        which part of the processing pipeline the output is coming from.

    output : Union[T, Any]
        The actual output data produced by the stream. This will be type T for final stream output,
        but can be also be of any type produced by any step of the whole stream.

    final : bool
        A boolean flag indicating whether this output is the final output of the stream.
        Only the outputs at the end of the stream are marked as "final".

    Example
    -------

    >>> from langstream import Stream
    >>> import asyncio
    ...
    >>> async def example():
    ...     greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!")
    ...     polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?")
    ...
    ...     async for output in polite_stream("Alice"):
    ...         # Output is of type StreamOutput
    ...         print(output)
    ...
    >>> asyncio.run(example())
    StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False)
    StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True)
    """

    stream: str
    data: Union[T, Any]
    final: bool

Ancestors

  • typing.Generic

Class variables

var data : Union[~T, Any]
var final : bool
var stream : str