From 69855b4779e5e2d852de3a8a3d84eeb6f5ff7afd Mon Sep 17 00:00:00 2001 From: Ruben Ortlam Date: Mon, 25 May 2026 16:27:17 +0200 Subject: [PATCH] add async copy --- ggml/src/ggml-vulkan/ggml-vulkan.cpp | 263 ++++++++++++++++++++++++++- 1 file changed, 262 insertions(+), 1 deletion(-) diff --git a/ggml/src/ggml-vulkan/ggml-vulkan.cpp b/ggml/src/ggml-vulkan/ggml-vulkan.cpp index 0603e8ac6a..093d284221 100644 --- a/ggml/src/ggml-vulkan/ggml-vulkan.cpp +++ b/ggml/src/ggml-vulkan/ggml-vulkan.cpp @@ -677,6 +677,7 @@ struct vk_device_struct { uint64_t min_imported_host_pointer_alignment; bool external_memory_host {}; bool external_memory_dma_buf {}; + bool external_semaphore_fd {}; bool fp16; bool bf16; bool pipeline_robustness; @@ -1037,6 +1038,18 @@ struct vk_d2d_path { vk_buffer buf_b; void * host_ptr = nullptr; size_t size = 0; + + bool async_capable = false; + vk::Semaphore sem_src = VK_NULL_HANDLE; + vk::Semaphore sem_dst = VK_NULL_HANDLE; + uint64_t sem_value = 0; + vk_device_struct * sem_src_device = nullptr; + vk_device_struct * sem_dst_device = nullptr; + + vk_command_pool hop1_cmd_pool; + vk::Fence hop1_fence = VK_NULL_HANDLE; + bool hop1_fence_pending = false; + vk_device_struct * hop1_device = nullptr; }; #endif @@ -2279,11 +2292,16 @@ static std::map, vk_d2d_path> vk static bool vk_instance_initialized = false; static vk_instance_t vk_instance; +#ifdef __linux__ +static void ggml_vk_d2d_destroy_shared_semaphore(vk_d2d_path& path); +#endif + vk_instance_t::~vk_instance_t() { #ifdef __linux__ { std::lock_guard guard(vk_d2d_cache_mutex); for (auto& entry : vk_d2d_cache) { + ggml_vk_d2d_destroy_shared_semaphore(entry.second); if (entry.second.host_ptr) { free(entry.second.host_ptr); entry.second.host_ptr = nullptr; @@ -2308,6 +2326,7 @@ vk_device_struct::~vk_device_struct() { std::lock_guard guard(vk_d2d_cache_mutex); for (auto it = vk_d2d_cache.begin(); it != vk_d2d_cache.end(); ) { if (it->first.first == this || it->first.second == this) { + ggml_vk_d2d_destroy_shared_semaphore(it->second); if (it->second.host_ptr) { free(it->second.host_ptr); it->second.host_ptr = nullptr; @@ -3509,6 +3528,154 @@ static bool ggml_vk_d2d_try_shared_staging(vk_device& dev_a, vk_device& dev_b, s out_host_ptr = ptr; return true; } + +static void ggml_vk_d2d_destroy_shared_semaphore(vk_d2d_path& path) { + if (!path.async_capable) { + return; + } + + if (path.hop1_fence_pending && path.hop1_device) { + try { + VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), + "d2d destroy wait hop1 fence"); + } catch (...) {} + path.hop1_fence_pending = false; + } + + if (path.hop1_fence) { + path.hop1_device->device.destroyFence(path.hop1_fence); + path.hop1_fence = VK_NULL_HANDLE; + } + if (path.hop1_cmd_pool.pool) { + path.hop1_cmd_pool.destroy(path.hop1_device->device); + } + + if (path.sem_src) { + path.sem_src_device->device.destroySemaphore(path.sem_src); + path.sem_src = VK_NULL_HANDLE; + } + if (path.sem_dst) { + path.sem_dst_device->device.destroySemaphore(path.sem_dst); + path.sem_dst = VK_NULL_HANDLE; + } + + path.async_capable = false; + path.sem_value = 0; + path.sem_src_device = nullptr; + path.sem_dst_device = nullptr; + path.hop1_device = nullptr; +} + +static bool ggml_vk_d2d_check_timeline_semaphore_export(vk_device& dev) { + vk::SemaphoreTypeCreateInfo sem_type; + sem_type.semaphoreType = vk::SemaphoreType::eTimeline; + + vk::PhysicalDeviceExternalSemaphoreInfo ext_sem_info; + ext_sem_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eOpaqueFd; + ext_sem_info.pNext = &sem_type; + + vk::ExternalSemaphoreProperties ext_sem_props = dev->physical_device.getExternalSemaphoreProperties(ext_sem_info); + + return (ext_sem_props.externalSemaphoreFeatures & vk::ExternalSemaphoreFeatureFlagBits::eExportable) && + (ext_sem_props.externalSemaphoreFeatures & vk::ExternalSemaphoreFeatureFlagBits::eImportable); +} + +static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& dst_dev, vk_d2d_path& path) { + if (!src_dev->external_semaphore_fd || !dst_dev->external_semaphore_fd) { + return false; + } + + if (!ggml_vk_d2d_check_timeline_semaphore_export(src_dev) || + !ggml_vk_d2d_check_timeline_semaphore_export(dst_dev)) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: timeline semaphore export/import not supported"); + return false; + } + + vk::ExportSemaphoreCreateInfo export_ci; + export_ci.handleTypes = vk::ExternalSemaphoreHandleTypeFlagBits::eOpaqueFd; + + vk::SemaphoreTypeCreateInfo type_ci; + type_ci.semaphoreType = vk::SemaphoreType::eTimeline; + type_ci.initialValue = 0; + type_ci.pNext = &export_ci; + + vk::SemaphoreCreateInfo sem_ci; + sem_ci.pNext = &type_ci; + + vk::Semaphore src_sem; + try { + src_sem = src_dev->device.createSemaphore(sem_ci); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: createSemaphore on src failed: " << e.what()); + return false; + } + + vk::SemaphoreGetFdInfoKHR get_fd_info; + get_fd_info.semaphore = src_sem; + get_fd_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eOpaqueFd; + + int fd = -1; + try { + fd = src_dev->device.getSemaphoreFdKHR(get_fd_info); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: getSemaphoreFdKHR failed: " << e.what()); + src_dev->device.destroySemaphore(src_sem); + return false; + } + if (fd < 0) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: getSemaphoreFdKHR returned invalid fd"); + src_dev->device.destroySemaphore(src_sem); + return false; + } + + vk::SemaphoreTypeCreateInfo dst_type_ci{ vk::SemaphoreType::eTimeline, 0 }; + + vk::SemaphoreCreateInfo dst_sem_ci{}; + dst_sem_ci.pNext = &dst_type_ci; + + vk::Semaphore dst_sem; + try { + dst_sem = dst_dev->device.createSemaphore(dst_sem_ci); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: createSemaphore on dst failed: " << e.what()); + close(fd); + src_dev->device.destroySemaphore(src_sem); + return false; + } + + vk::ImportSemaphoreFdInfoKHR import_info; + import_info.semaphore = dst_sem; + import_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eOpaqueFd; + import_info.fd = fd; + + try { + dst_dev->device.importSemaphoreFdKHR(import_info); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: importSemaphoreFdKHR failed: " << e.what()); + close(fd); + dst_dev->device.destroySemaphore(dst_sem); + src_dev->device.destroySemaphore(src_sem); + return false; + } + // fd ownership transferred to driver on successful import + + path.sem_src = src_sem; + path.sem_dst = dst_sem; + path.sem_value = 0; + path.sem_src_device = src_dev.get(); + path.sem_dst_device = dst_dev.get(); + + path.hop1_cmd_pool.init(src_dev, &src_dev->transfer_queue); + path.hop1_fence = src_dev->device.createFence({}); + path.hop1_fence_pending = false; + path.hop1_device = src_dev.get(); + + path.async_capable = true; + + GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: async semaphore created\n", + src_dev->name.c_str(), dst_dev->name.c_str()); + return true; +} #endif static vk_subbuffer ggml_vk_subbuffer(const ggml_backend_vk_context* ctx, const vk_buffer& buf, size_t offset = 0) { @@ -6010,6 +6177,8 @@ static vk_device ggml_vk_get_device(size_t idx) { #ifdef __linux__ bool dma_buf_support = false; bool external_memory_fd_support = false; + bool external_semaphore_support = false; + bool external_semaphore_fd_support = false; #endif for (const auto& properties : ext_props) { @@ -6068,6 +6237,10 @@ static vk_device ggml_vk_get_device(size_t idx) { dma_buf_support = true; } else if (strcmp("VK_KHR_external_memory_fd", properties.extensionName) == 0) { external_memory_fd_support = true; + } else if (strcmp("VK_KHR_external_semaphore", properties.extensionName) == 0) { + external_semaphore_support = true; + } else if (strcmp("VK_KHR_external_semaphore_fd", properties.extensionName) == 0) { + external_semaphore_fd_support = true; #endif #if defined(VK_EXT_shader_64bit_indexing) } else if (strcmp("VK_EXT_shader_64bit_indexing", properties.extensionName) == 0) { @@ -6245,6 +6418,7 @@ static vk_device ggml_vk_get_device(size_t idx) { #ifdef __linux__ device->external_memory_dma_buf = dma_buf_support && external_memory_fd_support; + device->external_semaphore_fd = external_semaphore_support && external_semaphore_fd_support; #endif device->max_workgroup_size_log2 = uint32_t(log2f(float(device->properties.limits.maxComputeWorkGroupInvocations))); @@ -6406,6 +6580,10 @@ static vk_device ggml_vk_get_device(size_t idx) { device_extensions.push_back("VK_EXT_external_memory_dma_buf"); device_extensions.push_back("VK_KHR_external_memory_fd"); } + if (device->external_semaphore_fd) { + device_extensions.push_back("VK_KHR_external_semaphore"); + device_extensions.push_back("VK_KHR_external_semaphore_fd"); + } #endif #if defined(VK_EXT_shader_64bit_indexing) @@ -8393,6 +8571,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_p2p (src exports VRAM)\n", src_dev->name.c_str(), dst_dev->name.c_str()); + ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8409,6 +8588,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_p2p (dst exports VRAM)\n", src_dev->name.c_str(), dst_dev->name.c_str()); + ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8430,6 +8610,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_gtt (src exports GTT)\n", src_dev->name.c_str(), dst_dev->name.c_str()); + ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8447,6 +8628,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_gtt (dst exports GTT)\n", src_dev->name.c_str(), dst_dev->name.c_str()); + ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8468,6 +8650,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: shared_staging\n", src_dev->name.c_str(), dst_dev->name.c_str()); + ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(buf_a); @@ -8514,6 +8697,14 @@ static bool ggml_vk_d2d_grow_path(vk_d2d_path& path, vk_device& src_dev, vk_devi return false; } + // Wait for any in-flight hop1 before destroying old buffers + if (path.hop1_fence_pending && path.hop1_device) { + VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), + "d2d grow wait hop1 fence"); + path.hop1_device->device.resetFences({ path.hop1_fence }); + path.hop1_fence_pending = false; + } + // Destroy old buffers ggml_vk_destroy_buffer(path.buf_a); ggml_vk_destroy_buffer(path.buf_b); @@ -8557,6 +8748,65 @@ static vk_d2d_path& ggml_vk_get_d2d_path(vk_device& src_dev, vk_device& dst_dev, return path; } + +static bool ggml_vk_d2d_is_async_capable(vk_device& src_dev, vk_device& dst_dev) { + std::lock_guard guard(vk_d2d_cache_mutex); + auto key = std::make_pair(src_dev.get(), dst_dev.get()); + auto it = vk_d2d_cache.find(key); + return it != vk_d2d_cache.end() && it->second.async_capable; +} + +static bool ggml_vk_buffer_copy_async_d2d( + vk_context& dst_compute_ctx, + vk_buffer& dst, size_t dst_offset, + vk_buffer& src, size_t src_offset, + size_t size) { + VK_LOG_DEBUG("ggml_vk_buffer_copy_async_d2d(" << size << ")"); + + vk_d2d_path& path = ggml_vk_get_d2d_path(src->device, dst->device, size); + + if (!path.async_capable || path.method == D2D_STAGING) { + return false; + } + + vk_buffer& src_side_buf = path.reverse_direction ? path.buf_b : path.buf_a; + vk_buffer& dst_side_buf = path.reverse_direction ? path.buf_a : path.buf_b; + + // Wait for any previous hop1 on this path to complete (command buffer reuse) + if (path.hop1_fence_pending) { + VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), + "d2d async wait hop1 fence"); + path.hop1_device->device.resetFences({ path.hop1_fence }); + path.hop1_fence_pending = false; + ggml_vk_command_pool_cleanup(src->device, path.hop1_cmd_pool); + } + + uint64_t signal_value = ++path.sem_value; + + // Hop 1: src device copies VRAM -> shared buffer, signals semaphore + { + std::lock_guard guard(src->device->mutex); + vk_context hop1_ctx = ggml_vk_create_temporary_context(path.hop1_cmd_pool); + ggml_vk_ctx_begin(src->device, hop1_ctx); + + VkBufferCopy bc{ src_offset, 0, size }; + vkCmdCopyBuffer(hop1_ctx->s->buffer->buf, (VkBuffer)src->buffer, (VkBuffer)src_side_buf->buffer, 1, &bc); + + hop1_ctx->s->signal_semaphores.push_back({ path.sem_src, signal_value }); + + ggml_vk_ctx_end(hop1_ctx); + ggml_vk_submit(hop1_ctx, path.hop1_fence); + path.hop1_fence_pending = true; + } + + // Hop 2: record into dst compute context — waits on semaphore, copies shared buffer -> VRAM + dst_compute_ctx->s->wait_semaphores.push_back({ path.sem_dst, signal_value }); + + VkBufferCopy bc2{ 0, dst_offset, size }; + vkCmdCopyBuffer(dst_compute_ctx->s->buffer->buf, (VkBuffer)dst_side_buf->buffer, (VkBuffer)dst->buffer, 1, &bc2); + + return true; +} #endif static void ggml_vk_buffer_copy_async(vk_context& ctx, vk_buffer& dst, size_t dst_offset, vk_buffer& src, size_t src_offset, size_t size) { @@ -16057,8 +16307,19 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend_src, ggml_ba if (ggml_backend_buffer_is_vk(src->buffer)) { ggml_backend_vk_buffer_context * src_buf_ctx = (ggml_backend_vk_buffer_context *)src->buffer->context; - // Async copy only works within the same device if (src_buf_ctx->dev_buffer->device != dst_buf->device) { +#ifdef __linux__ + if (ggml_vk_d2d_is_async_capable(src_buf_ctx->dev_buffer->device, dst_buf->device)) { + vk_context compute_ctx = ggml_vk_get_compute_ctx(ctx); + if (ggml_vk_buffer_copy_async_d2d( + compute_ctx, + dst_buf, vk_tensor_offset(dst) + dst->view_offs, + src_buf_ctx->dev_buffer, vk_tensor_offset(src) + src->view_offs, + ggml_nbytes(src))) { + return true; + } + } +#endif return false; }