Files
ANSCORE/engines/TensorRTAPI/include/engine/EnginePoolManager.h

432 lines
19 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#pragma once
// EnginePoolManager.h — Process-wide cache for shared Engine<T> pool instances.
//
// When multiple AI tasks load the same model (same ONNX path + GPU + config),
// this manager ensures they share a SINGLE Engine<T> pool instead of each task
// creating its own pool with independent execution contexts and VRAM buffers.
//
// Without sharing: N tasks × ~500 MB = N × 500 MB VRAM (OOM at ~5-8 tasks on 8GB GPU)
// With sharing: 1 pool × ~500 MB = 500 MB total (unlimited tasks, slower via queuing)
//
// Lazy eviction: when refcount drops to 0, the pool is kept alive for
// kEvictGraceSec seconds. If a new task acquires it within that window,
// it gets an instant HIT without rebuilding. This handles the LabView
// edit/duplicate/create cycle (destroy → recreate) gracefully.
//
// Thread-safety: All public methods are mutex-protected.
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <array>
#include <iostream>
#include <functional>
#include <chrono>
#include <thread>
#include <atomic>
#include <cuda_runtime.h>
#include "TRTEngineCache.h" // constructor touches TRTEngineCache::instance() for destruction ordering
#ifdef _WIN32
#include <windows.h>
#endif
// Forward declare Engine<T> to avoid circular includes.
// The header that includes this must also include engine.h.
template <typename T> class Engine;
namespace ANSCENTER { struct Options; }
template <typename T>
class EnginePoolManager {
public:
static EnginePoolManager& instance() {
static EnginePoolManager s_instance;
return s_instance;
}
// ========================================================================
// Cache key — uniquely identifies a compatible Engine pool.
// ========================================================================
struct PoolKey {
std::string modelPath;
int precision = 0; // cast from Precision enum
int maxBatch = 1;
bool operator==(const PoolKey& o) const {
return modelPath == o.modelPath &&
precision == o.precision &&
maxBatch == o.maxBatch;
}
};
struct PoolKeyHash {
size_t operator()(const PoolKey& k) const {
size_t h = std::hash<std::string>{}(k.modelPath);
h ^= std::hash<int>{}(k.precision) << 16;
h ^= std::hash<int>{}(k.maxBatch) << 24;
return h;
}
};
// ========================================================================
// acquire() — get or create a shared Engine pool.
//
// On first call for a given key: creates a new Engine<T>, calls
// buildLoadNetwork with the provided parameters, and caches it.
//
// On subsequent calls (or within lazy-eviction grace period):
// returns the existing shared_ptr and increments refcount.
// No VRAM allocated, near-instant.
//
// Returns nullptr if engine creation/loading fails.
// ========================================================================
std::shared_ptr<Engine<T>> acquire(
const PoolKey& key,
const ANSCENTER::Options& options,
const std::string& modelPath,
const std::array<float, 3>& subVals,
const std::array<float, 3>& divVals,
bool normalize,
int maxSlotsPerGpu)
{
// Optimizer / temporary engines: maxSlotsPerGpu==0 means the caller
// only needs a lightweight, non-shared engine (e.g., OptimizeModelStr).
// Bypass the pool cache entirely:
// - Don't hold m_mutex (which blocks ALL other pool creation)
// - Don't cache the result (temporary engine is destroyed on release)
// - Use the simple 4-param buildLoadNetwork (no pool, no probe, no VRAM measurement)
// Note: maxSlotsPerGpu==1 is now the normal "1 slot per GPU" multi-GPU
// round-robin mode, so it goes through the pool path below.
if (maxSlotsPerGpu == 0) {
logEvent("[EnginePoolManager] BYPASS (maxSlots=0): " + key.modelPath
+ " — creating non-shared engine");
auto engine = std::make_shared<Engine<T>>(options);
bool ok = engine->buildLoadNetwork(modelPath, subVals, divVals, normalize);
return ok ? engine : nullptr;
}
std::unique_lock<std::mutex> lock(m_mutex);
auto it = m_pools.find(key);
if (it != m_pools.end()) {
it->second.refcount++;
it->second.evictTime = TimePoint{}; // cancel pending eviction
int refs = it->second.refcount;
auto engine = it->second.engine;
logEvent("[EnginePoolManager] HIT: " + key.modelPath
+ " refs=" + std::to_string(refs));
// Demand-driven growth: only in elastic mode (maxSlotsPerGpu <= 0
// or > 1). With maxSlotsPerGpu==1 (round-robin default), the pool
// already has the right number of slots (1 per GPU) — tasks queue
// when all slots are busy, which is the intended behavior.
if (maxSlotsPerGpu != 1 && refs > 1 && engine) {
int alive = engine->getTotalCapacity();
if (alive < refs) {
// Check total GPU VRAM — skip growth on small GPUs
size_t totalVram = 0;
{
size_t freeTmp = 0;
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeTmp, &totalVram);
}
constexpr size_t kMinVramForGrowth = 6ULL * 1024 * 1024 * 1024; // 6 GB
if (totalVram >= kMinVramForGrowth) {
lock.unlock(); // release PoolManager lock before growing
std::thread([engine, alive, refs, modelPath = key.modelPath]() {
int created = engine->growPool(1);
if (created > 0) {
logEngineEvent("[EnginePoolManager] DEMAND GROWTH: " + modelPath
+ " grew from " + std::to_string(alive)
+ " to " + std::to_string(engine->getTotalCapacity())
+ " slots (refs=" + std::to_string(refs) + ")");
}
}).detach();
} else {
logEvent("[EnginePoolManager] SKIP GROWTH: " + key.modelPath
+ " (GPU VRAM " + std::to_string(totalVram >> 20)
+ " MiB < 6 GB threshold, refs=" + std::to_string(refs) + ")");
}
}
}
return engine;
}
// Cache miss — create new Engine pool
logEvent("[EnginePoolManager] MISS: Creating pool for " + key.modelPath + "...");
// Log VRAM before attempting to create probe
{
size_t freeMem = 0, totalMem = 0;
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeMem, &totalMem);
logEvent("[EnginePoolManager] GPU[" + std::to_string(options.deviceIndex)
+ "] VRAM: " + std::to_string(freeMem >> 20) + " MiB free / "
+ std::to_string(totalMem >> 20) + " MiB total (before probe)");
}
auto engine = std::make_shared<Engine<T>>(options);
bool ok = engine->buildLoadNetwork(modelPath, subVals, divVals, normalize, maxSlotsPerGpu);
if (!ok) {
// Step 1: Force-evict all pools with refcount=0 to reclaim VRAM
int evicted = forceEvictPending();
if (evicted > 0) {
size_t freeMem2 = 0, totalMem2 = 0;
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeMem2, &totalMem2);
logEvent("[EnginePoolManager] RETRY EVICT: Force-evicted " + std::to_string(evicted)
+ " pending pool(s), now " + std::to_string(freeMem2 >> 20)
+ " MiB free. Retrying " + key.modelPath + "...");
engine = std::make_shared<Engine<T>>(options);
ok = engine->buildLoadNetwork(modelPath, subVals, divVals, normalize, maxSlotsPerGpu);
}
// Step 2: If still failing, retry with lightweight mode (no elastic pool).
// The elastic probe does heavy warmup (batch 1-8, 10+ iterations) which
// consumes ~300-500 MB vs ~50-100 MB for a simple loadNetwork.
// Lightweight mode: tasks queue for a single shared slot — slower but works.
if (!ok) {
size_t freeMem3 = 0, totalMem3 = 0;
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeMem3, &totalMem3);
logEvent("[EnginePoolManager] RETRY LIGHTWEIGHT: Elastic probe failed, "
+ std::to_string(freeMem3 >> 20) + " MiB free. "
"Retrying with single-slot mode for " + key.modelPath + "...");
engine = std::make_shared<Engine<T>>(options);
ok = engine->buildLoadNetwork(modelPath, subVals, divVals, normalize);
}
// Step 3: If still failing, wait briefly and retry.
// Transient failures can occur when:
// - TRT engine file is being written by another build (partial file)
// - CUDA driver has temporary resource contention during multi-pool startup
// - GPU memory fragmentation resolves after previous allocations settle
// Evidence: FireSmoke/detector.onnx failed at 3740 MiB free, then
// succeeded 4 seconds later at 3154 MiB free (less VRAM!).
if (!ok) {
size_t freeMem4 = 0, totalMem4 = 0;
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeMem4, &totalMem4);
logEvent("[EnginePoolManager] RETRY DELAYED: All attempts failed with "
+ std::to_string(freeMem4 >> 20) + " MiB free. "
"Waiting 3s before final retry for " + key.modelPath + "...");
// Release mutex during sleep so other tasks can proceed
// (they may complete pool creation that resolves our issue)
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(3));
lock.lock();
// Check if another thread created this pool while we slept
auto it2 = m_pools.find(key);
if (it2 != m_pools.end()) {
it2->second.refcount++;
it2->second.evictTime = TimePoint{};
logEvent("[EnginePoolManager] HIT (after delay): " + key.modelPath
+ " refs=" + std::to_string(it2->second.refcount));
return it2->second.engine;
}
// Final retry — try lightweight again after delay
cudaSetDevice(options.deviceIndex);
cudaMemGetInfo(&freeMem4, &totalMem4);
logEvent("[EnginePoolManager] RETRY FINAL: " + std::to_string(freeMem4 >> 20)
+ " MiB free. Last attempt for " + key.modelPath + "...");
engine = std::make_shared<Engine<T>>(options);
ok = engine->buildLoadNetwork(modelPath, subVals, divVals, normalize);
}
if (!ok) {
size_t freeMem = 0, totalMem = 0;
cudaMemGetInfo(&freeMem, &totalMem);
logEvent("[EnginePoolManager] FAILED: Could not load engine for "
+ key.modelPath + " | GPU[" + std::to_string(options.deviceIndex)
+ "] VRAM: " + std::to_string(freeMem >> 20) + " MiB free / "
+ std::to_string(totalMem >> 20) + " MiB total"
+ " (after 4 attempts: elastic, evict, lightweight, delayed)", true);
return nullptr;
}
}
PoolEntry entry;
entry.engine = engine;
entry.refcount = 1;
m_pools.emplace(key, std::move(entry));
// Start the lazy-eviction sweeper if not already running
startSweeperIfNeeded();
logEvent("[EnginePoolManager] CREATED: " + key.modelPath + " refs=1");
return engine;
}
// ========================================================================
// release() — decrement refcount for a shared pool.
//
// When refcount reaches 0, the pool is NOT immediately evicted.
// Instead, it is marked for lazy eviction after kEvictGraceSec.
// This handles the LabView edit cycle (destroy → recreate within
// seconds) without rebuilding the engine from scratch.
// ========================================================================
void release(const PoolKey& key) {
std::lock_guard<std::mutex> lock(m_mutex);
auto it = m_pools.find(key);
if (it == m_pools.end()) return;
if (it->second.refcount <= 0) return;
it->second.refcount--;
logEvent("[EnginePoolManager] RELEASE: " + key.modelPath
+ " refs=" + std::to_string(it->second.refcount));
if (it->second.refcount <= 0) {
// Mark for lazy eviction — don't destroy yet
it->second.evictTime = Clock::now() + std::chrono::seconds(kEvictGraceSec);
logEvent("[EnginePoolManager] PENDING EVICT: " + key.modelPath
+ " (will evict in " + std::to_string(kEvictGraceSec) + "s if not re-acquired)");
}
}
/// Clear all cached pools (call during DLL_PROCESS_DETACH).
void clearAll() {
{
std::lock_guard<std::mutex> lock(m_mutex);
logEvent("[EnginePoolManager] CLEAR ALL (" + std::to_string(m_pools.size()) + " pools)");
m_pools.clear();
}
stopSweeper();
}
/// Number of cached pools (for diagnostics).
size_t size() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_pools.size();
}
private:
EnginePoolManager() {
// CRITICAL: Touch TRTEngineCache singleton to ensure it is constructed
// BEFORE EnginePoolManager. C++ destroys function-local statics in
// reverse construction order, so this guarantees TRTEngineCache outlives
// EnginePoolManager. Without this, during ExitProcess the cache may be
// destroyed first, and ~Engine calling TRTEngineCache::release() crashes
// on a destroyed unordered_map (static destruction order fiasco).
(void)TRTEngineCache::instance();
}
~EnginePoolManager() {
if (g_processExiting().load(std::memory_order_relaxed)) {
// ExitProcess path: worker threads are dead, CUDA/TRT state is
// unreliable. Don't destroy Engine objects (their destructors
// call cudaFree, thread::join, etc. which deadlock or crash).
// The OS reclaims all memory, VRAM, and handles at process exit.
m_sweeperRunning.store(false);
return;
}
// Normal FreeLibrary path: threads are alive, safe to clean up.
// Explicitly clear pools before implicit member destruction.
// This destroys Engine<T> objects (which call TRTEngineCache::release())
// while we still hold m_mutex and can log diagnostics.
try {
std::lock_guard<std::mutex> lock(m_mutex);
m_pools.clear();
} catch (...) {}
stopSweeper();
}
EnginePoolManager(const EnginePoolManager&) = delete;
EnginePoolManager& operator=(const EnginePoolManager&) = delete;
// Grace period before evicting a pool with refcount=0.
// Covers LabView edit/duplicate/create cycles (destroy → recreate).
static constexpr int kEvictGraceSec = 120; // 2 minutes
// Sweeper interval — how often to check for expired pools.
static constexpr int kSweeperIntervalSec = 30;
using Clock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<Clock>;
// Log to stdout/stderr only — no Windows Event Viewer.
// Event Viewer logging is handled by logEngineEvent() in engine.h for
// critical engine-level errors. EnginePoolManager messages are
// informational (HIT/MISS/EVICT) and don't need Event Viewer entries.
static void logEvent(const std::string& msg, bool isError = false) {
if (isError)
std::cerr << msg << std::endl;
else
std::cout << msg << std::endl;
}
struct PoolEntry {
std::shared_ptr<Engine<T>> engine;
int refcount = 0;
TimePoint evictTime {}; // when to evict (zero = not pending)
};
// ========================================================================
// Sweeper thread — periodically checks for pools whose eviction
// grace period has expired and removes them.
// ========================================================================
void startSweeperIfNeeded() {
// Called under m_mutex
if (m_sweeperRunning.load()) return;
m_sweeperRunning.store(true);
m_sweeperThread = std::thread([this]() {
while (m_sweeperRunning.load()) {
std::this_thread::sleep_for(std::chrono::seconds(kSweeperIntervalSec));
if (!m_sweeperRunning.load()) break;
sweepExpired();
}
});
m_sweeperThread.detach();
}
void stopSweeper() {
m_sweeperRunning.store(false);
}
// Force-evict ALL pools with refcount=0 (regardless of grace period).
// Called when a new pool creation fails due to low VRAM.
// Returns number of pools evicted.
// MUST be called under m_mutex.
int forceEvictPending() {
int evicted = 0;
for (auto it = m_pools.begin(); it != m_pools.end(); ) {
if (it->second.refcount <= 0) {
logEvent("[EnginePoolManager] FORCE EVICT (VRAM recovery): " + it->first.modelPath);
it = m_pools.erase(it);
evicted++;
} else {
++it;
}
}
return evicted;
}
void sweepExpired() {
std::lock_guard<std::mutex> lock(m_mutex);
auto now = Clock::now();
for (auto it = m_pools.begin(); it != m_pools.end(); ) {
auto& entry = it->second;
// Only evict if refcount is 0 AND evictTime has passed
if (entry.refcount <= 0
&& entry.evictTime != TimePoint{}
&& now >= entry.evictTime)
{
logEvent("[EnginePoolManager] EVICT (expired): " + it->first.modelPath);
it = m_pools.erase(it);
} else {
++it;
}
}
}
std::unordered_map<PoolKey, PoolEntry, PoolKeyHash> m_pools;
mutable std::mutex m_mutex;
std::atomic<bool> m_sweeperRunning{false};
std::thread m_sweeperThread;
};