diff --git a/tools/server/server-stream.cpp b/tools/server/server-stream.cpp index 757c36ad25..688633ed65 100644 --- a/tools/server/server-stream.cpp +++ b/tools/server/server-stream.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace { constexpr int64_t STREAM_SESSION_TTL_SECONDS = 300; @@ -21,6 +22,104 @@ int64_t now_seconds() { } } +// owns all live sessions, runs a periodic GC to evict expired ones. +// the map is keyed by conversation_id, so the invariant "one conv = at most one +// live session" is enforced at the type level +class stream_session_manager { +public: + stream_session_manager(); + ~stream_session_manager(); + + stream_session_manager(const stream_session_manager &) = delete; + stream_session_manager & operator=(const stream_session_manager &) = delete; + + // install a new session for this conversation, evicting and cancelling any previous one. + // the conversation_id must be non empty, the caller is responsible for that check. + // returns the new session + stream_session_ptr create_or_replace(const std::string & conversation_id); + + // lookup, returns null if unknown or already evicted + stream_session_ptr get(const std::string & conversation_id); + + // list every live or recently completed session, used by GET /v1/streams without filter + std::vector list_all() const; + + // remove from the map and finalize, wakes any pending readers + void evict(const std::string & conversation_id); + + // signal the producer to cancel asap then evict, used by the explicit user Stop path + void evict_and_cancel(const std::string & conversation_id); + + void start_gc(); + void stop_gc(); + +private: + void gc_loop(); + + mutable std::shared_mutex map_mu; + std::unordered_map sessions; // key: conversation_id + std::thread gc_thread; + std::atomic running; + std::mutex gc_wake_mu; + std::condition_variable gc_wake_cv; +}; + +// process wide manager, lifecycle controlled by llama-server main() via start_gc/stop_gc +static stream_session_manager g_stream_sessions; + +void server_stream_session_manager_start() { + g_stream_sessions.start_gc(); +} + +void server_stream_session_manager_stop() { + g_stream_sessions.stop_gc(); +} + +struct stream_session { + std::string conversation_id; + int64_t started_ts; // unix seconds at construction, used by /v1/streams listing + + stream_session(std::string conversation_id_, size_t max_bytes_); + stream_session(const stream_session &) = delete; + stream_session & operator=(const stream_session &) = delete; + + // append raw bytes, drops from the front if the cap is reached. + // returns false if the session is already finalized + bool append(const char * data, size_t len); + + // mark the session as complete, wakes all pending readers + void finalize(); + + // drain bytes from offset, calling sink for each chunk. blocks until more + // bytes arrive or finalize is called. returns OK on clean exit, OFFSET_LOST + // if offset falls below the dropped prefix + stream_read_status read_from(size_t offset, + const std::function & sink, + const std::function & should_stop); + + bool is_done() const; + bool is_cancelled() const; + size_t total_size() const; // bytes that ever entered the session + size_t dropped_prefix() const; // bytes evicted from the front due to cap + int64_t completed_at() const; // 0 while alive, unix seconds after finalize + + // attach the producer stop hook used to cancel its reader, pass an empty function to detach + void set_stop_producer(std::function fn); + + // signal the producer to abort its inference asap via the stop hook, idempotent + void cancel(); + +private: + mutable std::mutex mu; + std::condition_variable cv; + std::vector buffer; + size_t prefix_dropped; + size_t cap_bytes; + std::atomic done; + std::atomic cancelled; + std::atomic completed_ts; + std::function stop_producer; // protected by mu +}; stream_session::stream_session(std::string conversation_id_, size_t max_bytes_) : conversation_id(std::move(conversation_id_)) , started_ts(now_seconds()) @@ -294,11 +393,23 @@ void stream_session_manager::gc_loop() { } } -// process wide manager, lifecycle controlled by llama-server main() via start_gc/stop_gc -stream_session_manager g_stream_sessions; - // stream_pipe --------------------------------------------------------------------------------- +// consumer end: read-only replay of the ring buffer, the destructor does not finalize the session +struct stream_pipe_consumer : stream_pipe { + // drain bytes from offset, calling sink for each available chunk. blocks until more data + // arrives or the session finalizes. should_stop is polled, returns OFFSET_LOST if offset + // fell below the dropped prefix + stream_read_status read(size_t & offset, + const std::function & sink, + const std::function & should_stop); + + static std::shared_ptr create(stream_session_ptr session); + +private: + explicit stream_pipe_consumer(stream_session_ptr session); +}; + stream_pipe::stream_pipe(stream_session_ptr session) : session_(std::move(session)) { } diff --git a/tools/server/server-stream.h b/tools/server/server-stream.h index ff363bb4cd..6faaa14ce4 100644 --- a/tools/server/server-stream.h +++ b/tools/server/server-stream.h @@ -3,17 +3,10 @@ #include "server-http.h" #include -#include #include -#include #include #include -#include -#include #include -#include -#include -#include enum class stream_read_status { OK, @@ -23,51 +16,8 @@ enum class stream_read_status { // streaming buffer for one generation, survives HTTP disconnect. the producer appends raw SSE // bytes, readers drain from any offset via read_from and block until more bytes or finalize. // keyed by conversation_id: one conv = at most one live session -struct stream_session { - std::string conversation_id; - int64_t started_ts; // unix seconds at construction, used by /v1/streams listing - stream_session(std::string conversation_id_, size_t max_bytes_); - stream_session(const stream_session &) = delete; - stream_session & operator=(const stream_session &) = delete; - - // append raw bytes, drops from the front if the cap is reached. - // returns false if the session is already finalized - bool append(const char * data, size_t len); - - // mark the session as complete, wakes all pending readers - void finalize(); - - // drain bytes from offset, calling sink for each chunk. blocks until more - // bytes arrive or finalize is called. returns OK on clean exit, OFFSET_LOST - // if offset falls below the dropped prefix - stream_read_status read_from(size_t offset, - const std::function & sink, - const std::function & should_stop); - - bool is_done() const; - bool is_cancelled() const; - size_t total_size() const; // bytes that ever entered the session - size_t dropped_prefix() const; // bytes evicted from the front due to cap - int64_t completed_at() const; // 0 while alive, unix seconds after finalize - - // attach the producer stop hook used to cancel its reader, pass an empty function to detach - void set_stop_producer(std::function fn); - - // signal the producer to abort its inference asap via the stop hook, idempotent - void cancel(); - -private: - mutable std::mutex mu; - std::condition_variable cv; - std::vector buffer; - size_t prefix_dropped; - size_t cap_bytes; - std::atomic done; - std::atomic cancelled; - std::atomic completed_ts; - std::function stop_producer; // protected by mu -}; +struct stream_session; using stream_session_ptr = std::shared_ptr; @@ -121,67 +71,8 @@ private: server_http_res * res_ = nullptr; }; -// consumer end: read-only replay of the ring buffer, the destructor does not finalize the session -struct stream_pipe_consumer : stream_pipe { - // drain bytes from offset, calling sink for each available chunk. blocks until more data - // arrives or the session finalizes. should_stop is polled, returns OFFSET_LOST if offset - // fell below the dropped prefix - stream_read_status read(size_t & offset, - const std::function & sink, - const std::function & should_stop); - - static std::shared_ptr create(stream_session_ptr session); - -private: - explicit stream_pipe_consumer(stream_session_ptr session); -}; - -// owns all live sessions, runs a periodic GC to evict expired ones. -// the map is keyed by conversation_id, so the invariant "one conv = at most one -// live session" is enforced at the type level -class stream_session_manager { -public: - stream_session_manager(); - ~stream_session_manager(); - - stream_session_manager(const stream_session_manager &) = delete; - stream_session_manager & operator=(const stream_session_manager &) = delete; - - // install a new session for this conversation, evicting and cancelling any previous one. - // the conversation_id must be non empty, the caller is responsible for that check. - // returns the new session - stream_session_ptr create_or_replace(const std::string & conversation_id); - - // lookup, returns null if unknown or already evicted - stream_session_ptr get(const std::string & conversation_id); - - // list every live or recently completed session, used by GET /v1/streams without filter - std::vector list_all() const; - - // remove from the map and finalize, wakes any pending readers - void evict(const std::string & conversation_id); - - // signal the producer to cancel asap then evict, used by the explicit user Stop path - void evict_and_cancel(const std::string & conversation_id); - - void start_gc(); - void stop_gc(); - -private: - void gc_loop(); - - mutable std::shared_mutex map_mu; - std::unordered_map sessions; // key: conversation_id - std::thread gc_thread; - std::atomic running; - std::mutex gc_wake_mu; - std::condition_variable gc_wake_cv; -}; - -// process wide manager, linked by both llama-server and llama-cli. llama-server main() drives -// start_gc/stop_gc, llama-cli leaves it idle. the dtor calls stop_gc() unconditionally so exit -// is safe whether or not the GC thread ran -extern stream_session_manager g_stream_sessions; +void server_stream_session_manager_start(); +void server_stream_session_manager_stop(); // route handler factories operating on g_stream_sessions, wired under /v1/stream/* by server.cpp. // keeps the resumable stream surface confined to server-stream diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 1bbc99d890..dc83b32db4 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -85,7 +85,7 @@ int llama_server(int argc, char ** argv) { // start the stream session manager GC right after common init, before any HTTP route can // touch it. lifecycle is symmetric, stop_gc() runs in clean_up() before backend free - g_stream_sessions.start_gc(); + server_stream_session_manager_start(); if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) { return 1; @@ -343,7 +343,7 @@ int llama_server(int argc, char ** argv) { clean_up = [&models_routes]() { SRV_INF("%s: cleaning up before exit...\n", __func__); // stop the session GC first, it finalizes live sessions and wakes pending readers - g_stream_sessions.stop_gc(); + server_stream_session_manager_stop(); if (models_routes.has_value()) { models_routes->stopping.store(true); // maybe redundant, but just to be safe models_routes->models.unload_all(); @@ -371,7 +371,7 @@ int llama_server(int argc, char ** argv) { clean_up = [&ctx_http, &ctx_server]() { SRV_INF("%s: cleaning up before exit...\n", __func__); // stop the session GC first, it finalizes live sessions and wakes pending readers - g_stream_sessions.stop_gc(); + server_stream_session_manager_stop(); ctx_http.stop(); ctx_server.terminate(); llama_backend_free();