Files
ANSCORE/engines/TensorRTAPI/include/engine/EngineMultiGpu.inl

902 lines
38 KiB
Plaintext
Raw Normal View History

2026-03-28 16:54:11 +11:00
// ============================================================================
// EngineMultiGpu.inl
//
// Multi-GPU inference pool -- merged from MultiGpuEngineManager.h
//
// This file is #included at the bottom of engine.h and must not be compiled
// independently. It provides implementations for all pool-management methods
// declared inside Engine<T>:
//
// initializePool() -- build from ONNX, create pool
// initializePoolFromEngine() -- load pre-built TRT engine, create pool
// enumerateDevices() -- static CUDA device enumeration
// loadSlots() -- core pool allocation logic (private)
// runInferenceFromPool() -- thread-safe slot dispatch (private)
// getTotalCapacity() -- inline in engine.h
// getActiveInferences() -- inline in engine.h
// getAvailableSlots() -- inline in engine.h
// isAtCapacity() -- inline in engine.h
// printCapacityReport() -- human-readable pool status
// ============================================================================
// -- Static member definitions for global elastic slot cap --------------------
template <typename T>
std::atomic<int> Engine<T>::s_globalElasticCount{0};
template <typename T>
std::atomic<int> Engine<T>::s_globalElasticMax{32}; // safe default, overwritten on first pool init
template <typename T>
std::once_flag Engine<T>::s_globalCapInitFlag;
template <typename T>
std::atomic<int64_t> Engine<T>::s_lastPoolCreatedMs{0};
// ----------------------------------------------------------------------------
// enumerateDevices -- static, no model required
// ----------------------------------------------------------------------------
template <typename T>
/*static*/ std::vector<GpuDeviceInfo>
Engine<T>::enumerateDevices()
{
int count = 0;
cudaGetDeviceCount(&count);
std::vector<GpuDeviceInfo> devices;
devices.reserve(static_cast<size_t>(count));
for (int i = 0; i < count; ++i) {
cudaDeviceProp prop;
cudaGetDeviceProperties(&prop, i);
cudaSetDevice(i);
size_t freeBytes = 0, totalBytes = 0;
cudaMemGetInfo(&freeBytes, &totalBytes);
GpuDeviceInfo info;
info.index = i;
info.name = prop.name;
info.totalMemoryBytes = prop.totalGlobalMem;
info.freeMemoryAtInitBytes = freeBytes;
info.computeMajor = prop.major;
info.computeMinor = prop.minor;
info.slotsAllocated = 0;
info.memoryPerSlotBytes = 0;
devices.push_back(std::move(info));
}
return devices;
}
// ----------------------------------------------------------------------------
// Public pool-init wrappers
// ----------------------------------------------------------------------------
template <typename T>
bool Engine<T>::initializePool(
const ANSCENTER::Options& baseOptions,
const std::string& onnxModelPath,
const std::array<float, 3>& subVals,
const std::array<float, 3>& divVals,
bool normalize,
int maxSlotsPerGpu,
double memSafetyFactor)
{
// Apply baseOptions to *this* so that m_options is consistent whether
// the user goes through initializePool() or the 6-param buildLoadNetwork().
m_options = baseOptions;
return buildLoadNetwork(onnxModelPath, subVals, divVals, normalize,
maxSlotsPerGpu, memSafetyFactor);
}
template <typename T>
bool Engine<T>::initializePoolFromEngine(
const ANSCENTER::Options& baseOptions,
const std::string& trtEnginePath,
const std::array<float, 3>& subVals,
const std::array<float, 3>& divVals,
bool normalize,
int maxSlotsPerGpu,
double memSafetyFactor)
{
m_options = baseOptions;
return loadNetwork(trtEnginePath, subVals, divVals, normalize,
maxSlotsPerGpu, memSafetyFactor);
}
// ----------------------------------------------------------------------------
// loadSlots -- core pool allocation logic
//
// Three modes based on maxSlotsPerGpu:
//
// 1 => ROUND-ROBIN (default)
// 1 slot per GPU, created at init. Tasks queue when all slots
// busy. Best balance of VRAM usage and multi-GPU utilisation.
// Example: 3 GPUs → 3 slots, round-robin dispatch.
//
// -1 => ELASTIC MODE
// Only the probe slot is pre-loaded. Additional slots are created
// on-demand by tryGrowPool() when concurrent requests arrive, and
// released by releaseIdleSlots() when idle. Higher throughput but
// higher VRAM usage — only recommended for large GPUs (≥ 8 GB).
//
// >1 => PRE-ALLOCATED MODE (explicit cap)
// Slots are created upfront, capped at maxSlotsPerGpu per GPU.
// Useful when the caller knows the required concurrency level.
// ----------------------------------------------------------------------------
template <typename T>
bool Engine<T>::loadSlots(
const ANSCENTER::Options& baseOptions,
const std::string& modelPath,
const std::array<float, 3>& subVals,
const std::array<float, 3>& divVals,
bool normalize,
bool fromOnnx,
int maxSlotsPerGpu,
double memSafetyFactor)
{
// -- 1. Enumerate GPUs --------------------------------------------------
m_deviceInfos = enumerateDevices();
if (m_deviceInfos.empty()) {
std::cout << "Error [Pool]: No CUDA-capable GPUs detected" << std::endl;
return false;
}
const bool elastic = (maxSlotsPerGpu <= 0);
m_elasticMode = elastic;
// Set global elastic slot cap ONCE based on total GPU VRAM.
// Budget: ~4 slots per GB. This cap is shared across ALL pools
// to prevent CUDA driver SRW lock convoy (30+ threads deadlocked).
// 4 GB → 12, 6 GB → 24, 8 GB → 32, 12 GB → 48, 24 GB → 96
if (elastic) {
std::call_once(s_globalCapInitFlag, [this]() {
int totalGB = 0;
for (const auto& dev : m_deviceInfos)
totalGB += static_cast<int>(dev.totalMemoryBytes / (1024ULL * 1024ULL * 1024ULL));
int cap = std::max(8, totalGB * 4); // minimum 8
s_globalElasticMax.store(cap);
std::cout << "Info [Pool]: Global elastic slot cap = "
<< cap << " (total " << totalGB << " GB VRAM x4)" << std::endl;
});
}
std::cout << "\n====================================================" << std::endl;
std::cout << "Engine Pool Initialization"
<< (elastic ? " [ELASTIC]" : " [PRE-ALLOCATED]") << std::endl;
std::cout << "====================================================" << std::endl;
std::cout << "Found " << m_deviceInfos.size() << " GPU(s):" << std::endl;
for (const auto& d : m_deviceInfos) {
std::cout << " GPU[" << d.index << "] " << d.name
<< " | SM " << d.computeMajor << "." << d.computeMinor
<< " | Total " << d.totalMemoryBytes / 1048576 << " MiB"
<< " | Free " << d.freeMemoryAtInitBytes / 1048576 << " MiB"
<< std::endl;
}
// Warn about heterogeneous GPUs -- TRT engine may not be compatible
for (size_t i = 1; i < m_deviceInfos.size(); ++i) {
if (m_deviceInfos[i].name != m_deviceInfos[0].name) {
std::cout << "Warning [Pool]: GPU[" << i << "] '" << m_deviceInfos[i].name
<< "' differs from GPU[0] '" << m_deviceInfos[0].name
<< "'. TRT engine binary may be incompatible with dissimilar GPUs."
<< std::endl;
}
}
// -- 2. Probe engine: measure per-slot VRAM footprint -------------------
//
// Memory delta = freeBeforeLoad - freeAfterLoad
// Includes: TRT runtime buffers, CUDA context overhead, I/O buffers,
// stream memory, and workspace allocated by Engine<T>.
//
// MULTI-GPU BALANCING: place the probe on the GPU with the most free
// VRAM. This naturally distributes engines across GPUs as each pool
// init consumes VRAM from its chosen GPU, making the *other* GPU
// the best candidate for the next pool.
int probeGpuIdx = 0;
{
size_t bestFree = 0;
for (const auto& d : m_deviceInfos) {
cudaSetDevice(d.index);
size_t freeNow = 0, totalNow = 0;
cudaMemGetInfo(&freeNow, &totalNow);
std::cout << " GPU[" << d.index << "] free VRAM: " << freeNow / 1048576 << " MiB" << std::endl;
if (freeNow > bestFree) {
bestFree = freeNow;
probeGpuIdx = d.index;
}
}
}
std::cout << "\nLoading probe engine on GPU[" << probeGpuIdx
<< "] (most free VRAM) to measure per-slot memory..." << std::endl;
cudaSetDevice(probeGpuIdx);
size_t freeBefore = 0, tmp = 0;
cudaMemGetInfo(&freeBefore, &tmp);
ANSCENTER::Options opts0 = baseOptions;
opts0.deviceIndex = probeGpuIdx;
auto probeEngine = std::make_unique<Engine<T>>(opts0);
const bool probeOk = fromOnnx
? probeEngine->buildLoadNetwork(modelPath, subVals, divVals, normalize)
: probeEngine->loadNetwork (modelPath, subVals, divVals, normalize);
if (!probeOk) {
return false;
}
size_t freeAfter = 0;
cudaMemGetInfo(&freeAfter, &tmp);
// Floor the delta at 64 MiB to guard against measurement noise
constexpr size_t kMinSlotMemBytes = 64ULL * 1024 * 1024;
const size_t rawDelta = (freeBefore > freeAfter) ? (freeBefore - freeAfter) : 0ULL;
const size_t memPerSlot = std::max(rawDelta, kMinSlotMemBytes);
std::cout << "Info [Pool]: Memory per slot = " << memPerSlot / 1048576
<< " MiB (measured delta = " << rawDelta / 1048576 << " MiB)" << std::endl;
// Cache input/output tensor dims on *this* Engine so getInputDims() /
// getOutputDims() work correctly when the pool is the active code path.
m_inputDims = probeEngine->getInputDims();
m_outputDims = probeEngine->getOutputDims();
// Sync GPU-capped batch sizes from the probe engine. The build() function
// may have reduced maxBatchSize based on GPU VRAM tier; propagate that to
// the pool manager so callers see the actual runtime limits.
m_options.maxBatchSize = probeEngine->getOptions().maxBatchSize;
m_options.optBatchSize = probeEngine->getOptions().optBatchSize;
// Store per-slot measurement for on-demand growth
m_memPerSlot = memPerSlot;
// Promote the probe engine into the first slot on the chosen GPU
{
InferenceSlot s;
s.deviceIndex = probeGpuIdx;
s.busy = false;
s.memUsed = memPerSlot;
s.engine = std::move(probeEngine);
m_slots.push_back(std::move(s));
}
m_deviceInfos[probeGpuIdx].slotsAllocated = 1;
m_deviceInfos[probeGpuIdx].memoryPerSlotBytes = memPerSlot;
// -- 3. Store config for on-demand growth (elastic mode) -------------
m_poolModelPath = modelPath;
m_poolSubVals = subVals;
m_poolDivVals = divVals;
m_poolNormalize = normalize;
m_poolFromOnnx = fromOnnx;
m_poolSafetyFactor = memSafetyFactor;
if (elastic) {
// -- ELASTIC: only the probe slot is pre-loaded -----------------
std::cout << "Info [Pool]: Elastic mode -- starting with 1 probe slot."
<< " Additional slots will be created on-demand as concurrent"
<< " requests arrive and released when idle." << std::endl;
m_totalCapacity = 1;
// Mark creation time — elastic growth is deferred for s_elasticGraceSec
// to let other models create their probe engines first.
{
using namespace std::chrono;
auto now = duration_cast<milliseconds>(
steady_clock::now().time_since_epoch()).count();
s_lastPoolCreatedMs.store(now);
}
printCapacityReport();
startIdleTimer(); // Auto-cleanup idle slots periodically
return true;
}
// -- 4. PRE-ALLOCATED: compute per-GPU capacity, then interleave -----
//
// Phase A: determine how many slots each GPU can hold.
// Phase B: create slots in round-robin order across GPUs so that
// the linear m_nextSlotHint scan naturally distributes
// consecutive requests across GPUs:
// m_slots = [GPU0-s0, GPU1-s0, GPU2-s0, GPU0-s1, GPU1-s1, ...]
// This gives: Task1→GPU0, Task2→GPU1, Task3→GPU2, Task4→GPU0 ...
const int numGpus = static_cast<int>(m_deviceInfos.size());
// Phase A: compute slotsToAdd per GPU
std::vector<int> slotsPerGpu(numGpus, 0);
int maxSlotsAny = 0;
for (int di = 0; di < numGpus; ++di) {
cudaSetDevice(di);
size_t freeNow = 0, totalNow = 0;
cudaMemGetInfo(&freeNow, &totalNow);
const size_t usableBytes = static_cast<size_t>(
static_cast<double>(freeNow) * memSafetyFactor);
int slotsToAdd = (memPerSlot > 0)
? static_cast<int>(usableBytes / memPerSlot) : 0;
// Apply explicit per-GPU cap; the probe GPU already has the probe slot
if (maxSlotsPerGpu > 0) {
const int budget = (di == probeGpuIdx)
? (maxSlotsPerGpu - 1)
: maxSlotsPerGpu;
slotsToAdd = std::min(slotsToAdd, budget);
}
slotsPerGpu[di] = slotsToAdd;
if (slotsToAdd > maxSlotsAny) maxSlotsAny = slotsToAdd;
m_deviceInfos[di].memoryPerSlotBytes = memPerSlot;
std::cout << "Info [Pool]: GPU[" << di << "] " << m_deviceInfos[di].name
<< " -- free " << freeNow / 1048576 << " MiB"
<< ", usable " << usableBytes / 1048576 << " MiB"
<< " => will add " << slotsToAdd << " slot(s)" << std::endl;
}
// Phase B: create slots interleaved across GPUs
// Round 0: GPU0-slot0, GPU1-slot0, GPU2-slot0
// Round 1: GPU0-slot1, GPU1-slot1, GPU2-slot1
// ...
std::vector<int> slotsCreated(numGpus, 0); // track actual success per GPU
std::vector<bool> gpuFailed(numGpus, false); // stop trying failed GPUs
for (int round = 0; round < maxSlotsAny; ++round) {
for (int di = 0; di < numGpus; ++di) {
if (gpuFailed[di]) continue;
if (slotsCreated[di] >= slotsPerGpu[di]) continue;
cudaSetDevice(di);
ANSCENTER::Options opts = baseOptions;
opts.deviceIndex = di;
auto eng = std::make_unique<Engine<T>>(opts);
eng->setVerbose(false);
eng->setDisableGraphs(true); // concurrent graph captures corrupt CUDA context
eng->m_skipEngineCache = m_skipEngineCache; // propagate to pool slots
const bool ok = fromOnnx
? eng->buildLoadNetwork(modelPath, subVals, divVals, normalize)
: eng->loadNetwork (modelPath, subVals, divVals, normalize);
if (!ok) {
std::cout << "Warning [Pool]: GPU[" << di << "] slot "
<< (slotsCreated[di] + 1) << "/" << slotsPerGpu[di]
<< " failed to load; halting allocation on this device." << std::endl;
gpuFailed[di] = true;
continue;
}
InferenceSlot slot;
slot.deviceIndex = di;
slot.busy = false;
slot.memUsed = memPerSlot;
slot.engine = std::move(eng);
m_slots.push_back(std::move(slot));
m_deviceInfos[di].slotsAllocated++;
slotsCreated[di]++;
}
}
m_totalCapacity = static_cast<int>(m_slots.size());
printCapacityReport();
if (m_totalCapacity == 0) {
std::cout << "Error [Pool]: Zero inference slots allocated -- "
"check available GPU memory." << std::endl;
return false;
}
return true;
}
// ----------------------------------------------------------------------------
// tryGrowPool -- on-demand slot creation (elastic mode)
//
// Called by runInferenceFromPool when every alive slot is busy.
// Creates ONE new engine on the first GPU that has enough free VRAM.
// GPUs are scanned in order (0, 1, ...), concentrating load on GPU 0 first.
//
// Returns a pointer to the new slot (already marked busy) or nullptr if
// no GPU has enough VRAM.
//
// Thread-safety: m_growMutex serialises growth so only one thread creates
// a slot at a time. m_slotMutex is acquired briefly to push the new slot
// into the deque. The calling thread waits (engine deserialisation takes
// ~0.5-3 s), but that is far better than rejecting the request entirely.
// ----------------------------------------------------------------------------
template <typename T>
typename Engine<T>::InferenceSlot*
Engine<T>::tryGrowPool(bool bypassGrace)
{
std::lock_guard<std::mutex> growLock(m_growMutex);
// Grace period: defer elastic growth for s_elasticGraceSec after the most
// recent pool creation. This reserves VRAM for probe engines that haven't
// been created yet (e.g., 10 models loading sequentially — early pools
// shouldn't grow elastic slots while later probes still need VRAM).
// Bypassed for demand-driven growth (a new consumer explicitly joined the
// pool, so we KNOW more slots are needed).
if (!bypassGrace) {
using namespace std::chrono;
auto now = duration_cast<milliseconds>(
steady_clock::now().time_since_epoch()).count();
int64_t lastCreated = s_lastPoolCreatedMs.load();
int64_t elapsedSec = (now - lastCreated) / 1000;
if (lastCreated > 0 && elapsedSec < s_elasticGraceSec) {
// Silently skip — don't spam logs during grace period
return nullptr;
}
}
// Global cap: prevent too many concurrent CUDA operations across ALL pools.
// With shared engine pools, unlimited elastic growth causes CUDA driver
// SRW lock convoy (30+ threads all blocked on nvcuda64 internal locks).
const int currentGlobal = s_globalElasticCount.load();
const int maxGlobal = s_globalElasticMax.load();
if (currentGlobal >= maxGlobal) {
std::cout << "Info [Pool]: tryGrowPool -- global cap reached ("
<< currentGlobal << "/" << maxGlobal
<< " total slots), not growing" << std::endl;
return nullptr;
}
// Find the GPU with the most free VRAM that has enough for one more slot.
// This naturally balances load across GPUs instead of always filling GPU 0.
const size_t requiredBytes = (m_poolSafetyFactor > 0.0)
? static_cast<size_t>(static_cast<double>(m_memPerSlot) / m_poolSafetyFactor)
: m_memPerSlot;
std::cout << "Info [Pool]: tryGrowPool called -- need " << (requiredBytes >> 20)
<< " MiB per slot, scanning " << m_deviceInfos.size() << " GPU(s)..."
<< std::endl;
// Sort device candidates by free VRAM descending (most free first)
std::vector<std::pair<size_t, int>> gpuByFreeVram; // {freeBytes, deviceIndex}
for (const auto& dev : m_deviceInfos) {
cudaSetDevice(dev.index);
size_t freeNow = 0, totalNow = 0;
cudaMemGetInfo(&freeNow, &totalNow);
std::cout << "Info [Pool]: GPU[" << dev.index << "] free=" << (freeNow >> 20)
<< " MiB, required=" << (requiredBytes >> 20) << " MiB"
<< (freeNow >= requiredBytes ? " -> CANDIDATE" : " -> SKIP (not enough)")
<< std::endl;
if (freeNow >= requiredBytes) {
gpuByFreeVram.push_back({freeNow, dev.index});
}
}
std::sort(gpuByFreeVram.begin(), gpuByFreeVram.end(),
[](const auto& a, const auto& b) { return a.first > b.first; });
if (gpuByFreeVram.empty()) {
std::cout << "Warning [Pool]: tryGrowPool -- no GPU has enough free VRAM ("
<< (requiredBytes >> 20) << " MiB), cannot grow" << std::endl;
return nullptr;
}
for (const auto& [freeVram, devIdx] : gpuByFreeVram) {
auto& dev = m_deviceInfos[devIdx];
std::cout << "Info [Pool]: Creating on-demand slot on GPU[" << dev.index
<< "] (free=" << (freeVram >> 20) << " MiB)..." << std::endl;
// Create a new engine on the GPU with the most free VRAM
cudaSetDevice(dev.index);
ANSCENTER::Options opts = m_options;
opts.deviceIndex = dev.index;
auto eng = std::make_unique<Engine<T>>(opts);
eng->setVerbose(false);
eng->setDisableGraphs(true); // concurrent graph captures corrupt CUDA context
eng->m_skipEngineCache = m_skipEngineCache; // propagate to on-demand slots
eng->m_skipOnnxRebuild = true; // elastic growth must NOT delete/rebuild engine files
eng->m_skipOnnxBuild = bypassGrace; // demand-driven growth: skip ONNX→TRT if no cached engine
const bool ok = m_poolFromOnnx
? eng->buildLoadNetwork(m_poolModelPath, m_poolSubVals,
m_poolDivVals, m_poolNormalize)
: eng->loadNetwork(m_poolModelPath, m_poolSubVals,
m_poolDivVals, m_poolNormalize);
if (!ok) {
std::cout << "Warning [Pool]: On-demand slot creation FAILED on GPU["
<< dev.index << "]" << std::endl;
continue; // try next GPU
}
std::cout << "Info [Pool]: On-demand slot engine loaded OK on GPU["
<< dev.index << "]" << std::endl;
// Check if we can reuse a dead slot entry (engine == nullptr)
{
std::lock_guard<std::mutex> slotLock(m_slotMutex);
for (auto& s : m_slots) {
if (!s.engine) { // dead entry -- recycle it
s.deviceIndex = dev.index;
s.busy = true;
s.memUsed = m_memPerSlot;
s.engine = std::move(eng);
s.lastUsedTime = std::chrono::steady_clock::now();
dev.slotsAllocated++;
// Recount alive slots
int alive = 0;
for (const auto& x : m_slots) { if (x.engine) ++alive; }
m_totalCapacity = alive;
s_globalElasticCount++;
std::cout << "Info [Pool]: On-demand slot recycled on GPU["
<< dev.index << "] -- pool now " << m_totalCapacity
<< " slot(s) (global " << s_globalElasticCount.load()
<< "/" << s_globalElasticMax.load() << ")" << std::endl;
return &s;
}
}
// No dead entries to recycle -- push a new one.
// std::deque::push_back does NOT invalidate references to existing
// elements, so pointers held by other threads remain valid.
InferenceSlot newSlot;
newSlot.deviceIndex = dev.index;
newSlot.busy = true;
newSlot.memUsed = m_memPerSlot;
newSlot.engine = std::move(eng);
newSlot.lastUsedTime = std::chrono::steady_clock::now();
m_slots.push_back(std::move(newSlot));
dev.slotsAllocated++;
m_totalCapacity = static_cast<int>(m_slots.size()); // all alive here
s_globalElasticCount++;
std::cout << "Info [Pool]: On-demand slot created on GPU["
<< dev.index << "] -- pool now " << m_totalCapacity
<< " slot(s) (global " << s_globalElasticCount.load()
<< "/" << s_globalElasticMax.load() << ")" << std::endl;
return &m_slots.back();
}
}
return nullptr; // every GPU is full
}
// ----------------------------------------------------------------------------
// growPool -- public demand-driven growth (bypasses grace period)
// ----------------------------------------------------------------------------
template <typename T>
int Engine<T>::growPool(int count)
{
int created = 0;
for (int i = 0; i < count; ++i) {
auto* slot = tryGrowPool(/*bypassGrace=*/true);
if (!slot) break;
// Release so inference threads can use it
{
std::lock_guard<std::mutex> lk(m_slotMutex);
slot->busy = false;
slot->lastUsedTime = std::chrono::steady_clock::now();
}
m_slotFreeCv.notify_one();
++created;
}
return created;
}
// ----------------------------------------------------------------------------
// runInferenceFromPool -- thread-safe slot dispatch
// ----------------------------------------------------------------------------
template <typename T>
bool Engine<T>::runInferenceFromPool(
const std::vector<std::vector<cv::cuda::GpuMat>>& inputs,
std::vector<std::vector<std::vector<T>>>& featureVectors)
{
// -- 1. Acquire an idle, alive slot (round-robin) --------------------
//
// Round-robin starting point avoids always favouring GPU 0. Each call
// advances m_nextSlotHint so consecutive requests spread across GPUs.
// The mutex is held only for the O(N) scan + flag flip -- NOT during GPU
// execution -- so threads using different slots proceed in parallel.
//
// PROACTIVE GROWTH (elastic mode):
// If all alive slots are busy when a request arrives, the pool is
// undersized for the current concurrency level. We kick off pool
// growth (tryGrowPool) in a detached background thread while we
// wait for the current slot to free. This ensures multi-GPU
// utilisation: the new slot lands on the GPU with the most free
// VRAM (typically GPU[1]) and is ready for the *next* request.
// Growth is serialised by m_growMutex so duplicate threads are
// harmless — the second one finds a fresh slot immediately.
InferenceSlot* slot = nullptr;
bool kickedGrowth = false;
2026-04-04 20:19:54 +11:00
auto _poolAcquireStart = std::chrono::steady_clock::now();
2026-03-28 16:54:11 +11:00
{
std::unique_lock<std::mutex> lock(m_slotMutex);
const auto deadline = std::chrono::steady_clock::now()
+ std::chrono::milliseconds(2000);
while (!slot) {
const size_t n = m_slots.size();
if (n > 0) {
const size_t start = m_nextSlotHint.load() % n;
for (size_t i = 0; i < n; ++i) {
auto& s = m_slots[(start + i) % n];
if (!s.busy && s.engine) { // alive and idle
s.busy = true;
slot = &s;
m_nextSlotHint = (start + i + 1) % n;
break;
}
}
}
if (!slot) {
2026-04-04 20:19:54 +11:00
ANS_DBG("TRT_Pool", "ALL SLOTS BUSY: %zu slots, active=%d — waiting for free slot",
n, m_activeCount.load());
2026-03-28 16:54:11 +11:00
// All slots busy. In elastic mode, proactively grow the
// pool in the background so the next request has a slot
// on a different GPU. We only kick once per wait cycle.
if (m_elasticMode && !kickedGrowth
&& s_globalElasticCount.load() < s_globalElasticMax.load()) {
kickedGrowth = true;
std::cout << "Info [Pool]: All slots busy -- kicking background growth thread"
<< std::endl;
// Fire-and-forget: tryGrowPool is serialised by
// m_growMutex, so concurrent kicks are safe.
std::thread([this]() {
std::cout << "Info [Pool]: Background growth thread started" << std::endl;
auto* newSlot = this->tryGrowPool();
if (newSlot) {
// Slot was created pre-marked busy; release it
// so the next requester can claim it.
{
std::lock_guard<std::mutex> lk(m_slotMutex);
newSlot->busy = false;
newSlot->lastUsedTime = std::chrono::steady_clock::now();
}
m_slotFreeCv.notify_all();
std::cout << "Info [Pool]: Background growth SUCCEEDED -- new slot on GPU["
<< newSlot->deviceIndex << "], pool now "
<< m_totalCapacity << " slot(s)" << std::endl;
} else {
std::cout << "Warning [Pool]: Background growth FAILED -- no slot created"
<< std::endl;
}
}).detach();
}
// Wait for a running slot to finish and signal us
if (m_slotFreeCv.wait_until(lock, deadline)
== std::cv_status::timeout) {
break; // fall through to reject
}
}
}
}
// -- 3. Still no slot => reject ---------------------------------------
2026-04-04 20:19:54 +11:00
{
double _acquireMs = std::chrono::duration<double, std::milli>(
std::chrono::steady_clock::now() - _poolAcquireStart).count();
if (_acquireMs > 100.0) {
ANS_DBG("TRT_Pool", "SLOW slot acquire: %.1fms slot=%p gpu=%d active=%d/%zu",
_acquireMs, (void*)slot, slot ? slot->deviceIndex : -1,
m_activeCount.load(), m_slots.size());
}
}
2026-03-28 16:54:11 +11:00
if (!slot) {
2026-04-04 20:19:54 +11:00
ANS_DBG("TRT_Pool", "ERROR: No slot available — all %zu slots busy, timeout", m_slots.size());
2026-03-28 16:54:11 +11:00
return false;
}
++m_activeCount;
// -- RAII guard: guarantee busy-flag and activeCount are restored ----------
// If runInference() throws (cv::Exception, std::bad_alloc, ...) the slot
// must be released and the counter decremented -- otherwise the slot is
// permanently lost and capacity shrinks with every exception.
bool result = false;
try {
// Match the calling thread's CUDA context to the slot's device.
// Skip the call if the thread is already on the correct device
// (cudaSetDevice under WDDM can cost 1-5ms per call).
int currentDev = -1;
cudaGetDevice(&currentDev);
if (currentDev != slot->deviceIndex) {
cudaSetDevice(slot->deviceIndex);
}
2026-04-04 20:19:54 +11:00
ANS_DBG("TRT_Pool", "Slot dispatch: gpu=%d active=%d/%zu",
slot->deviceIndex, m_activeCount.load(), m_slots.size());
auto _slotStart = std::chrono::steady_clock::now();
2026-03-28 16:54:11 +11:00
result = slot->engine->runInference(inputs, featureVectors);
2026-04-04 20:19:54 +11:00
auto _slotEnd = std::chrono::steady_clock::now();
double _slotMs = std::chrono::duration<double, std::milli>(_slotEnd - _slotStart).count();
if (_slotMs > 500.0) {
ANS_DBG("TRT_Pool", "SLOW slot inference: %.1fms gpu=%d active=%d/%zu",
_slotMs, slot->deviceIndex, m_activeCount.load(), m_slots.size());
}
2026-03-28 16:54:11 +11:00
}
catch (const std::exception& ex) {
2026-04-04 20:19:54 +11:00
ANS_DBG("TRT_Pool", "ERROR: runInference threw: %s", ex.what());
2026-03-28 16:54:11 +11:00
std::cout << "Error [Pool]: runInference threw: " << ex.what() << std::endl;
}
catch (...) {
2026-04-04 20:19:54 +11:00
ANS_DBG("TRT_Pool", "ERROR: runInference threw unknown exception");
2026-03-28 16:54:11 +11:00
std::cout << "Error [Pool]: runInference threw unknown exception" << std::endl;
}
{
std::lock_guard<std::mutex> lock(m_slotMutex);
slot->busy = false;
slot->lastUsedTime = std::chrono::steady_clock::now();
}
--m_activeCount;
m_slotFreeCv.notify_one(); // wake one thread waiting for a free slot
return result;
}
// ----------------------------------------------------------------------------
// releaseIdleSlots -- VRAM reclamation for elastic pools
//
// Destroys engine instances that have been idle for at least `idleSeconds`.
// The first slot (probe, index 0) is never released so the model remains
// instantly usable without re-measurement.
//
// Dead slots are NOT erased from the deque (to avoid invalidating pointers);
// their engine is reset to nullptr and they are recycled by tryGrowPool().
//
// Call from a periodic background timer, e.g. every 10-30 seconds:
// engine->releaseIdleSlots(30.0);
// ----------------------------------------------------------------------------
template <typename T>
int Engine<T>::releaseIdleSlots(double idleSeconds)
{
std::lock_guard<std::mutex> growLock(m_growMutex);
std::lock_guard<std::mutex> slotLock(m_slotMutex);
const auto now = std::chrono::steady_clock::now();
int released = 0;
// Skip index 0 -- that's the probe slot, always kept alive
for (size_t i = 1; i < m_slots.size(); ++i) {
auto& s = m_slots[i];
if (!s.busy && s.engine) { // alive and idle
const double idle = std::chrono::duration<double>(
now - s.lastUsedTime).count();
if (idle >= idleSeconds) {
// Update device info
for (auto& dev : m_deviceInfos) {
if (dev.index == s.deviceIndex) {
if (dev.slotsAllocated > 0) dev.slotsAllocated--;
break;
}
}
std::cout << "Info [Pool]: Releasing idle slot on GPU["
<< s.deviceIndex << "] (idle "
<< static_cast<int>(idle) << "s)" << std::endl;
// Destroy engine -- frees GPU memory.
// The InferenceSlot entry stays in the deque (dead) for reuse.
s.engine.reset();
s.memUsed = 0;
released++;
s_globalElasticCount--;
}
}
}
// Recount alive slots
int alive = 0;
for (const auto& s : m_slots) { if (s.engine) ++alive; }
m_totalCapacity = alive;
if (released > 0) {
std::cout << "Info [Pool]: Released " << released << " idle slot(s)"
<< " -- pool now " << m_totalCapacity << " alive slot(s)"
<< std::endl;
}
return released;
}
// ----------------------------------------------------------------------------
// printCapacityReport
// ----------------------------------------------------------------------------
template <typename T>
void Engine<T>::printCapacityReport() const
{
// Count alive vs dead -- lock protects against concurrent tryGrowPool
std::lock_guard<std::mutex> lock(m_slotMutex);
int alive = 0, dead = 0;
for (const auto& s : m_slots) {
if (s.engine) ++alive; else ++dead;
}
std::cout << "\n=====================================================" << std::endl;
std::cout << " Engine Pool -- Capacity Report"
<< (m_elasticMode ? " [ELASTIC]" : " [PRE-ALLOCATED]") << std::endl;
std::cout << "=====================================================" << std::endl;
std::cout << " Alive inference slots : " << alive << std::endl;
if (dead > 0)
std::cout << " Dead (recyclable) : " << dead << std::endl;
std::cout << " Active inferences : " << m_activeCount.load() << std::endl;
std::cout << " Available slots : "
<< (alive - m_activeCount.load())
<< (m_elasticMode ? " (+ on-demand)" : "")
<< std::endl;
if (m_elasticMode) {
std::cout << " Global slot usage : "
<< s_globalElasticCount.load() << "/" << s_globalElasticMax.load()
<< " (across all pools)" << std::endl;
}
std::cout << " Memory per slot : " << m_memPerSlot / 1048576 << " MiB" << std::endl;
std::cout << "-----------------------------------------------------" << std::endl;
for (const auto& d : m_deviceInfos) {
std::cout << " GPU[" << d.index << "] " << d.name
<< " | SM " << d.computeMajor << "." << d.computeMinor
<< " | Total " << d.totalMemoryBytes / 1048576 << " MiB"
<< " | Slots: " << d.slotsAllocated
<< " | Mem/slot: " << d.memoryPerSlotBytes / 1048576 << " MiB"
<< std::endl;
}
std::cout << "=====================================================" << std::endl;
}
// ----------------------------------------------------------------------------
// startIdleTimer / stopIdleTimer -- automatic idle-slot cleanup
//
// A background thread wakes every m_idleTimerIntervalSec seconds and calls
// releaseIdleSlots(m_idleTimerThresholdSec). The thread uses a
// condition_variable with a timed wait so that stopIdleTimer() can wake it
// immediately for a clean shutdown (no dangling sleeps).
//
// Only active in elastic mode -- pre-allocated pools have fixed capacity.
// ----------------------------------------------------------------------------
template <typename T>
void Engine<T>::startIdleTimer()
{
if (!m_elasticMode) return; // no-op for pre-allocated pools
if (m_idleTimerThread.joinable()) return; // already running
m_idleTimerStop = false;
m_idleTimerThread = std::thread([this]() {
std::cout << "Info [Pool]: Idle-slot cleanup timer started "
<< "(interval=" << m_idleTimerIntervalSec << "s, threshold="
<< m_idleTimerThresholdSec << "s)" << std::endl;
while (!m_idleTimerStop.load()) {
// Sleep for the interval, but wake early if stop is signalled
{
std::unique_lock<std::mutex> lk(m_idleTimerMutex);
m_idleTimerCv.wait_for(lk,
std::chrono::duration<double>(m_idleTimerIntervalSec),
[this]() { return m_idleTimerStop.load(); });
}
if (m_idleTimerStop.load()) break;
releaseIdleSlots(m_idleTimerThresholdSec);
}
std::cout << "Info [Pool]: Idle-slot cleanup timer stopped." << std::endl;
});
}
template <typename T>
void Engine<T>::stopIdleTimer()
{
if (!m_idleTimerThread.joinable()) return; // not running
m_idleTimerStop = true;
m_idleTimerCv.notify_all(); // wake the sleeping thread immediately
// During ExitProcess, worker threads are already killed by the OS.
// Calling join() on a dead thread deadlocks or causes std::terminate.
// Detach instead — the OS will reclaim everything momentarily.
if (g_processExiting().load(std::memory_order_relaxed)) {
m_idleTimerThread.detach();
} else {
m_idleTimerThread.join(); // normal path: wait for clean exit
}
}