mirror of
https://github.com/ggml-org/llama.cpp.git
synced 2026-06-27 23:50:20 -05:00
poc: threadpool sampling
This commit is contained in:
parent
a527509d0f
commit
447b0c3646
@ -24,6 +24,11 @@
|
|||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <thread>
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <queue>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
// fix problem with std::min and std::max
|
// fix problem with std::min and std::max
|
||||||
#if defined(_WIN32)
|
#if defined(_WIN32)
|
||||||
@ -3616,6 +3621,67 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct sampler_task {
|
||||||
|
server_slot * slot;
|
||||||
|
int32_t tok_idx;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct sampler_threadpool {
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
std::queue<std::function<void()>> tasks;
|
||||||
|
std::mutex mtx;
|
||||||
|
std::condition_variable cv;
|
||||||
|
std::condition_variable cv_done;
|
||||||
|
int pending = 0;
|
||||||
|
bool stop = false;
|
||||||
|
|
||||||
|
~sampler_threadpool() {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mtx);
|
||||||
|
stop = true;
|
||||||
|
}
|
||||||
|
cv.notify_all();
|
||||||
|
for (auto & t : threads) t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void init(int n) {
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
threads.emplace_back([this]() {
|
||||||
|
while (true) {
|
||||||
|
std::function<void()> task;
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mtx);
|
||||||
|
cv.wait(lock, [this]() { return stop || !tasks.empty(); });
|
||||||
|
if (stop && tasks.empty()) return;
|
||||||
|
task = std::move(tasks.front());
|
||||||
|
tasks.pop();
|
||||||
|
}
|
||||||
|
task();
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mtx);
|
||||||
|
pending--;
|
||||||
|
}
|
||||||
|
cv_done.notify_one();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueue(std::function<void()> fn) {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mtx);
|
||||||
|
tasks.push(std::move(fn));
|
||||||
|
pending++;
|
||||||
|
}
|
||||||
|
cv.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void wait_all() {
|
||||||
|
std::unique_lock<std::mutex> lock(mtx);
|
||||||
|
cv_done.wait(lock, [this]() { return pending == 0; });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
void post_decode(int32_t n_batch_tokens, int32_t off, llama_batch & batch_view) {
|
void post_decode(int32_t n_batch_tokens, int32_t off, llama_batch & batch_view) {
|
||||||
// for checking if a given batch index is inside batch_view
|
// for checking if a given batch index is inside batch_view
|
||||||
auto is_inside_view = [&](int32_t idx) {
|
auto is_inside_view = [&](int32_t idx) {
|
||||||
@ -3637,6 +3703,8 @@ private:
|
|||||||
slot.task->params.sampling.preserved_tokens.find(token) != slot.task->params.sampling.preserved_tokens.end();
|
slot.task->params.sampling.preserved_tokens.find(token) != slot.task->params.sampling.preserved_tokens.end();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
std::vector<sampler_task> smpl_tasks;
|
||||||
|
|
||||||
iterate(slots, [&](server_slot & slot) {
|
iterate(slots, [&](server_slot & slot) {
|
||||||
// optionally send prompt processing progress
|
// optionally send prompt processing progress
|
||||||
if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_DONE_PROMPT) {
|
if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_DONE_PROMPT) {
|
||||||
@ -3684,13 +3752,42 @@ private:
|
|||||||
|
|
||||||
// shifted according to the current sub-batch
|
// shifted according to the current sub-batch
|
||||||
const int tok_idx = slot.i_batch - off;
|
const int tok_idx = slot.i_batch - off;
|
||||||
|
smpl_tasks.push_back({&slot, tok_idx});
|
||||||
|
});
|
||||||
|
|
||||||
llama_token id;
|
std::unordered_map<server_slot *, llama_token> sampled_token;
|
||||||
{
|
|
||||||
scoped_timer timer(t_sampl, n_sampl);
|
// run common_sampler_sample in a thread pool to sample all tokens in parallel
|
||||||
id = common_sampler_sample(slot.smpl.get(), slot.ctx_tgt, tok_idx);
|
if (!smpl_tasks.empty()) {
|
||||||
|
scoped_timer timer(t_sampl, n_sampl);
|
||||||
|
static sampler_threadpool pool;
|
||||||
|
if (pool.threads.empty()) {
|
||||||
|
pool.init(params_base.n_parallel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<server_slot *, llama_token>> results(smpl_tasks.size());
|
||||||
|
for (size_t i = 0; i < smpl_tasks.size(); i++) {
|
||||||
|
const auto & task = smpl_tasks[i];
|
||||||
|
pool.enqueue([&results, i, slot_ptr = task.slot, tok_idx = task.tok_idx]() {
|
||||||
|
results[i] = {slot_ptr, common_sampler_sample(slot_ptr->smpl.get(), slot_ptr->ctx_tgt, tok_idx)};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
pool.wait_all();
|
||||||
|
|
||||||
|
for (const auto & [slot_ptr, id] : results) {
|
||||||
|
sampled_token[slot_ptr] = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
iterate(slots, [&](server_slot & slot) {
|
||||||
|
const int tok_idx = slot.i_batch - off;
|
||||||
|
auto it = sampled_token.find(&slot);
|
||||||
|
if (it == sampled_token.end()) {
|
||||||
|
// no token sampled for this slot, skip
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto id = it->second;
|
||||||
|
|
||||||
slot.i_batch = -1;
|
slot.i_batch = -1;
|
||||||
|
|
||||||
common_sampler_accept(slot.smpl.get(), id, true);
|
common_sampler_accept(slot.smpl.get(), id, true);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user