Skip to main content

Streaming Response Handler

This module provides streaming response handling for the Venice AI SDK, enabling real-time processing of API responses as they arrive from the server.

The streaming system is designed to handle various types of streaming responses including chat completions, audio generation, and other real-time API outputs. It provides proper resource management, error handling, and type safety.

Key Features:

  • Real-time Processing: Stream responses as they arrive
  • Resource Management: Automatic cleanup of network resources
  • Error Handling: Comprehensive error handling for network issues
  • Type Safety: Generic typing for different chunk types
  • Consumption Tracking: Prevents multiple iterations over consumed streams

Streaming Types:

  • Chat Completions: Real-time text generation
  • Audio Responses: Streaming audio data
  • Image Generation: Progressive image data (future)
  • Custom Responses: Extensible for new streaming endpoints

Example:

>>> from venice_ai import VeniceClient
>>>
>>> client = VeniceClient(api_key="your-api-key")
>>>
>>> # Create a streaming chat completion
>>> stream = await client.chat.completions.create(
... model="venice-uncensored",
... messages=[{"role": "user", "content": "Write a story"}],
... stream=True
... )
>>>
>>> # Process chunks as they arrive
>>> async for chunk in stream:
... if chunk.choices[0].delta.content:
... print(chunk.choices[0].delta.content, end="")
>>>
>>> # Stream is automatically closed when iteration completes

venice_ai.streaming.T_co

T_co = TypeVar("T_co", covariant=True)

Covariant type variable for async iterator protocol

AsyncClosableIterator Objects

class AsyncClosableIterator(Protocol[T_co])

Protocol for async iterators that support resource cleanup.

This protocol extends the standard AsyncIterator to include the aclose() method that many async generators provide for proper resource cleanup. This ensures that network connections, file handles, and other resources are properly released when streaming is complete.

The aclose() method is particularly important for streaming HTTP responses where the underlying connection needs to be closed to free up resources and prevent connection leaks.

Methods:

  • aclose() - Async method to close the iterator and release resources

AsyncClosableIterator.aclose

async def aclose() -> None

Close the async iterator and release any associated resources.

This method should be called to ensure proper cleanup of resources such as network connections, file handles, or other system resources associated with the iterator.

Stream Objects

class Stream()

Wrapper for handling streaming responses from the Venice AI API.

This class provides a robust interface for iterating over streaming API responses, managing the underlying iterator lifecycle, and ensuring proper resource cleanup. It handles various error conditions and provides consumption tracking to prevent multiple iterations.

The Stream class is generic and can handle different types of streaming responses by parameterizing the ChunkType. It automatically handles network errors, timeouts, and other streaming-related issues.

Type Parameters: ChunkType: The type of chunks yielded by the stream (e.g., Dict[str, Any])

Attributes:

  • _iterator - The underlying async iterator that yields response chunks

  • _client - Reference to the Venice client for context and resource management

  • _consumed - Flag indicating whether the stream has been fully consumed

    Error Handling:

    • StreamConsumedError: Raised when attempting to iterate a consumed stream
    • APITimeoutError: Raised when streaming operations timeout
    • APIConnectionError: Raised for network connectivity issues
    • VeniceError: Raised for other unexpected streaming errors

Example:

>>> # Basic streaming usage
>>> stream = Stream(response_iterator, client=client)
>>> async for chunk in stream:
... process_chunk(chunk)
>>>
>>> # Manual cleanup (optional, done automatically)
>>> await stream.close()

>>> # Error handling
>>> try:
... async for chunk in stream:
... print(chunk)
... except StreamConsumedError:
... print("Stream was already consumed")
... except APITimeoutError:
... print("Streaming timed out")

Stream.__init__

def __init__(iterator: AsyncIterator[ChunkType], *, client: VeniceClient)

Initialize a new Stream wrapper.

Arguments:

  • iterator - The underlying async iterator that yields response chunks. This is typically created by the HTTP client when making streaming API requests.
  • client - The Venice client instance for context and resource management. Used for logging, metrics, and potential resource cleanup.

Notes:

The stream starts in an unconsumed state and can be iterated over exactly once. After consumption, attempts to iterate will raise StreamConsumedError.

Stream.__anext__

async def __anext__() -> ChunkType

Get the next chunk from the streaming response.

This method implements the async iterator protocol, yielding chunks as they become available from the underlying stream. It handles various error conditions and ensures proper resource cleanup on stream completion.

Returns:

The next chunk from the streaming response. The exact type depends on the ChunkType parameter but is typically a dictionary containing the parsed response data.

Raises:

  • StopAsyncIteration - When the stream is exhausted (normal completion)

  • StreamConsumedError - When attempting to iterate a consumed stream

  • APITimeoutError - When streaming operations timeout

  • APIConnectionError - When network connectivity issues occur

  • VeniceError - For other unexpected streaming errors

    Error Handling:

    • Timeouts are converted to APITimeoutError with original exception context
    • Network errors are converted to APIConnectionError
    • Unexpected errors are logged and wrapped in VeniceError
    • CancelledError is always re-raised for graceful shutdown
    • Stream is marked as consumed and cleaned up on any error

Notes:

This method automatically marks the stream as consumed and closes resources when the stream is exhausted or encounters an error.

