diff --git a/ggml/src/ggml-vulkan/ggml-vulkan.cpp b/ggml/src/ggml-vulkan/ggml-vulkan.cpp index edaaecc51c..e82be4a505 100644 --- a/ggml/src/ggml-vulkan/ggml-vulkan.cpp +++ b/ggml/src/ggml-vulkan/ggml-vulkan.cpp @@ -1017,8 +1017,8 @@ struct vk_buffer_struct { } VK_LOG_DEBUG("~vk_buffer_struct(" << buffer << ", " << size << ")"); - device->device.freeMemory(device_memory); device->device.destroyBuffer(buffer); + device->device.freeMemory(device_memory); } }; @@ -1031,6 +1031,12 @@ enum vk_d2d_method { D2D_STAGING, }; +enum vk_d2d_sync_method { + D2D_SYNC_NONE, + D2D_SYNC_TIMELINE, + D2D_SYNC_SYNCFD, +}; + struct vk_d2d_path { vk_d2d_method method = D2D_UNTESTED; bool reverse_direction = false; @@ -1043,13 +1049,17 @@ struct vk_d2d_path { vk_buffer buf_b; void * host_ptr = nullptr; uint64_t hop2_done = 0; + // sync_fd back-edge: hop2 signals a per-copy semaphore (stored here), + // exported via sync_fd into a per-copy semaphore on src for next hop1 + vk::Semaphore last_back_sem = VK_NULL_HANDLE; + bool back_edge_ready = false; }; slot slots[POOL_SIZE]; size_t num_slots = 0; size_t pool_idx = 0; - bool async_capable = false; + vk_d2d_sync_method sync_method = D2D_SYNC_NONE; vk::Semaphore sem_src = VK_NULL_HANDLE; vk::Semaphore sem_dst = VK_NULL_HANDLE; uint64_t sem_value = 0; @@ -1057,9 +1067,10 @@ struct vk_d2d_path { vk_device_struct * sem_dst_device = nullptr; vk_command_pool hop1_cmd_pool; - vk::Fence hop1_fence = VK_NULL_HANDLE; - bool hop1_fence_pending = false; vk_device_struct * hop1_device = nullptr; + + vk_command_pool hop2_cmd_pool; + vk_device_struct * hop2_device = nullptr; }; #endif @@ -2303,7 +2314,7 @@ static bool vk_instance_initialized = false; static vk_instance_t vk_instance; #ifdef __linux__ -static void ggml_vk_d2d_destroy_shared_semaphore(vk_d2d_path& path); +static void ggml_vk_d2d_destroy_sync(vk_d2d_path& path); static bool ggml_vk_d2d_grow_slot(vk_d2d_path& path, vk_device& src_dev, vk_device& dst_dev, size_t needed, vk_d2d_path::slot& s); #endif @@ -2317,12 +2328,11 @@ vk_instance_t::~vk_instance_t() { // in each device's destructor will implicitly free associated resources. // Explicit Vulkan calls here are unsafe because the validation layer's // static data may already be destroyed. - entry.second.async_capable = false; + entry.second.sync_method = D2D_SYNC_NONE; entry.second.sem_src = VK_NULL_HANDLE; entry.second.sem_dst = VK_NULL_HANDLE; - entry.second.hop1_fence = VK_NULL_HANDLE; - entry.second.hop1_fence_pending = false; entry.second.hop1_cmd_pool.pool = nullptr; + entry.second.hop2_cmd_pool.pool = nullptr; for (auto& s : entry.second.slots) { if (s.host_ptr) { free(s.host_ptr); @@ -2334,6 +2344,8 @@ vk_instance_t::~vk_instance_t() { if (s.buf_b) { s.buf_b->size = 0; } + s.last_back_sem = VK_NULL_HANDLE; + s.back_edge_ready = false; } } vk_d2d_cache.clear(); @@ -2349,7 +2361,7 @@ vk_device_struct::~vk_device_struct() { std::lock_guard guard(vk_d2d_cache_mutex); for (auto it = vk_d2d_cache.begin(); it != vk_d2d_cache.end(); ) { if (it->first.first == this || it->first.second == this) { - ggml_vk_d2d_destroy_shared_semaphore(it->second); + ggml_vk_d2d_destroy_sync(it->second); for (auto& s : it->second.slots) { if (s.host_ptr) { free(s.host_ptr); @@ -3016,9 +3028,17 @@ static vk_context ggml_vk_create_temporary_context(vk_command_pool& p) { return result; } -static vk_semaphore * ggml_vk_create_binary_semaphore(ggml_backend_vk_context * ctx) { - VK_LOG_DEBUG("ggml_vk_create_timeline_semaphore()"); +static vk_semaphore * ggml_vk_create_binary_semaphore(ggml_backend_vk_context * ctx, + vk::ExternalSemaphoreHandleTypeFlags export_handle_types = {}) { + VK_LOG_DEBUG("ggml_vk_create_binary_semaphore()"); + vk::ExportSemaphoreCreateInfo export_ci; + export_ci.handleTypes = export_handle_types; + vk::SemaphoreTypeCreateInfo tci{ vk::SemaphoreType::eBinary, 0 }; + if (export_handle_types) { + tci.pNext = &export_ci; + } + vk::SemaphoreCreateInfo ci{}; ci.setPNext(&tci); vk::Semaphore semaphore = ctx->device->device.createSemaphore(ci); @@ -3554,26 +3574,39 @@ static bool ggml_vk_d2d_try_shared_staging(vk_device& dev_a, vk_device& dev_b, s return true; } -static void ggml_vk_d2d_destroy_shared_semaphore(vk_d2d_path& path) { - if (!path.async_capable) { +static void ggml_vk_d2d_destroy_sync(vk_d2d_path& path) { + if (path.sync_method == D2D_SYNC_NONE) { return; } - if (path.hop1_fence_pending && path.hop1_device) { - try { - VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), - "d2d destroy wait hop1 fence"); - } catch (...) {} - path.hop1_fence_pending = false; + if (path.sync_method == D2D_SYNC_TIMELINE) { + if (path.sem_value > 0 && path.sem_src_device) { + try { + VkSemaphoreWaitInfo wait_info = {}; + wait_info.sType = VK_STRUCTURE_TYPE_SEMAPHORE_WAIT_INFO; + VkSemaphore sem = path.sem_src; + uint64_t val = path.sem_value; + wait_info.semaphoreCount = 1; + wait_info.pSemaphores = &sem; + wait_info.pValues = &val; + vkWaitSemaphores(path.sem_src_device->device, &wait_info, UINT64_MAX); + } catch (...) {} + } + } else if (path.sync_method == D2D_SYNC_SYNCFD) { + if (path.hop1_device) { + vkDeviceWaitIdle(path.hop1_device->device); + } + if (path.hop2_device) { + vkDeviceWaitIdle(path.hop2_device->device); + } } - if (path.hop1_fence) { - path.hop1_device->device.destroyFence(path.hop1_fence); - path.hop1_fence = VK_NULL_HANDLE; - } if (path.hop1_cmd_pool.pool) { path.hop1_cmd_pool.destroy(path.hop1_device->device); } + if (path.hop2_cmd_pool.pool) { + path.hop2_cmd_pool.destroy(path.hop2_device->device); + } if (path.sem_src) { path.sem_src_device->device.destroySemaphore(path.sem_src); @@ -3583,12 +3616,17 @@ static void ggml_vk_d2d_destroy_shared_semaphore(vk_d2d_path& path) { path.sem_dst_device->device.destroySemaphore(path.sem_dst); path.sem_dst = VK_NULL_HANDLE; } + for (size_t i = 0; i < path.num_slots; i++) { + path.slots[i].last_back_sem = VK_NULL_HANDLE; + path.slots[i].back_edge_ready = false; + } - path.async_capable = false; + path.sync_method = D2D_SYNC_NONE; path.sem_value = 0; path.sem_src_device = nullptr; path.sem_dst_device = nullptr; path.hop1_device = nullptr; + path.hop2_device = nullptr; } static bool ggml_vk_d2d_check_timeline_semaphore_export(vk_device& dev) { @@ -3605,20 +3643,32 @@ static bool ggml_vk_d2d_check_timeline_semaphore_export(vk_device& dev) { (props.externalSemaphoreFeatures & vk::ExternalSemaphoreFeatureFlagBits::eImportable); } -static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& dst_dev, vk_d2d_path& path) { +static bool ggml_vk_d2d_check_sync_fd_support(vk_device& dev) { + if (!dev->external_semaphore_fd) { + return false; + } + + vk::PhysicalDeviceExternalSemaphoreInfo ext_sem_info; + ext_sem_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eSyncFd; + + vk::ExternalSemaphoreProperties props = dev->physical_device.getExternalSemaphoreProperties(ext_sem_info); + + return (props.externalSemaphoreFeatures & vk::ExternalSemaphoreFeatureFlagBits::eExportable) && + (props.externalSemaphoreFeatures & vk::ExternalSemaphoreFeatureFlagBits::eImportable); +} + +static bool ggml_vk_d2d_try_timeline_sync(vk_device& src_dev, vk_device& dst_dev, vk_d2d_path& path) { if (!src_dev->external_semaphore_fd || !dst_dev->external_semaphore_fd) { return false; } - bool src_nvidia = src_dev->vendor_id == VK_VENDOR_ID_NVIDIA; - bool dst_nvidia = dst_dev->vendor_id == VK_VENDOR_ID_NVIDIA; - if (src_nvidia != dst_nvidia) { + if (src_dev->driver_id != dst_dev->driver_id) { return false; } if (!ggml_vk_d2d_check_timeline_semaphore_export(src_dev) || !ggml_vk_d2d_check_timeline_semaphore_export(dst_dev)) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: timeline semaphore export/import not supported"); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: timeline semaphore export/import not supported"); return false; } @@ -3635,7 +3685,7 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d try { src_sem = src_dev->device.createSemaphore(sem_ci); } catch (const vk::SystemError& e) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: createSemaphore on src failed: " << e.what()); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: createSemaphore on src failed: " << e.what()); return false; } @@ -3647,12 +3697,12 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d try { fd = src_dev->device.getSemaphoreFdKHR(get_fd_info); } catch (const vk::SystemError& e) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: getSemaphoreFdKHR failed: " << e.what()); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: getSemaphoreFdKHR failed: " << e.what()); src_dev->device.destroySemaphore(src_sem); return false; } if (fd < 0) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: getSemaphoreFdKHR returned invalid fd"); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: getSemaphoreFdKHR returned invalid fd"); src_dev->device.destroySemaphore(src_sem); return false; } @@ -3666,7 +3716,7 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d try { dst_sem = dst_dev->device.createSemaphore(dst_sem_ci); } catch (const vk::SystemError& e) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: createSemaphore on dst failed: " << e.what()); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: createSemaphore on dst failed: " << e.what()); close(fd); src_dev->device.destroySemaphore(src_sem); return false; @@ -3680,19 +3730,17 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d try { dst_dev->device.importSemaphoreFdKHR(import_info); } catch (const vk::SystemError& e) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: importSemaphoreFdKHR failed: " << e.what()); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: importSemaphoreFdKHR failed: " << e.what()); close(fd); dst_dev->device.destroySemaphore(dst_sem); src_dev->device.destroySemaphore(src_sem); return false; } - // fd ownership transferred to driver on successful import try { path.hop1_cmd_pool.init(src_dev, &src_dev->transfer_queue); - path.hop1_fence = src_dev->device.createFence({}); } catch (const vk::SystemError& e) { - VK_LOG_DEBUG("ggml_vk_d2d_create_shared_semaphore: cmd pool/fence creation failed: " << e.what()); + VK_LOG_DEBUG("ggml_vk_d2d_try_timeline_sync: cmd pool/fence creation failed: " << e.what()); if (path.hop1_cmd_pool.pool) { path.hop1_cmd_pool.destroy(src_dev->device); } @@ -3706,12 +3754,9 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d path.sem_value = 0; path.sem_src_device = src_dev.get(); path.sem_dst_device = dst_dev.get(); - path.hop1_fence_pending = false; path.hop1_device = src_dev.get(); + path.sync_method = D2D_SYNC_TIMELINE; - path.async_capable = true; - - // Allocate additional pool slots for double buffering for (size_t i = path.num_slots; i < vk_d2d_path::POOL_SIZE; i++) { if (!ggml_vk_d2d_grow_slot(path, src_dev, dst_dev, path.size, path.slots[i])) { break; @@ -3719,10 +3764,80 @@ static bool ggml_vk_d2d_create_shared_semaphore(vk_device& src_dev, vk_device& d path.num_slots = i + 1; } - GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: async semaphore created (%zu pool slots)\n", + GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: timeline sync (same driver), %zu pool slots\n", src_dev->name.c_str(), dst_dev->name.c_str(), path.num_slots); return true; } + +static bool ggml_vk_d2d_try_syncfd_sync(vk_device& src_dev, vk_device& dst_dev, vk_d2d_path& path) { + if (!ggml_vk_d2d_check_sync_fd_support(src_dev) || + !ggml_vk_d2d_check_sync_fd_support(dst_dev)) { + VK_LOG_DEBUG("ggml_vk_d2d_try_syncfd_sync: sync_fd not supported on one or both devices"); + return false; + } + + vk::ExportSemaphoreCreateInfo export_ci; + export_ci.handleTypes = vk::ExternalSemaphoreHandleTypeFlagBits::eSyncFd; + + vk::SemaphoreCreateInfo src_sem_ci; + src_sem_ci.pNext = &export_ci; + + vk::Semaphore src_sem; + try { + src_sem = src_dev->device.createSemaphore(src_sem_ci); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_try_syncfd_sync: createSemaphore on src failed: " << e.what()); + return false; + } + + try { + path.hop1_cmd_pool.init(src_dev, &src_dev->compute_queue); + path.hop2_cmd_pool.init(dst_dev, &dst_dev->compute_queue); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_try_syncfd_sync: cmd pool creation failed: " << e.what()); + if (path.hop2_cmd_pool.pool) { + path.hop2_cmd_pool.destroy(dst_dev->device); + } + if (path.hop1_cmd_pool.pool) { + path.hop1_cmd_pool.destroy(src_dev->device); + } + src_dev->device.destroySemaphore(src_sem); + return false; + } + + path.sem_src = src_sem; + path.sem_value = 0; + path.sem_src_device = src_dev.get(); + path.sem_dst_device = dst_dev.get(); + path.hop1_device = src_dev.get(); + path.hop2_device = dst_dev.get(); + path.sync_method = D2D_SYNC_SYNCFD; + + // Grow to double-buffer pool size + for (size_t i = path.num_slots; i < vk_d2d_path::POOL_SIZE; i++) { + if (!ggml_vk_d2d_grow_slot(path, src_dev, dst_dev, path.size, path.slots[i])) { + break; + } + path.num_slots = i + 1; + } + + GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: sync_fd sync (cross-driver), %zu pool slots\n", + src_dev->name.c_str(), dst_dev->name.c_str(), path.num_slots); + return true; +} + +static void ggml_vk_d2d_setup_sync(vk_device& src_dev, vk_device& dst_dev, vk_d2d_path& path) { + if (ggml_vk_d2d_try_timeline_sync(src_dev, dst_dev, path)) { + return; + } + + if (ggml_vk_d2d_try_syncfd_sync(src_dev, dst_dev, path)) { + return; + } + + GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: no async sync available, using CPU fallback\n", + src_dev->name.c_str(), dst_dev->name.c_str()); +} #endif static vk_subbuffer ggml_vk_subbuffer(const ggml_backend_vk_context* ctx, const vk_buffer& buf, size_t offset = 0) { @@ -8619,7 +8734,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_p2p (src exports VRAM)\n", src_dev->name.c_str(), dst_dev->name.c_str()); - ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); + ggml_vk_d2d_setup_sync(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8637,7 +8752,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_p2p (dst exports VRAM)\n", src_dev->name.c_str(), dst_dev->name.c_str()); - ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); + ggml_vk_d2d_setup_sync(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8660,7 +8775,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_gtt (src exports GTT)\n", src_dev->name.c_str(), dst_dev->name.c_str()); - ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); + ggml_vk_d2d_setup_sync(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8679,7 +8794,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: dmabuf_gtt (dst exports GTT)\n", src_dev->name.c_str(), dst_dev->name.c_str()); - ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); + ggml_vk_d2d_setup_sync(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(exp_buf); @@ -8702,7 +8817,7 @@ static vk_d2d_path ggml_vk_probe_d2d_path(vk_device& src_dev, vk_device& dst_dev path.size = VK_D2D_PROBE_SIZE; GGML_LOG_DEBUG("ggml_vulkan: d2d %s -> %s: shared_staging\n", src_dev->name.c_str(), dst_dev->name.c_str()); - ggml_vk_d2d_create_shared_semaphore(src_dev, dst_dev, path); + ggml_vk_d2d_setup_sync(src_dev, dst_dev, path); return path; } ggml_vk_destroy_buffer(buf_a); @@ -8763,18 +8878,43 @@ static bool ggml_vk_d2d_grow_slot(vk_d2d_path& path, vk_device& src_dev, vk_devi static bool ggml_vk_d2d_grow_path(vk_d2d_path& path, vk_device& src_dev, vk_device& dst_dev, size_t needed) { VK_LOG_DEBUG("ggml_vk_d2d_grow_path(" << needed << ", current=" << path.size << ")"); - // Wait for any in-flight hop1 before destroying old buffers - if (path.hop1_fence_pending && path.hop1_device) { - VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), - "d2d grow wait hop1 fence"); - path.hop1_device->device.resetFences({ path.hop1_fence }); - path.hop1_fence_pending = false; + // Wait for all in-flight work on both devices, then release all command buffer + // references to the shared buffers before destroying them. + // The shared buffers may be referenced by: hop1_cmd_pool command buffers, + // device queue command buffers (from probe test copies and sync d2d copies), + // and compute context command buffers (from hop2). + if (path.sync_method == D2D_SYNC_TIMELINE && path.sem_value > 0) { + VkSemaphoreWaitInfo wait_info = {}; + wait_info.sType = VK_STRUCTURE_TYPE_SEMAPHORE_WAIT_INFO; + VkSemaphore sem = path.sem_src; + uint64_t val = path.sem_value; + wait_info.semaphoreCount = 1; + wait_info.pSemaphores = &sem; + wait_info.pValues = &val; + vkWaitSemaphores(path.sem_src_device->device, &wait_info, UINT64_MAX); + + path.sem_src_device->device.resetCommandPool(path.hop1_cmd_pool.pool); + for (auto& cb : path.hop1_cmd_pool.cmd_buffers) { + cb.in_use = false; + } + } else if (path.sync_method == D2D_SYNC_SYNCFD) { + // No fences to wait — back-edge semaphores are GPU-only. + // deviceWaitIdle below ensures all work is done. } + src_dev->device.waitIdle(); + dst_dev->device.waitIdle(); + for (size_t i = 0; i < path.num_slots; i++) { if (!ggml_vk_d2d_grow_slot(path, src_dev, dst_dev, needed, path.slots[i])) { return false; } + path.slots[i].back_edge_ready = false; + } + + if (path.sync_method == D2D_SYNC_SYNCFD) { + ggml_vk_command_pool_cleanup(src_dev, path.hop1_cmd_pool); + ggml_vk_command_pool_cleanup(dst_dev, path.hop2_cmd_pool); } path.size = needed; @@ -8818,28 +8958,26 @@ static bool ggml_vk_d2d_is_async_capable(vk_device& src_dev, vk_device& dst_dev) std::lock_guard guard(vk_d2d_cache_mutex); auto key = std::make_pair(src_dev.get(), dst_dev.get()); auto it = vk_d2d_cache.find(key); - return it != vk_d2d_cache.end() && it->second.async_capable; + return it != vk_d2d_cache.end() && it->second.sync_method != D2D_SYNC_NONE; } -static bool ggml_vk_buffer_copy_async_d2d( +static bool ggml_vk_buffer_copy_async_d2d_timeline( vk_context& dst_compute_ctx, vk_buffer& dst, size_t dst_offset, vk_buffer& src, size_t src_offset, - size_t size) { - VK_LOG_DEBUG("ggml_vk_buffer_copy_async_d2d(" << size << ")"); + size_t size, + vk_d2d_path& path) { - vk_d2d_path& path = ggml_vk_get_d2d_path(src->device, dst->device, size); - - if (!path.async_capable || path.method == D2D_STAGING) { - return false; - } - - // Wait for any previous hop1 on this path to complete (command buffer reuse) - if (path.hop1_fence_pending) { - VK_CHECK(path.hop1_device->device.waitForFences({ path.hop1_fence }, true, UINT64_MAX), - "d2d async wait hop1 fence"); - path.hop1_device->device.resetFences({ path.hop1_fence }); - path.hop1_fence_pending = false; + // Periodic command pool cleanup + if (path.hop1_cmd_pool.buffers_in_use() >= 8) { + VkSemaphoreWaitInfo wait_info = {}; + wait_info.sType = VK_STRUCTURE_TYPE_SEMAPHORE_WAIT_INFO; + VkSemaphore sem = path.sem_src; + uint64_t val = path.sem_value; + wait_info.semaphoreCount = 1; + wait_info.pSemaphores = &sem; + wait_info.pValues = &val; + vkWaitSemaphores(path.sem_src_device->device, &wait_info, UINT64_MAX); ggml_vk_command_pool_cleanup(src->device, path.hop1_cmd_pool); } @@ -8851,13 +8989,11 @@ static bool ggml_vk_buffer_copy_async_d2d( vk_buffer& src_side_buf = path.reverse_direction ? slot.buf_b : slot.buf_a; vk_buffer& dst_side_buf = path.reverse_direction ? slot.buf_a : slot.buf_b; - // Two sem values per copy: hop1_signal for hop1→hop2, hop2_signal for hop2→next reuse uint64_t hop1_signal = path.sem_value + 1; uint64_t hop2_signal = path.sem_value + 2; path.sem_value = hop2_signal; // Hop 1: src device copies VRAM -> shared buffer - // Wait for previous hop2 on this slot to finish reading before overwriting { std::lock_guard guard(src->device->mutex); vk_context hop1_ctx = ggml_vk_create_temporary_context(path.hop1_cmd_pool); @@ -8873,11 +9009,10 @@ static bool ggml_vk_buffer_copy_async_d2d( hop1_ctx->s->signal_semaphores.push_back({ path.sem_src, hop1_signal }); ggml_vk_ctx_end(hop1_ctx); - ggml_vk_submit(hop1_ctx, path.hop1_fence); - path.hop1_fence_pending = true; + ggml_vk_submit(hop1_ctx, {}); } - // Hop 2: new submission in dst compute context — wait for hop1, signal when done reading + // Hop 2: deferred in dst compute context ggml_vk_ctx_begin(dst->device, dst_compute_ctx); dst_compute_ctx->s->wait_semaphores.push_back({ path.sem_dst, hop1_signal }); @@ -8889,6 +9024,140 @@ static bool ggml_vk_buffer_copy_async_d2d( return true; } + +static bool ggml_vk_d2d_syncfd_export_import(vk_device& from_dev, vk::Semaphore from_sem, + vk_device& to_dev, vk::Semaphore to_sem) { + vk::SemaphoreGetFdInfoKHR get_fd_info; + get_fd_info.semaphore = from_sem; + get_fd_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eSyncFd; + + int fd; + try { + fd = from_dev->device.getSemaphoreFdKHR(get_fd_info); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_syncfd_export_import: getSemaphoreFdKHR failed: " << e.what()); + return false; + } + + vk::ImportSemaphoreFdInfoKHR import_info; + import_info.semaphore = to_sem; + import_info.handleType = vk::ExternalSemaphoreHandleTypeFlagBits::eSyncFd; + import_info.fd = fd; + import_info.flags = vk::SemaphoreImportFlagBits::eTemporary; + + try { + to_dev->device.importSemaphoreFdKHR(import_info); + } catch (const vk::SystemError& e) { + VK_LOG_DEBUG("ggml_vk_d2d_syncfd_export_import: importSemaphoreFdKHR failed: " << e.what()); + close(fd); + return false; + } + + return true; +} + +static bool ggml_vk_buffer_copy_async_d2d_syncfd( + ggml_backend_vk_context * src_ctx, + ggml_backend_vk_context * dst_ctx, + vk_buffer& dst, size_t dst_offset, + vk_buffer& src, size_t src_offset, + size_t size, + vk_d2d_path& path) { + + size_t slot_idx = path.pool_idx; + path.pool_idx = (path.pool_idx + 1) % path.num_slots; + vk_d2d_path::slot& slot = path.slots[slot_idx]; + + vk_buffer& src_side_buf = path.reverse_direction ? slot.buf_b : slot.buf_a; + vk_buffer& dst_side_buf = path.reverse_direction ? slot.buf_a : slot.buf_b; + + // Create all per-copy semaphores upfront to avoid vector reallocation + // invalidating pointers (ggml_vk_create_binary_semaphore returns pointer + // into a vector that may reallocate on subsequent push_back) + vk::Semaphore back_wait_sem = VK_NULL_HANDLE; + if (slot.back_edge_ready) { + back_wait_sem = ggml_vk_create_binary_semaphore(src_ctx)->s; + slot.back_edge_ready = false; + } + vk::Semaphore fwd_sem = ggml_vk_create_binary_semaphore(dst_ctx)->s; + vk::Semaphore back_signal_sem = ggml_vk_create_binary_semaphore(dst_ctx, + vk::ExternalSemaphoreHandleTypeFlagBits::eSyncFd)->s; + + // Back edge: export previous hop2's signal into the src-side wait semaphore + if (back_wait_sem) { + if (!ggml_vk_d2d_syncfd_export_import(dst->device, slot.last_back_sem, src->device, back_wait_sem)) { + return false; + } + } + + // Hop 1: src device copies VRAM -> shared buffer + { + std::lock_guard guard(src->device->mutex); + vk_context hop1_ctx = ggml_vk_create_temporary_context(path.hop1_cmd_pool); + ggml_vk_ctx_begin(src->device, hop1_ctx); + + if (back_wait_sem) { + hop1_ctx->s->wait_semaphores.push_back({ back_wait_sem, 0 }); + } + + VkBufferCopy bc{ src_offset, 0, size }; + vkCmdCopyBuffer(hop1_ctx->s->buffer->buf, (VkBuffer)src->buffer, (VkBuffer)src_side_buf->buffer, 1, &bc); + + hop1_ctx->s->signal_semaphores.push_back({ path.sem_src, 0 }); + + ggml_vk_ctx_end(hop1_ctx); + ggml_vk_submit(hop1_ctx, {}); + } + + // Forward edge: export sync_fd from sem_src, import into per-copy dst semaphore + if (!ggml_vk_d2d_syncfd_export_import(src->device, path.sem_src, dst->device, fwd_sem)) { + return false; + } + + // Hop 2: dst device copies shared buffer -> VRAM + { + std::lock_guard guard(dst->device->mutex); + vk_context hop2_ctx = ggml_vk_create_temporary_context(path.hop2_cmd_pool); + ggml_vk_ctx_begin(dst->device, hop2_ctx); + + hop2_ctx->s->wait_semaphores.push_back({ fwd_sem, 0 }); + + VkBufferCopy bc2{ 0, dst_offset, size }; + vkCmdCopyBuffer(hop2_ctx->s->buffer->buf, (VkBuffer)dst_side_buf->buffer, (VkBuffer)dst->buffer, 1, &bc2); + + hop2_ctx->s->signal_semaphores.push_back({ back_signal_sem, 0 }); + + ggml_vk_ctx_end(hop2_ctx); + ggml_vk_submit(hop2_ctx, {}); + } + + slot.last_back_sem = back_signal_sem; + slot.back_edge_ready = true; + + return true; +} + +static bool ggml_vk_buffer_copy_async_d2d( + ggml_backend_vk_context * src_ctx, + ggml_backend_vk_context * dst_ctx, + vk_buffer& dst, size_t dst_offset, + vk_buffer& src, size_t src_offset, + size_t size) { + VK_LOG_DEBUG("ggml_vk_buffer_copy_async_d2d(" << size << ")"); + + vk_d2d_path& path = ggml_vk_get_d2d_path(src->device, dst->device, size); + + if (path.sync_method == D2D_SYNC_NONE || path.method == D2D_STAGING) { + return false; + } + + if (path.sync_method == D2D_SYNC_TIMELINE) { + vk_context compute_ctx = ggml_vk_get_compute_ctx(dst_ctx); + return ggml_vk_buffer_copy_async_d2d_timeline(compute_ctx, dst, dst_offset, src, src_offset, size, path); + } + + return ggml_vk_buffer_copy_async_d2d_syncfd(src_ctx, dst_ctx, dst, dst_offset, src, src_offset, size, path); +} #endif static void ggml_vk_buffer_copy_async(vk_context& ctx, vk_buffer& dst, size_t dst_offset, vk_buffer& src, size_t src_offset, size_t size) { @@ -15876,6 +16145,24 @@ static void ggml_vk_graph_cleanup(ggml_backend_vk_context * ctx) { } ctx->gc.semaphores.clear(); +#ifdef __linux__ + { + std::lock_guard guard(vk_d2d_cache_mutex); + for (auto& entry : vk_d2d_cache) { + vk_d2d_path& path = entry.second; + if (path.sync_method != D2D_SYNC_SYNCFD) { + continue; + } + if (path.sem_dst_device == ctx->device.get() || path.sem_src_device == ctx->device.get()) { + for (size_t i = 0; i < path.num_slots; i++) { + path.slots[i].last_back_sem = VK_NULL_HANDLE; + path.slots[i].back_edge_ready = false; + } + } + } + } +#endif + for (size_t i = 0; i < ctx->gc.tl_semaphores.size(); i++) { ctx->device->device.destroySemaphore({ ctx->gc.tl_semaphores[i].s }); } @@ -15904,6 +16191,22 @@ static void ggml_vk_cleanup(ggml_backend_vk_context * ctx) { ggml_vk_graph_cleanup(ctx); +#ifdef __linux__ + { + vkDeviceWaitIdle(ctx->device->device); + std::lock_guard guard(vk_d2d_cache_mutex); + for (auto& entry : vk_d2d_cache) { + vk_d2d_path& path = entry.second; + if (path.hop1_device == ctx->device.get() && path.hop1_cmd_pool.pool) { + ggml_vk_command_pool_cleanup(ctx->device, path.hop1_cmd_pool); + } + if (path.hop2_device == ctx->device.get() && path.hop2_cmd_pool.pool) { + ggml_vk_command_pool_cleanup(ctx->device, path.hop2_cmd_pool); + } + } + } +#endif + ggml_vk_destroy_buffer(ctx->prealloc_x); ggml_vk_destroy_buffer(ctx->prealloc_y); ggml_vk_destroy_buffer(ctx->prealloc_split_k); @@ -16390,9 +16693,9 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend_src, ggml_ba if (src_buf_ctx->dev_buffer->device != dst_buf->device) { #ifdef __linux__ if (ggml_vk_d2d_is_async_capable(src_buf_ctx->dev_buffer->device, dst_buf->device)) { - vk_context compute_ctx = ggml_vk_get_compute_ctx(ctx); + ggml_backend_vk_context * src_ctx = (ggml_backend_vk_context *)backend_src->context; if (ggml_vk_buffer_copy_async_d2d( - compute_ctx, + src_ctx, ctx, dst_buf, vk_tensor_offset(dst) + dst->view_offs, src_buf_ctx->dev_buffer, vk_tensor_offset(src) + src->view_offs, ggml_nbytes(src))) {