server-stream : pimpl

This commit is contained in:
Georgi Gerganov 2026-06-26 11:36:15 +03:00
parent c16c35b814
commit 5004859421
No known key found for this signature in database
GPG Key ID: 449E073F9DC10735
3 changed files with 120 additions and 118 deletions

View File

@ -6,6 +6,7 @@
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <shared_mutex>
namespace { namespace {
constexpr int64_t STREAM_SESSION_TTL_SECONDS = 300; constexpr int64_t STREAM_SESSION_TTL_SECONDS = 300;
@ -21,6 +22,104 @@ int64_t now_seconds() {
} }
} }
// owns all live sessions, runs a periodic GC to evict expired ones.
// the map is keyed by conversation_id, so the invariant "one conv = at most one
// live session" is enforced at the type level
class stream_session_manager {
public:
stream_session_manager();
~stream_session_manager();
stream_session_manager(const stream_session_manager &) = delete;
stream_session_manager & operator=(const stream_session_manager &) = delete;
// install a new session for this conversation, evicting and cancelling any previous one.
// the conversation_id must be non empty, the caller is responsible for that check.
// returns the new session
stream_session_ptr create_or_replace(const std::string & conversation_id);
// lookup, returns null if unknown or already evicted
stream_session_ptr get(const std::string & conversation_id);
// list every live or recently completed session, used by GET /v1/streams without filter
std::vector<stream_session_ptr> list_all() const;
// remove from the map and finalize, wakes any pending readers
void evict(const std::string & conversation_id);
// signal the producer to cancel asap then evict, used by the explicit user Stop path
void evict_and_cancel(const std::string & conversation_id);
void start_gc();
void stop_gc();
private:
void gc_loop();
mutable std::shared_mutex map_mu;
std::unordered_map<std::string, stream_session_ptr> sessions; // key: conversation_id
std::thread gc_thread;
std::atomic<bool> running;
std::mutex gc_wake_mu;
std::condition_variable gc_wake_cv;
};
// process wide manager, lifecycle controlled by llama-server main() via start_gc/stop_gc
static stream_session_manager g_stream_sessions;
void server_stream_session_manager_start() {
g_stream_sessions.start_gc();
}
void server_stream_session_manager_stop() {
g_stream_sessions.stop_gc();
}
struct stream_session {
std::string conversation_id;
int64_t started_ts; // unix seconds at construction, used by /v1/streams listing
stream_session(std::string conversation_id_, size_t max_bytes_);
stream_session(const stream_session &) = delete;
stream_session & operator=(const stream_session &) = delete;
// append raw bytes, drops from the front if the cap is reached.
// returns false if the session is already finalized
bool append(const char * data, size_t len);
// mark the session as complete, wakes all pending readers
void finalize();
// drain bytes from offset, calling sink for each chunk. blocks until more
// bytes arrive or finalize is called. returns OK on clean exit, OFFSET_LOST
// if offset falls below the dropped prefix
stream_read_status read_from(size_t offset,
const std::function<bool(const char *, size_t)> & sink,
const std::function<bool()> & should_stop);
bool is_done() const;
bool is_cancelled() const;
size_t total_size() const; // bytes that ever entered the session
size_t dropped_prefix() const; // bytes evicted from the front due to cap
int64_t completed_at() const; // 0 while alive, unix seconds after finalize
// attach the producer stop hook used to cancel its reader, pass an empty function to detach
void set_stop_producer(std::function<void()> fn);
// signal the producer to abort its inference asap via the stop hook, idempotent
void cancel();
private:
mutable std::mutex mu;
std::condition_variable cv;
std::vector<char> buffer;
size_t prefix_dropped;
size_t cap_bytes;
std::atomic<bool> done;
std::atomic<bool> cancelled;
std::atomic<int64_t> completed_ts;
std::function<void()> stop_producer; // protected by mu
};
stream_session::stream_session(std::string conversation_id_, size_t max_bytes_) stream_session::stream_session(std::string conversation_id_, size_t max_bytes_)
: conversation_id(std::move(conversation_id_)) : conversation_id(std::move(conversation_id_))
, started_ts(now_seconds()) , started_ts(now_seconds())
@ -294,11 +393,23 @@ void stream_session_manager::gc_loop() {
} }
} }
// process wide manager, lifecycle controlled by llama-server main() via start_gc/stop_gc
stream_session_manager g_stream_sessions;
// stream_pipe --------------------------------------------------------------------------------- // stream_pipe ---------------------------------------------------------------------------------
// consumer end: read-only replay of the ring buffer, the destructor does not finalize the session
struct stream_pipe_consumer : stream_pipe {
// drain bytes from offset, calling sink for each available chunk. blocks until more data
// arrives or the session finalizes. should_stop is polled, returns OFFSET_LOST if offset
// fell below the dropped prefix
stream_read_status read(size_t & offset,
const std::function<bool(const char *, size_t)> & sink,
const std::function<bool()> & should_stop);
static std::shared_ptr<stream_pipe_consumer> create(stream_session_ptr session);
private:
explicit stream_pipe_consumer(stream_session_ptr session);
};
stream_pipe::stream_pipe(stream_session_ptr session) stream_pipe::stream_pipe(stream_session_ptr session)
: session_(std::move(session)) { : session_(std::move(session)) {
} }

