Streaming Response Handler
This module provides streaming response handling for the Venice AI SDK, enabling real-time processing of API responses as they arrive from the server.
The streaming system is designed to handle various types of streaming responses including chat completions, audio generation, and other real-time API outputs. It provides proper resource management, error handling, and type safety.
Key Features:
- Real-time Processing: Stream responses as they arrive
- Resource Management: Automatic cleanup of network resources
- Error Handling: Comprehensive error handling for network issues
- Type Safety: Generic typing for different chunk types
- Consumption Tracking: Prevents multiple iterations over consumed streams
Streaming Types:
- Chat Completions: Real-time text generation
- Audio Responses: Streaming audio data
- Image Generation: Progressive image data (future)
- Custom Responses: Extensible for new streaming endpoints
Example:
>>> from venice_ai import VeniceClient
>>>
>>> client = VeniceClient(api_key="your-api-key")
>>>
>>> # Create a streaming chat completion
>>> stream = await client.chat.completions.create(
... model="venice-uncensored",
... messages=[{"role": "user", "content": "Write a story"}],
... stream=True
... )
>>>
>>> # Process chunks as they arrive
>>> async for chunk in stream:
... if chunk.choices[0].delta.content:
... print(chunk.choices[0].delta.content, end="")
>>>
>>> # Stream is automatically closed when iteration completes
venice_ai.streaming.T_co
T_co = TypeVar("T_co", covariant=True)
Covariant type variable for async iterator protocol
AsyncClosableIterator Objects
class AsyncClosableIterator(Protocol[T_co])
Protocol for async iterators that support resource cleanup.
This protocol extends the standard AsyncIterator to include the aclose() method that many async generators provide for proper resource cleanup. This ensures that network connections, file handles, and other resources are properly released when streaming is complete.
The aclose() method is particularly important for streaming HTTP responses where the underlying connection needs to be closed to free up resources and prevent connection leaks.
Methods:
aclose()- Async method to close the iterator and release resources
AsyncClosableIterator.aclose
async def aclose() -> None
Close the async iterator and release any associated resources.
This method should be called to ensure proper cleanup of resources such as network connections, file handles, or other system resources associated with the iterator.
Stream Objects
class Stream()
Wrapper for handling streaming responses from the Venice AI API.
This class provides a robust interface for iterating over streaming API responses, managing the underlying iterator lifecycle, and ensuring proper resource cleanup. It handles various error conditions and provides consumption tracking to prevent multiple iterations.
The Stream class is generic and can handle different types of streaming responses by parameterizing the ChunkType. It automatically handles network errors, timeouts, and other streaming-related issues.
Type Parameters: ChunkType: The type of chunks yielded by the stream (e.g., Dict[str, Any])
Attributes:
-
_iterator- The underlying async iterator that yields response chunks -
_client- Reference to the Venice client for context and resource management -
_consumed- Flag indicating whether the stream has been fully consumedError Handling:
- StreamConsumedError: Raised when attempting to iterate a consumed stream
- APITimeoutError: Raised when streaming operations timeout
- APIConnectionError: Raised for network connectivity issues
- VeniceError: Raised for other unexpected streaming errors
Example:
>>> # Basic streaming usage
>>> stream = Stream(response_iterator, client=client)
>>> async for chunk in stream:
... process_chunk(chunk)
>>>
>>> # Manual cleanup (optional, done automatically)
>>> await stream.close()
>>> # Error handling
>>> try:
... async for chunk in stream:
... print(chunk)
... except StreamConsumedError:
... print("Stream was already consumed")
... except APITimeoutError:
... print("Streaming timed out")
Stream.__init__
def __init__(iterator: AsyncIterator[ChunkType], *, client: VeniceClient)
Initialize a new Stream wrapper.
Arguments:
iterator- The underlying async iterator that yields response chunks. This is typically created by the HTTP client when making streaming API requests.client- The Venice client instance for context and resource management. Used for logging, metrics, and potential resource cleanup.
Notes:
The stream starts in an unconsumed state and can be iterated over exactly once. After consumption, attempts to iterate will raise StreamConsumedError.
Stream.__anext__
async def __anext__() -> ChunkType
Get the next chunk from the streaming response.
This method implements the async iterator protocol, yielding chunks as they become available from the underlying stream. It handles various error conditions and ensures proper resource cleanup on stream completion.
Returns:
The next chunk from the streaming response. The exact type depends on the ChunkType parameter but is typically a dictionary containing the parsed response data.
Raises:
-
StopAsyncIteration- When the stream is exhausted (normal completion) -
StreamConsumedError- When attempting to iterate a consumed stream -
APITimeoutError- When streaming operations timeout -
APIConnectionError- When network connectivity issues occur -
VeniceError- For other unexpected streaming errorsError Handling:
- Timeouts are converted to APITimeoutError with original exception context
- Network errors are converted to APIConnectionError
- Unexpected errors are logged and wrapped in VeniceError
- CancelledError is always re-raised for graceful shutdown
- Stream is marked as consumed and cleaned up on any error
Notes:
This method automatically marks the stream as consumed and closes resources when the stream is exhausted or encounters an error.
Stream.__aiter__
def __aiter__() -> AsyncIterator[ChunkType]
Return the async iterator object for the stream.
This method implements the async iterable protocol, allowing the stream to be used in async for loops. It includes consumption checking to prevent multiple iterations over the same stream.
Returns:
The stream iterator (self) for use in async iteration
Raises:
StreamConsumedError- If the stream has already been consumed by a previous iteration
Example:
>>> stream = Stream(iterator, client=client)
>>> async for chunk in stream: # This calls __aiter__()
... process_chunk(chunk)
>>>
>>> # Attempting to iterate again will raise an error
>>> async for chunk in stream: # StreamConsumedError
... pass
Stream.__aenter__
async def __aenter__() -> Self
Enter the async context manager.
Stream.__aexit__
async def __aexit__(*args: object) -> None
Exit the async context manager, closing the stream.
Stream.get_iterator
def get_iterator() -> AsyncIterator[ChunkType]
Return the underlying async iterator and mark this stream as consumed.
The caller takes ownership of the iterator. This stream instance should not be iterated after calling this method.
Stream.close
async def close() -> None
Close the stream and release any underlying resources.
This method ensures proper cleanup of the underlying iterator and any associated network resources. It's designed to be idempotent and safe to call multiple times. The method is automatically called when stream iteration completes or encounters an error.
Resource Cleanup:
- Calls aclose() on the underlying iterator if supported
- Silently handles any cleanup errors to prevent resource leaks
- Ensures network connections are properly closed
- Frees any associated memory or file handles
Example:
>>> stream = Stream(iterator, client=client)
>>> try:
... async for chunk in stream:
... process_chunk(chunk)
... finally:
... await stream.close() # Optional, done automatically
Notes:
This method never raises exceptions. Any errors during cleanup are silently handled to ensure the application can continue running even if resource cleanup fails.
ChatStream Objects
class ChatStream(Stream["ChatCompletionChunk"])
Enhanced stream for chat completions with convenience accessors.
Wraps a Stream[ChatCompletionChunk] and adds:
- :meth:
text_deltas— yields only text content strings (display-only) - :meth:
collect— silently consumes the stream and returns a full :class:~venice_ai.types.api.chat.ChatCompletionResponse - :meth:
collect_with_deltas— yields text deltas live AND populates :attr:final_responseonce iteration completes; one pass, both signals
Example — display deltas live and use the final aggregated response:
async with await client.chat.completions.stream(
model=model, messages=messages,
) as s:
async for text in s.collect_with_deltas():
print(text, end="", flush=True)
print()
assert s.final_response is not None
print("usage:", s.final_response.usage)
ChatStream.final_response
@property
def final_response() -> ChatCompletionResponse | None
Assembled response after :meth:collect or :meth:collect_with_deltas.
None until one of those two methods has finished consuming the
stream. :meth:text_deltas does not populate this — use
:meth:collect_with_deltas if you need both live deltas and the
final aggregated response.
ChatStream.text_deltas
async def text_deltas() -> AsyncIterator[str]
Yield only text content deltas, filtering empty/None.
Does not populate :attr:final_response. If you want both the
deltas and the final aggregated response in one pass, use
:meth:collect_with_deltas instead.
ChatStream.collect
async def collect() -> ChatCompletionResponse
Consume the entire stream and return a ChatCompletionResponse.
Accumulates text, reasoning content, and tool calls from all chunks into a single non-streaming response object.
.. note:
Only the first choice (index 0) is tracked. For ``n > 1`` requests,
use the raw ``Stream`` iterator and accumulate per-choice yourself.
A one-time warning is emitted if non-zero choices are observed.
Raises:
ValueError: If the stream completes without afinish_reasonon choice 0 (typically indicates the stream was interrupted).
ChatStream.collect_with_deltas
async def collect_with_deltas() -> AsyncIterator[str]
Yield each text delta and assemble the final response in one pass.
Solves the "two streams to display deltas live AND get the final
aggregated response" pattern by doing both jobs from a single
consumption: each choice.delta.content is yielded as it arrives,
and once iteration completes :attr:final_response holds the
assembled :class:ChatCompletionResponse (with usage, tool_calls,
reasoning, finish_reason).
Tool-call and reasoning deltas are accumulated into the final response but not yielded — only text deltas are yielded.
Example:
async with await client.chat.completions.stream(
model=model, messages=messages,
) as s:
async for text in s.collect_with_deltas():
print(text, end="", flush=True)
print()
if s.final_response and s.final_response.usage:
print("tokens:", s.final_response.usage.total_tokens)
.. note:
Only choice 0 is tracked (same constraint as :meth:`collect`).
A one-time warning is emitted if multi-choice (``n > 1``) chunks
are observed.
Raises:
ValueError: If the stream completes without afinish_reasonon choice 0 (typically indicates the stream was interrupted before the final chunk arrived).
BytesResponse Objects
class BytesResponse()
A wrapper for raw byte responses that includes the original aiohttp.ClientResponse object.
This class provides a way to access the raw byte content of a response while also retaining the original response object for accessing headers and other metadata.
Arguments:
content(bytes): The raw byte content of the responseresponse(Any): The original aiohttp.ClientResponse object