diff --git a/tools/server/CMakeLists.txt b/tools/server/CMakeLists.txt index 47bb582c30..b5c40884fd 100644 --- a/tools/server/CMakeLists.txt +++ b/tools/server/CMakeLists.txt @@ -15,6 +15,8 @@ add_library(${TARGET} STATIC server-common.h server-context.cpp server-context.h + server-stream.cpp + server-stream.h server-tools.cpp server-tools.h server-schema.cpp diff --git a/tools/server/README-dev.md b/tools/server/README-dev.md index 5959745e47..dfc9004de5 100644 --- a/tools/server/README-dev.md +++ b/tools/server/README-dev.md @@ -57,6 +57,7 @@ The core architecture consists of the following components: - `server_tokens`: Unified representation of token sequences (supports both text and multimodal tokens); used by `server_task` and `server_slot`. - `server_prompt_checkpoint`: For recurrent (e.g., RWKV) and SWA models, stores snapshots of KV cache state. Enables reuse when subsequent requests share the same prompt prefix, saving redundant computation. - `server_models`: Standalone component for managing multiple backend instances (used in router mode). It is completely independent of `server_context`. +- `stream_session_manager`: Process wide owner of resumable SSE stream sessions (`g_stream_sessions`), keyed by conversation id. Backs the replay buffer that lets a client reattach to a generation after an HTTP disconnect. See the "Resumable streaming" section below. ```mermaid graph TD @@ -117,6 +118,58 @@ Here is an example trace of an API request for text completion: - As the response is stateless, `server_res_generator` calls `response->update()` to update the response with the current state. - `server_res_generator` then calls `response->to_json()` and passes the response to the HTTP layer. +### Resumable streaming (SSE replay buffer) + +By default a streaming generation is bound to its HTTP socket: when the socket drops (refresh, tab close, mobile background, transient network) the generation aborts and the live stream is lost. This feature keeps the generation running server side and lets a client reattach. + +It is opt in via the `X-Conversation-Id` header on `POST /v1/chat/completions`. Without the header the OAI strict path is unchanged. The conversation id is the only identity end to end (server map key, client localStorage key, route path), with an optional `::model` suffix for direct routing in router mode. + +The feature lives entirely in `server-stream.{h,cpp}` and rests on three types: + +- `stream_session`: a bounded ring buffer (4 MiB cap, oldest bytes drop first) plus a condvar. `append` pushes raw SSE bytes, `read_from` drains from any offset and blocks for live bytes or finalize, `finalize` wakes readers, `cancel` stops the producer. One conv maps to at most one live session. +- `stream_session_manager` (`g_stream_sessions`): owns all sessions keyed by conv id, enforces the one conv one session invariant via `create_or_replace`, and runs a GC thread that drops completed sessions past their TTL. +- `stream_pipe_producer` / `stream_pipe_consumer`: the write and read ends. The producer owns the session lifetime and finalizes it on destruction; the consumer is read only and never finalizes, so a reader detaching cannot kill a running generation. + +Producer side: `server_res_generator` attaches a producer pipe when the header is present. The HTTP content provider mirrors every chunk into the ring before writing it to the socket. While a pipe is attached, `stream_aware_should_stop` ignores peer disconnect, so a dropped socket does not stop generation: only an explicit `DELETE` does. When the peer leaves early, `on_complete` calls `close()`, which drains the rest of the generation into the ring on the http worker. + +Lifetime safety: the producer pipe holds a shared `alive` flag also captured by the session cancel hook. `~server_res_generator` calls `cleanup()` to clear that hook while the reader is still alive, so a `cancel` arriving during teardown can never call `stop()` on a freed response. This ordering is the most fragile part of the feature: finalizing or destroying the producer before `cleanup()` runs reintroduces a use after free. + +Consumer side: `GET /v1/stream/?from=N` opens a `text/event-stream` that replays buffered bytes from offset `N` and blocks for live bytes, so the browser reattaches like a fresh EventSource. An offset below the dropped prefix returns 400. + +Routes: + +- `GET /v1/stream/:conv_id?from=N`: replay or live reattach. +- `POST /v1/streams/lookup` with `{"conversation_ids": [...]}`: returns session status only for ids the caller already owns. There is no listing route, so live sessions cannot be enumerated (an earlier `GET /v1/streams` was removed for exactly this reason). +- `DELETE /v1/stream/:conv_id`: explicit Stop, idempotent (`evict_and_cancel`). + +Router mode binds the same paths to proxy handlers. A `conv_id -> child` map (`conv_models`), populated when a POST is routed, resolves the owning child in one lookup with no polling. The lookup groups ids per child; GET and DELETE proxy straight to the owner. This loopback REST hop is expected to move to a websocket IPC later, swapping only the transport. + +Lifecycle: `g_stream_sessions.start_gc()` runs in main after common init, `stop_gc()` runs first in `clean_up()` and finalizes every live session so no reader hangs. Reader blocking and the post drop drain both run on httplib worker threads, which block on a condvar rather than spin. + +| Constant | Value | Role | +| --- | --- | --- | +| `STREAM_SESSION_TTL_SECONDS` | 300 | retention of a completed session before GC | +| `STREAM_SESSION_MAX_BYTES` | 4 MiB | ring cap per session | +| `STREAM_SESSION_GC_INTERVAL_SECONDS` | 60 | GC tick | +| `STREAM_READ_WAKE_INTERVAL_MS` | 200 | read_from wake to recheck should_stop | +| `STREAM_LOOKUP_TIMEOUT_MS` | 250 | router to child loopback budget | + +```mermaid +graph TD + Client -- "POST + X-Conversation-Id" --> RG[server_res_generator] + RG -- attach --> Prod[stream_pipe_producer] + Prod -- "write, drain on peer drop" --> Sess + subgraph g_stream_sessions + Sess[stream_session: ring buffer, 4 MiB] + GC[GC thread] -- drop after TTL --> Sess + end + Sess -- read_from offset --> Cons[stream_pipe_consumer] + Cons -- "GET /v1/stream/:id?from=N" --> Client + DEL[DELETE /v1/stream/:id] -- evict_and_cancel --> Sess +``` + +The diagram shows the buffer touch points. The live wire (chunks streamed to the original client during a normal generation) is the producer's default output, described under "Producer side" above. + ### Testing `llama-server` includes an automated test suite based on `pytest`. @@ -223,6 +276,7 @@ The flow for downloading a new model: - Speculative decoding: https://github.com/ggml-org/llama.cpp/pull/17808 and rework in https://github.com/ggml-org/llama.cpp/pull/17808 - INI presets: https://github.com/ggml-org/llama.cpp/pull/17859 (+ refactoring: https://github.com/ggml-org/llama.cpp/pull/18169) - Sleeping mode: https://github.com/ggml-org/llama.cpp/pull/18228 +- Resumable streaming (SSE replay buffer): https://github.com/ggml-org/llama.cpp/pull/23226 diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 39b7eb218e..5c33a418f5 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -5,6 +5,7 @@ #include "server-task.h" #include "server-queue.h" #include "server-schema.h" +#include "server-stream.h" #include "build-info.h" #include "common.h" @@ -4022,6 +4023,15 @@ struct server_res_generator : server_http_res { queue_tasks.wait_until_no_sleep(); } } + ~server_res_generator() override { + // cleanup() must run while rd is still alive (rd is destroyed after this body returns) + if (spipe) { + spipe->cleanup(); + } + } + void stop() override { + rd.stop(); + } void ok(const json & response_data) { status = 200; data = safe_json_to_str(response_data); @@ -4210,8 +4220,10 @@ std::unique_ptr server_routes::handle_completions_impl( } }; + auto effective_should_stop = stream_aware_should_stop(res_this, req.should_stop); + try { - if (req.should_stop()) { + if (effective_should_stop()) { SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); return false; // should_stop condition met } @@ -4245,8 +4257,8 @@ std::unique_ptr server_routes::handle_completions_impl( // receive subsequent results bool timeout = false; int64_t start_time = ggml_time_ms(); - auto result = rd.next([&timeout, &req, &start_time, ¶ms]() { - if (req.should_stop()) { + auto result = rd.next([&timeout, &start_time, ¶ms, &effective_should_stop]() { + if (effective_should_stop()) { return true; // should_stop condition met } else if (params.sse_ping_interval > 0 && ggml_time_ms() - start_time > (int64_t)params.sse_ping_interval * 1000) { timeout = true; @@ -4264,7 +4276,7 @@ std::unique_ptr server_routes::handle_completions_impl( if (result == nullptr) { SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); - GGML_ASSERT(req.should_stop()); + GGML_ASSERT(effective_should_stop()); return false; // should_stop condition met } @@ -4302,6 +4314,10 @@ std::unique_ptr server_routes::handle_completions_impl( }; } + // attach a producer pipe to the response when X-Conversation-Id is present. + // the pipe mirrors SSE chunks into the ring buffer and wires up the cancel hook. + stream_session_attach_pipe(*res, req.headers); + return res; } diff --git a/tools/server/server-http.cpp b/tools/server/server-http.cpp index 4f2abab00c..82f34edac0 100644 --- a/tools/server/server-http.cpp +++ b/tools/server/server-http.cpp @@ -1,5 +1,6 @@ #include "common.h" #include "server-http.h" +#include "server-stream.h" #include "server-common.h" #include "ui.h" @@ -456,13 +457,40 @@ static void set_headers(httplib::Response & res, const std::map int { + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'a' && c <= 'f') return c - 'a' + 10; + if (c >= 'A' && c <= 'F') return c - 'A' + 10; + return -1; + }; + int hi = hex(in[i + 1]); + int lo = hex(in[i + 2]); + if (hi >= 0 && lo >= 0) { + out.push_back(char((hi << 4) | lo)); + i += 2; + continue; + } + } + out.push_back(in[i]); + } + return out; +} + static std::map get_params(const httplib::Request & req) { std::map params; for (const auto & [key, value] : req.params) { params[key] = value; } for (const auto & [key, value] : req.path_params) { - params[key] = value; + params[key] = decode_path_component(value); } return params; } @@ -497,26 +525,41 @@ static void process_handler_response(server_http_req_ptr && request, server_http set_headers(res, response->headers); const std::string content_type = response->content_type; // convert to shared_ptr as both chunked_content_provider() and on_complete() need to use it - std::shared_ptr q_ptr = std::move(request); - std::shared_ptr r_ptr = std::move(response); - const auto chunked_content_provider = [response = r_ptr](size_t, const httplib::DataSink & sink) -> bool { + std::shared_ptr q_ptr = std::move(request); + std::shared_ptr r_ptr = std::move(response); + + const auto chunked_content_provider = [response = r_ptr](size_t, httplib::DataSink & sink) -> bool { std::string chunk; const bool has_next = response->next(chunk); if (!chunk.empty()) { + // mirror into the ring buffer first, the session must reflect every SSE chunk + // whether or not the wire write below succeeds + if (response->spipe) { + response->spipe->write(chunk.data(), chunk.size()); + } if (!sink.write(chunk.data(), chunk.size())) { + // peer is gone, stop the wire path here return false; } SRV_DBG("http: streamed chunk: %s\n", chunk.c_str()); } if (!has_next) { + // producer reached its natural end on the wire, a later close() skips the drain + if (response->spipe) { + response->spipe->done(); + } sink.done(); SRV_DBG("%s", "http: stream ended\n"); } return has_next; }; const auto on_complete = [request = q_ptr, response = r_ptr](bool) mutable { - response.reset(); // trigger the destruction of the response object - request.reset(); // trigger the destruction of the request object + // on a dropped peer, close() drains the rest of the generation into the ring buffer + if (response->spipe) { + response->spipe->close(); + } + response.reset(); // spipe destructor finalizes the session if attached + request.reset(); }; res.set_chunked_content_provider(content_type, chunked_content_provider, on_complete); } else { diff --git a/tools/server/server-http.h b/tools/server/server-http.h index 6b4a4b87a6..3508131836 100644 --- a/tools/server/server-http.h +++ b/tools/server/server-http.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -10,6 +11,7 @@ #include struct common_params; +struct stream_pipe_producer; // defined in server-stream.h // generator-like API for HTTP response generation // this object response with one of the 2 modes: @@ -23,12 +25,20 @@ struct server_http_res { std::string data; std::map headers; - // TODO: move this to a virtual function once we have proper polymorphism support + // if set, the stream survives a client disconnect: the producer pipe keeps draining into the + // ring buffer and finalizes the session on destruction, so no explicit on_stream_end is needed. + // shared_ptr (not unique_ptr) so the forward-declared type is safe to delete here. + std::shared_ptr spipe; + std::function next = nullptr; bool is_stream() const { return next != nullptr; } + // called when the session is cancelled (e.g. DELETE /v1/stream/). + // server_res_generator overrides this to stop its reader; the default is a no-op. + virtual void stop() {} + virtual ~server_http_res() = default; }; diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index bb2f43a10d..0380f98a36 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -1,12 +1,14 @@ #include "server-common.h" #include "server-models.h" #include "server-context.h" +#include "server-stream.h" #include "build-info.h" #include "preset.h" #include "download.h" #include // TODO: remove this once we use HTTP client from download.h +#include #include #include @@ -92,6 +94,9 @@ struct server_subproc { } }; +// short loopback budget for the resumable stream router to child JSON calls (probe, lookup, +// delete). distinct from params.timeout_read/write which only applies to the generation proxy +static constexpr int STREAM_LOOKUP_TIMEOUT_MS = 250; static std::filesystem::path get_server_exec_path() { #if defined(_WIN32) @@ -1580,6 +1585,45 @@ static bool is_autoload(const common_params & params, const server_http_req & re } } +// percent encode one query or path component, covers reserved chars without pulling in +// httplib::detail. used by the stream routes to forward conversation_id to children safely +static std::string encode_qs(const std::string & in) { + std::string out; + out.reserve(in.size() * 3); + for (unsigned char c : in) { + bool safe = (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') + || c == '-' || c == '_' || c == '.' || c == '~'; + if (safe) { + out.push_back(char(c)); + } else { + char buf[4]; + std::snprintf(buf, sizeof(buf), "%%%02X", c); + out.append(buf, 3); + } + } + return out; +} + +// resolve the child that owns a conversation's stream session via the conv_id -> model map +// populated when the POST was routed. single map lookup then a meta lookup, no polling, no +// parsing of the conv id. returns nullopt when nothing maps, the caller answers not found and +// the client recovers +static std::optional resolve_child_for_conv( + server_models & models, const std::string & conversation_id) { + if (conversation_id.empty()) { + return std::nullopt; + } + auto tracked = models.conv_models.lookup(conversation_id); + if (!tracked.has_value()) { + return std::nullopt; + } + auto meta = models.get_meta(*tracked); + if (meta.has_value() && meta->is_ready()) { + return meta; + } + return std::nullopt; +} + void server_models_routes::init_routes() { this->get_router_props = [this](const server_http_req & req) { std::string name = req.get_param("model"); @@ -1628,6 +1672,12 @@ void server_models_routes::init_routes() { if (!router_validate_model(name, models, autoload, error_res)) { return error_res; } + // remember which child serves this conversation so the stream routes can route straight + // to it without polling, keyed on the exact conv id from the header + std::string conv_id = stream_conv_id_from_headers(req.headers); + if (!conv_id.empty()) { + models.conv_models.remember(conv_id, name); + } return models.proxy_request(req, method, name, true); // update last usage for POST request only }; @@ -1819,6 +1869,128 @@ void server_models_routes::init_routes() { res_ok(res, {{"success", true}}); return res; }; + + this->router_stream_get = [this](const server_http_req & req) { + // GET /v1/stream/?from=N. resolve the owning child from the conv_id -> model + // map, 404 when nothing maps + auto res = std::make_unique(); + std::string conv_id = req.get_param("conv_id"); + if (conv_id.empty()) { + res_err(res, format_error_response("Missing conversation id in path", ERROR_TYPE_INVALID_REQUEST)); + return res; + } + std::optional owner = resolve_child_for_conv(models, conv_id); + if (!owner.has_value()) { + res_err(res, format_error_response("Stream not found or expired", ERROR_TYPE_NOT_FOUND)); + return res; + } + std::string from = req.get_param("from"); + std::string child_path = "/v1/stream/" + encode_qs(conv_id); + if (!from.empty()) { + child_path += "?from=" + from; + } + SRV_INF("proxying stream resume to model %s on port %d, path=%s\n", + owner->name.c_str(), owner->port, child_path.c_str()); + auto proxy = std::make_unique( + "GET", + "http", + CHILD_ADDR, + owner->port, + child_path, + req.headers, + req.body, + req.files, + req.should_stop, + params.timeout_read, + params.timeout_write); + return std::unique_ptr(std::move(proxy)); + }; + + this->router_streams_lookup = [this](const server_http_req & req) { + // POST /v1/streams/lookup. resolve each requested conv id to its owning child via the + // map, group the ids per child, and query only the children that actually own some of + // them instead of fanning out to every ready child. a child only answers for the ids + // it owns, never lists anything else + auto res = std::make_unique(); + std::vector requested; + try { + json body = json::parse(req.body); + if (body.contains("conversation_ids") && body["conversation_ids"].is_array()) { + for (const auto & v : body["conversation_ids"]) { + if (v.is_string() && !v.get().empty()) { + requested.push_back(v.get()); + } + } + } + } catch (const std::exception &) { + res_ok(res, json::array()); + return res; + } + + // group requested ids by the child port that owns them, drop ids that map to nothing + std::unordered_map per_child; + for (const auto & cid : requested) { + auto owner = resolve_child_for_conv(models, cid); + if (!owner.has_value()) { + continue; + } + per_child[owner->port].push_back(cid); + } + + json aggregated = json::array(); + for (auto & [port, ids] : per_child) { + json child_body = {{"conversation_ids", ids}}; + httplib::Client cli(CHILD_ADDR, port); + cli.set_connection_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + cli.set_read_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + cli.set_write_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + auto resp = cli.Post("/v1/streams/lookup", child_body.dump(), "application/json"); + if (!resp || resp->status != 200) { + continue; + } + try { + json child_arr = json::parse(resp->body); + if (!child_arr.is_array()) { + continue; + } + for (auto & entry : child_arr) { + if (entry.is_object()) { + aggregated.push_back(entry); + } + } + } catch (const std::exception &) { + continue; + } + } + res_ok(res, aggregated); + return res; + }; + + this->router_stream_delete = [this](const server_http_req & req) { + // DELETE /v1/stream/. resolve the owning child via the map and forward only to + // it, evict_and_cancel is idempotent on the child + auto res = std::make_unique(); + std::string conv_id = req.get_param("conv_id"); + if (conv_id.empty()) { + res_err(res, format_error_response("Missing conversation id in path", ERROR_TYPE_INVALID_REQUEST)); + return res; + } + std::string child_path = "/v1/stream/" + encode_qs(conv_id); + auto owner = resolve_child_for_conv(models, conv_id); + if (owner.has_value()) { + httplib::Client cli(CHILD_ADDR, owner->port); + cli.set_connection_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + cli.set_read_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + cli.set_write_timeout(0, STREAM_LOOKUP_TIMEOUT_MS * 1000); + auto resp = cli.Delete(child_path.c_str()); + (void) resp; // best effort, 404 and network errors are equivalent to no op + } + // drop the tracking entry, the session is being torn down + models.conv_models.forget(conv_id); + res->status = 204; + res->content_type = "application/json"; + return res; + }; } diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 9ed4aeead0..62bed8725b 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -11,7 +11,10 @@ #include #include #include +#include #include +#include +#include /** * state diagram: @@ -126,6 +129,44 @@ private: // if true, the next get_meta() will trigger a reload of model list bool need_reload = false; + // conv_id -> model name that currently serves its stream session, lets the resumable stream + // routes go straight to the owning child instead of polling every one. populated when + // proxy_request forwards a POST carrying an X-Conversation-Id. best effort: a stale entry just + // makes the child answer not found and the client recovers. owns its lock, one mutex per struct + struct conv_model_tracker { + void remember(const std::string & conv_id, const std::string & model) { + if (conv_id.empty() || model.empty()) { + return; + } + std::lock_guard lock(mu); + map[conv_id] = model; + } + + std::optional lookup(const std::string & conv_id) { + if (conv_id.empty()) { + return std::nullopt; + } + std::lock_guard lock(mu); + auto it = map.find(conv_id); + if (it == map.end()) { + return std::nullopt; + } + return it->second; + } + + void forget(const std::string & conv_id) { + if (conv_id.empty()) { + return; + } + std::lock_guard lock(mu); + map.erase(conv_id); + } + + private: + std::mutex mu; + std::unordered_map map; + }; + common_preset_context ctx_preset; common_params base_params; @@ -145,6 +186,9 @@ private: void notify_sse(const std::string & event, const std::string & model_id, const json & data = nullptr); public: + // conv_id -> model tracker for the resumable stream routes, owns its lock + conv_model_tracker conv_models; + server_models(const common_params & params, int argc, char ** argv); server_response sse; // for real-time updates via SSE endpoint @@ -268,6 +312,12 @@ struct server_models_routes { server_http_context::handler_t get_router_models_sse; server_http_context::handler_t post_router_models; server_http_context::handler_t del_router_models; + + // router side handlers for the resumable streaming routes. each resolves the child that owns + // a conversation through the conv_id -> model map, no probing or fan out + server_http_context::handler_t router_stream_get; + server_http_context::handler_t router_streams_lookup; + server_http_context::handler_t router_stream_delete; }; /** diff --git a/tools/server/server-stream.cpp b/tools/server/server-stream.cpp new file mode 100644 index 0000000000..757c36ad25 --- /dev/null +++ b/tools/server/server-stream.cpp @@ -0,0 +1,569 @@ +#include "server-stream.h" +#include "server-common.h" +#include "server-http.h" +#include "server-queue.h" + +#include +#include +#include + +namespace { +constexpr int64_t STREAM_SESSION_TTL_SECONDS = 300; +constexpr size_t STREAM_SESSION_MAX_BYTES = 4 * 1024 * 1024; +constexpr int64_t STREAM_SESSION_GC_INTERVAL_SECONDS = 60; +constexpr int64_t STREAM_READ_WAKE_INTERVAL_MS = 200; + +// returns unix time in seconds +int64_t now_seconds() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count(); +} +} + +stream_session::stream_session(std::string conversation_id_, size_t max_bytes_) + : conversation_id(std::move(conversation_id_)) + , started_ts(now_seconds()) + , prefix_dropped(0) + , cap_bytes(max_bytes_) + , done(false) + , cancelled(false) + , completed_ts(0) { + buffer.reserve(64 * 1024); +} + +bool stream_session::append(const char * data, size_t len) { + if (len == 0) { + return true; + } + { + std::lock_guard lock(mu); + if (done.load(std::memory_order_relaxed)) { + return false; + } + if (len >= cap_bytes) { + // single chunk bigger than the cap, keep only the tail that fits + size_t skip = len - cap_bytes; + prefix_dropped += buffer.size() + skip; + buffer.clear(); + buffer.insert(buffer.end(), data + skip, data + len); + } else { + size_t needed = buffer.size() + len; + if (needed > cap_bytes) { + size_t to_drop = needed - cap_bytes; + buffer.erase(buffer.begin(), buffer.begin() + to_drop); + prefix_dropped += to_drop; + } + buffer.insert(buffer.end(), data, data + len); + } + } + cv.notify_all(); + return true; +} + +void stream_session::finalize() { + bool was_done = done.exchange(true, std::memory_order_acq_rel); + if (was_done) { + return; + } + completed_ts.store(now_seconds(), std::memory_order_release); + cv.notify_all(); +} + +stream_read_status stream_session::read_from(size_t offset, + const std::function & sink, + const std::function & should_stop) { + std::unique_lock lock(mu); + while (true) { + if (should_stop && should_stop()) { + return stream_read_status::OK; + } + if (offset < prefix_dropped) { + return stream_read_status::OFFSET_LOST; + } + size_t logical_end = prefix_dropped + buffer.size(); + if (offset < logical_end) { + size_t local_off = offset - prefix_dropped; + size_t n = buffer.size() - local_off; + // copy the available chunk under the lock, release before calling the sink + std::vector chunk(buffer.begin() + local_off, buffer.begin() + local_off + n); + offset += n; + lock.unlock(); + bool keep_going = sink(chunk.data(), chunk.size()); + if (!keep_going) { + return stream_read_status::OK; + } + lock.lock(); + continue; + } + if (done.load(std::memory_order_acquire)) { + return stream_read_status::OK; + } + // wait for new bytes, finalize, or a periodic wake to re check should_stop + cv.wait_for(lock, std::chrono::milliseconds(STREAM_READ_WAKE_INTERVAL_MS)); + } +} + +bool stream_session::is_done() const { + return done.load(std::memory_order_acquire); +} + +size_t stream_session::total_size() const { + std::lock_guard lock(mu); + return prefix_dropped + buffer.size(); +} + +size_t stream_session::dropped_prefix() const { + std::lock_guard lock(mu); + return prefix_dropped; +} + +int64_t stream_session::completed_at() const { + return completed_ts.load(std::memory_order_acquire); +} + +void stream_session::set_stop_producer(std::function fn) { + std::lock_guard lock(mu); + stop_producer = std::move(fn); +} + +void stream_session::cancel() { + // flip cancelled first so the producer-side stream_aware_should_stop can break out of the + // recv() wait even if remove_waiting_task_ids does not notify the condvar (the cancel task + // posted by rd.stop() will eventually notify, but we do not want to depend on that timing) + cancelled.store(true, std::memory_order_release); + // copy the hook under the lock then invoke outside, the producer side may grab queue locks + // and we do not want to hold our mu across that path + std::function fn; + { + std::lock_guard lock(mu); + fn = stop_producer; + } + if (fn) { + fn(); + } +} + +bool stream_session::is_cancelled() const { + return cancelled.load(std::memory_order_acquire); +} + +stream_session_manager::stream_session_manager() + : running(false) { +} + +stream_session_manager::~stream_session_manager() { + stop_gc(); +} + +stream_session_ptr stream_session_manager::create_or_replace(const std::string & conversation_id) { + // evict any previous session on the same conv, this guarantees the invariant + // "one conv = at most one live session" and propagates cancel to its producer + stream_session_ptr previous; + auto fresh = std::make_shared(conversation_id, STREAM_SESSION_MAX_BYTES); + { + std::unique_lock lock(map_mu); + auto it = sessions.find(conversation_id); + if (it != sessions.end()) { + previous = it->second; + it->second = fresh; + } else { + sessions.emplace(conversation_id, fresh); + } + } + if (previous) { + previous->cancel(); + previous->finalize(); + } + return fresh; +} + +stream_session_ptr stream_session_manager::get(const std::string & conversation_id) { + std::shared_lock lock(map_mu); + auto it = sessions.find(conversation_id); + if (it == sessions.end()) { + return nullptr; + } + return it->second; +} + +std::vector stream_session_manager::list_all() const { + std::vector out; + std::shared_lock lock(map_mu); + out.reserve(sessions.size()); + for (auto & kv : sessions) { + out.push_back(kv.second); + } + return out; +} + +void stream_session_manager::evict(const std::string & conversation_id) { + stream_session_ptr s; + { + std::unique_lock lock(map_mu); + auto it = sessions.find(conversation_id); + if (it == sessions.end()) { + return; + } + s = it->second; + sessions.erase(it); + } + // finalize outside the map lock so any pending readers wake up and exit + s->finalize(); +} + +void stream_session_manager::evict_and_cancel(const std::string & conversation_id) { + stream_session_ptr s; + { + std::unique_lock lock(map_mu); + auto it = sessions.find(conversation_id); + if (it == sessions.end()) { + return; + } + s = it->second; + sessions.erase(it); + } + // signal the producer side first so the inference is cancelled at the queue level, + // then finalize, which wakes any pending HTTP reader and lets the drain exit naturally + s->cancel(); + s->finalize(); +} + +void stream_session_manager::start_gc() { + if (running.exchange(true)) { + return; + } + gc_thread = std::thread([this] { gc_loop(); }); +} + +void stream_session_manager::stop_gc() { + bool was_running = running.exchange(false); + if (was_running) { + { + std::lock_guard lock(gc_wake_mu); + } + gc_wake_cv.notify_all(); + if (gc_thread.joinable()) { + gc_thread.join(); + } + } + // finalize all live sessions so no reader ever hangs + std::vector snapshot; + { + std::unique_lock lock(map_mu); + snapshot.reserve(sessions.size()); + for (auto & kv : sessions) { + snapshot.push_back(kv.second); + } + sessions.clear(); + } + for (auto & s : snapshot) { + s->finalize(); + } +} + +void stream_session_manager::gc_loop() { + while (running.load(std::memory_order_acquire)) { + { + std::unique_lock lock(gc_wake_mu); + gc_wake_cv.wait_for(lock, + std::chrono::seconds(STREAM_SESSION_GC_INTERVAL_SECONDS), + [this] { return !running.load(std::memory_order_acquire); }); + } + if (!running.load(std::memory_order_acquire)) { + return; + } + int64_t cutoff = now_seconds() - STREAM_SESSION_TTL_SECONDS; + std::vector to_drop; + { + std::unique_lock lock(map_mu); + for (auto it = sessions.begin(); it != sessions.end(); ) { + int64_t completed = it->second->completed_at(); + if (completed != 0 && completed <= cutoff) { + to_drop.push_back(it->second); + it = sessions.erase(it); + } else { + ++it; + } + } + } + // finalize outside the map lock, idempotent if the session was already done + for (auto & s : to_drop) { + s->finalize(); + } + } +} + +// process wide manager, lifecycle controlled by llama-server main() via start_gc/stop_gc +stream_session_manager g_stream_sessions; + +// stream_pipe --------------------------------------------------------------------------------- + +stream_pipe::stream_pipe(stream_session_ptr session) + : session_(std::move(session)) { +} + +bool stream_pipe::is_cancelled() const { + return session_->is_cancelled(); +} + +// stream_pipe_producer + +stream_pipe_producer::stream_pipe_producer(stream_session_ptr session) + : stream_pipe(std::move(session)) { +} + +stream_pipe_producer::~stream_pipe_producer() { + cleanup(); + session_->finalize(); +} + +void stream_pipe_producer::cleanup() { + if (!alive_) { + return; + } + alive_->store(false, std::memory_order_release); + session_->set_stop_producer(nullptr); + alive_.reset(); +} + +bool stream_pipe_producer::write(const char * data, size_t len) { + return session_->append(data, len); +} + +void stream_pipe_producer::done() { + done_ = true; +} + +void stream_pipe_producer::close() { + // httplib bails its content provider the moment is_peer_alive() goes false, so pump the rest + // of the generation into the ring buffer here. a DELETE flips is_cancelled and cuts it short + if (done_ || session_->is_cancelled()) { + SRV_INF("stream_pipe close: skip drain (done=%d cancelled=%d) conv=%s\n", + done_ ? 1 : 0, session_->is_cancelled() ? 1 : 0, session_->conversation_id.c_str()); + return; + } + SRV_INF("stream_pipe close: draining conv=%s\n", session_->conversation_id.c_str()); + size_t drained = 0; + std::string chunk; + while (true) { + chunk.clear(); + bool has_next = res_->next(chunk); + if (!chunk.empty()) { + write(chunk.data(), chunk.size()); + drained += chunk.size(); + } + if (!has_next) { + break; + } + } + SRV_INF("stream_pipe close: drain ended conv=%s bytes=%zu\n", session_->conversation_id.c_str(), drained); +} + +std::shared_ptr stream_pipe_producer::create(stream_session_ptr session, + server_http_res & res) { + auto alive = std::make_shared>(true); + auto * res_ptr = &res; + session->set_stop_producer([alive, res_ptr]() { + if (alive->load(std::memory_order_acquire)) { + res_ptr->stop(); + } + }); + auto pipe = std::shared_ptr(new stream_pipe_producer(std::move(session))); + pipe->alive_ = std::move(alive); + pipe->res_ = res_ptr; + return pipe; +} + +// stream_pipe_consumer + +stream_pipe_consumer::stream_pipe_consumer(stream_session_ptr session) + : stream_pipe(std::move(session)) { +} + +stream_read_status stream_pipe_consumer::read(size_t & offset, + const std::function & sink, + const std::function & should_stop) { + return session_->read_from(offset, sink, should_stop); +} + +std::shared_ptr stream_pipe_consumer::create(stream_session_ptr session) { + return std::shared_ptr(new stream_pipe_consumer(std::move(session))); +} + +// helper, builds the standard error response and assigns it to a brand new http_res +static server_http_res_ptr make_error_response(int status, const std::string & message, error_type type) { + auto res = std::make_unique(); + json err = format_error_response(message, type); + res->status = json_value(err, "code", status); + res->content_type = "application/json; charset=utf-8"; + res->data = safe_json_to_str({{"error", err}}); + return res; +} + +server_http_context::handler_t make_stream_get_handler() { + return [](const server_http_req & req) -> server_http_res_ptr { + // GET /v1/stream/?from=N replays the SSE bytes already buffered for the + // session, blocks for more bytes when the session is still running, returns when + // the session is finalized. the body is streamed back as text/event-stream so the + // browser EventSource can attach to it like a fresh request + std::string conv_id = req.get_param("conv_id"); + if (conv_id.empty()) { + return make_error_response(400, "Missing conversation id in path", ERROR_TYPE_INVALID_REQUEST); + } + auto session = g_stream_sessions.get(conv_id); + if (!session) { + return make_error_response(404, "Stream not found or expired", ERROR_TYPE_NOT_FOUND); + } + size_t from = 0; + std::string from_str = req.get_param("from"); + if (!from_str.empty()) { + try { + from = static_cast(std::stoull(from_str)); + } catch (const std::exception &) { + return make_error_response(400, "Invalid 'from' offset", ERROR_TYPE_INVALID_REQUEST); + } + } + if (from < session->dropped_prefix()) { + return make_error_response(400, "Stream offset lost, please restart", ERROR_TYPE_INVALID_REQUEST); + } + auto res = std::make_unique(); + res->status = 200; + res->content_type = "text/event-stream"; + // the next closure reads from the ring buffer at the requested offset, blocks until + // bytes arrive or the session finalizes. exit each call after draining the available + // chunk so set_chunked_content_provider gets a chance to flush to the socket + auto offset_ptr = std::make_shared(from); + // consumer pipe: read-only, does not finalize the session on destruction + auto pipe = stream_pipe_consumer::create(session); + res->next = [pipe, offset_ptr, &req](std::string & output) -> bool { + bool got_any = false; + pipe->read(*offset_ptr, + [&](const char * d, size_t n) { + output.append(d, n); + *offset_ptr += n; + got_any = true; + return false; + }, + req.should_stop); + return got_any; + }; + return res; + }; +} + +server_http_context::handler_t make_streams_lookup_handler() { + return [](const server_http_req & req) -> server_http_res_ptr { + // POST /v1/streams/lookup with body {"conversation_ids": ["X", "Y", ...]} returns the + // matching sessions, only for ids the caller already knows. each id matches the exact key + // and any "::" variant, so one lookup covers every per model session for a conv + std::vector requested; + try { + json body = json::parse(req.body); + if (body.contains("conversation_ids") && body["conversation_ids"].is_array()) { + for (const auto & v : body["conversation_ids"]) { + if (v.is_string()) { + std::string id = v.get(); + if (!id.empty()) { + requested.push_back(std::move(id)); + } + } + } + } + } catch (const std::exception & e) { + auto res = std::make_unique(); + res->status = 400; + res->content_type = "application/json; charset=utf-8"; + res->data = safe_json_to_str({{"error", {{"message", std::string("invalid body: ") + e.what()}, + {"type", "invalid_request_error"}}}}); + return res; + } + + std::vector sessions; + if (!requested.empty()) { + auto all = g_stream_sessions.list_all(); + for (const auto & rid : requested) { + const std::string with_sep = rid + "::"; + for (auto & s : all) { + if (s->conversation_id == rid || + s->conversation_id.compare(0, with_sep.size(), with_sep) == 0) { + sessions.push_back(s); + } + } + } + } + + json arr = json::array(); + for (auto & s : sessions) { + arr.push_back({ + {"conversation_id", s->conversation_id}, + {"is_done", s->is_done()}, + {"total_bytes", s->total_size()}, + {"started_at", s->started_ts}, + {"completed_at", s->completed_at()}, + }); + } + auto res = std::make_unique(); + res->status = 200; + res->content_type = "application/json; charset=utf-8"; + res->data = safe_json_to_str(arr); + return res; + }; +} + +server_http_context::handler_t make_stream_delete_handler() { + return [](const server_http_req & req) -> server_http_res_ptr { + // DELETE /v1/stream/ is the explicit user Stop, cancels the producer hook + // wired by handle_completions_impl and evicts the buffer. idempotent, a session that + // already finalized or was never created returns 204 either way + std::string conv_id = req.get_param("conv_id"); + if (conv_id.empty()) { + return make_error_response(400, "Missing conversation id in path", ERROR_TYPE_INVALID_REQUEST); + } + SRV_INF("DELETE /v1/stream/%s -> evict_and_cancel\n", conv_id.c_str()); + g_stream_sessions.evict_and_cancel(conv_id); + auto res = std::make_unique(); + res->status = 204; + res->content_type = "application/json"; + return res; + }; +} + +std::string stream_conv_id_from_headers(const std::map & headers) { + // case-insensitive scan for x-conversation-id + static constexpr char target[] = "x-conversation-id"; + static constexpr size_t target_len = sizeof(target) - 1; + for (const auto & [hk, hv] : headers) { + if (hk.size() != target_len) continue; + bool match = true; + for (size_t i = 0; i < target_len; ++i) { + char c = hk[i]; + if (c >= 'A' && c <= 'Z') c = char(c + 32); + if (c != target[i]) { match = false; break; } + } + if (match) { + return hv; + } + } + return std::string(); +} + +void stream_session_attach_pipe(server_http_res & res, const std::map & headers) { + std::string conversation_id = stream_conv_id_from_headers(headers); + SRV_INF("stream_session_attach_pipe: conv_id=%s (empty=%d)\n", + conversation_id.c_str(), conversation_id.empty() ? 1 : 0); + if (conversation_id.empty()) { + return; + } + auto session = g_stream_sessions.create_or_replace(conversation_id); + res.spipe = stream_pipe_producer::create(session, res); +} + +std::function stream_aware_should_stop(server_http_res * res, std::function fallback) { + return [res, fallback = std::move(fallback)]() -> bool { + if (res->spipe) { + return res->spipe->is_cancelled(); + } + return fallback(); + }; +} diff --git a/tools/server/server-stream.h b/tools/server/server-stream.h new file mode 100644 index 0000000000..ff363bb4cd --- /dev/null +++ b/tools/server/server-stream.h @@ -0,0 +1,203 @@ +#pragma once + +#include "server-http.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +enum class stream_read_status { + OK, + OFFSET_LOST, +}; + +// streaming buffer for one generation, survives HTTP disconnect. the producer appends raw SSE +// bytes, readers drain from any offset via read_from and block until more bytes or finalize. +// keyed by conversation_id: one conv = at most one live session +struct stream_session { + std::string conversation_id; + int64_t started_ts; // unix seconds at construction, used by /v1/streams listing + + stream_session(std::string conversation_id_, size_t max_bytes_); + stream_session(const stream_session &) = delete; + stream_session & operator=(const stream_session &) = delete; + + // append raw bytes, drops from the front if the cap is reached. + // returns false if the session is already finalized + bool append(const char * data, size_t len); + + // mark the session as complete, wakes all pending readers + void finalize(); + + // drain bytes from offset, calling sink for each chunk. blocks until more + // bytes arrive or finalize is called. returns OK on clean exit, OFFSET_LOST + // if offset falls below the dropped prefix + stream_read_status read_from(size_t offset, + const std::function & sink, + const std::function & should_stop); + + bool is_done() const; + bool is_cancelled() const; + size_t total_size() const; // bytes that ever entered the session + size_t dropped_prefix() const; // bytes evicted from the front due to cap + int64_t completed_at() const; // 0 while alive, unix seconds after finalize + + // attach the producer stop hook used to cancel its reader, pass an empty function to detach + void set_stop_producer(std::function fn); + + // signal the producer to abort its inference asap via the stop hook, idempotent + void cancel(); + +private: + mutable std::mutex mu; + std::condition_variable cv; + std::vector buffer; + size_t prefix_dropped; + size_t cap_bytes; + std::atomic done; + std::atomic cancelled; + std::atomic completed_ts; + std::function stop_producer; // protected by mu +}; + +using stream_session_ptr = std::shared_ptr; + +// one end of a stream_session pipe. the base holds the session and the shared query, the +// producer and consumer ends derive from it. virtual dtor so each end runs its own teardown: +// the producer finalizes the session, the consumer leaves it untouched +struct stream_pipe { + virtual ~stream_pipe() = default; + + // true if the session was cancelled (e.g. via DELETE /v1/stream/) + bool is_cancelled() const; + +protected: + explicit stream_pipe(stream_session_ptr session); + + stream_session_ptr session_; +}; + +// producer end: writes chunks into the ring buffer and owns the session lifetime, finalizing it +// on destruction. +// +// lifetime safety: holds a shared_ptr> alive also captured by the session's +// stop_producer hook. cleanup() sets alive=false and clears the hook; it must run while the +// response the hook calls stop() on is still alive. ~server_res_generator() does this explicitly. +struct stream_pipe_producer : stream_pipe { + ~stream_pipe_producer() override; + + // append raw bytes to the session's ring buffer, returns false if already finalized + bool write(const char * data, size_t len); + + // mark the natural end on the wire so a later close() is a no-op + void done(); + + // on a peer drop, pump the response next() into the ring buffer until done. runs on the http + // worker from on_complete, no-op after done() or cancel + void close(); + + // disarm the stop hook and drop the alive guard, must run while the response the hook + // references is still alive. idempotent, the destructor calls it too + void cleanup(); + + // res.stop() is invoked when the session is cancelled, the alive guard ensures stop() is not + // called after cleanup() has run + static std::shared_ptr create(stream_session_ptr session, server_http_res & res); + +private: + explicit stream_pipe_producer(stream_session_ptr session); + + bool done_ = false; + std::shared_ptr> alive_; + server_http_res * res_ = nullptr; +}; + +// consumer end: read-only replay of the ring buffer, the destructor does not finalize the session +struct stream_pipe_consumer : stream_pipe { + // drain bytes from offset, calling sink for each available chunk. blocks until more data + // arrives or the session finalizes. should_stop is polled, returns OFFSET_LOST if offset + // fell below the dropped prefix + stream_read_status read(size_t & offset, + const std::function & sink, + const std::function & should_stop); + + static std::shared_ptr create(stream_session_ptr session); + +private: + explicit stream_pipe_consumer(stream_session_ptr session); +}; + +// owns all live sessions, runs a periodic GC to evict expired ones. +// the map is keyed by conversation_id, so the invariant "one conv = at most one +// live session" is enforced at the type level +class stream_session_manager { +public: + stream_session_manager(); + ~stream_session_manager(); + + stream_session_manager(const stream_session_manager &) = delete; + stream_session_manager & operator=(const stream_session_manager &) = delete; + + // install a new session for this conversation, evicting and cancelling any previous one. + // the conversation_id must be non empty, the caller is responsible for that check. + // returns the new session + stream_session_ptr create_or_replace(const std::string & conversation_id); + + // lookup, returns null if unknown or already evicted + stream_session_ptr get(const std::string & conversation_id); + + // list every live or recently completed session, used by GET /v1/streams without filter + std::vector list_all() const; + + // remove from the map and finalize, wakes any pending readers + void evict(const std::string & conversation_id); + + // signal the producer to cancel asap then evict, used by the explicit user Stop path + void evict_and_cancel(const std::string & conversation_id); + + void start_gc(); + void stop_gc(); + +private: + void gc_loop(); + + mutable std::shared_mutex map_mu; + std::unordered_map sessions; // key: conversation_id + std::thread gc_thread; + std::atomic running; + std::mutex gc_wake_mu; + std::condition_variable gc_wake_cv; +}; + +// process wide manager, linked by both llama-server and llama-cli. llama-server main() drives +// start_gc/stop_gc, llama-cli leaves it idle. the dtor calls stop_gc() unconditionally so exit +// is safe whether or not the GC thread ran +extern stream_session_manager g_stream_sessions; + +// route handler factories operating on g_stream_sessions, wired under /v1/stream/* by server.cpp. +// keeps the resumable stream surface confined to server-stream +server_http_context::handler_t make_stream_get_handler(); +server_http_context::handler_t make_streams_lookup_handler(); +server_http_context::handler_t make_stream_delete_handler(); + +// extract the X-Conversation-Id header value (case-insensitive), empty when absent. exposed so +// the router can track which child serves a forwarded POST +std::string stream_conv_id_from_headers(const std::map & headers); + +// on an X-Conversation-Id header, create or replace the session and attach a producer pipe to +// res. no-op when absent, called from the server_res_generator constructor +void stream_session_attach_pipe(server_http_res & res, const std::map & headers); + +// should_stop closure that ignores peer disconnect when a pipe is attached, so only an explicit +// DELETE stops the producer and generation keeps flowing into the ring buffer. without a pipe it +// delegates to fallback, the legacy non-resumable flow +std::function stream_aware_should_stop(server_http_res * res, std::function fallback); diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 0a1947faf5..1bbc99d890 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -2,6 +2,7 @@ #include "server-http.h" #include "server-models.h" #include "server-cors-proxy.h" +#include "server-stream.h" #include "server-tools.h" #include "arg.h" @@ -82,6 +83,10 @@ int llama_server(int argc, char ** argv) { common_init(); + // start the stream session manager GC right after common init, before any HTTP route can + // touch it. lifecycle is symmetric, stop_gc() runs in clean_up() before backend free + g_stream_sessions.start_gc(); + if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) { return 1; } @@ -239,6 +244,29 @@ int llama_server(int argc, char ** argv) { ctx_http.get ("/slots", ex_wrapper(routes.get_slots)); ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); + // resumable streaming, the conversation_id is the session identity end to end. router and + // child wire different handlers under the same paths: a child binds the local g_stream_sessions + // backed factories, the router binds proxies that resolve the owning child through the + // conv_id -> model map + server_http_context::handler_t stream_get_h; + server_http_context::handler_t streams_lookup_h; + server_http_context::handler_t stream_delete_h; + if (is_router_server) { + stream_get_h = models_routes->router_stream_get; + streams_lookup_h = models_routes->router_streams_lookup; + stream_delete_h = models_routes->router_stream_delete; + } else { + stream_get_h = make_stream_get_handler(); + streams_lookup_h = make_streams_lookup_handler(); + stream_delete_h = make_stream_delete_handler(); + } + ctx_http.get ("/v1/stream/:conv_id", ex_wrapper(stream_get_h)); + // POST /v1/streams/lookup with body {"conversation_ids": [...]}. you can only ask for ids + // you already own (the WebUI passes the convs visible in its sidebar). the server never + // lists ids it has not been asked about, so a random caller cannot enumerate live sessions + ctx_http.post("/v1/streams/lookup", ex_wrapper(streams_lookup_h)); + ctx_http.del ("/v1/stream/:conv_id", ex_wrapper(stream_delete_h)); + // Google Cloud Platform (Vertex AI) compat ctx_http.register_gcp_compat(); @@ -314,6 +342,8 @@ int llama_server(int argc, char ** argv) { clean_up = [&models_routes]() { SRV_INF("%s: cleaning up before exit...\n", __func__); + // stop the session GC first, it finalizes live sessions and wakes pending readers + g_stream_sessions.stop_gc(); if (models_routes.has_value()) { models_routes->stopping.store(true); // maybe redundant, but just to be safe models_routes->models.unload_all(); @@ -340,6 +370,8 @@ int llama_server(int argc, char ** argv) { // setup clean up function, to be called before exit clean_up = [&ctx_http, &ctx_server]() { SRV_INF("%s: cleaning up before exit...\n", __func__); + // stop the session GC first, it finalizes live sessions and wakes pending readers + g_stream_sessions.stop_gc(); ctx_http.stop(); ctx_server.terminate(); llama_backend_free(); diff --git a/tools/ui/src/lib/components/app/chat/ChatScreen/ChatScreen.svelte b/tools/ui/src/lib/components/app/chat/ChatScreen/ChatScreen.svelte index 18635ba392..d2fba5a932 100644 --- a/tools/ui/src/lib/components/app/chat/ChatScreen/ChatScreen.svelte +++ b/tools/ui/src/lib/components/app/chat/ChatScreen/ChatScreen.svelte @@ -5,6 +5,7 @@ ChatMessages, ChatScreenDragOverlay, ChatScreenProcessingInfo, + ChatScreenStreamResumeStatus, ServerLoadingSplash, ChatScreenServerError } from '$lib/components/app'; @@ -281,6 +282,10 @@ + {#if page.params.id} + + {/if} +
{#if (isMobile.current ? mobileScrollDownHint || isMobileUserScrolledUp : autoScroll.userScrolledUp) && page.url.hash.includes(ROUTES.CHAT) && page.params.id} + import { chatStore } from '$lib/stores/chat.svelte'; + import { StreamConnectionState } from '$lib/enums'; + import { Loader2 } from '@lucide/svelte'; + + let state = $derived(chatStore.streamConnectionState); + + +{#if state === StreamConnectionState.RESUMING} +
+ + Reconnecting to the stream... +
+{/if} diff --git a/tools/ui/src/lib/components/app/chat/index.ts b/tools/ui/src/lib/components/app/chat/index.ts index 517f24d740..d7004d3ae3 100644 --- a/tools/ui/src/lib/components/app/chat/index.ts +++ b/tools/ui/src/lib/components/app/chat/index.ts @@ -683,3 +683,11 @@ export { default as ChatScreenProcessingInfo } from './ChatScreen/ChatScreenProc * Rendered inside ChatScreen when `serverError` store has a value. */ export { default as ChatScreenServerError } from './ChatScreen/ChatScreenServerError.svelte'; + +/** + * Stream resume status indicator. Shows a small "Reconnecting to the stream..." + * banner with a spinner while `chatStore.streamConnectionState` is `resuming`, + * i.e. after a dropped connection is reattaching to the live SSE replay buffer. + * Renders nothing otherwise. Shown inside ChatScreen only on an active conversation route. + */ +export { default as ChatScreenStreamResumeStatus } from './ChatScreen/ChatScreenStreamResumeStatus.svelte'; diff --git a/tools/ui/src/lib/constants/api-endpoints.ts b/tools/ui/src/lib/constants/api-endpoints.ts index a410905057..37137c1c76 100644 --- a/tools/ui/src/lib/constants/api-endpoints.ts +++ b/tools/ui/src/lib/constants/api-endpoints.ts @@ -21,5 +21,11 @@ export const API_TOOLS = { EXECUTE: '/tools' }; +// resumable stream routes, the conv::model identity is appended as a path segment +export const API_STREAM = { + BASE: './v1/stream', + LOOKUP: './v1/streams/lookup' +}; + /** CORS proxy endpoint path */ export const CORS_PROXY_ENDPOINT = '/cors-proxy'; diff --git a/tools/ui/src/lib/constants/index.ts b/tools/ui/src/lib/constants/index.ts index 4993ab647a..b982a59072 100644 --- a/tools/ui/src/lib/constants/index.ts +++ b/tools/ui/src/lib/constants/index.ts @@ -46,6 +46,7 @@ export * from './routes'; export * from './sandbox'; export * from './settings-keys'; export * from './settings-registry'; +export * from './stream'; export * from './supported-file-types'; export * from './table-html-restorer'; export * from './title-generation'; diff --git a/tools/ui/src/lib/constants/storage.ts b/tools/ui/src/lib/constants/storage.ts index 8d425b96b7..0180a76fb6 100644 --- a/tools/ui/src/lib/constants/storage.ts +++ b/tools/ui/src/lib/constants/storage.ts @@ -26,6 +26,9 @@ export const THINKING_ENABLED_DEFAULT_LOCALSTORAGE_KEY = `${STORAGE_APP_NAME}.th export const REASONING_EFFORT_DEFAULT_LOCALSTORAGE_KEY = `${STORAGE_APP_NAME}.reasoningEffortDefault`; export const USER_OVERRIDES_LOCALSTORAGE_KEY = `${STORAGE_APP_NAME}.userOverrides`; +/** Key prefix for per-conversation resumable stream state, conversationId is appended */ +export const STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX = `${STORAGE_APP_NAME}.streamResume.`; + // Deprecated old key names (kept for backward compat while users migrate) /** @deprecated Use {@link ALWAYS_ALLOWED_TOOLS_LOCALSTORAGE_KEY} instead */ export const DEPRECATED_ALWAYS_ALLOWED_TOOLS_LOCALSTORAGE_KEY = `${STORAGE_APP_NAME_DEPRECATED}.alwaysAllowedTools`; diff --git a/tools/ui/src/lib/constants/stream.ts b/tools/ui/src/lib/constants/stream.ts new file mode 100644 index 0000000000..3d042451fc --- /dev/null +++ b/tools/ui/src/lib/constants/stream.ts @@ -0,0 +1,3 @@ +// grace window after a visibilitychange before we kick a reader whose socket likely died +// while the tab was hidden. covers brief background pauses without thrashing live streams +export const STREAM_VISIBILITY_KICK_MS = 1000; diff --git a/tools/ui/src/lib/enums/chat.enums.ts b/tools/ui/src/lib/enums/chat.enums.ts index 798fb1bf78..278e4af84e 100644 --- a/tools/ui/src/lib/enums/chat.enums.ts +++ b/tools/ui/src/lib/enums/chat.enums.ts @@ -5,6 +5,15 @@ export enum ChatMessageStatsView { SUMMARY = 'summary' } +/** + * Connection state of a streamed completion, drives the resume status indicator. + */ +export enum StreamConnectionState { + STREAMING = 'streaming', + RESUMING = 'resuming', + LOST = 'lost' +} + /** * Reasoning format options for API requests. */ diff --git a/tools/ui/src/lib/enums/index.ts b/tools/ui/src/lib/enums/index.ts index 811744fd9a..d8e47958bb 100644 --- a/tools/ui/src/lib/enums/index.ts +++ b/tools/ui/src/lib/enums/index.ts @@ -10,6 +10,7 @@ export { AgenticSectionType, ContinueIntentKind, ToolCallType } from './agentic. export { ChatMessageStatsView, + StreamConnectionState, ContentPartType, ConversationSelectionMode, ErrorDialogType, diff --git a/tools/ui/src/lib/services/chat.service.ts b/tools/ui/src/lib/services/chat.service.ts index 9001c9572f..7dfee37731 100644 --- a/tools/ui/src/lib/services/chat.service.ts +++ b/tools/ui/src/lib/services/chat.service.ts @@ -1,6 +1,7 @@ -import { getJsonHeaders } from '$lib/utils/api-headers'; +import { getAuthHeaders, getJsonHeaders } from '$lib/utils/api-headers'; import { formatAttachmentText } from '$lib/utils/formatters'; import { isAbortError } from '$lib/utils/abort'; +import { streamIdentity } from '$lib/utils/stream-identity'; import { ATTACHMENT_LABEL_PDF_FILE, ATTACHMENT_LABEL_MCP_PROMPT, @@ -13,7 +14,10 @@ import { CONTROL_ACTION, SSE_LINE_SEPARATOR, SSE_DATA_PREFIX, - SSE_DONE_MARKER + SSE_DONE_MARKER, + STREAM_VISIBILITY_KICK_MS, + STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX, + API_STREAM } from '$lib/constants'; import { AttachmentType, @@ -21,12 +25,14 @@ import { FileTypeAudio, MessageRole, MimeTypeAudio, - ReasoningFormat + ReasoningFormat, + StreamConnectionState } from '$lib/enums'; import type { ApiChatMessageContentPart, ApiChatMessageData, - ApiChatCompletionToolCall + ApiChatCompletionToolCall, + ApiStreamSession } from '$lib/types/api'; import type { AudioInputFormat, @@ -54,6 +60,19 @@ function getAudioInputFormat(mimeType: string): AudioInputFormat { return FileTypeAudio.MP3; } +interface ResumableStreamState { + bytesReceived: number; + updatedAt: number; + + // model frozen at POST time, lets a reload rebuild the exact conv::model identity the + // server keyed the session under. null when the POST carried no explicit model + model?: string | null; +} + +function streamStorageKey(conversationId: string): string { + return STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX + conversationId; +} + export class ChatService { /** * @@ -128,6 +147,7 @@ export class ChatService { onChunk, onComplete, onError, + onConnectionState, onReasoningChunk, onToolCallChunk, onModel, @@ -312,9 +332,16 @@ export class ChatService { } try { + const headers: Record = { ...getJsonHeaders() }; + // tag streaming requests with the conversation id, this single header is the opt in for the + // server side replay buffer and powers discoverActiveStream on tab reopen. with an explicit + // model the ::model suffix keeps the per model session distinct + if (stream && conversationId) { + headers['X-Conversation-Id'] = streamIdentity(conversationId, options.model); + } const response = await fetch(API_CHAT.COMPLETIONS, { method: 'POST', - headers: getJsonHeaders(), + headers, body: JSON.stringify(requestBody), signal }); @@ -341,7 +368,9 @@ export class ChatService { onCompletionId, onTimings, conversationId, - signal + signal, + onConnectionState, + options.model ); return; @@ -473,6 +502,116 @@ export class ChatService { * @param excludeReasoning - Whether to strip reasoning content (should match excludeReasoningFromContext setting) * @param signal - Optional AbortSignal to cancel the pre-encode request */ + static async cancelServerStream(conversationId: string, model?: string | null): Promise { + if (!conversationId) return; + try { + const id = streamIdentity(conversationId, model); + await fetch(`${API_STREAM.BASE}/${encodeURIComponent(id)}`, { + method: 'DELETE', + headers: getAuthHeaders() + }); + } catch (e) { + console.warn('cancelServerStream failed:', e); + } + } + + /** + * Pick the running session to splice into when discoverActiveStream lists candidates for a + * conversation. Finalized sessions are not candidates: their final content was already written + * to the DB by the original onComplete handler, so attaching to them would replay a buffer that + * may not match what the DB holds. A continue session's buffer holds only the appended deltas, + * not the pre continue prefix, so replaying it as a fresh generation would erase the original. + * + * Among running sessions we tie break on the most recent started_at, which covers the case of + * multiple inferences left running on the same conversation. + */ + static selectActiveStream( + sessions: ApiStreamSession[] | null | undefined + ): ApiStreamSession | null { + if (!Array.isArray(sessions) || sessions.length === 0) { + return null; + } + const running = sessions.filter((s) => !s.is_done); + if (running.length === 0) { + return null; + } + return running.reduce((best, cur) => (cur.started_at > best.started_at ? cur : best)); + } + + // persist the running byte count and the frozen model for a conversation, a later visit + // resumes the SSE replay at the right offset under the same conv::model identity + static saveStreamState( + conversationId: string, + bytesReceived: number, + model?: string | null + ): void { + if (!conversationId) return; + try { + const state: ResumableStreamState = { + bytesReceived, + updatedAt: Date.now(), + model: model ?? null + }; + localStorage.setItem(streamStorageKey(conversationId), JSON.stringify(state)); + } catch { + // localStorage may be full or disabled, silently ignore + } + } + + static getStreamState(conversationId: string): ResumableStreamState | null { + if (!conversationId) return null; + try { + const raw = localStorage.getItem(streamStorageKey(conversationId)); + if (!raw) return null; + const parsed = JSON.parse(raw) as ResumableStreamState; + if (!parsed || typeof parsed.bytesReceived !== 'number') return null; + return parsed; + } catch { + return null; + } + } + + static clearStreamState(conversationId: string): void { + if (!conversationId) return; + try { + localStorage.removeItem(streamStorageKey(conversationId)); + } catch { + // nothing to do + } + } + + /** + * Rebuild the stream identity for a resume. The model persisted at POST time wins, including a + * stored null which means the POST carried no explicit model so the identity stays the bare conv + * id. Only fall back to the caller supplied current model when nothing was persisted. + */ + static resumeStreamIdentity( + conversationId: string, + state: ResumableStreamState | null, + fallbackModel: string | null + ): string { + const model = state && state.model !== undefined ? state.model : fallbackModel; + return streamIdentity(conversationId, model); + } + + /** + * Reconnect to an interrupted stream for this conversation. Returns the fetch Response so the + * existing SSE parser drains it like a fresh stream. The server returns 200 on success, 404 if + * no session exists for the conv_id, and 400 if the offset is below the dropped prefix. + */ + static async resumeStream( + conversationId: string, + signal?: AbortSignal, + model?: string | null + ): Promise { + if (!conversationId) return null; + const state = ChatService.getStreamState(conversationId); + const from = state?.bytesReceived ?? 0; + const id = streamIdentity(conversationId, model); + const url = `${API_STREAM.BASE}/${encodeURIComponent(id)}?from=${from}`; + return await fetch(url, { method: 'GET', signal, headers: getAuthHeaders() }); + } + static async preEncode( messages: ApiChatMessageData[] | (DatabaseMessage & { extra?: DatabaseMessageExtra[] })[], model?: string | null, @@ -557,7 +696,7 @@ export class ChatService { * @returns {Promise} Promise that resolves when streaming is complete * @throws {Error} if the stream cannot be read or parsed */ - private static async handleStreamResponse( + static async handleStreamResponse( response: Response, onChunk?: (chunk: string) => void, onComplete?: ( @@ -573,15 +712,34 @@ export class ChatService { onCompletionId?: (id: string) => void, onTimings?: (timings?: ChatMessageTimings, promptProgress?: ChatMessagePromptProgress) => void, conversationId?: string, - abortSignal?: AbortSignal + abortSignal?: AbortSignal, + onConnectionState?: (state: StreamConnectionState) => void, + streamModel?: string | null ): Promise { - const reader = response.body?.getReader(); + let reader = response.body?.getReader(); if (!reader) { throw new Error('No response body'); } - const decoder = new TextDecoder(); + // bytesParsed is the absolute server side buffer offset of the next byte to parse + // segmentStartOffset is the absolute offset where the current reader started, reset on resume + // segmentBytesRead is wire bytes read by the current reader + let bytesParsed = 0; + let segmentStartOffset = 0; + let segmentBytesRead = 0; + let lastByteAt = Date.now(); + // each resume must produce at least one byte to be retried again + // if a resume returns 200 but yields nothing, we abandon + // since the session has a bounded size, the total number of retries is bounded by construction + let madeProgress = true; + const encoder = new TextEncoder(); + if (conversationId) { + ChatService.saveStreamState(conversationId, 0, streamModel); + } + onConnectionState?.(StreamConnectionState.STREAMING); + + let decoder = new TextDecoder(); let aggregatedContent = ''; let fullReasoningContent = ''; let aggregatedToolCalls: ApiChatCompletionToolCall[] = []; @@ -633,84 +791,180 @@ export class ChatService { } }; + const onVisibilityChange = () => { + if (typeof document === 'undefined') return; + if (document.visibilityState !== 'visible') return; + if (streamFinished) return; + if (!conversationId) return; + // the bytes have been quiet for too long, the OS likely killed the socket + // kicking the reader unblocks reader.read with done=true so the outer loop can resume + if (Date.now() - lastByteAt > STREAM_VISIBILITY_KICK_MS) { + reader!.cancel().catch(() => {}); + } + }; + if (typeof document !== 'undefined') { + document.addEventListener('visibilitychange', onVisibilityChange); + } + try { let chunk = ''; + // outer loop drives the resume cycle, swaps reader on premature end of stream while (true) { - if (abortSignal?.aborted) break; - - const { done, value } = await reader.read(); - if (done) break; - - if (abortSignal?.aborted) break; - - chunk += decoder.decode(value, { stream: true }); - const lines = chunk.split(SSE_LINE_SEPARATOR); - chunk = lines.pop() || ''; - - for (const line of lines) { + while (true) { if (abortSignal?.aborted) break; - if (line.startsWith(SSE_DATA_PREFIX)) { - const data = line.slice(SSE_DATA_PREFIX.length).trim(); - if (data === SSE_DONE_MARKER) { - streamFinished = true; - - continue; + let done: boolean; + let value: Uint8Array | undefined; + try { + const r = await reader.read(); + done = r.done; + value = r.value; + } catch (readErr) { + // reader.read() rejects with TypeError when the underlying connection drops + // instead of just resolving with done=true. treat it like done so the outer + // loop swaps reader via the resume path + if (isAbortError(readErr)) { + throw readErr; } + console.warn('reader.read() rejected, treating as premature end:', readErr); + done = true; + value = undefined; + } + if (done) break; - try { - const parsed: ApiChatCompletionStreamChunk = JSON.parse(data); - const choice = parsed.choices?.[0]; - const content = choice?.delta?.content; - const reasoningContent = choice?.delta?.reasoning_content; - const toolCalls = choice?.delta?.tool_calls; - const timings = parsed.timings; - const promptProgress = parsed.prompt_progress; + if (abortSignal?.aborted) break; - const chunkModel = ChatService.extractModelName(parsed); - if (chunkModel && !modelEmitted) { - modelEmitted = true; - onModel?.(chunkModel); - } - - if (parsed.id && !idEmitted) { - idEmitted = true; - onCompletionId?.(parsed.id); - } - - if (promptProgress) { - ChatService.notifyTimings(undefined, promptProgress, onTimings); - } - - if (timings) { - ChatService.notifyTimings(timings, promptProgress, onTimings); - lastTimings = timings; - } - - if (content) { - finalizeOpenToolCallBatch(); - aggregatedContent += content; - if (!abortSignal?.aborted) { - onChunk?.(content); - } - } - - if (reasoningContent) { - finalizeOpenToolCallBatch(); - fullReasoningContent += reasoningContent; - if (!abortSignal?.aborted) { - onReasoningChunk?.(reasoningContent); - } - } - - processToolCallDelta(toolCalls); - } catch (e) { - console.error('Error parsing JSON chunk:', e); + if (value && value.byteLength > 0) { + segmentBytesRead += value.byteLength; + lastByteAt = Date.now(); + if (!madeProgress) { + madeProgress = true; + onConnectionState?.(StreamConnectionState.STREAMING); } } + + chunk += decoder.decode(value, { stream: true }); + const lines = chunk.split(SSE_LINE_SEPARATOR); + chunk = lines.pop() || ''; + + // the persisted offset must point right after the last fully parsed line, + // the trailing `chunk` is partial bytes still waiting for a newline + if (conversationId) { + const tailBytes = encoder.encode(chunk).byteLength; + bytesParsed = segmentStartOffset + segmentBytesRead - tailBytes; + ChatService.saveStreamState(conversationId, bytesParsed, streamModel); + } + + for (const line of lines) { + if (abortSignal?.aborted) break; + + if (line.startsWith(SSE_DATA_PREFIX)) { + const data = line.slice(SSE_DATA_PREFIX.length).trim(); + if (data === SSE_DONE_MARKER) { + streamFinished = true; + + continue; + } + + try { + const parsed: ApiChatCompletionStreamChunk = JSON.parse(data); + const choice = parsed.choices?.[0]; + const content = choice?.delta?.content; + const reasoningContent = choice?.delta?.reasoning_content; + const toolCalls = choice?.delta?.tool_calls; + const timings = parsed.timings; + const promptProgress = parsed.prompt_progress; + + const chunkModel = ChatService.extractModelName(parsed); + if (chunkModel && !modelEmitted) { + modelEmitted = true; + onModel?.(chunkModel); + } + + if (parsed.id && !idEmitted) { + idEmitted = true; + onCompletionId?.(parsed.id); + } + + if (promptProgress) { + ChatService.notifyTimings(undefined, promptProgress, onTimings); + } + + if (timings) { + ChatService.notifyTimings(timings, promptProgress, onTimings); + lastTimings = timings; + } + + if (content) { + finalizeOpenToolCallBatch(); + aggregatedContent += content; + if (!abortSignal?.aborted) { + onChunk?.(content); + } + } + + if (reasoningContent) { + finalizeOpenToolCallBatch(); + fullReasoningContent += reasoningContent; + if (!abortSignal?.aborted) { + onReasoningChunk?.(reasoningContent); + } + } + + processToolCallDelta(toolCalls); + } catch (e) { + console.error('Error parsing JSON chunk:', e); + } + } + } + + if (abortSignal?.aborted) break; + if (streamFinished) break; } + // inner reader done, decide whether to try a resume if (abortSignal?.aborted) break; + if (streamFinished) break; + if (!conversationId) break; + + if (!madeProgress) { + onConnectionState?.(StreamConnectionState.LOST); + onError?.(new Error('Stream resume produced no new bytes, giving up')); + break; + } + + onConnectionState?.(StreamConnectionState.RESUMING); + madeProgress = false; + + // the server resends starting at bytesParsed, discard any partial line we held, it + // will be retransmitted from a clean line boundary. reuse the frozen model, not the + // live dropdown + const resumeResp = await ChatService.resumeStream( + conversationId, + abortSignal, + streamModel + ).catch(() => null); + // an abort landing during the resume request is intentional, not a lost connection + if (abortSignal?.aborted) break; + if (!resumeResp || resumeResp.status !== 200) { + onConnectionState?.(StreamConnectionState.LOST); + onError?.(new Error('Stream connection lost and could not be resumed')); + break; + } + const newReader = resumeResp.body?.getReader(); + if (!newReader) break; + + try { + reader.releaseLock(); + } catch { + /* ignore */ + } + reader = newReader; + decoder = new TextDecoder(); + chunk = ''; + segmentStartOffset = bytesParsed; + segmentBytesRead = 0; + lastByteAt = Date.now(); } if (abortSignal?.aborted) return; @@ -718,6 +972,10 @@ export class ChatService { if (streamFinished) { finalizeOpenToolCallBatch(); + if (conversationId) { + ChatService.clearStreamState(conversationId); + } + const finalToolCalls = aggregatedToolCalls.length > 0 ? JSON.stringify(aggregatedToolCalls) : undefined; @@ -735,7 +993,14 @@ export class ChatService { throw err; } finally { - reader.releaseLock(); + if (typeof document !== 'undefined') { + document.removeEventListener('visibilitychange', onVisibilityChange); + } + try { + reader.releaseLock(); + } catch { + /* ignore */ + } } } diff --git a/tools/ui/src/lib/services/mcp.service.ts b/tools/ui/src/lib/services/mcp.service.ts index 90de0d5d88..4432989171 100644 --- a/tools/ui/src/lib/services/mcp.service.ts +++ b/tools/ui/src/lib/services/mcp.service.ts @@ -628,19 +628,20 @@ export class MCPService { ); const runtimeErrorHandler = (error: Error) => { - // Ignore errors that are expected when the SDK's transport is closed, - // or when connecting to servers that don't support SSE (stateless-only - // endpoints returning 405). The SDK wraps the original AbortError in - // a new Error with the message "SSE stream disconnected: AbortError", - // and also produces "Cannot cancel a stream locked by a reader". - // DOMException is thrown by the browser when aborting fetch requests. - const msg = error.message || String(error); + // the SDK reports any post initialize error here, including the abort we trigger + // ourselves on the next health check cycle, on tab unload, or on server teardown. + // these are lifecycle aborts, not actionable errors, so we keep them out of the red console. + // the SDK wraps the original AbortError in a generic Error like + // "SSE stream disconnected: AbortError: The operation was aborted." + // which isAbortError cannot recognize by name alone, so we also pattern match on the message + if (isAbortError(error)) { + return; + } + const msg = error?.message ?? ''; if ( - error.name === 'AbortError' || - error instanceof DOMException || - msg.includes('SSE stream disconnected') || - msg.includes('stream locked by a reader') || - msg.includes('The operation was aborted') + /SSE stream disconnected:.*AbortError/i.test(msg) || + /AbortError: .*aborted/i.test(msg) || + /stream locked by a reader/i.test(msg) ) { return; } diff --git a/tools/ui/src/lib/stores/agentic.svelte.ts b/tools/ui/src/lib/stores/agentic.svelte.ts index 5579cc1e5a..27491257ac 100644 --- a/tools/ui/src/lib/stores/agentic.svelte.ts +++ b/tools/ui/src/lib/stores/agentic.svelte.ts @@ -614,7 +614,7 @@ class AgenticStore { throw error; } }, - undefined, + conversationId, signal ); diff --git a/tools/ui/src/lib/stores/chat.svelte.ts b/tools/ui/src/lib/stores/chat.svelte.ts index b899130e50..faaaa9755e 100644 --- a/tools/ui/src/lib/stores/chat.svelte.ts +++ b/tools/ui/src/lib/stores/chat.svelte.ts @@ -11,9 +11,11 @@ * @see ChatService in services/chat.service.ts for API operations */ -import { SvelteMap } from 'svelte/reactivity'; +import { SvelteMap, SvelteSet } from 'svelte/reactivity'; import { DatabaseService } from '$lib/services/database.service'; import { ChatService } from '$lib/services/chat.service'; +import { streamIdentity } from '$lib/utils/stream-identity'; +import { getAuthHeaders } from '$lib/utils/api-headers'; import { conversationsStore } from '$lib/stores/conversations.svelte'; import { config } from '$lib/stores/settings.svelte'; import { agenticStore } from '$lib/stores/agentic.svelte'; @@ -49,10 +51,17 @@ import type { import type { ApiChatMessageData, ApiProcessingState, + ApiStreamSession, DatabaseMessage, DatabaseMessageExtra } from '$lib/types'; -import { ContinueIntentKind, ErrorDialogType, MessageRole, MessageType } from '$lib/enums'; +import { + ContinueIntentKind, + ErrorDialogType, + MessageRole, + MessageType, + StreamConnectionState +} from '$lib/enums'; interface ConversationStateEntry { lastAccessed: number; @@ -65,9 +74,25 @@ class ChatStore { isLoading = $state(false); // true while the active conversation streams reasoning content but no visible content yet isReasoning = $state(false); + // resumable stream connection state for the active conversation + // streaming -> bytes flowing normally, resuming -> waiting on /v1/stream/:id reconnect, lost -> unrecoverable + streamConnectionState = $state(StreamConnectionState.STREAMING); chatLoadingStates = new SvelteMap(); chatReasoningStates = new SvelteMap(); - chatStreamingStates = new SvelteMap(); + chatStreamingStates = new SvelteMap< + string, + { response: string; messageId: string; model?: string | null } + >(); + // convs that the backend reports as having a running session, populated by the global sync + // at app mount and on visibilitychange. it does not overlap with chatLoadingStates which + // tracks inferences driven by this browser, both are unioned to feed the sidebar spinners + private remoteRunningConvs = new SvelteSet(); + // per conv attach lifecycle, used to derive the global streaming flag without flipping it + // off when one conv finishes while another is still streaming. mirrors chatLoadingStates + // in scope but tracks the attach + tee replay path specifically + private attachingConvs = new SvelteSet(); + // in-flight discoverActiveStream guard, keyed by conv id + private discoveringConvs = new SvelteSet(); private abortControllers = new SvelteMap(); private preEncodeAbortController: AbortController | null = null; private processingStates = new SvelteMap(); @@ -98,6 +123,11 @@ class ChatStore { this.chatLoadingStates.delete(convId); if (convId === conversationsStore.activeConversation?.id) this.isLoading = false; this.setChatReasoning(convId, false); + // the local pipe is the authoritative observer of session end: when it finishes (clean + // onComplete or explicit Stop), the backend session is finalized too, so we drop the + // sidebar hint for this conv right away instead of waiting for the next visibilitychange + // snapshot. without this the spinner ghosts until the user toggles the tab + this.remoteRunningConvs.delete(convId); } } @@ -110,9 +140,18 @@ class ChatStore { if (convId === conversationsStore.activeConversation?.id) this.isReasoning = false; } } - private setChatStreaming(convId: string, response: string, messageId: string): void { + private setChatStreaming( + convId: string, + response: string, + messageId: string, + model?: string | null + ): void { this.touchConversationState(convId); - this.chatStreamingStates.set(convId, { response, messageId }); + this.chatStreamingStates.set(convId, { + response, + messageId, + model: model ?? this.chatStreamingStates.get(convId)?.model + }); if (convId === conversationsStore.activeConversation?.id) this.currentResponse = response; } private clearChatStreaming(convId: string): void { @@ -137,6 +176,314 @@ class ChatStore { } } } + /** + * Server side stream discovery, split in three pieces: + * + * probeServerStream(convId) -> hits POST /v1/streams/lookup with the conv id, returns the session to attach + * to or null. Pure read, no side effect, no UI lock. Safe to fire in parallel with anything. + * + * attachServerStream(convId) -> flips the spinner immediately, fetches the replay stream + * from byte 0, finds the assistant slot to splice into (creates a placeholder if the conv has + * no assistant message yet, for cross device or fresh local DB cases), and pipes the SSE bytes + * into the message via handleStreamResponse. + * + * discoverActiveStream(convId) -> probe + attach in one call. Used by callers that do not need + * to overlap the probe with other async work. + * + * The mount of the chat page in +page.svelte calls probeServerStream in parallel with + * loadConversation, then attachServerStream once both have settled. This gives the earliest + * possible time to spinner and avoids racing against an empty activeMessages array. + */ + async probeServerStream(convId: string): Promise { + if (!convId) return null; + let listResp: Response; + try { + // POST the one conv id we are probing + listResp = await fetch(`./v1/streams/lookup`, { + method: 'POST', + headers: { ...getAuthHeaders(), 'Content-Type': 'application/json' }, + body: JSON.stringify({ conversation_ids: [convId] }) + }); + } catch (e) { + console.warn('probeServerStream fetch failed:', e); + return null; + } + if (!listResp.ok) { + console.warn(`probeServerStream got HTTP ${listResp.status} for conv ${convId}`); + return null; + } + let sessions: ApiStreamSession[]; + try { + sessions = (await listResp.json()) as ApiStreamSession[]; + } catch (e) { + console.warn('probeServerStream JSON parse failed:', e); + return null; + } + return ChatService.selectActiveStream(sessions); + } + + async attachServerStream(convId: string, streamId?: string): Promise { + if (!convId) return; + if (this.chatStreamingStates.has(convId)) return; + + // flip the spinner immediately, the user sees activity as soon as the conv becomes active. + // the global isStreamingActive flag is derived from attachingConvs.size, so adding here + // turns it on, and removing in unlock only turns it off when this is the last attach + this.setChatLoading(convId, true); + this.attachingConvs.add(convId); + this.setStreamingActive(true); + // only set the active processing conv if we are looking at it, otherwise a background + // attach would steal the indicator from the conv the user is currently viewing + if (convId === conversationsStore.activeConversation?.id) { + this.setActiveProcessingConversation(convId); + } + + const unlock = () => { + this.attachingConvs.delete(convId); + // flip the global flag off only when no other conv is still attaching + if (this.attachingConvs.size === 0) { + this.setStreamingActive(false); + } + this.setChatLoading(convId, false); + this.clearChatStreaming(convId); + }; + + // fetch the replay stream from byte 0, rebuild the assistant message from scratch. + // resolve the server side identity, fall back to streamIdentity when the caller does not + // pass a streamId. probeServerStream returns the full id (with ::model suffix when present) + const id = streamId || streamIdentity(convId, selectedModelName()); + let response: Response; + try { + response = await fetch(`./v1/stream/${encodeURIComponent(id)}?from=0`, { + headers: getAuthHeaders() + }); + } catch (e) { + console.error('attachServerStream replay fetch failed:', e); + unlock(); + return; + } + if (!response.ok) { + console.warn(`attachServerStream replay got HTTP ${response.status} for conv ${convId}`); + unlock(); + return; + } + + // load the target conversation messages by id, not via the active store. when multiple + // attaches run in parallel the active store may reflect another conv and writing through + // its index mixes content across convs (CoT flicker, message bleed). by going through the + // DB we stay isolated, and only mirror into the active store when the attached conv is + // the one currently displayed + let messages: DatabaseMessage[]; + try { + messages = await DatabaseService.getConversationMessages(convId); + } catch (e) { + console.error('attachServerStream load messages failed:', e); + unlock(); + return; + } + + // locate the slot to splice into, create a placeholder assistant message if there is none. + // we use the conv-scoped findLastAssistantIdx helpers, they only depend on the array + let targetIdx = this.findLastAssistantIdx(messages); + if (targetIdx === -1) { + const lastUserIdx = this.findLastUserIdx(messages); + if (lastUserIdx === -1) { + console.warn( + `attachServerStream: conv ${convId} has no user or assistant message, cannot splice` + ); + unlock(); + return; + } + try { + const placeholder = await DatabaseService.createMessageBranch( + { + convId, + role: MessageRole.ASSISTANT, + content: '', + type: MessageType.TEXT, + timestamp: Date.now(), + parent: messages[lastUserIdx].id, + children: [], + toolCalls: '' + } as Omit, + messages[lastUserIdx].id + ); + messages = [...messages, placeholder]; + targetIdx = messages.length - 1; + // only push into the active store when this conv is the one displayed right now + if (convId === conversationsStore.activeConversation?.id) { + conversationsStore.addMessageToActive(placeholder); + } + } catch (e) { + console.error('attachServerStream placeholder creation failed:', e); + unlock(); + return; + } + } + if (targetIdx === -1) { + unlock(); + return; + } + const targetMessage = messages[targetIdx]; + const targetMessageId = targetMessage.id; + // when the assistant slot already has content, the running session is a continue or + // another append flow and its buffer holds only the appended deltas. preserve the prefix + // and let the replay add to it. when the slot is empty the session buffer holds the whole + // message so we wipe and rebuild from byte 0 + const existingContent = targetMessage.content ?? ''; + const existingReasoning = targetMessage.reasoningContent ?? ''; + const isAppendMode = existingContent.length > 0; + + // helper: write to the active store only when the attached conv is currently displayed. + // the lookup by message id is robust to reordering of activeMessages, two parallel attaches + // can no longer step on each other's indices + const writeActive = (updates: Partial) => { + if (convId !== conversationsStore.activeConversation?.id) { + return; + } + const liveIdx = conversationsStore.findMessageIndex(targetMessageId); + if (liveIdx === -1) return; + conversationsStore.updateMessageAtIndex(liveIdx, updates); + }; + + if (!isAppendMode) { + writeActive({ content: '', reasoningContent: undefined }); + } + + // extract the model suffix, the resume calls in handleStreamResponse must reuse the model + // the session was tagged with, not the live dropdown + const sepIdx = id.indexOf('::'); + const attachedModel: string | null = sepIdx === -1 ? null : id.slice(sepIdx + 2); + this.setChatStreaming(convId, existingContent, targetMessageId, attachedModel); + const abortController = this.getOrCreateAbortController(convId); + + let streamedContent = ''; + let streamedReasoningContent = ''; + + const cleanup = () => { + unlock(); + this.setProcessingState(convId, null); + }; + + try { + await ChatService.handleStreamResponse( + response, + (chunk: string) => { + streamedContent += chunk; + const displayed = isAppendMode ? existingContent + streamedContent : streamedContent; + writeActive({ content: displayed }); + this.setChatStreaming(convId, displayed, targetMessageId); + }, + async ( + finalContent?: string, + reasoningContent?: string, + timings?: ChatMessageTimings, + toolCalls?: string + ) => { + const streamed = streamedContent || finalContent || ''; + const streamedR = streamedReasoningContent || reasoningContent || ''; + const content = isAppendMode ? existingContent + streamed : streamed; + const reasoning = isAppendMode ? existingReasoning + streamedR : streamedR; + // the DB write is the source of truth, mirror to the active store only when + // the conv is currently displayed + await DatabaseService.updateMessage(targetMessageId, { + content, + reasoningContent: reasoning || undefined, + toolCalls: toolCalls || '', + timings + }); + writeActive({ + content, + reasoningContent: reasoning || undefined, + timings + }); + cleanup(); + }, + (err: Error) => { + console.error('attachServerStream pipe error:', err); + cleanup(); + }, + (chunk: string) => { + streamedReasoningContent += chunk; + const displayed = isAppendMode + ? existingReasoning + streamedReasoningContent + : streamedReasoningContent; + writeActive({ reasoningContent: displayed }); + }, + undefined, + undefined, + undefined, + undefined, + convId, + abortController.signal, + (connState: StreamConnectionState) => { + if (convId === conversationsStore.activeConversation?.id) { + this.streamConnectionState = connState; + } + }, + attachedModel + ); + } catch (e) { + console.error('attachServerStream pipe crashed:', e); + cleanup(); + } + } + + async discoverActiveStream(convId: string): Promise { + if (!convId) return; + if (this.chatStreamingStates.has(convId)) return; + if (this.chatLoadingStates.get(convId)) return; + // concurrency guard: another discover may already be running for this conv (typical race + // between mount and visibilitychange on tab switch). a second concurrent fetch on the same + // /v1/stream/ would duplicate every byte into the DB message, this guard bounces it + if (this.discoveringConvs.has(convId)) return; + this.discoveringConvs.add(convId); + + try { + // the model is frozen at POST time, rebuild the exact conv::model identity from the + // persisted state so the lookup key matches what the server stored. null means a single + // model conv with no ::suffix, only guess from the dropdown with no persisted state + const localState = ChatService.getStreamState(convId); + const streamId = ChatService.resumeStreamIdentity(convId, localState, selectedModelName()); + + // primary path: ask the server which sessions exist for this identity + const serverTarget = await this.probeServerStream(streamId); + if (serverTarget) { + // pass the full server side identity (may carry a ::model suffix) so the GET routes + // straight to the owning session, no probe or fan out + await this.attachServerStream(convId, serverTarget.conversation_id); + return; + } + + // fallback: local state remembers an interrupted byte offset for this conv, the server may + // still have a live session matching that identity (we just lost the bytes mid stream). retry + // with the frozen identity, the server probe inside attachServerStream tells us if it exists + if (!localState) { + return; + } + await this.attachServerStream(convId, streamId); + // if attachServerStream failed (session gone, TTL expired), clear the local state to avoid retrying forever + if (!this.chatStreamingStates.has(convId) && !this.chatLoadingStates.get(convId)) { + ChatService.clearStreamState(convId); + } + } finally { + this.discoveringConvs.delete(convId); + } + } + + private findLastAssistantIdx(messages: DatabaseMessage[]): number { + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === MessageRole.ASSISTANT) return i; + } + return -1; + } + + private findLastUserIdx(messages: DatabaseMessage[]): number { + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === MessageRole.USER) return i; + } + return -1; + } clearUIState(): void { this.isLoading = false; @@ -265,13 +612,83 @@ class ChatStore { } getAllLoadingChats(): string[] { - return Array.from(this.chatLoadingStates.keys()); + // union of local (this browser is piping) and remote (backend reports a running session + // for this conv but no local pipe yet) sources. the sidebar shows one spinner per entry + const out = new SvelteSet(this.chatLoadingStates.keys()); + for (const id of this.remoteRunningConvs) { + out.add(id); + } + return Array.from(out); } getAllStreamingChats(): string[] { return Array.from(this.chatStreamingStates.keys()); } + /** + * Resync the remote running convs set from the backend. Called by the layout at mount and on + * visibilitychange, no polling. A snapshot semantic: the set is replaced wholesale, stale entries + * for sessions that finalized while the browser was elsewhere are dropped naturally. + */ + async syncRemoteRunningStreams(): Promise { + // the conversations store loads from IndexedDB asynchronously, the +layout onMount caller + // fires before that finishes. read ids straight from the DB so the result does not depend + // on the store init race, and the sidebar spinners light up at first paint for every conv + // the user owns even if it has not been hydrated into the store yet + let ids: string[]; + try { + const all = await DatabaseService.getAllConversations(); + ids = all.map((c) => c.id).filter((id) => !!id); + } catch (e) { + console.warn('syncRemoteRunningStreams DB read failed:', e); + return; + } + // only ask about conv ids the user already owns + if (ids.length === 0) { + for (const id of Array.from(this.remoteRunningConvs)) { + this.remoteRunningConvs.delete(id); + } + return; + } + // rebuild the frozen conv::model identity per conv so a session started with a model still + // matches. the server response is mapped back to the bare id below for the sidebar set + const lookupIds = ids.map((id) => + ChatService.resumeStreamIdentity(id, ChatService.getStreamState(id), null) + ); + let sessions: ApiStreamSession[]; + try { + const resp = await fetch('./v1/streams/lookup', { + method: 'POST', + headers: { ...getAuthHeaders(), 'Content-Type': 'application/json' }, + body: JSON.stringify({ conversation_ids: lookupIds }) + }); + if (!resp.ok) return; + const body = (await resp.json()) as unknown; + if (!Array.isArray(body)) return; + sessions = body as ApiStreamSession[]; + } catch (e) { + console.warn('syncRemoteRunningStreams fetch failed:', e); + return; + } + const running = new SvelteSet(); + for (const s of sessions) { + if (s && !s.is_done && typeof s.conversation_id === 'string' && s.conversation_id) { + // strip the optional ::model suffix, the sidebar set is keyed by the bare conv id + const sepIdx = s.conversation_id.indexOf('::'); + const bareId = sepIdx === -1 ? s.conversation_id : s.conversation_id.slice(0, sepIdx); + running.add(bareId); + } + } + for (const id of Array.from(this.remoteRunningConvs)) { + if (!running.has(id)) { + this.remoteRunningConvs.delete(id); + } + } + for (const id of running) { + this.remoteRunningConvs.add(id); + } + } + getChatStreamingPublic(convId: string): { response: string; messageId: string } | undefined { return this.getChatStreaming(convId); } @@ -922,6 +1339,11 @@ class ChatStore { onModel: streamCallbacks.onModel, onCompletionId: streamCallbacks.onCompletionId, onTimings: streamCallbacks.onTimings, + onConnectionState: (state: StreamConnectionState) => { + if (convId === conversationsStore.activeConversation?.id) { + this.streamConnectionState = state; + } + }, onComplete: async ( finalContent?: string, reasoningContent?: string, @@ -979,6 +1401,12 @@ class ChatStore { async stopGenerationForChat(convId: string): Promise { await this.savePartialResponseIfNeeded(convId); this.setStreamingActive(false); + // tell the server to stop the generation, not just drop the HTTP socket. without this the + // detached drain keeps producing tokens until eos or max_tokens. use the frozen identity + // captured when the session started, not the live dropdown + const streamStateForStop = this.chatStreamingStates.get(convId); + const modelForStop = streamStateForStop?.model ?? selectedModelName(); + void ChatService.cancelServerStream(convId, modelForStop); this.abortRequest(convId); this.setChatLoading(convId, false); this.clearChatStreaming(convId); @@ -1393,7 +1821,11 @@ class ChatStore { const updateStreamingContent = (fullContent: string) => { this.setChatStreaming(msg.convId, fullContent, msg.id); - conversationsStore.updateMessageAtIndex(idx, { content: fullContent }); + // resolve the row by id on every write, switching to another conv mid continue makes + // this a no op instead of writing positionally into the now displayed conversation + conversationsStore.updateMessageAtIndex(conversationsStore.findMessageIndex(msg.id), { + content: fullContent + }); }; const abortController = this.getOrCreateAbortController(msg.convId); @@ -1403,6 +1835,11 @@ class ChatStore { { ...this.getApiOptions(), continueFinalMessage: true, + onConnectionState: (state: StreamConnectionState) => { + if (msg.convId === conversationsStore.activeConversation?.id) { + this.streamConnectionState = state; + } + }, onChunk: (chunk: string) => { appendedContent += chunk; hasReceivedContent = true; @@ -1414,7 +1851,7 @@ class ChatStore { hasReceivedContent = true; // mark streaming state so a stop mid-thinking can persist the partial reasoning this.setChatStreaming(msg.convId, originalContent + appendedContent, msg.id); - conversationsStore.updateMessageAtIndex(idx, { + conversationsStore.updateMessageAtIndex(conversationsStore.findMessageIndex(msg.id), { reasoningContent: originalReasoning + appendedReasoning }); this.setChatReasoning(msg.convId, true); @@ -1455,7 +1892,7 @@ class ChatStore { timings }); - conversationsStore.updateMessageAtIndex(idx, { + conversationsStore.updateMessageAtIndex(conversationsStore.findMessageIndex(msg.id), { content: fullContent, reasoningContent: fullReasoning, timestamp: Date.now(), @@ -1477,11 +1914,14 @@ class ChatStore { timestamp: Date.now() }); - conversationsStore.updateMessageAtIndex(idx, { - content: originalContent + appendedContent, - reasoningContent: originalReasoning + appendedReasoning || undefined, - timestamp: Date.now() - }); + conversationsStore.updateMessageAtIndex( + conversationsStore.findMessageIndex(msg.id), + { + content: originalContent + appendedContent, + reasoningContent: originalReasoning + appendedReasoning || undefined, + timestamp: Date.now() + } + ); } this.setChatLoading(msg.convId, false); @@ -1498,7 +1938,7 @@ class ChatStore { reasoningContent: originalReasoning + appendedReasoning || undefined, timestamp: Date.now() }); - conversationsStore.updateMessageAtIndex(idx, { + conversationsStore.updateMessageAtIndex(conversationsStore.findMessageIndex(msg.id), { content: originalContent + appendedContent, reasoningContent: originalReasoning + appendedReasoning || undefined, timestamp: Date.now() diff --git a/tools/ui/src/lib/types/api.d.ts b/tools/ui/src/lib/types/api.d.ts index 2a2524d002..ec695ac61c 100644 --- a/tools/ui/src/lib/types/api.d.ts +++ b/tools/ui/src/lib/types/api.d.ts @@ -512,3 +512,18 @@ export interface ApiRouterModelsUnloadResponse { success: boolean; error?: string; } + +/** + * Entry returned by POST /v1/streams/lookup. The client passes the conv ids it owns in the body + * and the server returns one entry per matching live or recently completed background streaming + * session, keyed by conversation_id. The WebUI uses this at mount and on visibilitychange to + * populate sidebar spinners and to reattach to an ongoing inference for the active conversation. + * The server never lists ids the client did not ask about, so foreign random UUIDs stay private. + */ +export interface ApiStreamSession { + conversation_id: string; + is_done: boolean; + total_bytes: number; + started_at: number; + completed_at: number; +} diff --git a/tools/ui/src/lib/types/index.ts b/tools/ui/src/lib/types/index.ts index 9b0b118045..cbe0538be6 100644 --- a/tools/ui/src/lib/types/index.ts +++ b/tools/ui/src/lib/types/index.ts @@ -34,7 +34,8 @@ export type { ApiRouterModelsListResponse, ApiRouterModelsUnloadRequest, ApiRouterModelsUnloadResponse, - AudioInputFormat + AudioInputFormat, + ApiStreamSession } from './api'; // Chat types diff --git a/tools/ui/src/lib/types/settings.d.ts b/tools/ui/src/lib/types/settings.d.ts index d1cdca957c..9c787cd958 100644 --- a/tools/ui/src/lib/types/settings.d.ts +++ b/tools/ui/src/lib/types/settings.d.ts @@ -4,9 +4,10 @@ import type { OpenAIToolDefinition } from './mcp'; import type { DatabaseMessageExtra } from './database'; import type { ParameterSource, - ReasoningEffort, SyncableParameterType, - SettingsFieldType + SettingsFieldType, + StreamConnectionState, + ReasoningEffort } from '$lib/enums'; import type { Icon } from '@lucide/svelte'; import type { Component } from 'svelte'; @@ -119,6 +120,7 @@ export interface SettingsChatServiceOptions { toolCalls?: string ) => void; onError?: (error: Error) => void; + onConnectionState?: (state: StreamConnectionState) => void; } export type SettingsConfigType = typeof SETTING_CONFIG_DEFAULT & { diff --git a/tools/ui/src/lib/utils/abort.ts b/tools/ui/src/lib/utils/abort.ts index fc4f31ec69..135ef087a0 100644 --- a/tools/ui/src/lib/utils/abort.ts +++ b/tools/ui/src/lib/utils/abort.ts @@ -6,6 +6,17 @@ * when needed (e.g., user stops generation, navigates away, etc.). */ +// the standard DOMException name for a cancelled operation +const ABORT_ERROR_NAME = 'AbortError'; + +// browser specific TypeError messages emitted when a fetch reader is cut by page unload, +// navigation, or a transient network drop. functionally aborts, not actionable errors +const ABORT_LIKE_MESSAGE_PATTERNS = [ + /input stream/i, // Firefox: stream cut at unload + /network connection was lost/i, // Safari: transient network drop + /load failed/i // Safari: page navigation during fetch +]; + /** * Throws an AbortError if the signal is aborted. * Use this at the start of async operations to fail fast. @@ -23,7 +34,7 @@ */ export function throwIfAborted(signal?: AbortSignal): void { if (signal?.aborted) { - throw new DOMException('Operation was aborted', 'AbortError'); + throw new DOMException('Operation was aborted', ABORT_ERROR_NAME); } } @@ -48,11 +59,18 @@ export function throwIfAborted(signal?: AbortSignal): void { * ``` */ export function isAbortError(error: unknown): boolean { - if (error instanceof DOMException && error.name === 'AbortError') { + if (error instanceof DOMException && error.name === ABORT_ERROR_NAME) { return true; } - if (error instanceof Error && error.name === 'AbortError') { - return true; + if (error instanceof Error) { + if (error.name === ABORT_ERROR_NAME) { + return true; + } + // these patterns are functionally aborts, keep them out of the red console + if (error instanceof TypeError) { + const msg = error.message ?? ''; + if (ABORT_LIKE_MESSAGE_PATTERNS.some((re) => re.test(msg))) return true; + } } return false; } @@ -133,7 +151,7 @@ export async function withAbortSignal(promise: Promise, signal?: AbortSign return new Promise((resolve, reject) => { const abortHandler = () => { - reject(new DOMException('Operation was aborted', 'AbortError')); + reject(new DOMException('Operation was aborted', ABORT_ERROR_NAME)); }; signal.addEventListener('abort', abortHandler, { once: true }); diff --git a/tools/ui/src/lib/utils/stream-identity.ts b/tools/ui/src/lib/utils/stream-identity.ts new file mode 100644 index 0000000000..ce88df0074 --- /dev/null +++ b/tools/ui/src/lib/utils/stream-identity.ts @@ -0,0 +1,13 @@ +/** + * Build the conversation identity used by the server side replay buffer. + * + * The server identifies a stream session by a conversation id sent in the + * X-Conversation-Id header. When the user has explicitly picked a model the + * client appends ::modelName, so a per model session stays distinct and the + * router resolves the owning child through its conv_id -> model map. + */ +export function streamIdentity(conversationId: string, model?: string | null): string { + if (!conversationId) return ''; + if (!model) return conversationId; + return `${conversationId}::${model}`; +} diff --git a/tools/ui/src/routes/(chat)/chat/[id]/+page.svelte b/tools/ui/src/routes/(chat)/chat/[id]/+page.svelte index e31d4443ef..f14553c90a 100644 --- a/tools/ui/src/routes/(chat)/chat/[id]/+page.svelte +++ b/tools/ui/src/routes/(chat)/chat/[id]/+page.svelte @@ -4,7 +4,7 @@ import { afterNavigate } from '$app/navigation'; import { DialogModelNotAvailable } from '$lib/components/app'; import { APP_NAME, ROUTES } from '$lib/constants'; - import { chatStore, isLoading } from '$lib/stores/chat.svelte'; + import { chatStore } from '$lib/stores/chat.svelte'; import { conversationsStore, activeConversation } from '$lib/stores/conversations.svelte'; import { modelsStore, modelOptions } from '$lib/stores/models.svelte'; @@ -83,7 +83,7 @@ // Skip loading if this conversation is already active (e.g., just created) if (activeConversation()?.id === chatId) { - // Still handle URL params even if conversation is active + void chatStore.discoverActiveStream(chatId); if ((qParam !== null || modelParam !== null) && !urlParamsProcessed) { handleUrlParams(); } @@ -92,35 +92,33 @@ (async () => { const success = await conversationsStore.loadConversation(chatId); - if (success) { - chatStore.syncLoadingStateForChat(chatId); - - // Handle URL params after conversation is loaded - if ((qParam !== null || modelParam !== null) && !urlParamsProcessed) { - await handleUrlParams(); - } - } else { + if (!success) { await goto(ROUTES.START); + return; + } + chatStore.syncLoadingStateForChat(chatId); + // server probe (with localStorage fallback) and attach + await chatStore.discoverActiveStream(chatId); + + if ((qParam !== null || modelParam !== null) && !urlParamsProcessed) { + await handleUrlParams(); } })(); } }); $effect(() => { - if (typeof window !== 'undefined') { - const handleBeforeUnload = () => { - if (isLoading()) { - console.log('Page unload detected while streaming - aborting stream'); - chatStore.stopGeneration(); - } - }; + if (typeof window === 'undefined' || typeof document === 'undefined') return; - window.addEventListener('beforeunload', handleBeforeUnload); - - return () => { - window.removeEventListener('beforeunload', handleBeforeUnload); - }; - } + // when the tab comes back to the foreground, re-run discovery to catch any race + // where the initial mount probe missed an active session + const onVisibility = () => { + if (document.visibilityState !== 'visible') return; + if (!chatId) return; + void chatStore.discoverActiveStream(chatId); + }; + document.addEventListener('visibilitychange', onVisibility); + return () => document.removeEventListener('visibilitychange', onVisibility); }); diff --git a/tools/ui/src/routes/+layout.svelte b/tools/ui/src/routes/+layout.svelte index 339b067dd2..38848786e9 100644 --- a/tools/ui/src/routes/+layout.svelte +++ b/tools/ui/src/routes/+layout.svelte @@ -11,6 +11,7 @@ import { PwaMetaTags, PwaRefreshAlert } from '$lib/components/pwa'; import { pwaAssetsHead } from 'virtual:pwa-assets/head'; + import { chatStore } from '$lib/stores/chat.svelte'; import { conversationsStore } from '$lib/stores/conversations.svelte'; import * as Tooltip from '$lib/components/ui/tooltip'; import { isRouterMode, serverStore } from '$lib/stores/server.svelte'; @@ -154,8 +155,18 @@ onMount(() => { updateFavicon(); + // snapshot of every backend running stream on first load, populates the sidebar spinners + // so the user sees each conv that has a live inference, even ones not opened yet + void chatStore.syncRemoteRunningStreams(); }); + // refresh that snapshot when the tab returns to the foreground, a stream may have advanced + // or ended while it was hidden. snapshot only, no polling + function handleVisibilityChange() { + if (document.visibilityState !== 'visible') return; + void chatStore.syncRemoteRunningStreams(); + } + $effect(() => { void theme.isSystemDark; @@ -280,6 +291,7 @@ +
diff --git a/tools/ui/tests/unit/abort.test.ts b/tools/ui/tests/unit/abort.test.ts new file mode 100644 index 0000000000..306e71ca15 --- /dev/null +++ b/tools/ui/tests/unit/abort.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from 'vitest'; +import { isAbortError } from '$lib/utils/abort'; + +describe('isAbortError', () => { + it('returns false for null, undefined and non-error values', () => { + expect(isAbortError(null)).toBe(false); + expect(isAbortError(undefined)).toBe(false); + expect(isAbortError('string error')).toBe(false); + expect(isAbortError({ name: 'AbortError' })).toBe(false); + expect(isAbortError(42)).toBe(false); + }); + + it('returns true for DOMException with AbortError name', () => { + const err = new DOMException('Operation was aborted', 'AbortError'); + expect(isAbortError(err)).toBe(true); + }); + + it('returns true for plain Error with AbortError name', () => { + const err = new Error('aborted'); + err.name = 'AbortError'; + expect(isAbortError(err)).toBe(true); + }); + + it('returns false for unrelated Error instances', () => { + expect(isAbortError(new Error('something failed'))).toBe(false); + expect(isAbortError(new TypeError('not related'))).toBe(false); + expect(isAbortError(new RangeError('out of range'))).toBe(false); + }); + + it('recognizes Firefox TypeError "Error in input stream" emitted at page unload', () => { + expect(isAbortError(new TypeError('Error in input stream'))).toBe(true); + expect(isAbortError(new TypeError('TypeError: Error in input stream'))).toBe(true); + }); + + it('recognizes Safari "The network connection was lost" during transient drop', () => { + expect(isAbortError(new TypeError('The network connection was lost.'))).toBe(true); + }); + + it('recognizes Safari "Load failed" during page navigation', () => { + expect(isAbortError(new TypeError('Load failed'))).toBe(true); + }); + + it('does NOT recognize generic TypeError messages as aborts', () => { + // matching too broadly would hide real bugs, the predicate must stay conservative + expect(isAbortError(new TypeError('Failed to fetch'))).toBe(false); + expect(isAbortError(new TypeError('Cannot read property of undefined'))).toBe(false); + expect(isAbortError(new TypeError('NetworkError when attempting to fetch resource'))).toBe( + false + ); + }); + + it('is case insensitive on the matched substrings', () => { + expect(isAbortError(new TypeError('error in INPUT STREAM'))).toBe(true); + expect(isAbortError(new TypeError('the network connection WAS LOST'))).toBe(true); + }); +}); diff --git a/tools/ui/tests/unit/stream-discovery.test.ts b/tools/ui/tests/unit/stream-discovery.test.ts new file mode 100644 index 0000000000..a428e5df8a --- /dev/null +++ b/tools/ui/tests/unit/stream-discovery.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, it } from 'vitest'; +import { ChatService } from '$lib/services/chat.service'; +import type { ApiStreamSession } from '$lib/types'; + +function makeSession(overrides: Partial): ApiStreamSession { + return { + conversation_id: 'conv', + is_done: true, + total_bytes: 0, + started_at: 0, + completed_at: 0, + ...overrides + }; +} + +describe('selectActiveStream', () => { + it('returns null on empty input', () => { + expect(ChatService.selectActiveStream([])).toBeNull(); + }); + + it('returns null on null or undefined input', () => { + expect(ChatService.selectActiveStream(null)).toBeNull(); + expect(ChatService.selectActiveStream(undefined)).toBeNull(); + }); + + it('returns the single session when it is running', () => { + const s = makeSession({ conversation_id: 'only', is_done: false, started_at: 42 }); + expect(ChatService.selectActiveStream([s])).toBe(s); + }); + + it('returns null when the single session is finalized', () => { + const s = makeSession({ conversation_id: 'only', is_done: true, started_at: 42 }); + expect(ChatService.selectActiveStream([s])).toBeNull(); + }); + + it('prefers a still running session over a finalized one regardless of started_at', () => { + const finalized = makeSession({ conversation_id: 'old', is_done: true, started_at: 1000 }); + const running = makeSession({ conversation_id: 'new', is_done: false, started_at: 10 }); + expect(ChatService.selectActiveStream([finalized, running])?.conversation_id).toBe('new'); + expect(ChatService.selectActiveStream([running, finalized])?.conversation_id).toBe('new'); + }); + + it('among running sessions, picks the most recently started one', () => { + const a = makeSession({ conversation_id: 'a', is_done: false, started_at: 100 }); + const b = makeSession({ conversation_id: 'b', is_done: false, started_at: 200 }); + const c = makeSession({ conversation_id: 'c', is_done: false, started_at: 150 }); + expect(ChatService.selectActiveStream([a, b, c])?.conversation_id).toBe('b'); + expect(ChatService.selectActiveStream([c, a, b])?.conversation_id).toBe('b'); + }); + + it('returns null when all sessions are finalized, the DB already holds the content', () => { + const a = makeSession({ conversation_id: 'a', is_done: true, started_at: 10 }); + const b = makeSession({ conversation_id: 'b', is_done: true, started_at: 30 }); + const c = makeSession({ conversation_id: 'c', is_done: true, started_at: 20 }); + expect(ChatService.selectActiveStream([a, b, c])).toBeNull(); + }); + + it('keeps the first match on ties when both are running with identical started_at', () => { + // reduce visits left to right, the initial accumulator stays unless a strictly greater value appears + const a = makeSession({ conversation_id: 'first', is_done: false, started_at: 50 }); + const b = makeSession({ conversation_id: 'second', is_done: false, started_at: 50 }); + expect(ChatService.selectActiveStream([a, b])?.conversation_id).toBe('first'); + }); + + it('handles a typical realistic mix: two finalized old, one freshly running, one freshly finalized', () => { + const old1 = makeSession({ conversation_id: 'old1', is_done: true, started_at: 100 }); + const old2 = makeSession({ conversation_id: 'old2', is_done: true, started_at: 200 }); + const freshFin = makeSession({ conversation_id: 'freshFin', is_done: true, started_at: 500 }); + const running = makeSession({ conversation_id: 'running', is_done: false, started_at: 400 }); + expect(ChatService.selectActiveStream([old1, old2, freshFin, running])?.conversation_id).toBe( + 'running' + ); + }); +}); diff --git a/tools/ui/tests/unit/stream-resume.test.ts b/tools/ui/tests/unit/stream-resume.test.ts new file mode 100644 index 0000000000..f52f2cabd5 --- /dev/null +++ b/tools/ui/tests/unit/stream-resume.test.ts @@ -0,0 +1,128 @@ +import { afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'; + +// node env unit project has no DOM, install a minimal localStorage backed by a Map +beforeAll(() => { + const store = new Map(); + const polyfill: Storage = { + get length() { + return store.size; + }, + clear: () => store.clear(), + getItem: (k) => (store.has(k) ? store.get(k)! : null), + key: (i) => Array.from(store.keys())[i] ?? null, + removeItem: (k) => { + store.delete(k); + }, + setItem: (k, v) => { + store.set(k, String(v)); + } + }; + (globalThis as unknown as { localStorage: Storage }).localStorage = polyfill; +}); + +import { ChatService } from '$lib/services/chat.service'; +import { STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX } from '$lib/constants'; + +describe('ChatService stream resume', () => { + beforeEach(() => { + localStorage.clear(); + }); + afterEach(() => { + localStorage.clear(); + }); + + it('returns null when no state exists for the conversation', () => { + expect(ChatService.getStreamState('conv-a')).toBeNull(); + }); + + it('saves and reads back the byte count', () => { + ChatService.saveStreamState('conv-a', 4242); + const got = ChatService.getStreamState('conv-a'); + expect(got).not.toBeNull(); + expect(got!.bytesReceived).toBe(4242); + expect(typeof got!.updatedAt).toBe('number'); + }); + + it('overwrites the previous byte count on a new save for the same conversation', () => { + ChatService.saveStreamState('conv-a', 100); + ChatService.saveStreamState('conv-a', 200); + const got = ChatService.getStreamState('conv-a'); + expect(got!.bytesReceived).toBe(200); + }); + + it('keeps states for distinct conversations isolated', () => { + ChatService.saveStreamState('conv-a', 10); + ChatService.saveStreamState('conv-b', 20); + expect(ChatService.getStreamState('conv-a')!.bytesReceived).toBe(10); + expect(ChatService.getStreamState('conv-b')!.bytesReceived).toBe(20); + }); + + it('clears the state for a given conversation', () => { + ChatService.saveStreamState('conv-a', 10); + ChatService.clearStreamState('conv-a'); + expect(ChatService.getStreamState('conv-a')).toBeNull(); + }); + + it('ignores empty conversation id on save', () => { + ChatService.saveStreamState('', 1); + expect(ChatService.getStreamState('')).toBeNull(); + }); + + it('returns null on corrupted storage payload', () => { + localStorage.setItem(`${STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX}conv-a`, '{not-json'); + expect(ChatService.getStreamState('conv-a')).toBeNull(); + }); + + it('persists the model alongside the byte count', () => { + ChatService.saveStreamState('conv-a', 10, 'model-x'); + expect(ChatService.getStreamState('conv-a')!.model).toBe('model-x'); + }); + + it('stores a null model when none is provided', () => { + ChatService.saveStreamState('conv-a', 10); + expect(ChatService.getStreamState('conv-a')!.model).toBeNull(); + }); + + it('overwrites the model on a new save for the same conversation', () => { + ChatService.saveStreamState('conv-a', 10, 'model-x'); + ChatService.saveStreamState('conv-a', 20, 'model-y'); + expect(ChatService.getStreamState('conv-a')!.model).toBe('model-y'); + }); + + describe('resumeStreamIdentity', () => { + it('appends the persisted model so the resume key matches the frozen POST identity', () => { + ChatService.saveStreamState('conv-a', 10, 'model-x'); + expect( + ChatService.resumeStreamIdentity('conv-a', ChatService.getStreamState('conv-a'), 'dropdown') + ).toBe('conv-a::model-x'); + }); + + it('keeps the bare conv id when the persisted model is null', () => { + ChatService.saveStreamState('conv-a', 10); + expect( + ChatService.resumeStreamIdentity('conv-a', ChatService.getStreamState('conv-a'), 'dropdown') + ).toBe('conv-a'); + }); + + it('falls back to the current model only when no state is persisted', () => { + expect(ChatService.resumeStreamIdentity('conv-a', null, 'dropdown')).toBe('conv-a::dropdown'); + }); + + it('ignores the fallback when a state exists, the persisted value is authoritative', () => { + ChatService.saveStreamState('conv-a', 10, 'model-x'); + expect( + ChatService.resumeStreamIdentity('conv-a', ChatService.getStreamState('conv-a'), 'dropdown') + ).toBe('conv-a::model-x'); + }); + + it('falls back when a legacy state has no model field', () => { + localStorage.setItem( + `${STREAM_RESUME_LOCALSTORAGE_KEY_PREFIX}conv-a`, + JSON.stringify({ bytesReceived: 10, updatedAt: 1 }) + ); + expect( + ChatService.resumeStreamIdentity('conv-a', ChatService.getStreamState('conv-a'), 'dropdown') + ).toBe('conv-a::dropdown'); + }); + }); +});