Stream.__aiter__

def __aiter__() -> AsyncIterator[ChunkType]

Return the async iterator object for the stream.

This method implements the async iterable protocol, allowing the stream to be used in async for loops. It includes consumption checking to prevent multiple iterations over the same stream.

Returns:

The stream iterator (self) for use in async iteration

Raises:

  • StreamConsumedError - If the stream has already been consumed by a previous iteration

Example:

>>> stream = Stream(iterator, client=client)
>>> async for chunk in stream: # This calls __aiter__()
... process_chunk(chunk)
>>>
>>> # Attempting to iterate again will raise an error
>>> async for chunk in stream: # StreamConsumedError
... pass

Stream.__aenter__

async def __aenter__() -> Self

Enter the async context manager.

Stream.__aexit__

async def __aexit__(*args: object) -> None

Exit the async context manager, closing the stream.

Stream.get_iterator

def get_iterator() -> AsyncIterator[ChunkType]

Return the underlying async iterator and mark this stream as consumed.

The caller takes ownership of the iterator. This stream instance should not be iterated after calling this method.

Stream.close

async def close() -> None

Close the stream and release any underlying resources.

This method ensures proper cleanup of the underlying iterator and any associated network resources. It's designed to be idempotent and safe to call multiple times. The method is automatically called when stream iteration completes or encounters an error.

Resource Cleanup:

  • Calls aclose() on the underlying iterator if supported
  • Silently handles any cleanup errors to prevent resource leaks
  • Ensures network connections are properly closed
  • Frees any associated memory or file handles

Example:

>>> stream = Stream(iterator, client=client)
>>> try:
... async for chunk in stream:
... process_chunk(chunk)
... finally:
... await stream.close() # Optional, done automatically

Notes:

This method never raises exceptions. Any errors during cleanup are silently handled to ensure the application can continue running even if resource cleanup fails.

ChatStream Objects

class ChatStream(Stream["ChatCompletionChunk"])

Enhanced stream for chat completions with convenience accessors.

Wraps a Stream[ChatCompletionChunk] and adds:

  • :meth:text_deltas — yields only text content strings (display-only)
  • :meth:collect — silently consumes the stream and returns a full :class:~venice_ai.types.api.chat.ChatCompletionResponse
  • :meth:collect_with_deltas — yields text deltas live AND populates :attr:final_response once iteration completes; one pass, both signals

Example — display deltas live and use the final aggregated response:

async with await client.chat.completions.stream(
model=model, messages=messages,
) as s:
async for text in s.collect_with_deltas():
print(text, end="", flush=True)
print()
assert s.final_response is not None
print("usage:", s.final_response.usage)

ChatStream.final_response

@property
def final_response() -> ChatCompletionResponse | None

Assembled response after :meth:collect or :meth:collect_with_deltas.

None until one of those two methods has finished consuming the stream. :meth:text_deltas does not populate this — use :meth:collect_with_deltas if you need both live deltas and the final aggregated response.

ChatStream.text_deltas

async def text_deltas() -> AsyncIterator[str]

Yield only text content deltas, filtering empty/None.

Does not populate :attr:final_response. If you want both the deltas and the final aggregated response in one pass, use :meth:collect_with_deltas instead.

ChatStream.collect

async def collect() -> ChatCompletionResponse

Consume the entire stream and return a ChatCompletionResponse.

Accumulates text, reasoning content, and tool calls from all chunks into a single non-streaming response object.

.. note:

Only the first choice (index 0) is tracked. For ``n > 1`` requests,
use the raw ``Stream`` iterator and accumulate per-choice yourself.
A one-time warning is emitted if non-zero choices are observed.

Raises:

  • ValueError: If the stream completes without a finish_reason on choice 0 (typically indicates the stream was interrupted).

ChatStream.collect_with_deltas

async def collect_with_deltas() -> AsyncIterator[str]

Yield each text delta and assemble the final response in one pass.

Solves the "two streams to display deltas live AND get the final aggregated response" pattern by doing both jobs from a single consumption: each choice.delta.content is yielded as it arrives, and once iteration completes :attr:final_response holds the assembled :class:ChatCompletionResponse (with usage, tool_calls, reasoning, finish_reason).

Tool-call and reasoning deltas are accumulated into the final response but not yielded — only text deltas are yielded.

Example:

async with await client.chat.completions.stream(
model=model, messages=messages,
) as s:
async for text in s.collect_with_deltas():
print(text, end="", flush=True)
print()
if s.final_response and s.final_response.usage:
print("tokens:", s.final_response.usage.total_tokens)

.. note:

Only choice 0 is tracked (same constraint as :meth:`collect`).
A one-time warning is emitted if multi-choice (``n > 1``) chunks
are observed.

Raises:

  • ValueError: If the stream completes without a finish_reason on choice 0 (typically indicates the stream was interrupted before the final chunk arrived).

BytesResponse Objects

class BytesResponse()

A wrapper for raw byte responses that includes the original aiohttp.ClientResponse object.

This class provides a way to access the raw byte content of a response while also retaining the original response object for accessing headers and other metadata.

Arguments:

  • content (bytes): The raw byte content of the response
  • response (Any): The original aiohttp.ClientResponse object