Streaming Chat with SSE + “StopResume” + Conversation Memory
A runnable chat service that streams LLM tokens over SSE, supports cancellation and resume, and persists conversation state safely.
1. Overview
This solution implements a Spring Boot chat service that streams LLM tokens to clients over Server-Sent Events (SSE), supports user-driven cancellation, enables resume after disconnects, and persists conversation state in PostgreSQL with audit-safe logging.
In production, “streaming chat” typically fails due to:
- Fragile connection handling: SSE streams break due to mobile networks, proxies, or idle timeouts; many implementations treat disconnects as errors and lose state.
- Unsafe cancellation: cancelling an HTTP stream does not automatically cancel upstream LLM generation; systems continue consuming tokens and cost unless they propagate cancellation correctly.
- No resumability: clients reconnect after a drop but cannot resume from an exact token boundary; duplicate output, inconsistent state, and user-visible glitches are common.
- Inconsistent persistence: partial assistant messages are stored incorrectly (or not at all), producing corrupted conversation history and poor replay behavior.
- Audit and privacy gaps: logs capture raw prompts/responses, creating risk; audit trails are incomplete for stop/resume events.
This implementation is production-ready because it provides:
- Deterministic streaming state: server assigns monotonically increasing sequence numbers to emitted token chunks and persists them.
- Stop/resume contract: a stable resume cursor (
lastEventId/cursor) allows clients to reconnect and continue without duplication. - Cancellation propagation: client cancel triggers server-side cancellation token propagation to the upstream provider call.
- Safe persistence: conversation, message, and streamed chunk tables allow replay and recovery without corrupting message ordering.
- Audit-safe logs: structured event logs record metadata (conversationId, messageId, event types, counters) without storing raw content in logs.
2. Architecture
Request flow and components are as follows:
- Client → Chat Service REST API (
POST /api/conversations,POST /api/conversations/{id}/messages) - Client → Chat Service SSE endpoint (
GET /api/conversations/{id}/stream) - Chat Service → PostgreSQL (conversation state, messages, streamed chunks, audit events)
- Chat Service → LLM Provider (OpenAI-compatible streaming endpoint)
Key components:
- Conversation Service: creates conversations, appends user messages, manages assistant message lifecycle.
- Streaming Orchestrator: initiates upstream streaming request, converts upstream deltas into SSE events, assigns sequence numbers, persists chunks.
- Resume Manager: supports replay of missed SSE events from persisted chunks using a cursor.
- Cancellation Manager: maps client cancel requests to server-side cancellation tokens and upstream request abort.
- LLM Streaming Client: OpenAI-compatible streaming client (HTTP) handling incremental deltas, timeouts, and error mapping.
- Audit Logger: records state transitions (STREAM_STARTED, TOKEN_EMITTED counters, STREAM_STOPPED, STREAM_RESUMED, STREAM_COMPLETED, STREAM_FAILED).
External dependencies:
- PostgreSQL for durable state and replay
- OpenAI-compatible API for streaming generation
- Docker Compose for local runtime
Trust boundaries:
- Client-to-service boundary: authentication and authorization enforced; SSE stream scoped to the authenticated principal.
- Service-to-database boundary: internal access; sensitive content stored only in DB columns intended for it.
- Service-to-LLM provider boundary: provider credentials held only by the service; upstream traffic is not exposed to clients.
3. Key Design Decisions
Technology stack:
- Spring Boot 3.x + Spring MVC: SSE support via
SseEmitter/ reactive-friendly patterns while retaining standard MVC deployment and operational familiarity. - PostgreSQL: durable conversation/message storage and chunk persistence enabling deterministic resume.
- Docker Compose: reproducible local execution and evidence capture.
- OpenAI-compatible streaming: supports incremental token deltas; avoids buffering full responses before emitting.
Data storage model:
- Conversations and messages are stored durably.
- Streamed output is persisted as ordered chunks with monotonically increasing sequence numbers (per assistant message stream). This enables replay after disconnect and prevents duplication.
- Audit events are stored separately from content-bearing tables to keep logs minimal and safe.
Synchrony vs asynchrony:
- The SSE endpoint is long-lived and asynchronous from the client perspective, but the server retains control of the stream lifecycle.
- Upstream streaming is handled in a dedicated execution context to avoid blocking request threads.
- Persistence is performed incrementally (chunk-by-chunk) to ensure resumability even mid-stream.
Error handling and retries:
- Upstream call uses strict timeouts and bounded retries only for connection-establishment errors (not for mid-stream token delivery, where retries would duplicate content).
- Mid-stream failures transition the assistant message state to
FAILEDwith an error code; the stream ends with an SSE terminal event. - Client reconnection triggers resume logic from persisted chunks and then either continues generation (if supported) or restarts generation with an explicit “resume prompt strategy” (configurable).
Idempotency strategy:
- User message creation supports idempotency via an optional
Idempotency-Keyheader, preventing duplicate user messages on client retries. - Streaming resume uses an explicit cursor (
Last-Event-IDor query param) to replay already-generated chunks without re-generating them.
4. Data Model
Core tables and intent:
-
conversations
- Purpose: conversation container and ownership.
- Key columns:
id (uuid),owner_user_id,title,status(ACTIVE,ARCHIVED),created_at,updated_at. - Indexing:
(owner_user_id, updated_at desc)for listing.
-
messages
- Purpose: user and assistant messages with lifecycle.
- Key columns:
id (uuid),conversation_id,role(USER,ASSISTANT),content(text),status(FINAL,STREAMING,CANCELLED,FAILED),created_at. - Indexing:
(conversation_id, created_at asc)for ordered retrieval.
-
message_streams
- Purpose: per-assistant-message stream instance tracking (supports stop/resume and terminal states).
- Key columns:
id (uuid),assistant_message_id,state(OPEN,COMPLETED,CANCELLED,FAILED),next_seq(bigint),started_at,ended_at,last_error_code. - Indexing:
(assistant_message_id)unique, plus(state)optional for ops.
-
message_chunks
- Purpose: durable token/chunk persistence for replay.
- Key columns:
stream_id,seq(bigint),delta(text),created_at. - Indexing: primary key
(stream_id, seq); this is the core replay index.
-
audit_events
- Purpose: audit-safe event trail without raw content.
- Key columns:
id,actor_user_id,conversation_id,message_id,event_type,summary,details_json,created_at. - Indexing:
(conversation_id, created_at desc),(event_type, created_at desc).
Design intent:
- Replay is performed by selecting
message_chunkswithseq > cursorordered byseq. - The current assistant message’s
statusis finalized only when the stream completes; cancellation sets status toCANCELLEDand persists the partial content deterministically.
5. API Surface
Public (application):
-
POST /api/conversations — Create a conversation (ROLE_USER)
-
GET /api/conversations — List user conversations (ROLE_USER)
-
GET /api/conversations/{id} — Fetch conversation with messages (ROLE_USER, owner-only)
-
POST /api/conversations/{id}/messages — Append a user message and create an assistant message in
STREAMINGstate (ROLE_USER, owner-only) -
GET /api/conversations/{id}/stream?messageId={assistantMessageId} — SSE stream of assistant tokens; supports resume cursor (ROLE_USER, owner-only)
- Resume:
Last-Event-IDheader orcursorquery param representing last deliveredseq
- Resume:
-
POST /api/conversations/{id}/cancel?messageId={assistantMessageId} — Cancel active stream (ROLE_USER, owner-only)
Operational:
- GET /health — Liveness/readiness (no auth or internal-only depending on deployment)
Admin/internal (optional for marketplace alignment):
- GET /admin/audit-events?conversationId=... — Audit event search (ROLE_ADMIN)
6. Security Model
Authentication:
- Spring Security with your platform’s standard authentication (session or bearer), requiring
ROLE_USERfor all chat APIs. - SSE endpoint uses the same authentication context as standard MVC endpoints.
Authorization (roles):
ROLE_USERrequired for chat APIs.- Owner enforcement on conversation resources:
conversation.owner_user_id == authenticated user. ROLE_ADMINoptional for audit viewing endpoints.
Paid access enforcement (if applicable):
-
If chat is a paid feature, enforce
ROLE_PAIDon write/stream endpoints:- allow read-only conversation listing for non-paid users if desired
- enforce paid gating on
POST /messages,/stream, and/cancel
CSRF considerations:
- For cookie-based sessions, keep CSRF enabled for state-changing endpoints (
POST /messages,POST /cancel). - SSE
GET /streamis not state-changing, but it should still require authentication and be protected by same-origin policy as appropriate.
Data isolation guarantees:
- Conversation ownership is enforced at the query layer and controller/service layer.
- Stream replay only returns chunks for conversations owned by the caller.
- Audit logging avoids emitting raw message content to logs; DB storage remains the authoritative record.
7. Operational Behavior
Startup behavior:
- Flyway applies migrations on boot.
- Health endpoint reports readiness when PostgreSQL connectivity is available.
Failure modes:
- Client disconnect: stream ends server-side; state remains
OPENand resumable if upstream generation continues or if sufficient chunks are persisted. - Upstream provider failure mid-stream: assistant message transitions to
FAILED; SSE emits a terminal error event withrequestId. - Cancellation: assistant message transitions to
CANCELLED; SSE emits a terminal cancelled event; upstream request is aborted.
Retry and timeout behavior:
-
Upstream connection establishment uses strict connect/read timeouts.
-
No mid-stream automatic retry to avoid duplicate output. Resume is the supported recovery mechanism.
-
Resume replays persisted chunks first, then either:
- continues with an upstream “continue” call if supported by provider semantics, or
- restarts generation using conversation history with a server-side “resume prompt strategy” (explicitly documented and logged).
Observability hooks:
- Structured logs:
conversationId,assistantMessageId,streamId,seq,eventType,requestId. - Metrics: active streams, disconnect count, resume count, cancellation count, average time-to-first-token, upstream latency.
- Traces (optional): spans for
stream.start,chunk.persist,sse.emit,stream.cancel,stream.resume,llm.upstream.stream.
8. Local Execution
Prerequisites:
- Docker Desktop / Docker Engine with Docker Compose v2
- An OpenAI-compatible provider base URL and API key
Environment variables:
LLM_BASE_URL— OpenAI-compatible base URLLLM_API_KEY— provider keySPRING_DATASOURCE_URL(if overriding compose defaults)SPRING_PROFILES_ACTIVE=local
Run:
docker compose up -d --build
Verify health:
curl -s http://localhost:8080/health
Create a conversation and send a message (example assumes your platform auth; replace with your auth headers):
curl -X POST http://localhost:8080/api/conversations \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <token>' \
-d '{"title":"demo"}'
Append user message (returns assistantMessageId):
curl -X POST http://localhost:8080/api/conversations/<conversationId>/messages \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <token>' \
-d '{"content":"Hello"}'
Start SSE stream:
curl -N http://localhost:8080/api/conversations/<conversationId>/stream?messageId=<assistantMessageId> \
-H 'Authorization: Bearer <token>'
Cancel stream:
curl -X POST "http://localhost:8080/api/conversations/<conversationId>/cancel?messageId=<assistantMessageId>" \
-H 'Authorization: Bearer <token>'
Resume stream from a cursor (example uses Last-Event-ID):
curl -N http://localhost:8080/api/conversations/<conversationId>/stream?messageId=<assistantMessageId> \
-H 'Authorization: Bearer <token>' \
-H 'Last-Event-ID: 42'
9. Evidence Pack
-
[ ] Service startup logs showing Flyway migration and app readiness
-
[ ] Health check response (200 OK)
-
[ ] Conversation creation request/response logs
-
[ ] Message append request/response logs including returned
assistantMessageId -
[ ] SSE streaming capture showing incremental events with monotonically increasing
seq -
[ ] Cancellation demonstration:
- client cancels mid-stream
- server emits terminal cancelled event
- database shows assistant message status
CANCELLED
-
[ ] Resume demonstration:
- interrupt SSE connection
- reconnect with cursor / Last-Event-ID
- server replays missing chunks without duplication
-
[ ] Database records after execution:
messagesrows for user + assistantmessage_chunksrows with sequentialseqmessage_streamsstate transitions (OPEN → COMPLETED / CANCELLED / FAILED)
-
[ ] Failure mode demonstration:
- simulate upstream provider failure (invalid key or network block)
- assistant message transitions to
FAILED - SSE terminal error event emitted
-
[ ] Audit-safe logging demonstration:
- logs show IDs and event types without raw prompt/response content
10. Known Limitations
- Does not implement WebSocket transport; SSE only.
- Provider “true resume” of generation depends on upstream semantics; the fallback strategy is to replay persisted chunks and optionally restart generation using conversation history.
- Does not include advanced PII redaction; it focuses on avoiding raw content in logs while storing content in DB for application needs.
- Does not implement multi-region persistence or leader election for stream coordination; deployment model is single-node Docker Compose.
11. Extension Points
-
Replace MVC SSE with reactive streaming (WebFlux) if you need higher concurrency; the persistence and cursor model remains valid.
-
Add a dedicated streaming coordination layer for multi-instance deployments:
- store active stream ownership in DB/Redis
- ensure only one instance streams a given
assistantMessageIdat a time
-
Add message encryption-at-rest for sensitive environments and configurable retention policies for message/chunk tables.
-
Introduce token accounting and cost controls (reuse the gateway enforcement components) if streaming cost governance is required.
-
Add client acknowledgment checkpoints and periodic snapshotting to reduce chunk table size for long streams.
-
Integrate OpenTelemetry collector and provide trace evidence for stream lifecycle and upstream latency.
1.1.0
- Solution write-up + runnable implementation
- Evidence images (when published)
- Code bundle downloads (when enabled)