Streaming Chat with SSE + “StopResume” + Conversation Memory

A runnable chat service that streams LLM tokens over SSE, supports cancellation and resume, and persists conversation state safely.

Verified v1.1.0 Redhat 8/9 / Ubuntu / macOS / Windows (Docker) Java 17 · Spring Boot 3.x · Spring MVC · PostgreSQL · Docker Compose
Register account for free
Unlock full implementation + downloads
Account access required
This solution includes runnable code bundles and full implementation details intended for production use.

1. Overview

This solution implements a Spring Boot chat service that streams LLM tokens to clients over Server-Sent Events (SSE), supports user-driven cancellation, enables resume after disconnects, and persists conversation state in PostgreSQL with audit-safe logging.

In production, “streaming chat” typically fails due to:

  • Fragile connection handling: SSE streams break due to mobile networks, proxies, or idle timeouts; many implementations treat disconnects as errors and lose state.
  • Unsafe cancellation: cancelling an HTTP stream does not automatically cancel upstream LLM generation; systems continue consuming tokens and cost unless they propagate cancellation correctly.
  • No resumability: clients reconnect after a drop but cannot resume from an exact token boundary; duplicate output, inconsistent state, and user-visible glitches are common.
  • Inconsistent persistence: partial assistant messages are stored incorrectly (or not at all), producing corrupted conversation history and poor replay behavior.
  • Audit and privacy gaps: logs capture raw prompts/responses, creating risk; audit trails are incomplete for stop/resume events.

This implementation is production-ready because it provides:

  • Deterministic streaming state: server assigns monotonically increasing sequence numbers to emitted token chunks and persists them.
  • Stop/resume contract: a stable resume cursor (lastEventId / cursor) allows clients to reconnect and continue without duplication.
  • Cancellation propagation: client cancel triggers server-side cancellation token propagation to the upstream provider call.
  • Safe persistence: conversation, message, and streamed chunk tables allow replay and recovery without corrupting message ordering.
  • Audit-safe logs: structured event logs record metadata (conversationId, messageId, event types, counters) without storing raw content in logs.

2. Architecture

Request flow and components are as follows:

  • Client → Chat Service REST API (POST /api/conversations, POST /api/conversations/{id}/messages)
  • Client → Chat Service SSE endpoint (GET /api/conversations/{id}/stream)
  • Chat Service → PostgreSQL (conversation state, messages, streamed chunks, audit events)
  • Chat Service → LLM Provider (OpenAI-compatible streaming endpoint)

Key components:

  • Conversation Service: creates conversations, appends user messages, manages assistant message lifecycle.
  • Streaming Orchestrator: initiates upstream streaming request, converts upstream deltas into SSE events, assigns sequence numbers, persists chunks.
  • Resume Manager: supports replay of missed SSE events from persisted chunks using a cursor.
  • Cancellation Manager: maps client cancel requests to server-side cancellation tokens and upstream request abort.
  • LLM Streaming Client: OpenAI-compatible streaming client (HTTP) handling incremental deltas, timeouts, and error mapping.
  • Audit Logger: records state transitions (STREAM_STARTED, TOKEN_EMITTED counters, STREAM_STOPPED, STREAM_RESUMED, STREAM_COMPLETED, STREAM_FAILED).

External dependencies:

  • PostgreSQL for durable state and replay
  • OpenAI-compatible API for streaming generation
  • Docker Compose for local runtime

Trust boundaries:

  • Client-to-service boundary: authentication and authorization enforced; SSE stream scoped to the authenticated principal.
  • Service-to-database boundary: internal access; sensitive content stored only in DB columns intended for it.
  • Service-to-LLM provider boundary: provider credentials held only by the service; upstream traffic is not exposed to clients.

3. Key Design Decisions

Technology stack:

  • Spring Boot 3.x + Spring MVC: SSE support via SseEmitter / reactive-friendly patterns while retaining standard MVC deployment and operational familiarity.
  • PostgreSQL: durable conversation/message storage and chunk persistence enabling deterministic resume.
  • Docker Compose: reproducible local execution and evidence capture.
  • OpenAI-compatible streaming: supports incremental token deltas; avoids buffering full responses before emitting.

Data storage model:

  • Conversations and messages are stored durably.
  • Streamed output is persisted as ordered chunks with monotonically increasing sequence numbers (per assistant message stream). This enables replay after disconnect and prevents duplication.
  • Audit events are stored separately from content-bearing tables to keep logs minimal and safe.

Synchrony vs asynchrony:

  • The SSE endpoint is long-lived and asynchronous from the client perspective, but the server retains control of the stream lifecycle.
  • Upstream streaming is handled in a dedicated execution context to avoid blocking request threads.
  • Persistence is performed incrementally (chunk-by-chunk) to ensure resumability even mid-stream.

