common: update logging to enforce max_capacity and optimize queue resizing (#24490)

* common: update logging to enforce max_capacity and optimize queue resizing logic

* common/log: remove queue expansion logic
This commit is contained in:
Max Krasnyansky 2026-06-16 23:19:11 -07:00 committed by GitHub
parent 890f1a27ed
commit cda63856b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -11,8 +11,13 @@
#include <sstream> #include <sstream>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <algorithm>
#if defined(_WIN32) #if defined(_WIN32)
# define WIN32_LEAN_AND_MEAN
# ifndef NOMINMAX
# define NOMINMAX
# endif
# include <io.h> # include <io.h>
# include <windows.h> # include <windows.h>
# define isatty _isatty # define isatty _isatty
@ -62,16 +67,15 @@ static const char* g_col[] = {
}; };
struct common_log_entry { struct common_log_entry {
enum ggml_log_level level; enum ggml_log_level level {GGML_LOG_LEVEL_INFO};
bool prefix;
int64_t timestamp;
std::vector<char> msg; std::vector<char> msg;
// signals the worker thread to stop int64_t timestamp { 0 };
bool is_end; bool is_end { false }; // signals the worker thread to stop
bool prefix { false };
common_log_entry(size_t size = 256) : msg(size) { }
void print(FILE * file = nullptr) const { void print(FILE * file = nullptr) const {
FILE * fcur = file; FILE * fcur = file;
@ -122,22 +126,15 @@ struct common_log_entry {
}; };
struct common_log { struct common_log {
// default capacity - will be expanded if needed // default capacity
common_log() : common_log(256) {} common_log(size_t capacity = 512) {
file = nullptr;
common_log(size_t capacity) { prefix = false;
file = nullptr;
prefix = false;
timestamps = false; timestamps = false;
running = false; running = false;
t_start = t_us(); t_start = t_us();
// initial message size - will be expanded if longer messages arrive
entries.resize(capacity);
for (auto & entry : entries) {
entry.msg.resize(256);
}
queue.resize(capacity, common_log_entry(256));
head = 0; head = 0;
tail = 0; tail = 0;
@ -152,9 +149,10 @@ struct common_log {
} }
private: private:
std::mutex mtx; std::mutex mtx;
std::thread thrd; std::thread thrd;
std::condition_variable cv; std::condition_variable cv_new; // new entry
std::condition_variable cv_full; // wait on full
FILE * file; FILE * file;
@ -164,24 +162,53 @@ private:
int64_t t_start; int64_t t_start;
// ring buffer of entries // queue of entries
std::vector<common_log_entry> entries; std::vector<common_log_entry> queue;
size_t head; size_t head;
size_t tail; size_t tail;
// worker thread copies into this bool print_entry(const common_log_entry & e) const {
common_log_entry cur; if (e.is_end) return true;
e.print();
if (file) {
e.print(file);
}
return false;
}
bool flush_queue(size_t start_head, size_t end_tail, size_t & out_head) const {
bool stop = false;
size_t h = start_head;
while (h != end_tail && !stop) {
stop = print_entry(queue[h]);
h = (h + 1) % queue.size();
}
out_head = h;
return stop;
}
public: public:
bool is_full() const {
return ((tail + 1) % queue.size()) == head;
}
bool is_empty() const {
return head == tail;
}
void add(enum ggml_log_level level, const char * fmt, va_list args) { void add(enum ggml_log_level level, const char * fmt, va_list args) {
std::lock_guard<std::mutex> lock(mtx); std::unique_lock<std::mutex> lock(mtx);
// block if the queue is full
cv_full.wait(lock, [this]() { return !running || !is_full(); });
if (!running) { if (!running) {
// discard messages while the worker thread is paused // discard messages while the worker thread is paused
return; return;
} }
auto & entry = entries[tail]; auto & entry = queue[tail];
{ {
// cannot use args twice, so make a copy in case we need to expand the buffer // cannot use args twice, so make a copy in case we need to expand the buffer
@ -216,38 +243,16 @@ public:
va_end(args_copy); va_end(args_copy);
} }
entry.level = level; entry.is_end = false;
entry.prefix = prefix; entry.level = level;
entry.prefix = prefix;
entry.timestamp = 0; entry.timestamp = 0;
if (timestamps) { if (timestamps) {
entry.timestamp = t_us() - t_start; entry.timestamp = t_us() - t_start;
} }
entry.is_end = false;
tail = (tail + 1) % entries.size(); tail = (tail + 1) % queue.size();
if (tail == head) { cv_new.notify_one();
// expand the buffer
std::vector<common_log_entry> new_entries(2*entries.size());
size_t new_tail = 0;
do {
new_entries[new_tail] = std::move(entries[head]);
head = (head + 1) % entries.size();
new_tail = (new_tail + 1);
} while (head != tail);
head = 0;
tail = new_tail;
for (size_t i = tail; i < new_entries.size(); i++) {
new_entries[i].msg.resize(256);
}
entries = std::move(new_entries);
}
cv.notify_one();
} }
void resume() { void resume() {
@ -261,23 +266,24 @@ public:
thrd = std::thread([this]() { thrd = std::thread([this]() {
while (true) { while (true) {
{ std::unique_lock<std::mutex> lock(mtx);
std::unique_lock<std::mutex> lock(mtx); cv_new.wait(lock, [this]() { return !is_empty(); });
cv.wait(lock, [this]() { return head != tail; });
cur = entries[head];
head = (head + 1) % entries.size(); size_t cached_head = head;
} size_t cached_tail = tail;
if (cur.is_end) { lock.unlock(); // drop the lock during flush
size_t next_head;
bool stop = flush_queue(cached_head, cached_tail, next_head);
lock.lock();
head = next_head;
cv_full.notify_all();
if (stop) {
break; break;
} }
cur.print(); // stdout and stderr
if (file) {
cur.print(file);
}
} }
}); });
} }
@ -293,13 +299,13 @@ public:
running = false; running = false;
// push an entry to signal the worker thread to stop // push an entry to signal the worker thread to stop
{ auto & entry = queue[tail];
auto & entry = entries[tail]; entry.is_end = true;
entry.is_end = true; tail = (tail + 1) % queue.size();
tail = (tail + 1) % entries.size(); // wakeup everyone
} cv_new.notify_one();
cv.notify_one(); cv_full.notify_all();
} }
thrd.join(); thrd.join();