diff --git a/common/log.cpp b/common/log.cpp index bd62616d8a..2d1e74ad1f 100644 --- a/common/log.cpp +++ b/common/log.cpp @@ -11,8 +11,13 @@ #include #include #include +#include #if defined(_WIN32) +# define WIN32_LEAN_AND_MEAN +# ifndef NOMINMAX +# define NOMINMAX +# endif # include # include # define isatty _isatty @@ -62,16 +67,15 @@ static const char* g_col[] = { }; struct common_log_entry { - enum ggml_log_level level; - - bool prefix; - - int64_t timestamp; + enum ggml_log_level level {GGML_LOG_LEVEL_INFO}; std::vector msg; - // signals the worker thread to stop - bool is_end; + int64_t timestamp { 0 }; + bool is_end { false }; // signals the worker thread to stop + bool prefix { false }; + + common_log_entry(size_t size = 256) : msg(size) { } void print(FILE * file = nullptr) const { FILE * fcur = file; @@ -122,22 +126,15 @@ struct common_log_entry { }; struct common_log { - // default capacity - will be expanded if needed - common_log() : common_log(256) {} - - common_log(size_t capacity) { - file = nullptr; - prefix = false; + // default capacity + common_log(size_t capacity = 512) { + file = nullptr; + prefix = false; timestamps = false; - running = false; - t_start = t_us(); - - // initial message size - will be expanded if longer messages arrive - entries.resize(capacity); - for (auto & entry : entries) { - entry.msg.resize(256); - } + running = false; + t_start = t_us(); + queue.resize(capacity, common_log_entry(256)); head = 0; tail = 0; @@ -152,9 +149,10 @@ struct common_log { } private: - std::mutex mtx; - std::thread thrd; - std::condition_variable cv; + std::mutex mtx; + std::thread thrd; + std::condition_variable cv_new; // new entry + std::condition_variable cv_full; // wait on full FILE * file; @@ -164,24 +162,53 @@ private: int64_t t_start; - // ring buffer of entries - std::vector entries; + // queue of entries + std::vector queue; size_t head; size_t tail; - // worker thread copies into this - common_log_entry cur; + bool print_entry(const common_log_entry & e) const { + if (e.is_end) return true; + + e.print(); + if (file) { + e.print(file); + } + return false; + } + + bool flush_queue(size_t start_head, size_t end_tail, size_t & out_head) const { + bool stop = false; + size_t h = start_head; + while (h != end_tail && !stop) { + stop = print_entry(queue[h]); + h = (h + 1) % queue.size(); + } + out_head = h; + return stop; + } public: + bool is_full() const { + return ((tail + 1) % queue.size()) == head; + } + + bool is_empty() const { + return head == tail; + } + void add(enum ggml_log_level level, const char * fmt, va_list args) { - std::lock_guard lock(mtx); + std::unique_lock lock(mtx); + + // block if the queue is full + cv_full.wait(lock, [this]() { return !running || !is_full(); }); if (!running) { // discard messages while the worker thread is paused return; } - auto & entry = entries[tail]; + auto & entry = queue[tail]; { // cannot use args twice, so make a copy in case we need to expand the buffer @@ -216,38 +243,16 @@ public: va_end(args_copy); } - entry.level = level; - entry.prefix = prefix; + entry.is_end = false; + entry.level = level; + entry.prefix = prefix; entry.timestamp = 0; if (timestamps) { entry.timestamp = t_us() - t_start; } - entry.is_end = false; - tail = (tail + 1) % entries.size(); - if (tail == head) { - // expand the buffer - std::vector new_entries(2*entries.size()); - - size_t new_tail = 0; - - do { - new_entries[new_tail] = std::move(entries[head]); - - head = (head + 1) % entries.size(); - new_tail = (new_tail + 1); - } while (head != tail); - - head = 0; - tail = new_tail; - - for (size_t i = tail; i < new_entries.size(); i++) { - new_entries[i].msg.resize(256); - } - - entries = std::move(new_entries); - } - cv.notify_one(); + tail = (tail + 1) % queue.size(); + cv_new.notify_one(); } void resume() { @@ -261,23 +266,24 @@ public: thrd = std::thread([this]() { while (true) { - { - std::unique_lock lock(mtx); - cv.wait(lock, [this]() { return head != tail; }); - cur = entries[head]; + std::unique_lock lock(mtx); + cv_new.wait(lock, [this]() { return !is_empty(); }); - head = (head + 1) % entries.size(); - } + size_t cached_head = head; + size_t cached_tail = tail; - if (cur.is_end) { + lock.unlock(); // drop the lock during flush + + size_t next_head; + bool stop = flush_queue(cached_head, cached_tail, next_head); + + lock.lock(); + head = next_head; + cv_full.notify_all(); + + if (stop) { break; } - - cur.print(); // stdout and stderr - - if (file) { - cur.print(file); - } } }); } @@ -293,13 +299,13 @@ public: running = false; // push an entry to signal the worker thread to stop - { - auto & entry = entries[tail]; - entry.is_end = true; + auto & entry = queue[tail]; + entry.is_end = true; + tail = (tail + 1) % queue.size(); - tail = (tail + 1) % entries.size(); - } - cv.notify_one(); + // wakeup everyone + cv_new.notify_one(); + cv_full.notify_all(); } thrd.join();