Error handling and retries:

  • Upstream call uses strict timeouts and bounded retries only for connection-establishment errors (not for mid-stream token delivery, where retries would duplicate content).
  • Mid-stream failures transition the assistant message state to FAILED with an error code; the stream ends with an SSE terminal event.
  • Client reconnection triggers resume logic from persisted chunks and then either continues generation (if supported) or restarts generation with an explicit “resume prompt strategy” (configurable).

Idempotency strategy:

  • User message creation supports idempotency via an optional Idempotency-Key header, preventing duplicate user messages on client retries.
  • Streaming resume uses an explicit cursor (Last-Event-ID or query param) to replay already-generated chunks without re-generating them.

4. Data Model

Core tables and intent:

  • conversations

    • Purpose: conversation container and ownership.
    • Key columns: id (uuid), owner_user_id, title, status (ACTIVE, ARCHIVED), created_at, updated_at.
    • Indexing: (owner_user_id, updated_at desc) for listing.
  • messages

    • Purpose: user and assistant messages with lifecycle.
    • Key columns: id (uuid), conversation_id, role (USER, ASSISTANT), content (text), status (FINAL, STREAMING, CANCELLED, FAILED), created_at.
    • Indexing: (conversation_id, created_at asc) for ordered retrieval.
  • message_streams

    • Purpose: per-assistant-message stream instance tracking (supports stop/resume and terminal states).
    • Key columns: id (uuid), assistant_message_id, state (OPEN, COMPLETED, CANCELLED, FAILED), next_seq (bigint), started_at, ended_at, last_error_code.
    • Indexing: (assistant_message_id) unique, plus (state) optional for ops.
  • message_chunks

    • Purpose: durable token/chunk persistence for replay.
    • Key columns: stream_id, seq (bigint), delta (text), created_at.
    • Indexing: primary key (stream_id, seq); this is the core replay index.
  • audit_events

    • Purpose: audit-safe event trail without raw content.
    • Key columns: id, actor_user_id, conversation_id, message_id, event_type, summary, details_json, created_at.
    • Indexing: (conversation_id, created_at desc), (event_type, created_at desc).

Design intent:

  • Replay is performed by selecting message_chunks with seq > cursor ordered by seq.
  • The current assistant message’s status is finalized only when the stream completes; cancellation sets status to CANCELLED and persists the partial content deterministically.

5. API Surface

Public (application):

  • POST /api/conversations — Create a conversation (ROLE_USER)

  • GET /api/conversations — List user conversations (ROLE_USER)

  • GET /api/conversations/{id} — Fetch conversation with messages (ROLE_USER, owner-only)

  • POST /api/conversations/{id}/messages — Append a user message and create an assistant message in STREAMING state (ROLE_USER, owner-only)

  • GET /api/conversations/{id}/stream?messageId={assistantMessageId} — SSE stream of assistant tokens; supports resume cursor (ROLE_USER, owner-only)

    • Resume: Last-Event-ID header or cursor query param representing last delivered seq
  • POST /api/conversations/{id}/cancel?messageId={assistantMessageId} — Cancel active stream (ROLE_USER, owner-only)

Operational:

  • GET /health — Liveness/readiness (no auth or internal-only depending on deployment)

Admin/internal (optional for marketplace alignment):

  • GET /admin/audit-events?conversationId=... — Audit event search (ROLE_ADMIN)

6. Security Model

Authentication:

  • Spring Security with your platform’s standard authentication (session or bearer), requiring ROLE_USER for all chat APIs.
  • SSE endpoint uses the same authentication context as standard MVC endpoints.

Authorization (roles):

  • ROLE_USER required for chat APIs.
  • Owner enforcement on conversation resources: conversation.owner_user_id == authenticated user.
  • ROLE_ADMIN optional for audit viewing endpoints.

Paid access enforcement (if applicable):

  • If chat is a paid feature, enforce ROLE_PAID on write/stream endpoints:

    • allow read-only conversation listing for non-paid users if desired
    • enforce paid gating on POST /messages, /stream, and /cancel

CSRF considerations:

  • For cookie-based sessions, keep CSRF enabled for state-changing endpoints (POST /messages, POST /cancel).
  • SSE GET /stream is not state-changing, but it should still require authentication and be protected by same-origin policy as appropriate.

Data isolation guarantees:

  • Conversation ownership is enforced at the query layer and controller/service layer.
  • Stream replay only returns chunks for conversations owned by the caller.
  • Audit logging avoids emitting raw message content to logs; DB storage remains the authoritative record.

7. Operational Behavior

Startup behavior:

  • Flyway applies migrations on boot.
  • Health endpoint reports readiness when PostgreSQL connectivity is available.

Failure modes:

  • Client disconnect: stream ends server-side; state remains OPEN and resumable if upstream generation continues or if sufficient chunks are persisted.
  • Upstream provider failure mid-stream: assistant message transitions to FAILED; SSE emits a terminal error event with requestId.
  • Cancellation: assistant message transitions to CANCELLED; SSE emits a terminal cancelled event; upstream request is aborted.

