Ensure the threadsafe for video upload
This commit is contained in:
@@ -2114,24 +2114,6 @@ namespace ANSCENTER
|
||||
constexpr const char* kX264WriterOpts =
|
||||
"video_codec;libx264|crf;26|preset;slow|tune;stillimage|movflags;+faststart";
|
||||
|
||||
std::string prevWriterOpts;
|
||||
bool hadPrevWriterOpts = false;
|
||||
if (const char* prev = std::getenv(kWriterOptsEnv)) {
|
||||
prevWriterOpts = prev;
|
||||
hadPrevWriterOpts = true;
|
||||
}
|
||||
|
||||
auto setX264Opts = [&]() {
|
||||
_putenv_s(kWriterOptsEnv, kX264WriterOpts);
|
||||
};
|
||||
auto restoreOpts = [&]() {
|
||||
if (hadPrevWriterOpts) {
|
||||
_putenv_s(kWriterOptsEnv, prevWriterOpts.c_str());
|
||||
} else {
|
||||
_putenv_s(kWriterOptsEnv, "");
|
||||
}
|
||||
};
|
||||
|
||||
// Try codecs in order of preference.
|
||||
// avc1 / H264 / x264 route to libx264 via FFmpeg when forced via the
|
||||
// video_codec option above. MP4V and MJPG are last-resort fallbacks —
|
||||
@@ -2146,34 +2128,68 @@ namespace ANSCENTER
|
||||
|
||||
bool codecFound = false;
|
||||
std::string usedCodec;
|
||||
for (const auto& [name, fourcc] : codecs) {
|
||||
const bool isH264Family =
|
||||
(name == "avc1" || name == "H264" || name == "x264");
|
||||
|
||||
if (isH264Family) {
|
||||
setX264Opts();
|
||||
} else {
|
||||
restoreOpts();
|
||||
// ──────────────────────────────────────────────────────────────
|
||||
// PARALLEL-SAFETY: OPENCV_FFMPEG_WRITER_OPTIONS is process-wide.
|
||||
// Two workers racing through this block would clobber each
|
||||
// other's env var between setX264Opts() and VideoWriter::open(),
|
||||
// silently mis-tuning or mis-picking a codec for one of them.
|
||||
// Serialize the env-var + open() window with a function-local
|
||||
// static mutex. The frame-write loop after this block is fully
|
||||
// parallel because `videoWriter` is self-contained once opened.
|
||||
// ──────────────────────────────────────────────────────────────
|
||||
{
|
||||
static std::mutex g_writerOptsMutex;
|
||||
std::lock_guard<std::mutex> optsLock(g_writerOptsMutex);
|
||||
|
||||
std::string prevWriterOpts;
|
||||
bool hadPrevWriterOpts = false;
|
||||
if (const char* prev = std::getenv(kWriterOptsEnv)) {
|
||||
prevWriterOpts = prev;
|
||||
hadPrevWriterOpts = true;
|
||||
}
|
||||
|
||||
videoWriter.open(mp4OutputPath, fourcc, fps,
|
||||
cv::Size(videoWidth, videoHeight), true);
|
||||
auto setX264Opts = [&]() {
|
||||
_putenv_s(kWriterOptsEnv, kX264WriterOpts);
|
||||
};
|
||||
auto restoreOpts = [&]() {
|
||||
if (hadPrevWriterOpts) {
|
||||
_putenv_s(kWriterOptsEnv, prevWriterOpts.c_str());
|
||||
} else {
|
||||
_putenv_s(kWriterOptsEnv, "");
|
||||
}
|
||||
};
|
||||
|
||||
if (videoWriter.isOpened()) {
|
||||
std::cout << "Using codec: " << name
|
||||
<< (isH264Family ? " (libx264 forced, crf=26, preset=slow, tune=stillimage)" : "")
|
||||
<< std::endl;
|
||||
usedCodec = name;
|
||||
codecFound = true;
|
||||
break;
|
||||
for (const auto& [name, fourcc] : codecs) {
|
||||
const bool isH264Family =
|
||||
(name == "avc1" || name == "H264" || name == "x264");
|
||||
|
||||
if (isH264Family) {
|
||||
setX264Opts();
|
||||
} else {
|
||||
restoreOpts();
|
||||
}
|
||||
|
||||
videoWriter.open(mp4OutputPath, fourcc, fps,
|
||||
cv::Size(videoWidth, videoHeight), true);
|
||||
|
||||
if (videoWriter.isOpened()) {
|
||||
std::cout << "Using codec: " << name
|
||||
<< (isH264Family ? " (libx264 forced, crf=26, preset=slow, tune=stillimage)" : "")
|
||||
<< std::endl;
|
||||
usedCodec = name;
|
||||
codecFound = true;
|
||||
break;
|
||||
}
|
||||
videoWriter.release();
|
||||
}
|
||||
videoWriter.release();
|
||||
|
||||
// Always restore the env var before releasing the mutex
|
||||
// — don't leak the libx264 override into the rest of the
|
||||
// process.
|
||||
restoreOpts();
|
||||
}
|
||||
|
||||
// Always restore the env var after we're done — don't leak the
|
||||
// libx264 override into the rest of the process.
|
||||
restoreOpts();
|
||||
|
||||
if (!codecFound) {
|
||||
std::cerr << "Error: Could not open video writer with any codec!" << std::endl;
|
||||
return false;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -17,6 +17,7 @@
|
||||
#include <opencv2/opencv.hpp>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
|
||||
namespace ANSCENTER {
|
||||
class ANSULT_API ANSUtilities {
|
||||
@@ -155,7 +156,9 @@ namespace ANSCENTER {
|
||||
private:
|
||||
bool _isUnlockCodeValid{ false };
|
||||
bool _isLicenseValid{ false };
|
||||
bool _bConnected{ false };
|
||||
// Atomic because upload threads read _bConnected without a lock
|
||||
// while Connect() / RetryLoop() can write it.
|
||||
std::atomic<bool> _bConnected{ false };
|
||||
bool _bAwsPath{ true }; // true = virtual-hosted (AWS), false = path-style (MinIO)
|
||||
std::string _licenseKey;
|
||||
std::string _unlockCode;
|
||||
@@ -178,6 +181,13 @@ namespace ANSCENTER {
|
||||
std::string _secretKey;
|
||||
bool _authReady{ false };
|
||||
|
||||
// Request timeouts applied to every new S3Connection.
|
||||
// _connectTimeoutMs : max time to wait for TCP/TLS connect.
|
||||
// _idleTimeoutMs : max gap between bytes on a send/recv.
|
||||
// Atomic so SetTimeouts can update them without holding _configMutex.
|
||||
std::atomic<int> _connectTimeoutMs{ 10000 }; // 10 s
|
||||
std::atomic<int> _idleTimeoutMs { 20000 }; // 20 s
|
||||
|
||||
// Proxy settings
|
||||
std::string _proxyHost;
|
||||
int _proxyPort{ 0 };
|
||||
@@ -194,6 +204,21 @@ namespace ANSCENTER {
|
||||
std::string ExtractFileName(const std::string& filePath);
|
||||
std::string GetContentType(const std::string& filePath);
|
||||
|
||||
// ---- Retry helper for transient / network-stability failures ----
|
||||
// Each upload function delegates its per-attempt body to a lambda.
|
||||
// The lambda returns one of these outcomes:
|
||||
// Success -> stop, UploadWithRetry returns true
|
||||
// Transient -> retry (up to kUploadMaxAttempts total)
|
||||
// Permanent -> stop, UploadWithRetry returns false
|
||||
// The lambda should write a short human-readable reason into
|
||||
// `lastError` on any non-Success outcome so it can be logged.
|
||||
enum class AttemptResult { Success, Transient, Permanent };
|
||||
static constexpr int kUploadMaxAttempts = 20;
|
||||
static constexpr int kUploadRetryDelayMs = 100;
|
||||
bool UploadWithRetry(
|
||||
const std::string& opName,
|
||||
const std::function<AttemptResult(std::string& lastError)>& attemptFn);
|
||||
|
||||
// Background retry
|
||||
std::thread _retryThread;
|
||||
std::atomic<bool> _stopRetry{ false };
|
||||
@@ -212,6 +237,24 @@ namespace ANSCENTER {
|
||||
// Returns: 1 = connected, 0 = failed (bad auth/URL), 2 = no internet (background retry started)
|
||||
[[nodiscard]] int Connect(const std::string& baseDomain, const std::string& bucketRegion, const std::string& serviceName, int port, bool bTls, bool autoReconnect, bool& awsPath);
|
||||
[[nodiscard]] bool SetAuthentication(const std::string& accessKey, const std::string& secretKey);
|
||||
|
||||
// Pre-populate the connection pool with `count` ready-to-use
|
||||
// S3Connections. Creating a connection performs a TLS handshake
|
||||
// inside CreateConnection(); doing it up-front in parallel keeps
|
||||
// the first `count` concurrent uploads from serializing on handshake.
|
||||
// Safe to call after Connect() + SetAuthentication() have succeeded.
|
||||
// Returns the number of connections actually added to the pool
|
||||
// (may be less than `count` on transient network issues).
|
||||
int PrewarmConnectionPool(int count);
|
||||
|
||||
// Update request timeouts used by every new connection, and
|
||||
// propagate them to all already-pooled connections so the new
|
||||
// values take effect on the very next request.
|
||||
// connectMs: max time to wait for TCP/TLS connect (>= 1000 ms)
|
||||
// idleMs : max gap between bytes on send/recv (>= 1000 ms)
|
||||
// Returns false on invalid input (values are left unchanged).
|
||||
bool SetTimeouts(int connectMs, int idleMs);
|
||||
|
||||
[[nodiscard]] std::vector<std::string> ListBuckets();
|
||||
[[nodiscard]] std::vector<std::string> ListBucketObjects(const std::string& bucketName);
|
||||
[[nodiscard]] std::vector<std::string> ListBucketObjectsWithPrefix(const std::string& bucketName, const std::string& prefix);
|
||||
|
||||
Reference in New Issue
Block a user