Package langstream
🪽🔗 LangStream
This is the top level module for LangStream, all the core classes and functions are made available to be imported from here:
>>> from langstream import Stream, as_async_generator, filter_final_output # etc
Stream
The Stream
is the core concept of LangStream for working with LLMs (Large Language Models), it
provides a way to create composable calls to LLMs and any other processing elements.
Since LLMs produce one token at a time, streams are essentially Python AsyncGenerators under the hood, with type-safe composable functions on top. By making use of the type hints and the composable functions, LangStream makes it very easy to build and debug a complex stream of execution for working with LLMs.
Since async generation streams is the fundamental block of LangStream, it's processing should never be blocking.
When executing a Stream with input, you get back an AsyncGenerator that produces the outputs of all
pieces in the whole Stream, not only the final one. This means you have access to the each step of
what is happening, making it easier to display and debug. The final output at the edge of the stream
is marked as final
when you process the Stream stream, with helpers available to filter them.
On this module, we document the low-level concepts of Streams, so we use simple examples of hardcoded string generation. But one you learn the composition fundamentals here, you can expect to apply the same functions for composing everythere in LangStream.
Core Concepts
Stream
:
A Stream takes in a name and a function, which takes an input and produces an asynchronous stream of outputs from it (AsyncGenerator).
Streams can be streamed together with the composition methods below, and their input and output types can be
specifying in the type signature.
SingleOutputStream
:
A SingleOutputStream is a subtype of Stream that produces a single asynchronous final output after processing,
rather than an asynchronous stream of outputs.
Composition Methods
Stream.map()
:
Transforms the output of the Stream by applying a function to each token as they arrive. This is
non-blocking and maps as stream generations flow in: stream.map(lambda token: token.lower())
Stream.and_then()
:
Applies a function on the list of results of the Stream. Differently from map
, this is blocking,
and collects the outputs before applying the function. It can also take another Stream as argument,
effectively composing two Streams together: first_stream.and_then(second_stream)
.
Stream.collect()
:
Collects the output of a Stream into a list. This is a blocking operation and can be used
when the next processing step requires the full output at once.
Stream.join()
:
Joins the output of a string producing Stream into a single string by concatenating each item.
Stream.gather()
:
Gathers results from a stream that produces multiple async generators and processes them in parallel,
returning a list of lists of the results of all generators, allowing you to execute many Streams at
the same time, this is similar to asyncio.gather
.
Contrib: OpenAI, GPT4All and more
The core of LangStream is kept small and stable, so all the integrations that build on top of it live separate,
under the langstream.contrib
module. Check it out for reference and code examples of the integrations.
Examples
Using Stream to process text data:
>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
... # Stream that splits a sentence into words
... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
... # Stream that capitalizes each word
... capitalized_stream = words_stream.map(lambda word: word.capitalize())
... # Join the capitalized words into a single string
... stream = capitalized_stream.join(" ")
...
... async for output in stream("this is an example"):
... if output.final:
... return output.data
...
>>> asyncio.run(example())
'This Is An Example'
Here you can find the reference and code examples, for further tutorials and use cases, consult the documentation.
Expand source code
"""
🪽🔗 LangStream
This is the top level module for [LangStream](https://github.com/rogeriochaves/langstream),
all the core classes and functions are made available to be imported from here:
>>> from langstream import Stream, as_async_generator, filter_final_output # etc
Stream
=====
The `Stream` is the core concept of LangStream for working with LLMs (Large Language Models), it
provides a way to create composable calls to LLMs and any other processing elements.
Since LLMs produce one token at a time, streams are essentially Python AsyncGenerators under the
hood, with type-safe composable functions on top. By making use of the type hints and the
composable functions, LangStream makes it very easy to build and debug a complex stream of execution
for working with LLMs.
Since async generation streams is the fundamental block of LangStream, it's processing should never be blocking.
When executing a Stream with input, you get back an AsyncGenerator that produces the outputs of all
pieces in the whole Stream, not only the final one. This means you have access to the each step of
what is happening, making it easier to display and debug. The final output at the edge of the stream
is marked as `final` when you process the Stream stream, with helpers available to filter them.
On this module, we document the low-level concepts of Streams, so we use simple examples of hardcoded
string generation. But one you learn the composition fundamentals here, you can expect to apply the
same functions for composing everythere in LangStream.
Core Concepts
-------------
`Stream`:
A Stream takes in a name and a function, which takes an input and produces an asynchronous stream of outputs from it (AsyncGenerator).
Streams can be streamed together with the composition methods below, and their input and output types can be
specifying in the type signature.
`SingleOutputStream`:
A SingleOutputStream is a subtype of Stream that produces a single asynchronous final output after processing,
rather than an asynchronous stream of outputs.
Composition Methods
------------------
`Stream.map`:
Transforms the output of the Stream by applying a function to each token as they arrive. This is
non-blocking and maps as stream generations flow in: `stream.map(lambda token: token.lower())`
`Stream.and_then`:
Applies a function on the list of results of the Stream. Differently from `map`, this is blocking,
and collects the outputs before applying the function. It can also take another Stream as argument,
effectively composing two Streams together: `first_stream.and_then(second_stream)`.
`Stream.collect`:
Collects the output of a Stream into a list. This is a blocking operation and can be used
when the next processing step requires the full output at once.
`Stream.join`:
Joins the output of a string producing Stream into a single string by concatenating each item.
`Stream.gather`:
Gathers results from a stream that produces multiple async generators and processes them in parallel,
returning a list of lists of the results of all generators, allowing you to execute many Streams at
the same time, this is similar to `asyncio.gather`.
Contrib: OpenAI, GPT4All and more
---------------------------------
The core of LangStream is kept small and stable, so all the integrations that build on top of it live separate,
under the `langstream.contrib` module. Check it out for reference and code examples of the integrations.
Examples
--------
Using Stream to process text data:
>>> from langstream import Stream, as_async_generator, collect_final_output
>>> import asyncio
...
>>> async def example():
... # Stream that splits a sentence into words
... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" ")))
... # Stream that capitalizes each word
... capitalized_stream = words_stream.map(lambda word: word.capitalize())
... # Join the capitalized words into a single string
... stream = capitalized_stream.join(" ")
...
... async for output in stream("this is an example"):
... if output.final:
... return output.data
...
>>> asyncio.run(example())
'This Is An Example'
---
Here you can find the reference and code examples, for further tutorials and use cases, consult the [documentation](https://github.com/rogeriochaves/langstream).
"""
from langstream.core.stream import Stream, StreamOutput, SingleOutputStream
from langstream.utils.stream import (
debug,
filter_final_output,
collect_final_output,
join_final_output,
)
from langstream.utils.async_generator import (
as_async_generator,
collect,
join,
gather,
next_item,
)
__all__ = (
"Stream",
"StreamOutput",
"SingleOutputStream",
"debug",
"filter_final_output",
"collect_final_output",
"join_final_output",
"as_async_generator",
"collect",
"join",
"gather",
"next_item",
)
Sub-modules
langstream.contrib
-
The Contrib module is where all the other streams and integrations that build on top of core Stream module live. Here you can import the LLMs you want …
langstream.core
langstream.utils
Functions
async def as_async_generator(*values: ~T) ‑> AsyncGenerator[~T, Any]
-
Creates an asynchronous generator out of simple values, it's useful for converting a single value or a list of values to a streamed output of a Stream
Example
>>> import asyncio >>> async def run_example(): ... async for value in as_async_generator(1, 2, 3): ... print(value) ... >>> asyncio.run(run_example()) 1 2 3
Expand source code
async def as_async_generator(*values: T) -> AsyncGenerator[T, Any]: """ Creates an asynchronous generator out of simple values, it's useful for converting a single value or a list of values to a streamed output of a Stream Example ------- >>> import asyncio >>> async def run_example(): ... async for value in as_async_generator(1, 2, 3): ... print(value) ... >>> asyncio.run(run_example()) 1 2 3 """ for item in values: yield item
async def collect(async_generator: AsyncGenerator[~T, Any]) ‑> List[~T]
-
Collect items from an async generator into a list.
>>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "can" ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(collect(async_gen())) ['hello', 'how', 'can', 'I', 'assist', 'you', 'today']
Expand source code
async def collect(async_generator: AsyncGenerator[T, Any]) -> List[T]: """ Collect items from an async generator into a list. >>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "can" ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(collect(async_gen())) ['hello', 'how', 'can', 'I', 'assist', 'you', 'today'] """ return [item async for item in async_generator]
async def collect_final_output(async_iterable: AsyncGenerator[StreamOutput[~T], Any]) ‑> Iterable[~T]
-
Blocks the stream until it is done, then joins the final output values into a single list.
Example
>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... async for output in greet_stream("Alice"): ... print(output) ... >>> async def collected_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... output = await collect_final_output(greet_stream("Alice")) ... print(output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, ', final=True) StreamOutput(stream='GreetingStream', data='Alice', final=True) StreamOutput(stream='GreetingStream', data='!', final=True) >>> asyncio.run(collected_outputs()) ['Hello, ', 'Alice', '!']
Expand source code
async def collect_final_output( async_iterable: AsyncGenerator[StreamOutput[T], Any] ) -> Iterable[T]: """ Blocks the stream until it is done, then joins the final output values into a single list. Example ------- >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... async for output in greet_stream("Alice"): ... print(output) ... >>> async def collected_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... output = await collect_final_output(greet_stream("Alice")) ... print(output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, ', final=True) StreamOutput(stream='GreetingStream', data='Alice', final=True) StreamOutput(stream='GreetingStream', data='!', final=True) >>> asyncio.run(collected_outputs()) ['Hello, ', 'Alice', '!'] """ return await collect(filter_final_output(async_iterable))
def debug(stream: Callable[[~T], AsyncGenerator[StreamOutput[~U], Any]]) ‑> Stream[~T, ~U]
-
A helper for helping you debugging streams. Simply wrap any piece of the stream or the whole stream together with the
debug()
function to print out everything that goes through it and its nested streams.For example, you can wrap the whole stream to debug everything:
>>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def debug_whole_stream(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?") ... stream = debug( ... greet_stream.join().and_then(polite_stream) ... ) ... await join_final_output(stream("Alice")) ... >>> asyncio.run(debug_whole_stream()) <BLANKLINE> <BLANKLINE> [32m> GreetingStream[39m <BLANKLINE> Hello, Alice! <BLANKLINE> [32m> PoliteStream[39m <BLANKLINE> Hello, Alice! How are you?
Or, alternatively, you can debug just a piece of it:
>>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def debug_whole_stream(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?") ... stream = debug(greet_stream).join().and_then(polite_stream) ... await join_final_output(stream("Alice")) ... >>> asyncio.run(debug_whole_stream()) <BLANKLINE> <BLANKLINE> [32m> GreetingStream[39m <BLANKLINE> Hello, Alice!
Expand source code
def debug( stream: Callable[[T], AsyncGenerator[StreamOutput[U], Any]] ) -> Stream[T, U]: """ A helper for helping you debugging streams. Simply wrap any piece of the stream or the whole stream together with the `debug` function to print out everything that goes through it and its nested streams. For example, you can wrap the whole stream to debug everything: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def debug_whole_stream(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?") ... stream = debug( ... greet_stream.join().and_then(polite_stream) ... ) ... await join_final_output(stream("Alice")) ... >>> asyncio.run(debug_whole_stream()) <BLANKLINE> <BLANKLINE> \x1b[32m> GreetingStream\x1b[39m <BLANKLINE> Hello, Alice! <BLANKLINE> \x1b[32m> PoliteStream\x1b[39m <BLANKLINE> Hello, Alice! How are you? Or, alternatively, you can debug just a piece of it: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def debug_whole_stream(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = Stream[str, str]("PoliteStream", lambda greeting: f"{greeting} How are you?") ... stream = debug(greet_stream).join().and_then(polite_stream) ... await join_final_output(stream("Alice")) ... >>> asyncio.run(debug_whole_stream()) <BLANKLINE> <BLANKLINE> \x1b[32m> GreetingStream\x1b[39m <BLANKLINE> Hello, Alice! """ async def debug(input: T) -> AsyncGenerator[StreamOutput[U], Any]: last_stream = "" last_output = "" async for output in stream(input): if output.stream != last_stream and last_output == output.data: yield output continue if output.stream != last_stream: print("\n", end="", flush=True) last_stream = output.stream print(f"\n{Fore.GREEN}> {output.stream}{Fore.RESET}\n") if hasattr(output.data, "__stream_debug__"): output.data.__stream_debug__() # type: ignore elif isinstance(output.data, Exception): print(f"{Fore.RED}Exception:{Fore.RESET} {output.data}", end="") else: print( output.data, end=("" if isinstance(output.data, str) else ", "), flush=True, ) last_output = output.data yield output next_name = f"@debug" if hasattr(next, "name"): next_name = f"{next.name}@debug" return Stream[T, U](next_name, debug)
async def filter_final_output(async_iterable: AsyncGenerator[StreamOutput[~T], Any]) ‑> AsyncGenerator[~T, Any]
-
Filters only the final output values of a Stream's outputs.
Example
>>> from langstream import Stream, filter_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... async for output in polite_stream("Alice"): ... print(output) ... >>> async def only_final_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... async for final_output in filter_final_output(polite_stream("Alice")): ... print(final_output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False) StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True) >>> asyncio.run(only_final_outputs()) Hello, Alice! How are you?
Expand source code
async def filter_final_output( async_iterable: AsyncGenerator[StreamOutput[T], Any] ) -> AsyncGenerator[T, Any]: """ Filters only the final output values of a Stream's outputs. Example ------- >>> from langstream import Stream, filter_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... async for output in polite_stream("Alice"): ... print(output) ... >>> async def only_final_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... async for final_output in filter_final_output(polite_stream("Alice")): ... print(final_output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False) StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True) >>> asyncio.run(only_final_outputs()) Hello, Alice! How are you? """ async for output in async_iterable: if output.final: yield cast(T, output.data)
async def gather(async_generators: List[AsyncGenerator[~T, Any]]) ‑> List[List[~T]]
-
Gather items from a list of async generators into a list of lists.
>>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(gather([async_gen1(), async_gen2()])) [['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']]
Expand source code
async def gather(async_generators: List[AsyncGenerator[T, Any]]) -> List[List[T]]: """ Gather items from a list of async generators into a list of lists. >>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(gather([async_gen1(), async_gen2()])) [['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']] """ return await asyncio.gather(*(collect(generator) for generator in async_generators))
async def join(async_generator: AsyncGenerator[str, Any], separator='') ‑> str
-
Collect items from an async generator and join them in a string.
>>> import asyncio >>> async def async_gen(): ... yield "hello " ... yield "how " ... yield "can " ... yield "I " ... yield "assist " ... yield "you " ... yield "today" >>> asyncio.run(join(async_gen())) 'hello how can I assist you today'
Expand source code
async def join(async_generator: AsyncGenerator[str, Any], separator="") -> str: """ Collect items from an async generator and join them in a string. >>> import asyncio >>> async def async_gen(): ... yield "hello " ... yield "how " ... yield "can " ... yield "I " ... yield "assist " ... yield "you " ... yield "today" >>> asyncio.run(join(async_gen())) 'hello how can I assist you today' """ lst = await collect(async_generator) return separator.join(lst)
async def join_final_output(async_iterable: AsyncGenerator[StreamOutput[str], Any]) ‑> str
-
Blocks a string producing stream until it is done, then joins the final output values into a single string.
Example
>>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... async for output in greet_stream("Alice"): ... print(output) ... >>> async def joined_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... output = await join_final_output(greet_stream("Alice")) ... print(output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, ', final=True) StreamOutput(stream='GreetingStream', data='Alice', final=True) StreamOutput(stream='GreetingStream', data='!', final=True) >>> asyncio.run(joined_outputs()) Hello, Alice!
Expand source code
async def join_final_output( async_iterable: AsyncGenerator[StreamOutput[str], Any] ) -> str: """ Blocks a string producing stream until it is done, then joins the final output values into a single string. Example ------- >>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def all_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... async for output in greet_stream("Alice"): ... print(output) ... >>> async def joined_outputs(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: as_async_generator("Hello, ", name, "!")) ... output = await join_final_output(greet_stream("Alice")) ... print(output) ... >>> asyncio.run(all_outputs()) StreamOutput(stream='GreetingStream', data='Hello, ', final=True) StreamOutput(stream='GreetingStream', data='Alice', final=True) StreamOutput(stream='GreetingStream', data='!', final=True) >>> asyncio.run(joined_outputs()) Hello, Alice! """ return await join(filter_final_output(async_iterable))
async def next_item(async_generator: AsyncGenerator[~T, Any]) ‑> ~T
-
Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken
>>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "are" >>> asyncio.run(next_item(async_gen())) 'hello'
Expand source code
async def next_item(async_generator: AsyncGenerator[T, Any]) -> T: """ Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken >>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "are" >>> asyncio.run(next_item(async_gen())) 'hello' """ return await async_generator.__aiter__().__anext__()
Classes
class SingleOutputStream (name: str, call: Callable[[~T], Union[AsyncGenerator[StreamOutput[~U], Any], AsyncGenerator[~U, Any], ~U]])
-
Expand source code
class SingleOutputStream(Stream[T, U]): """""" _call: Callable[ [T], Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U] ] async def _reyield( self, async_iterable: AsyncGenerator[StreamOutput[U], Any] ) -> AsyncGenerator[Tuple[Optional[U], StreamOutput[U]], Any]: final_value: Optional[U] = None async for u in async_iterable: u_rewrapped = self._output_wrap(u, final=False) if u.final: final_value = u.data yield (final_value, u_rewrapped) def map(self, fn: Callable[[U], V]) -> "SingleOutputStream[T, V]": """ Similar to `Stream.map`, this method applies a function to the final output of the stream, but returns a SingleOutputStream. The `fn` parameter is a function that takes a value of type U and returns a value of type V. For detailed examples, refer to the documentation of `Stream.map`. """ next_name = f"{self.name}@map" async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value yield self._output_wrap(fn(unwrap(final_u)), name=next_name) return SingleOutputStream[T, V](next_name, lambda input: map(input)) def filter(self, fn: Callable[[U], bool]) -> "SingleOutputStream[T, Union[U, None]]": """ Similar to `Stream.filter`, however, singe SingleOutputStream must always produce a value, this method simply replaces the value with a None if the filter function returns False The `fn` parameter is a function that takes a value of type U and returns a bool. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers])) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [None] """ next_name = f"{self.name}@filter" async def filter(input: T) -> AsyncGenerator[StreamOutput[Union[U, None]], Any]: # Reyield previous stream so we never block the stream, and at the same time yield filtered values final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[Union[U, None]], to_reyield) final_u = value yield self._output_wrap( final_u if fn(unwrap(final_u)) else None, name=next_name ) return SingleOutputStream[T, Union[U, None]]( next_name, lambda input: filter(input) ) def and_then( self, next: Callable[ [U], Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V], ], ) -> "Stream[T, V]": """ Similar to `Stream.and_then`, this method takes a function that receives the final output of this stream as its input and returns a new Stream. For detailed examples, refer to the documentation of `Stream.and_then`. """ next_name = f"{self.name}@and_then" if hasattr(next, "name"): next_name = next.name async def and_then( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value # Then, call in the next stream iter_v = self._wrap(next(unwrap(final_u)), name=next_name) async for v in iter_v: yield v return Stream[T, V](next_name, and_then) def pipe( self, fn: Callable[ [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any] ], ) -> "Stream[T, V]": """ Similar to `Stream.pipe`, except that it takes a stream that will only even produce a single value, so it effectively works basically the same as `and_then`, only with a different interface. For detailed examples, refer to the documentation of `Stream.pipe`. """ next_name = f"{self.name}@pipe" if hasattr(next, "name"): next_name = next.name async def pipe( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value # Then, call in the piping function single_item_stream = as_async_generator(unwrap(final_u)) iter_v = self._wrap(fn(single_item_stream), name=next_name) async for v in iter_v: yield cast(StreamOutput[V], v) return Stream[T, V](next_name, pipe) def gather( self: "Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]", ) -> "SingleOutputStream[T, List[List[V]]]": """ Similar to `Stream.gather`, this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list. For detailed examples, refer to the documentation of `Stream.gather`. """ next_name = f"{self.name}@gather" async def gather( input: T, ) -> AsyncGenerator[StreamOutput[List[List[V]]], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[ Union[ List[AsyncGenerator[StreamOutput[V], Any]], List[AsyncGenerator[V, Any]], ] ] = None # TODO: try to work out why the type signature of self(input) is not fitting in there, it should async for value, to_reyield in self._reyield(cast(Any, self(input))): yield cast(StreamOutput[List[List[V]]], to_reyield) final_u = value if final_u is None: final_u = [] async def consume_async_generator( generator: AsyncGenerator[X, Any], ) -> Iterable[X]: return [item async for item in generator] # TODO: should we really wait for everything to arrive before calling asyncio gather? Can we call it during the previous reyield? vss: Union[ List[List[StreamOutput[V]]], List[List[V]] ] = await asyncio.gather(*(consume_async_generator(gen) for gen in final_u)) clean_vss: List[List[V]] = [] for vs in vss: clean_vs: List[V] = [] for v in vs: v_rewrapped = cast( StreamOutput[List[List[V]]], self._output_wrap(v, final=False), ) if isinstance(v, StreamOutput): yield v_rewrapped if v.final: clean_vs.append(v.data) else: clean_vs.append(v) clean_vss.append(clean_vs) yield self._output_wrap(clean_vss, name=next_name) return SingleOutputStream[T, List[List[V]]](next_name, gather) def on_error( self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]], ) -> "SingleOutputStream[T, Union[U, V]]": """ Similar to `Stream.on_error`, this method handles any uncaught exceptions that might occur during the execution of the current stream. For detailed examples, refer to the documentation of `Stream.gather`. """ next_name = f"{self.name}@on_error" if hasattr(next, "name"): next_name = next.name async def on_error( input: T, ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]: try: async for output in self(input): yield cast(StreamOutput[Union[U, V]], output) except Exception as e: async for output in self._wrap(handler(e), name=next_name): yield cast(StreamOutput[Union[U, V]], output) return SingleOutputStream[T, Union[U, V]]( next_name, lambda input: on_error(input) )
Ancestors
- Stream
- typing.Generic
Methods
def and_then(self, next: Callable[[~U], Union[AsyncGenerator[StreamOutput[~V], Any], AsyncGenerator[~V, Any], ~V]]) ‑> Stream[~T, ~V]
-
Similar to
Stream.and_then()
, this method takes a function that receives the final output of this stream as its input and returns a new Stream.For detailed examples, refer to the documentation of
Stream.and_then()
.Expand source code
def and_then( self, next: Callable[ [U], Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V], ], ) -> "Stream[T, V]": """ Similar to `Stream.and_then`, this method takes a function that receives the final output of this stream as its input and returns a new Stream. For detailed examples, refer to the documentation of `Stream.and_then`. """ next_name = f"{self.name}@and_then" if hasattr(next, "name"): next_name = next.name async def and_then( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value # Then, call in the next stream iter_v = self._wrap(next(unwrap(final_u)), name=next_name) async for v in iter_v: yield v return Stream[T, V](next_name, and_then)
def filter(self, fn: Callable[[~U], bool]) ‑> SingleOutputStream[~T, Optional[~U]]
-
Similar to
Stream.filter()
, however, singe SingleOutputStream must always produce a value, this method simply replaces the value with a None if the filter function returns FalseThe
fn
parameter is a function that takes a value of type U and returns a bool.Example:
>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers])) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [None]
Expand source code
def filter(self, fn: Callable[[U], bool]) -> "SingleOutputStream[T, Union[U, None]]": """ Similar to `Stream.filter`, however, singe SingleOutputStream must always produce a value, this method simply replaces the value with a None if the filter function returns False The `fn` parameter is a function that takes a value of type U and returns a bool. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.collect().filter(lambda numbers: all([n % 2 == 0 for n in numbers])) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [None] """ next_name = f"{self.name}@filter" async def filter(input: T) -> AsyncGenerator[StreamOutput[Union[U, None]], Any]: # Reyield previous stream so we never block the stream, and at the same time yield filtered values final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[Union[U, None]], to_reyield) final_u = value yield self._output_wrap( final_u if fn(unwrap(final_u)) else None, name=next_name ) return SingleOutputStream[T, Union[U, None]]( next_name, lambda input: filter(input) )
def gather(self: Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]) ‑> SingleOutputStream[~T, typing.List[typing.List[~V]]]
-
Similar to
Stream.gather()
, this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list.For detailed examples, refer to the documentation of
Stream.gather()
.Expand source code
def gather( self: "Union[SingleOutputStream[T, List[AsyncGenerator[StreamOutput[V], Any]]], SingleOutputStream[T, List[AsyncGenerator[V, Any]]]]", ) -> "SingleOutputStream[T, List[List[V]]]": """ Similar to `Stream.gather`, this method waits for all the async generators in the list returned by the stream to finish and gathers their results in a list. For detailed examples, refer to the documentation of `Stream.gather`. """ next_name = f"{self.name}@gather" async def gather( input: T, ) -> AsyncGenerator[StreamOutput[List[List[V]]], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[ Union[ List[AsyncGenerator[StreamOutput[V], Any]], List[AsyncGenerator[V, Any]], ] ] = None # TODO: try to work out why the type signature of self(input) is not fitting in there, it should async for value, to_reyield in self._reyield(cast(Any, self(input))): yield cast(StreamOutput[List[List[V]]], to_reyield) final_u = value if final_u is None: final_u = [] async def consume_async_generator( generator: AsyncGenerator[X, Any], ) -> Iterable[X]: return [item async for item in generator] # TODO: should we really wait for everything to arrive before calling asyncio gather? Can we call it during the previous reyield? vss: Union[ List[List[StreamOutput[V]]], List[List[V]] ] = await asyncio.gather(*(consume_async_generator(gen) for gen in final_u)) clean_vss: List[List[V]] = [] for vs in vss: clean_vs: List[V] = [] for v in vs: v_rewrapped = cast( StreamOutput[List[List[V]]], self._output_wrap(v, final=False), ) if isinstance(v, StreamOutput): yield v_rewrapped if v.final: clean_vs.append(v.data) else: clean_vs.append(v) clean_vss.append(clean_vs) yield self._output_wrap(clean_vss, name=next_name) return SingleOutputStream[T, List[List[V]]](next_name, gather)
def map(self, fn: Callable[[~U], ~V]) ‑> SingleOutputStream[~T, ~V]
-
Similar to
Stream.map()
, this method applies a function to the final output of the stream, but returns a SingleOutputStream.The
fn
parameter is a function that takes a value of type U and returns a value of type V.For detailed examples, refer to the documentation of
Stream.map()
.Expand source code
def map(self, fn: Callable[[U], V]) -> "SingleOutputStream[T, V]": """ Similar to `Stream.map`, this method applies a function to the final output of the stream, but returns a SingleOutputStream. The `fn` parameter is a function that takes a value of type U and returns a value of type V. For detailed examples, refer to the documentation of `Stream.map`. """ next_name = f"{self.name}@map" async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value yield self._output_wrap(fn(unwrap(final_u)), name=next_name) return SingleOutputStream[T, V](next_name, lambda input: map(input))
def on_error(self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[~V], Any], ~V]]) ‑> SingleOutputStream[~T, typing.Union[~U, ~V]]
-
Similar to
Stream.on_error()
, this method handles any uncaught exceptions that might occur during the execution of the current stream.For detailed examples, refer to the documentation of
Stream.gather()
.Expand source code
def on_error( self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]], ) -> "SingleOutputStream[T, Union[U, V]]": """ Similar to `Stream.on_error`, this method handles any uncaught exceptions that might occur during the execution of the current stream. For detailed examples, refer to the documentation of `Stream.gather`. """ next_name = f"{self.name}@on_error" if hasattr(next, "name"): next_name = next.name async def on_error( input: T, ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]: try: async for output in self(input): yield cast(StreamOutput[Union[U, V]], output) except Exception as e: async for output in self._wrap(handler(e), name=next_name): yield cast(StreamOutput[Union[U, V]], output) return SingleOutputStream[T, Union[U, V]]( next_name, lambda input: on_error(input) )
def pipe(self, fn: Callable[[AsyncGenerator[~U, Any]], AsyncGenerator[Union[StreamOutput[~V], ~V], Any]]) ‑> Stream[~T, ~V]
-
Similar to
Stream.pipe()
, except that it takes a stream that will only even produce a single value, so it effectively works basically the same asand_then
, only with a different interface.For detailed examples, refer to the documentation of
Stream.pipe()
.Expand source code
def pipe( self, fn: Callable[ [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any] ], ) -> "Stream[T, V]": """ Similar to `Stream.pipe`, except that it takes a stream that will only even produce a single value, so it effectively works basically the same as `and_then`, only with a different interface. For detailed examples, refer to the documentation of `Stream.pipe`. """ next_name = f"{self.name}@pipe" if hasattr(next, "name"): next_name = next.name async def pipe( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the last result when it is done final_u: Optional[U] = None async for value, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) final_u = value # Then, call in the piping function single_item_stream = as_async_generator(unwrap(final_u)) iter_v = self._wrap(fn(single_item_stream), name=next_name) async for v in iter_v: yield cast(StreamOutput[V], v) return Stream[T, V](next_name, pipe)
Inherited members
class Stream (name: str, call: Callable[[~T], Union[AsyncGenerator[StreamOutput[~U], Any], AsyncGenerator[~U, Any], ~U]])
-
Expand source code
class Stream(Generic[T, U]): """""" _call: Callable[ [T], Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U] ] def __init__( self, name: str, call: Callable[ [T], Union[AsyncGenerator[StreamOutput[U], Any], AsyncGenerator[U, Any], U], ], ) -> None: self.name = name self._call = call def __call__(self, input: T) -> AsyncGenerator[StreamOutput[U], Any]: result = self._call(input) return self._wrap(result) def _wrap( self, value: Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V], final: Optional[bool] = None, name: Optional[str] = None, ) -> AsyncGenerator[StreamOutput[V], Any]: async def _wrap( values: Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any]], ) -> AsyncGenerator[StreamOutput[V], Any]: async for value in values: yield self._output_wrap(value, final=final, name=name) if isinstance(value, AsyncGenerator): return _wrap(value) return _wrap(as_async_generator(value)) def _output_wrap( self, value: Union[StreamOutput[V], V], final=None, name=None ) -> StreamOutput[V]: if isinstance(value, StreamOutput): final = final if final is not None else value.final return StreamOutput[V](stream=value.stream, data=value.data, final=final) final = final if final is not None else True return StreamOutput[V]( stream=self.name if name is None else name, data=value, final=final ) async def _reyield( self, async_iterable: AsyncGenerator[StreamOutput[U], Any] ) -> AsyncGenerator[Tuple[List[U], StreamOutput[U]], Any]: values: List[U] = [] async for u in async_iterable: u_rewrapped = self._output_wrap(u, final=False) if u.final: values.append(u.data) yield (values, u_rewrapped) def map(self, fn: Callable[[U], V]) -> "Stream[T, V]": """ Maps the output of the current stream through a function as they arrive. The transform function will receive the current output of the stream and should return a modified version of it. This method is non-blocking and will continue processing the stream in parallel. Example: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def example(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... return await join_final_output(polite_stream("Alice")) ... >>> asyncio.run(example()) 'Hello, Alice! How are you?' Example of processing one token at a time: >>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter ... return await join_final_output(accronym_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP' """ next_name = f"{self.name}@map" async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values prev_len_values = 0 async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) if len(values) > prev_len_values: # as soon as there is a new value prev_len_values = len(values) yield self._output_wrap(fn(values[-1]), name=next_name) return Stream[T, V](next_name, lambda input: map(input)) def filter(self, fn: Callable[[U], bool]) -> "Stream[T, U]": """ Filters the output of the current stream, keeping only the values that return True. This method is non-blocking and expects a function that returns True for keeping the value, or False for dropping it, as they arrive. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.filter(lambda input: input % 2 == 0) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [0, 2, 4, 6, 8] """ next_name = f"{self.name}@filter" async def filter(input: T) -> AsyncGenerator[StreamOutput[U], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values prev_len_values = 0 async for values, to_reyield in self._reyield(self(input)): yield to_reyield if len(values) > prev_len_values: # as soon as there is a new value prev_len_values = len(values) if fn(values[-1]): yield self._output_wrap(values[-1], name=next_name) return Stream[T, U](next_name, lambda input: filter(input)) def and_then( self, next: Callable[ [Iterable[U]], Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V], ], ) -> "Stream[T, V]": """ Processes the output of the current stream through a transformation function or another stream. Unlike the map method, which applies transformations to outputs as they arrive, the and_then method first collects all the outputs and then passes them to the transformation function or the next stream. This method is blocking and will wait for the entire stream to be processed before applying the transformation. If `transform` is a function, it should accept the list of collected outputs and return a modified version of it. If `transform` is another stream, it is used to process the list of collected outputs. Example using a function: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!")) ... count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs))) ... return await collect_final_output(count_stream("Hi")) ... >>> asyncio.run(example()) [2] Example using another stream: >>> from langstream import Stream, as_async_generator, join_final_output >>> from typing import Iterable >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym ... composed_stream = words_stream.and_then(acronym_stream) ... return await join_final_output(composed_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP' """ next_name = f"{self.name}@and_then" if hasattr(next, "name"): next_name = next.name async def and_then( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[U] = [] async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) iter_u = values # Then, call in the next stream iter_v = self._wrap(next(iter_u), name=next_name) async for v in iter_v: yield v return Stream[T, V](next_name, and_then) def pipe( self, fn: Callable[ [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any] ], ) -> "Stream[T, V]": """ Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator. Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you need to declare an async function and your function needs to use `yield` for generating values, the advantage of that is that you have fine control on whether it will be blocking the stream or not. In fact, with pipe you can reconstruct `map` and `and_then`, for example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> from typing import List, AsyncGenerator >>> import asyncio ... >>> async def example(items): ... async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: ... waiting_for_mushroom = False ... async for item in stream: ... if item == "Mario": ... waiting_for_mushroom = True ... elif item == "Mushroom" and waiting_for_mushroom: ... yield "Super Mario!" ... else: ... yield item + "?" ... ... piped_stream = Stream[List[str], str]( ... "PipedStream", lambda items: as_async_generator(*items) ... ).pipe(mario_pipe) ... ... return await collect_final_output(piped_stream(items)) ... >>> asyncio.run(example(["Mario", "Mushroom"])) ['Super Mario!'] >>> asyncio.run(example(["Luigi"])) ['Luigi?'] >>> asyncio.run(example(["Mario", "Luigi", "Mushroom"])) ['Luigi?', 'Super Mario!'] As you can see this pipe blocks kinda like `and_then` when it sees "Mario", until a mushroom arrives, but for other random items such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, like `map`. You can also call another stream from `pipe` directly, just be sure to re-yield its outputs """ next_name = f"{self.name}@pipe" async def filter_final_output( async_iterable: AsyncGenerator[StreamOutput[U], Any] ) -> AsyncGenerator[U, Any]: async for output in async_iterable: if output.final: yield cast(U, output.data) def pipe(input: T) -> AsyncGenerator[StreamOutput[V], Any]: previous, final = asyncstdlib.tee(self(input), n=2, lock=asyncio.Lock()) previous = self._wrap(previous, name=next_name, final=False) previous = cast(AsyncGenerator[StreamOutput[V], Any], previous) final = filter_final_output( cast(AsyncGenerator[StreamOutput[U], Any], final) ) final = cast( AsyncGenerator[StreamOutput[V], Any], self._wrap(fn(final), name=next_name), ) return merge(previous, final) return Stream[T, V](next_name, pipe) def collect(self: "Stream[T, U]") -> "SingleOutputStream[T, List[U]]": """ Collects all the outputs produced by the stream and returns them as a list. This method is blocking useful when the next stream or processing step needs to have access to the entire output all at once, rather than processing elements as they arrive. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream: Stream[str, List[str]] = Stream[str, str]( ... "WordStream", lambda word: as_async_generator(word, "!") ... ).collect() ... return await collect_final_output(word_stream("Hi")) ... >>> asyncio.run(example()) [['Hi', '!']] """ next_name = f"{self.name}@collect" async def _collect( input: T, ) -> AsyncGenerator[StreamOutput[List[U]], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[U] = [] async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[List[U]], to_reyield) iter_u = values # Then, yield the collected results yield self._output_wrap(iter_u, name=next_name) return SingleOutputStream[T, List[U]](next_name, _collect) def join(self: "Stream[T, str]", separator="") -> "SingleOutputStream[T, str]": """ Joins the output of a string-producing stream into a single string. The `join` method concatenates each item in the output of the stream, using the provided separator between each element. This is particularly useful when working with text, and you want to merge all the generated tokens. Note that this method blocks until all outputs of the stream are available, as it needs to wait for the complete output to perform the join operation. Params ---------- separator : str A string that will be used as a separator between the elements. Default is an empty string. Example: >>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) ... capitalized_stream = words_stream.map(lambda word: word.capitalize()) ... joined_stream = capitalized_stream.join(" ") ... return await join_final_output(joined_stream("this is an example")) ... >>> asyncio.run(example()) 'This Is An Example' """ next_name = f"{self.name}@join" async def _join( input: T, ) -> AsyncGenerator[StreamOutput[str], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[str] = [] async for values, to_reyield in self._reyield(self(input)): yield to_reyield iter_u = values # Then, return the joined result output: str = separator.join(iter_u) yield self._output_wrap(output, name=next_name) return SingleOutputStream[T, str](next_name, _join) def gather( self: "Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]", ) -> "SingleOutputStream[T, List[List[V]]]": """ Gathers results from multiple streams and processes them in parallel. The `gather` method is used to process several streams concurrently, and it waits until all of them are complete before continuing. This is similar to `asyncio.gather`, and is useful when you want to run multiple asynchronous tasks in parallel and wait for all of them to complete. Note that the order of results corresponds to the order of streams passed to the `gather` method. >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def delayed_output(x): ... await asyncio.sleep(0.1) ... yield f"Number: {x}" ... >>> async def example(): ... number_stream = Stream[int, int]( ... "NumberStream", lambda x: as_async_generator(*range(x)) ... ) ... gathered_stream : Stream[int, str] = ( ... number_stream.map(delayed_output) ... .gather() ... .and_then(lambda results: as_async_generator(*(r[0] for r in results))) ... ) ... return await collect_final_output(gathered_stream(3)) ... >>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel ['Number: 0', 'Number: 1', 'Number: 2'] """ return self.collect().gather() def on_error( self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]], ) -> "Stream[T, Union[U, V]]": """ Handles any uncaught exceptions that might occur during the execution of the current stream. The `handler` function takes an exception as its argument and returns a new value that will be used as the output of the stream instead of the exception. The function can also re-raise the exception or raise a new one, which will then be propagated further up the stream. If an exception occurs in the `handler` function itself, it will be propagated without any further handling. Example: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> def failed_greeting(name: str): ... raise Exception(f"Giving {name} a cold shoulder") ... >>> async def example(): ... greet_stream = Stream[str, str]( ... "GreetingStream", ... failed_greeting ... ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}") ... ... async for output in greet_stream("Alice"): ... print(output) ... >>> asyncio.run(example()) StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False) StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True) """ next_name = f"{self.name}@on_error" if hasattr(next, "name"): next_name = next.name async def on_error( input: T, ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]: try: async for output in self(input): yield cast(StreamOutput[Union[U, V]], output) except Exception as e: yield cast(StreamOutput[Union[U, V]], self._output_wrap(e, final=False)) async for output in self._wrap(handler(e), name=next_name): yield cast(StreamOutput[Union[U, V]], output) return Stream[T, Union[U, V]](next_name, lambda input: on_error(input))
Ancestors
- typing.Generic
Subclasses
Methods
def and_then(self, next: Callable[[Iterable[~U]], Union[AsyncGenerator[StreamOutput[~V], Any], AsyncGenerator[~V, Any], ~V]]) ‑> Stream[~T, ~V]
-
Processes the output of the current stream through a transformation function or another stream.
Unlike the map method, which applies transformations to outputs as they arrive, the and_then method first collects all the outputs and then passes them to the transformation function or the next stream. This method is blocking and will wait for the entire stream to be processed before applying the transformation.
If
transform
is a function, it should accept the list of collected outputs and return a modified version of it. Iftransform
is another stream, it is used to process the list of collected outputs.Example using a function:
>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!")) ... count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs))) ... return await collect_final_output(count_stream("Hi")) ... >>> asyncio.run(example()) [2]
Example using another stream:
>>> from langstream import Stream, as_async_generator, join_final_output >>> from typing import Iterable >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym ... composed_stream = words_stream.and_then(acronym_stream) ... return await join_final_output(composed_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP'
Expand source code
def and_then( self, next: Callable[ [Iterable[U]], Union[AsyncGenerator[StreamOutput[V], Any], AsyncGenerator[V, Any], V], ], ) -> "Stream[T, V]": """ Processes the output of the current stream through a transformation function or another stream. Unlike the map method, which applies transformations to outputs as they arrive, the and_then method first collects all the outputs and then passes them to the transformation function or the next stream. This method is blocking and will wait for the entire stream to be processed before applying the transformation. If `transform` is a function, it should accept the list of collected outputs and return a modified version of it. If `transform` is another stream, it is used to process the list of collected outputs. Example using a function: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream = Stream[str, str]("WordStream", lambda word: as_async_generator(word, "!")) ... count_stream : Stream[str, int] = word_stream.and_then(lambda outputs: len(list(outputs))) ... return await collect_final_output(count_stream("Hi")) ... >>> asyncio.run(example()) [2] Example using another stream: >>> from langstream import Stream, as_async_generator, join_final_output >>> from typing import Iterable >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... acronym_stream = Stream[Iterable[str], str]("AcronymStream", lambda words: "".join(word.upper()[0] for word in words)) # produces acronym ... composed_stream = words_stream.and_then(acronym_stream) ... return await join_final_output(composed_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP' """ next_name = f"{self.name}@and_then" if hasattr(next, "name"): next_name = next.name async def and_then( input: T, ) -> AsyncGenerator[StreamOutput[V], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[U] = [] async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) iter_u = values # Then, call in the next stream iter_v = self._wrap(next(iter_u), name=next_name) async for v in iter_v: yield v return Stream[T, V](next_name, and_then)
def collect(self: Stream[T, U]) ‑> SingleOutputStream[~T, typing.List[~U]]
-
Collects all the outputs produced by the stream and returns them as a list.
This method is blocking useful when the next stream or processing step needs to have access to the entire output all at once, rather than processing elements as they arrive.
Example:
>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream: Stream[str, List[str]] = Stream[str, str]( ... "WordStream", lambda word: as_async_generator(word, "!") ... ).collect() ... return await collect_final_output(word_stream("Hi")) ... >>> asyncio.run(example()) [['Hi', '!']]
Expand source code
def collect(self: "Stream[T, U]") -> "SingleOutputStream[T, List[U]]": """ Collects all the outputs produced by the stream and returns them as a list. This method is blocking useful when the next stream or processing step needs to have access to the entire output all at once, rather than processing elements as they arrive. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... word_stream: Stream[str, List[str]] = Stream[str, str]( ... "WordStream", lambda word: as_async_generator(word, "!") ... ).collect() ... return await collect_final_output(word_stream("Hi")) ... >>> asyncio.run(example()) [['Hi', '!']] """ next_name = f"{self.name}@collect" async def _collect( input: T, ) -> AsyncGenerator[StreamOutput[List[U]], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[U] = [] async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[List[U]], to_reyield) iter_u = values # Then, yield the collected results yield self._output_wrap(iter_u, name=next_name) return SingleOutputStream[T, List[U]](next_name, _collect)
def filter(self, fn: Callable[[~U], bool]) ‑> Stream[~T, ~U]
-
Filters the output of the current stream, keeping only the values that return True.
This method is non-blocking and expects a function that returns True for keeping the value, or False for dropping it, as they arrive.
Example:
>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.filter(lambda input: input % 2 == 0) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [0, 2, 4, 6, 8]
Expand source code
def filter(self, fn: Callable[[U], bool]) -> "Stream[T, U]": """ Filters the output of the current stream, keeping only the values that return True. This method is non-blocking and expects a function that returns True for keeping the value, or False for dropping it, as they arrive. Example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def example(): ... numbers_stream = Stream[int, int]("NumbersStream", lambda input: as_async_generator(*range(0, input))) ... even_stream = numbers_stream.filter(lambda input: input % 2 == 0) ... return await collect_final_output(even_stream(9)) ... >>> asyncio.run(example()) [0, 2, 4, 6, 8] """ next_name = f"{self.name}@filter" async def filter(input: T) -> AsyncGenerator[StreamOutput[U], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values prev_len_values = 0 async for values, to_reyield in self._reyield(self(input)): yield to_reyield if len(values) > prev_len_values: # as soon as there is a new value prev_len_values = len(values) if fn(values[-1]): yield self._output_wrap(values[-1], name=next_name) return Stream[T, U](next_name, lambda input: filter(input))
def gather(self: Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]) ‑> SingleOutputStream[~T, typing.List[typing.List[~V]]]
-
Gathers results from multiple streams and processes them in parallel.
The
gather()
method is used to process several streams concurrently, and it waits until all of them are complete before continuing. This is similar toasyncio.gather
, and is useful when you want to run multiple asynchronous tasks in parallel and wait for all of them to complete.Note that the order of results corresponds to the order of streams passed to the
gather()
method.>>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def delayed_output(x): ... await asyncio.sleep(0.1) ... yield f"Number: {x}" ... >>> async def example(): ... number_stream = Stream[int, int]( ... "NumberStream", lambda x: as_async_generator(*range(x)) ... ) ... gathered_stream : Stream[int, str] = ( ... number_stream.map(delayed_output) ... .gather() ... .and_then(lambda results: as_async_generator(*(r[0] for r in results))) ... ) ... return await collect_final_output(gathered_stream(3)) ... >>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel ['Number: 0', 'Number: 1', 'Number: 2']
Expand source code
def gather( self: "Union[Stream[T, AsyncGenerator[StreamOutput[V], Any]], Stream[T, AsyncGenerator[V, Any]]]", ) -> "SingleOutputStream[T, List[List[V]]]": """ Gathers results from multiple streams and processes them in parallel. The `gather` method is used to process several streams concurrently, and it waits until all of them are complete before continuing. This is similar to `asyncio.gather`, and is useful when you want to run multiple asynchronous tasks in parallel and wait for all of them to complete. Note that the order of results corresponds to the order of streams passed to the `gather` method. >>> from langstream import Stream, as_async_generator, collect_final_output >>> import asyncio ... >>> async def delayed_output(x): ... await asyncio.sleep(0.1) ... yield f"Number: {x}" ... >>> async def example(): ... number_stream = Stream[int, int]( ... "NumberStream", lambda x: as_async_generator(*range(x)) ... ) ... gathered_stream : Stream[int, str] = ( ... number_stream.map(delayed_output) ... .gather() ... .and_then(lambda results: as_async_generator(*(r[0] for r in results))) ... ) ... return await collect_final_output(gathered_stream(3)) ... >>> asyncio.run(example()) # will take 0.1s to finish, not 0.3s, because it runs in parallel ['Number: 0', 'Number: 1', 'Number: 2'] """ return self.collect().gather()
def join(self: Stream[T, str], separator='') ‑> SingleOutputStream[~T, str]
-
Joins the output of a string-producing stream into a single string.
The
join()
method concatenates each item in the output of the stream, using the provided separator between each element. This is particularly useful when working with text, and you want to merge all the generated tokens.Note that this method blocks until all outputs of the stream are available, as it needs to wait for the complete output to perform the join operation.
Params
separator : str A string that will be used as a separator between the elements. Default is an empty string.
Example:
>>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) ... capitalized_stream = words_stream.map(lambda word: word.capitalize()) ... joined_stream = capitalized_stream.join(" ") ... return await join_final_output(joined_stream("this is an example")) ... >>> asyncio.run(example()) 'This Is An Example'
Expand source code
def join(self: "Stream[T, str]", separator="") -> "SingleOutputStream[T, str]": """ Joins the output of a string-producing stream into a single string. The `join` method concatenates each item in the output of the stream, using the provided separator between each element. This is particularly useful when working with text, and you want to merge all the generated tokens. Note that this method blocks until all outputs of the stream are available, as it needs to wait for the complete output to perform the join operation. Params ---------- separator : str A string that will be used as a separator between the elements. Default is an empty string. Example: >>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) ... capitalized_stream = words_stream.map(lambda word: word.capitalize()) ... joined_stream = capitalized_stream.join(" ") ... return await join_final_output(joined_stream("this is an example")) ... >>> asyncio.run(example()) 'This Is An Example' """ next_name = f"{self.name}@join" async def _join( input: T, ) -> AsyncGenerator[StreamOutput[str], Any]: # First, reyield previous stream so we never block the stream, and collect the results until they are done iter_u: Iterable[str] = [] async for values, to_reyield in self._reyield(self(input)): yield to_reyield iter_u = values # Then, return the joined result output: str = separator.join(iter_u) yield self._output_wrap(output, name=next_name) return SingleOutputStream[T, str](next_name, _join)
def map(self, fn: Callable[[~U], ~V]) ‑> Stream[~T, ~V]
-
Maps the output of the current stream through a function as they arrive.
The transform function will receive the current output of the stream and should return a modified version of it. This method is non-blocking and will continue processing the stream in parallel.
Example:
>>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def example(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... return await join_final_output(polite_stream("Alice")) ... >>> asyncio.run(example()) 'Hello, Alice! How are you?'
Example of processing one token at a time:
>>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter ... return await join_final_output(accronym_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP'
Expand source code
def map(self, fn: Callable[[U], V]) -> "Stream[T, V]": """ Maps the output of the current stream through a function as they arrive. The transform function will receive the current output of the stream and should return a modified version of it. This method is non-blocking and will continue processing the stream in parallel. Example: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> async def example(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... return await join_final_output(polite_stream("Alice")) ... >>> asyncio.run(example()) 'Hello, Alice! How are you?' Example of processing one token at a time: >>> from langstream import Stream, as_async_generator, join_final_output >>> import asyncio ... >>> async def example(): ... words_stream = Stream[str, str]("WordsStream", lambda sentence: as_async_generator(*sentence.split(" "))) # produces one word at a time ... accronym_stream = words_stream.map(lambda word: word.upper()[0]) # uppercases each word and take the first letter ... return await join_final_output(accronym_stream("as soon as possible")) ... >>> asyncio.run(example()) 'ASAP' """ next_name = f"{self.name}@map" async def map(input: T) -> AsyncGenerator[StreamOutput[V], Any]: # Reyield previous stream so we never block the stream, and at the same time yield mapped values prev_len_values = 0 async for values, to_reyield in self._reyield(self(input)): yield cast(StreamOutput[V], to_reyield) if len(values) > prev_len_values: # as soon as there is a new value prev_len_values = len(values) yield self._output_wrap(fn(values[-1]), name=next_name) return Stream[T, V](next_name, lambda input: map(input))
def on_error(self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[~V], Any], ~V]]) ‑> Stream[~T, typing.Union[~U, ~V]]
-
Handles any uncaught exceptions that might occur during the execution of the current stream.
The
handler
function takes an exception as its argument and returns a new value that will be used as the output of the stream instead of the exception. The function can also re-raise the exception or raise a new one, which will then be propagated further up the stream.If an exception occurs in the
handler
function itself, it will be propagated without any further handling.Example:
>>> from langstream import Stream, join_final_output >>> import asyncio ... >>> def failed_greeting(name: str): ... raise Exception(f"Giving {name} a cold shoulder") ... >>> async def example(): ... greet_stream = Stream[str, str]( ... "GreetingStream", ... failed_greeting ... ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}") ... ... async for output in greet_stream("Alice"): ... print(output) ... >>> asyncio.run(example()) StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False) StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True)
Expand source code
def on_error( self, handler: Callable[[Exception], Union[AsyncGenerator[StreamOutput[V], Any], V]], ) -> "Stream[T, Union[U, V]]": """ Handles any uncaught exceptions that might occur during the execution of the current stream. The `handler` function takes an exception as its argument and returns a new value that will be used as the output of the stream instead of the exception. The function can also re-raise the exception or raise a new one, which will then be propagated further up the stream. If an exception occurs in the `handler` function itself, it will be propagated without any further handling. Example: >>> from langstream import Stream, join_final_output >>> import asyncio ... >>> def failed_greeting(name: str): ... raise Exception(f"Giving {name} a cold shoulder") ... >>> async def example(): ... greet_stream = Stream[str, str]( ... "GreetingStream", ... failed_greeting ... ).on_error(lambda e: f"Sorry, an error occurred: {str(e)}") ... ... async for output in greet_stream("Alice"): ... print(output) ... >>> asyncio.run(example()) StreamOutput(stream='GreetingStream', data=Exception('Giving Alice a cold shoulder'), final=False) StreamOutput(stream='GreetingStream@on_error', data='Sorry, an error occurred: ...', final=True) """ next_name = f"{self.name}@on_error" if hasattr(next, "name"): next_name = next.name async def on_error( input: T, ) -> AsyncGenerator[StreamOutput[Union[U, V]], Any]: try: async for output in self(input): yield cast(StreamOutput[Union[U, V]], output) except Exception as e: yield cast(StreamOutput[Union[U, V]], self._output_wrap(e, final=False)) async for output in self._wrap(handler(e), name=next_name): yield cast(StreamOutput[Union[U, V]], output) return Stream[T, Union[U, V]](next_name, lambda input: on_error(input))
def pipe(self, fn: Callable[[AsyncGenerator[~U, Any]], AsyncGenerator[Union[StreamOutput[~V], ~V], Any]]) ‑> Stream[~T, ~V]
-
Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator. Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you need to declare an async function and your function needs to use
yield
for generating values, the advantage of that is that you have fine control on whether it will be blocking the stream or not.In fact, with pipe you can reconstruct
map
andand_then
, for example:>>> from langstream import Stream, as_async_generator, collect_final_output >>> from typing import List, AsyncGenerator >>> import asyncio ... >>> async def example(items): ... async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: ... waiting_for_mushroom = False ... async for item in stream: ... if item == "Mario": ... waiting_for_mushroom = True ... elif item == "Mushroom" and waiting_for_mushroom: ... yield "Super Mario!" ... else: ... yield item + "?" ... ... piped_stream = Stream[List[str], str]( ... "PipedStream", lambda items: as_async_generator(*items) ... ).pipe(mario_pipe) ... ... return await collect_final_output(piped_stream(items)) ... >>> asyncio.run(example(["Mario", "Mushroom"])) ['Super Mario!'] >>> asyncio.run(example(["Luigi"])) ['Luigi?'] >>> asyncio.run(example(["Mario", "Luigi", "Mushroom"])) ['Luigi?', 'Super Mario!']
As you can see this pipe blocks kinda like
and_then
when it sees "Mario", until a mushroom arrives, but for other random items such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, likemap
.You can also call another stream from
pipe
directly, just be sure to re-yield its outputsExpand source code
def pipe( self, fn: Callable[ [AsyncGenerator[U, Any]], AsyncGenerator[Union[StreamOutput[V], V], Any] ], ) -> "Stream[T, V]": """ Lower level constructor to pipe a stream into another one, giving you the underlying AsyncGenerator. Pipe takes a callback function which should always produce an AsyncGenerator in return, which means you need to declare an async function and your function needs to use `yield` for generating values, the advantage of that is that you have fine control on whether it will be blocking the stream or not. In fact, with pipe you can reconstruct `map` and `and_then`, for example: >>> from langstream import Stream, as_async_generator, collect_final_output >>> from typing import List, AsyncGenerator >>> import asyncio ... >>> async def example(items): ... async def mario_pipe(stream: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: ... waiting_for_mushroom = False ... async for item in stream: ... if item == "Mario": ... waiting_for_mushroom = True ... elif item == "Mushroom" and waiting_for_mushroom: ... yield "Super Mario!" ... else: ... yield item + "?" ... ... piped_stream = Stream[List[str], str]( ... "PipedStream", lambda items: as_async_generator(*items) ... ).pipe(mario_pipe) ... ... return await collect_final_output(piped_stream(items)) ... >>> asyncio.run(example(["Mario", "Mushroom"])) ['Super Mario!'] >>> asyncio.run(example(["Luigi"])) ['Luigi?'] >>> asyncio.run(example(["Mario", "Luigi", "Mushroom"])) ['Luigi?', 'Super Mario!'] As you can see this pipe blocks kinda like `and_then` when it sees "Mario", until a mushroom arrives, but for other random items such as "Luigi" it just re-yields it immediately, adding a question mark, non-blocking, like `map`. You can also call another stream from `pipe` directly, just be sure to re-yield its outputs """ next_name = f"{self.name}@pipe" async def filter_final_output( async_iterable: AsyncGenerator[StreamOutput[U], Any] ) -> AsyncGenerator[U, Any]: async for output in async_iterable: if output.final: yield cast(U, output.data) def pipe(input: T) -> AsyncGenerator[StreamOutput[V], Any]: previous, final = asyncstdlib.tee(self(input), n=2, lock=asyncio.Lock()) previous = self._wrap(previous, name=next_name, final=False) previous = cast(AsyncGenerator[StreamOutput[V], Any], previous) final = filter_final_output( cast(AsyncGenerator[StreamOutput[U], Any], final) ) final = cast( AsyncGenerator[StreamOutput[V], Any], self._wrap(fn(final), name=next_name), ) return merge(previous, final) return Stream[T, V](next_name, pipe)
class StreamOutput (stream: str, data: Union[~T, Any], final: bool)
-
StreamOutput is a data class that represents the output of a Stream at each step.
Attributes
stream
:str
- The name of the stream that produced this output. This helps in identifying which part of the processing pipeline the output is coming from.
output
:Union[T, Any]
- The actual output data produced by the stream. This will be type T for final stream output, but can be also be of any type produced by any step of the whole stream.
final
:bool
- A boolean flag indicating whether this output is the final output of the stream. Only the outputs at the end of the stream are marked as "final".
Example
>>> from langstream import Stream >>> import asyncio ... >>> async def example(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... ... async for output in polite_stream("Alice"): ... # Output is of type StreamOutput ... print(output) ... >>> asyncio.run(example()) StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False) StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True)
Expand source code
@dataclass class StreamOutput(Generic[T]): """ StreamOutput is a data class that represents the output of a Stream at each step. Attributes ---------- stream : str The name of the stream that produced this output. This helps in identifying which part of the processing pipeline the output is coming from. output : Union[T, Any] The actual output data produced by the stream. This will be type T for final stream output, but can be also be of any type produced by any step of the whole stream. final : bool A boolean flag indicating whether this output is the final output of the stream. Only the outputs at the end of the stream are marked as "final". Example ------- >>> from langstream import Stream >>> import asyncio ... >>> async def example(): ... greet_stream = Stream[str, str]("GreetingStream", lambda name: f"Hello, {name}!") ... polite_stream = greet_stream.map(lambda greeting: f"{greeting} How are you?") ... ... async for output in polite_stream("Alice"): ... # Output is of type StreamOutput ... print(output) ... >>> asyncio.run(example()) StreamOutput(stream='GreetingStream', data='Hello, Alice!', final=False) StreamOutput(stream='GreetingStream@map', data='Hello, Alice! How are you?', final=True) """ stream: str data: Union[T, Any] final: bool
Ancestors
- typing.Generic
Class variables
var data : Union[~T, Any]
var final : bool
var stream : str