Retry and timeout behavior:

  • Upstream connection establishment uses strict connect/read timeouts.

  • No mid-stream automatic retry to avoid duplicate output. Resume is the supported recovery mechanism.

  • Resume replays persisted chunks first, then either:

    • continues with an upstream “continue” call if supported by provider semantics, or
    • restarts generation using conversation history with a server-side “resume prompt strategy” (explicitly documented and logged).

Observability hooks:

  • Structured logs: conversationId, assistantMessageId, streamId, seq, eventType, requestId.
  • Metrics: active streams, disconnect count, resume count, cancellation count, average time-to-first-token, upstream latency.
  • Traces (optional): spans for stream.start, chunk.persist, sse.emit, stream.cancel, stream.resume, llm.upstream.stream.

8. Local Execution

Prerequisites:

  • Docker Desktop / Docker Engine with Docker Compose v2
  • An OpenAI-compatible provider base URL and API key

Environment variables:

  • LLM_BASE_URL — OpenAI-compatible base URL
  • LLM_API_KEY — provider key
  • SPRING_DATASOURCE_URL (if overriding compose defaults)
  • SPRING_PROFILES_ACTIVE=local

Run:

docker compose up -d --build

Verify health:

curl -s http://localhost:8080/health

Create a conversation and send a message (example assumes your platform auth; replace with your auth headers):

curl -X POST http://localhost:8080/api/conversations \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer <token>' \
  -d '{"title":"demo"}'

Append user message (returns assistantMessageId):

curl -X POST http://localhost:8080/api/conversations/<conversationId>/messages \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer <token>' \
  -d '{"content":"Hello"}'

Start SSE stream:

curl -N http://localhost:8080/api/conversations/<conversationId>/stream?messageId=<assistantMessageId> \
  -H 'Authorization: Bearer <token>'

Cancel stream:

curl -X POST "http://localhost:8080/api/conversations/<conversationId>/cancel?messageId=<assistantMessageId>" \
  -H 'Authorization: Bearer <token>'

Resume stream from a cursor (example uses Last-Event-ID):

curl -N http://localhost:8080/api/conversations/<conversationId>/stream?messageId=<assistantMessageId> \
  -H 'Authorization: Bearer <token>' \
  -H 'Last-Event-ID: 42'

9. Evidence Pack

  • [ ] Service startup logs showing Flyway migration and app readiness

  • [ ] Health check response (200 OK)

  • [ ] Conversation creation request/response logs

  • [ ] Message append request/response logs including returned assistantMessageId

  • [ ] SSE streaming capture showing incremental events with monotonically increasing seq

  • [ ] Cancellation demonstration:

    • client cancels mid-stream
    • server emits terminal cancelled event
    • database shows assistant message status CANCELLED
  • [ ] Resume demonstration:

    • interrupt SSE connection
    • reconnect with cursor / Last-Event-ID
    • server replays missing chunks without duplication
  • [ ] Database records after execution:

    • messages rows for user + assistant
    • message_chunks rows with sequential seq
    • message_streams state transitions (OPEN → COMPLETED / CANCELLED / FAILED)
  • [ ] Failure mode demonstration:

    • simulate upstream provider failure (invalid key or network block)
    • assistant message transitions to FAILED
    • SSE terminal error event emitted
  • [ ] Audit-safe logging demonstration:

    • logs show IDs and event types without raw prompt/response content

10. Known Limitations

  • Does not implement WebSocket transport; SSE only.
  • Provider “true resume” of generation depends on upstream semantics; the fallback strategy is to replay persisted chunks and optionally restart generation using conversation history.
  • Does not include advanced PII redaction; it focuses on avoiding raw content in logs while storing content in DB for application needs.
  • Does not implement multi-region persistence or leader election for stream coordination; deployment model is single-node Docker Compose.

11. Extension Points

  • Replace MVC SSE with reactive streaming (WebFlux) if you need higher concurrency; the persistence and cursor model remains valid.

  • Add a dedicated streaming coordination layer for multi-instance deployments:

    • store active stream ownership in DB/Redis
    • ensure only one instance streams a given assistantMessageId at a time
  • Add message encryption-at-rest for sensitive environments and configurable retention policies for message/chunk tables.

  • Introduce token accounting and cost controls (reuse the gateway enforcement components) if streaming cost governance is required.

  • Add client acknowledgment checkpoints and periodic snapshotting to reduce chunk table size for long streams.

  • Integrate OpenTelemetry collector and provide trace evidence for stream lifecycle and upstream latency.

Changelog
Release notes

1.1.0

Locked
Register account to unlock implementation details and assets.
Account


  • Solution write-up + runnable implementation
  • Evidence images (when published)
  • Code bundle downloads (when enabled)
Evidence
8 item(s)
code-structure-1.png
build-2.png
01_health.png
02_create_conversation.png
03_append_message.png
04_stream.png
05_cancel.png
06_resume.png
Code downloads
2 file(s)
streaming-chat-sse-v1.1.zip
ZIP bundle
Locked
streaming-chat-sse.zip
ZIP bundle
Locked