Module langstream.utils.async_generator
Utils for working with Python's AsyncGenerator with the same primitives as streams
Expand source code
"""
Utils for working with Python's AsyncGenerator with the same primitives as streams
"""
import asyncio
from typing import Any, AsyncGenerator, List, TypeVar, Union
T = TypeVar("T")
U = TypeVar("U")
async def as_async_generator(*values: T) -> AsyncGenerator[T, Any]:
"""
Creates an asynchronous generator out of simple values, it's useful for
converting a single value or a list of values to a streamed output of a Stream
Example
-------
>>> import asyncio
>>> async def run_example():
... async for value in as_async_generator(1, 2, 3):
... print(value)
...
>>> asyncio.run(run_example())
1
2
3
"""
for item in values:
yield item
async def collect(async_generator: AsyncGenerator[T, Any]) -> List[T]:
"""
Collect items from an async generator into a list.
>>> import asyncio
>>> async def async_gen():
... yield "hello"
... yield "how"
... yield "can"
... yield "I"
... yield "assist"
... yield "you"
... yield "today"
>>> asyncio.run(collect(async_gen()))
['hello', 'how', 'can', 'I', 'assist', 'you', 'today']
"""
return [item async for item in async_generator]
async def join(async_generator: AsyncGenerator[str, Any], separator="") -> str:
"""
Collect items from an async generator and join them in a string.
>>> import asyncio
>>> async def async_gen():
... yield "hello "
... yield "how "
... yield "can "
... yield "I "
... yield "assist "
... yield "you "
... yield "today"
>>> asyncio.run(join(async_gen()))
'hello how can I assist you today'
"""
lst = await collect(async_generator)
return separator.join(lst)
async def gather(async_generators: List[AsyncGenerator[T, Any]]) -> List[List[T]]:
"""
Gather items from a list of async generators into a list of lists.
>>> import asyncio
>>> async def async_gen1():
... yield "hello"
... yield "how"
... yield "can"
>>> async def async_gen2():
... yield "I"
... yield "assist"
... yield "you"
... yield "today"
>>> asyncio.run(gather([async_gen1(), async_gen2()]))
[['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']]
"""
return await asyncio.gather(*(collect(generator) for generator in async_generators))
async def next_item(async_generator: AsyncGenerator[T, Any]) -> T:
"""
Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken
>>> import asyncio
>>> async def async_gen():
... yield "hello"
... yield "how"
... yield "are"
>>> asyncio.run(next_item(async_gen()))
'hello'
"""
return await async_generator.__aiter__().__anext__()
# From: https://stackoverflow.com/a/55317623
def merge(
async_generator_a: AsyncGenerator[T, Any], async_generator_b: AsyncGenerator[U, Any]
) -> AsyncGenerator[Union[T, U], Any]:
"""
Merges two AsyncGenerators into one, taking values from both generators as soon as they arrive.
>>> import asyncio
>>> async def async_gen1():
... yield "hello"
... yield "how"
... yield "can"
>>> async def async_gen2():
... yield "I"
... yield "assist"
... yield "you"
... yield "today"
>>> async def example():
... return await collect(merge(async_gen1(), async_gen2()))
>>> asyncio.run(example())
['hello', 'how', 'I', 'can', 'assist', 'you', 'today']
"""
aiters = [async_generator_a, async_generator_b]
queue = asyncio.Queue(1)
run_count = len(aiters)
cancelling = False
async def drain(aiter):
nonlocal run_count
try:
async for item in aiter:
await queue.put((False, item))
except Exception as e:
if not cancelling:
await queue.put((True, e))
else:
raise
finally:
run_count -= 1
async def merged():
try:
while run_count:
raised, next_item = await queue.get()
if raised:
cancel_tasks()
raise next_item
yield next_item
finally:
cancel_tasks()
def cancel_tasks():
nonlocal cancelling
cancelling = True
for t in tasks:
t.cancel()
tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters]
return merged()
Functions
async def as_async_generator(*values: ~T) ‑> AsyncGenerator[~T, Any]
-
Creates an asynchronous generator out of simple values, it's useful for converting a single value or a list of values to a streamed output of a Stream
Example
>>> import asyncio >>> async def run_example(): ... async for value in as_async_generator(1, 2, 3): ... print(value) ... >>> asyncio.run(run_example()) 1 2 3
Expand source code
async def as_async_generator(*values: T) -> AsyncGenerator[T, Any]: """ Creates an asynchronous generator out of simple values, it's useful for converting a single value or a list of values to a streamed output of a Stream Example ------- >>> import asyncio >>> async def run_example(): ... async for value in as_async_generator(1, 2, 3): ... print(value) ... >>> asyncio.run(run_example()) 1 2 3 """ for item in values: yield item
async def collect(async_generator: AsyncGenerator[~T, Any]) ‑> List[~T]
-
Collect items from an async generator into a list.
>>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "can" ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(collect(async_gen())) ['hello', 'how', 'can', 'I', 'assist', 'you', 'today']
Expand source code
async def collect(async_generator: AsyncGenerator[T, Any]) -> List[T]: """ Collect items from an async generator into a list. >>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "can" ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(collect(async_gen())) ['hello', 'how', 'can', 'I', 'assist', 'you', 'today'] """ return [item async for item in async_generator]
async def gather(async_generators: List[AsyncGenerator[~T, Any]]) ‑> List[List[~T]]
-
Gather items from a list of async generators into a list of lists.
>>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(gather([async_gen1(), async_gen2()])) [['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']]
Expand source code
async def gather(async_generators: List[AsyncGenerator[T, Any]]) -> List[List[T]]: """ Gather items from a list of async generators into a list of lists. >>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> asyncio.run(gather([async_gen1(), async_gen2()])) [['hello', 'how', 'can'], ['I', 'assist', 'you', 'today']] """ return await asyncio.gather(*(collect(generator) for generator in async_generators))
async def join(async_generator: AsyncGenerator[str, Any], separator='') ‑> str
-
Collect items from an async generator and join them in a string.
>>> import asyncio >>> async def async_gen(): ... yield "hello " ... yield "how " ... yield "can " ... yield "I " ... yield "assist " ... yield "you " ... yield "today" >>> asyncio.run(join(async_gen())) 'hello how can I assist you today'
Expand source code
async def join(async_generator: AsyncGenerator[str, Any], separator="") -> str: """ Collect items from an async generator and join them in a string. >>> import asyncio >>> async def async_gen(): ... yield "hello " ... yield "how " ... yield "can " ... yield "I " ... yield "assist " ... yield "you " ... yield "today" >>> asyncio.run(join(async_gen())) 'hello how can I assist you today' """ lst = await collect(async_generator) return separator.join(lst)
def merge(async_generator_a: AsyncGenerator[~T, Any], async_generator_b: AsyncGenerator[~U, Any]) ‑> AsyncGenerator[Union[~T, ~U], Any]
-
Merges two AsyncGenerators into one, taking values from both generators as soon as they arrive.
>>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> async def example(): ... return await collect(merge(async_gen1(), async_gen2())) >>> asyncio.run(example()) ['hello', 'how', 'I', 'can', 'assist', 'you', 'today']
Expand source code
def merge( async_generator_a: AsyncGenerator[T, Any], async_generator_b: AsyncGenerator[U, Any] ) -> AsyncGenerator[Union[T, U], Any]: """ Merges two AsyncGenerators into one, taking values from both generators as soon as they arrive. >>> import asyncio >>> async def async_gen1(): ... yield "hello" ... yield "how" ... yield "can" >>> async def async_gen2(): ... yield "I" ... yield "assist" ... yield "you" ... yield "today" >>> async def example(): ... return await collect(merge(async_gen1(), async_gen2())) >>> asyncio.run(example()) ['hello', 'how', 'I', 'can', 'assist', 'you', 'today'] """ aiters = [async_generator_a, async_generator_b] queue = asyncio.Queue(1) run_count = len(aiters) cancelling = False async def drain(aiter): nonlocal run_count try: async for item in aiter: await queue.put((False, item)) except Exception as e: if not cancelling: await queue.put((True, e)) else: raise finally: run_count -= 1 async def merged(): try: while run_count: raised, next_item = await queue.get() if raised: cancel_tasks() raise next_item yield next_item finally: cancel_tasks() def cancel_tasks(): nonlocal cancelling cancelling = True for t in tasks: t.cancel() tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters] return merged()
async def next_item(async_generator: AsyncGenerator[~T, Any]) ‑> ~T
-
Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken
>>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "are" >>> asyncio.run(next_item(async_gen())) 'hello'
Expand source code
async def next_item(async_generator: AsyncGenerator[T, Any]) -> T: """ Takes the next item of an AsyncGenerator, can result in exception if there is no items left to be taken >>> import asyncio >>> async def async_gen(): ... yield "hello" ... yield "how" ... yield "are" >>> asyncio.run(next_item(async_gen())) 'hello' """ return await async_generator.__aiter__().__anext__()