Skip to content

Streaming

For long or open-ended responses, return one of three streaming wrappers instead of a buffered body. Each is typed end-to-end — the generic T carries the per-item schema (which the OpenAPI work will read), and an optional H carries typed response headers just like the buffered responses.

Wrapper Content type Yields
StreamingResponse application/octet-stream bytes chunks
NDJSONStreamingResponse[T] application/x-ndjson one T Struct per line
SSEResponse[T] text/event-stream Server-Sent Events (GET-only)

Two of these run for real in the demo_app/ package: an NDJSON /questions endpoint that proxies the OpenAI streaming API (one answer chunk per line), and an SSE /notifications feed. Both live in demo_app/operations/streaming_operations.py.

NDJSON

Stream one JSON Struct per line — ideal for large result sets a client consumes incrementally:

from collections.abc import AsyncIterator

from msgspec import Struct

from jero import BaseApp, Endpoint, NDJSONStreamingResponse


class Movie(Struct):
    title: str


class MoviesEndpoint(Endpoint, path="/movies"):
    async def _movies(self) -> AsyncIterator[Movie]:
        for title in ("first", "second"):     # stream rows from a DB cursor, etc.
            yield Movie(title=title)

    async def get(self) -> NDJSONStreamingResponse[Movie]:
        return NDJSONStreamingResponse(stream=self._movies())


class App(BaseApp):
    async def _wire(self) -> None:
        self._include_endpoint(MoviesEndpoint())


app = App()

Server-Sent Events

SSEResponse is GET-only. Yield a Struct/str (sent as data) or a ServerSentEvent to control the event / id / retry fields:

from collections.abc import AsyncIterator

from msgspec import Struct

from jero import BaseApp, Endpoint, SSEResponse, ServerSentEvent


class Movie(Struct):
    title: str


class EventsEndpoint(Endpoint, path="/events"):
    async def _events(self) -> AsyncIterator[Movie | ServerSentEvent[Movie]]:
        yield Movie(title="first")                                    # data: {...}
        yield ServerSentEvent(data=Movie(title="second"), event="added", id="2")

    async def get(self) -> SSEResponse[Movie]:
        return SSEResponse(stream=self._events())


class App(BaseApp):
    async def _wire(self) -> None:
        self._include_endpoint(EventsEndpoint())


app = App()

Keepalive

Set keepalive to emit a comment ping every N idle seconds, so proxies don't drop an idle connection:

SSEResponse(stream=self._events(), keepalive=15.0)   # ": ping" every 15s idle

Raw bytes

For anything else — CSV, a proxied download — stream bytes:

from collections.abc import AsyncIterator

from jero import BaseApp, Endpoint, StreamingResponse


class CSVEndpoint(Endpoint, path="/export"):
    async def _chunks(self) -> AsyncIterator[bytes]:
        yield b"id,name\n"
        yield b"1,gizmo\n"

    async def get(self) -> StreamingResponse:
        return StreamingResponse(stream=self._chunks(), raw_headers={"content-type": "text/csv"})


class App(BaseApp):
    async def _wire(self) -> None:
        self._include_endpoint(CSVEndpoint())


app = App()

Setup & teardown (lifecycle)

A plain async iterable is enough when there's nothing to clean up. When the stream holds a resource — a DB cursor, an upstream connection — give it a one-yield lifecycle generator: yield the stream once, then do teardown after. The framework guarantees the teardown runs, even on client disconnect or a mid-stream error:

from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator

from msgspec import Struct

from jero import BaseApp, Endpoint, NDJSONStreamingResponse


class Movie(Struct):
    title: str


class Cursor:
    """A stand-in for a real resource (a DB cursor, an upstream connection): it must be
    opened before use and closed afterwards. Implementing the async context manager
    protocol — `__aenter__` / `__aexit__` — is what lets `async with` drive it."""

    async def __aenter__(self) -> Cursor:
        print("cursor opened")   # illustrative side effect: acquire the resource here
        return self

    async def __aexit__(self, *exc: object) -> None:
        print("cursor closed")   # proof the teardown really runs: release it here

    async def rows_iterator(self) -> AsyncIterator[Movie]:
        yield Movie(title="first")
        yield Movie(title="second")


class ExportEndpoint(Endpoint, path="/movies/export"):
    async def _lifecycle(self) -> AsyncGenerator[AsyncIterable[Movie]]:
        async with Cursor() as cursor:  # "cursor opened" — before streaming starts
            yield cursor.rows_iterator()
        # control returns here once the stream is drained (or abandoned), so the
        # `async with` exits and prints "cursor closed" — even on disconnect or error

    async def get(self) -> NDJSONStreamingResponse[Movie]:
        return NDJSONStreamingResponse(stream=self._lifecycle())


class App(BaseApp):
    async def _wire(self) -> None:
        self._include_endpoint(ExportEndpoint())


app = App()

Hit GET /movies/export and the worker logs cursor opened before the rows stream and cursor closed once it's done — the teardown half of the one-yield generator running for real.

This is the one blessed way to scope a resource to a stream: a simple stream if you don't need lifecycle, a one-yield generator if you do.

Disconnect handling

jero watches the client connection while it streams. If the client disconnects, it stops pulling from your iterator and runs the lifecycle teardown — you don't write any of that bookkeeping. Errors raised inside the stream are swallowed after teardown so a broken stream can't crash the worker.

Status & headers

Streaming wrappers carry status_code, typed headers, and raw_headers, exactly like the buffered responses. HEAD requests return the headers with no body and never iterate the stream.