View File

@ -3,17 +3,10 @@
#include "server-http.h" #include "server-http.h"
#include <atomic> #include <atomic>
#include <condition_variable>
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex>
#include <shared_mutex>
#include <string> #include <string>
#include <thread>
#include <unordered_map>
#include <vector>
enum class stream_read_status { enum class stream_read_status {
OK, OK,
@ -23,51 +16,8 @@ enum class stream_read_status {
// streaming buffer for one generation, survives HTTP disconnect. the producer appends raw SSE // streaming buffer for one generation, survives HTTP disconnect. the producer appends raw SSE
// bytes, readers drain from any offset via read_from and block until more bytes or finalize. // bytes, readers drain from any offset via read_from and block until more bytes or finalize.
// keyed by conversation_id: one conv = at most one live session // keyed by conversation_id: one conv = at most one live session
struct stream_session {
std::string conversation_id;
int64_t started_ts; // unix seconds at construction, used by /v1/streams listing
stream_session(std::string conversation_id_, size_t max_bytes_); struct stream_session;
stream_session(const stream_session &) = delete;
stream_session & operator=(const stream_session &) = delete;
// append raw bytes, drops from the front if the cap is reached.
// returns false if the session is already finalized
bool append(const char * data, size_t len);
// mark the session as complete, wakes all pending readers
void finalize();
// drain bytes from offset, calling sink for each chunk. blocks until more
// bytes arrive or finalize is called. returns OK on clean exit, OFFSET_LOST
// if offset falls below the dropped prefix
stream_read_status read_from(size_t offset,
const std::function<bool(const char *, size_t)> & sink,
const std::function<bool()> & should_stop);
bool is_done() const;
bool is_cancelled() const;
size_t total_size() const; // bytes that ever entered the session
size_t dropped_prefix() const; // bytes evicted from the front due to cap
int64_t completed_at() const; // 0 while alive, unix seconds after finalize
// attach the producer stop hook used to cancel its reader, pass an empty function to detach
void set_stop_producer(std::function<void()> fn);
// signal the producer to abort its inference asap via the stop hook, idempotent
void cancel();
private:
mutable std::mutex mu;
std::condition_variable cv;
std::vector<char> buffer;
size_t prefix_dropped;
size_t cap_bytes;
std::atomic<bool> done;
std::atomic<bool> cancelled;
std::atomic<int64_t> completed_ts;
std::function<void()> stop_producer; // protected by mu
};
using stream_session_ptr = std::shared_ptr<stream_session>; using stream_session_ptr = std::shared_ptr<stream_session>;
@ -121,67 +71,8 @@ private:
server_http_res * res_ = nullptr; server_http_res * res_ = nullptr;
}; };
// consumer end: read-only replay of the ring buffer, the destructor does not finalize the session void server_stream_session_manager_start();
struct stream_pipe_consumer : stream_pipe { void server_stream_session_manager_stop();
// drain bytes from offset, calling sink for each available chunk. blocks until more data
// arrives or the session finalizes. should_stop is polled, returns OFFSET_LOST if offset
// fell below the dropped prefix
stream_read_status read(size_t & offset,
const std::function<bool(const char *, size_t)> & sink,
const std::function<bool()> & should_stop);
static std::shared_ptr<stream_pipe_consumer> create(stream_session_ptr session);
private:
explicit stream_pipe_consumer(stream_session_ptr session);
};
// owns all live sessions, runs a periodic GC to evict expired ones.
// the map is keyed by conversation_id, so the invariant "one conv = at most one
// live session" is enforced at the type level
class stream_session_manager {
public:
stream_session_manager();
~stream_session_manager();
stream_session_manager(const stream_session_manager &) = delete;
stream_session_manager & operator=(const stream_session_manager &) = delete;
// install a new session for this conversation, evicting and cancelling any previous one.
// the conversation_id must be non empty, the caller is responsible for that check.
// returns the new session
stream_session_ptr create_or_replace(const std::string & conversation_id);
// lookup, returns null if unknown or already evicted
stream_session_ptr get(const std::string & conversation_id);
// list every live or recently completed session, used by GET /v1/streams without filter
std::vector<stream_session_ptr> list_all() const;
// remove from the map and finalize, wakes any pending readers
void evict(const std::string & conversation_id);
// signal the producer to cancel asap then evict, used by the explicit user Stop path
void evict_and_cancel(const std::string & conversation_id);
void start_gc();
void stop_gc();
private:
void gc_loop();
mutable std::shared_mutex map_mu;
std::unordered_map<std::string, stream_session_ptr> sessions; // key: conversation_id
std::thread gc_thread;
std::atomic<bool> running;
std::mutex gc_wake_mu;
std::condition_variable gc_wake_cv;
};
// process wide manager, linked by both llama-server and llama-cli. llama-server main() drives
// start_gc/stop_gc, llama-cli leaves it idle. the dtor calls stop_gc() unconditionally so exit
// is safe whether or not the GC thread ran
extern stream_session_manager g_stream_sessions;
// route handler factories operating on g_stream_sessions, wired under /v1/stream/* by server.cpp. // route handler factories operating on g_stream_sessions, wired under /v1/stream/* by server.cpp.
// keeps the resumable stream surface confined to server-stream // keeps the resumable stream surface confined to server-stream

View File

@ -85,7 +85,7 @@ int llama_server(int argc, char ** argv) {
// start the stream session manager GC right after common init, before any HTTP route can // start the stream session manager GC right after common init, before any HTTP route can
// touch it. lifecycle is symmetric, stop_gc() runs in clean_up() before backend free // touch it. lifecycle is symmetric, stop_gc() runs in clean_up() before backend free
g_stream_sessions.start_gc(); server_stream_session_manager_start();
if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) { if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) {
return 1; return 1;
@ -343,7 +343,7 @@ int llama_server(int argc, char ** argv) {
clean_up = [&models_routes]() { clean_up = [&models_routes]() {
SRV_INF("%s: cleaning up before exit...\n", __func__); SRV_INF("%s: cleaning up before exit...\n", __func__);
// stop the session GC first, it finalizes live sessions and wakes pending readers // stop the session GC first, it finalizes live sessions and wakes pending readers
g_stream_sessions.stop_gc(); server_stream_session_manager_stop();
if (models_routes.has_value()) { if (models_routes.has_value()) {
models_routes->stopping.store(true); // maybe redundant, but just to be safe models_routes->stopping.store(true); // maybe redundant, but just to be safe
models_routes->models.unload_all(); models_routes->models.unload_all();
@ -371,7 +371,7 @@ int llama_server(int argc, char ** argv) {
clean_up = [&ctx_http, &ctx_server]() { clean_up = [&ctx_http, &ctx_server]() {
SRV_INF("%s: cleaning up before exit...\n", __func__); SRV_INF("%s: cleaning up before exit...\n", __func__);
// stop the session GC first, it finalizes live sessions and wakes pending readers // stop the session GC first, it finalizes live sessions and wakes pending readers
g_stream_sessions.stop_gc(); server_stream_session_manager_stop();
ctx_http.stop(); ctx_http.stop();
ctx_server.terminate(); ctx_server.terminate();
llama_backend_free(); llama_backend_free();