From a63feab0fff4fee77a1909d68ec60b7cc3aabd2a Mon Sep 17 00:00:00 2001 From: Tuan Nghia Nguyen Date: Fri, 17 Apr 2026 07:03:03 +1000 Subject: [PATCH] Ensure the threadsafe for video upload --- modules/ANSCV/ANSOpenCV.cpp | 94 +- modules/ANSUtilities/ANSAWSS3.cpp | 2132 +++++++++++++-------------- modules/ANSUtilities/ANSUtilities.h | 45 +- 3 files changed, 1117 insertions(+), 1154 deletions(-) diff --git a/modules/ANSCV/ANSOpenCV.cpp b/modules/ANSCV/ANSOpenCV.cpp index edd2eb1..612729b 100644 --- a/modules/ANSCV/ANSOpenCV.cpp +++ b/modules/ANSCV/ANSOpenCV.cpp @@ -2114,24 +2114,6 @@ namespace ANSCENTER constexpr const char* kX264WriterOpts = "video_codec;libx264|crf;26|preset;slow|tune;stillimage|movflags;+faststart"; - std::string prevWriterOpts; - bool hadPrevWriterOpts = false; - if (const char* prev = std::getenv(kWriterOptsEnv)) { - prevWriterOpts = prev; - hadPrevWriterOpts = true; - } - - auto setX264Opts = [&]() { - _putenv_s(kWriterOptsEnv, kX264WriterOpts); - }; - auto restoreOpts = [&]() { - if (hadPrevWriterOpts) { - _putenv_s(kWriterOptsEnv, prevWriterOpts.c_str()); - } else { - _putenv_s(kWriterOptsEnv, ""); - } - }; - // Try codecs in order of preference. // avc1 / H264 / x264 route to libx264 via FFmpeg when forced via the // video_codec option above. MP4V and MJPG are last-resort fallbacks — @@ -2146,34 +2128,68 @@ namespace ANSCENTER bool codecFound = false; std::string usedCodec; - for (const auto& [name, fourcc] : codecs) { - const bool isH264Family = - (name == "avc1" || name == "H264" || name == "x264"); - if (isH264Family) { - setX264Opts(); - } else { - restoreOpts(); + // ────────────────────────────────────────────────────────────── + // PARALLEL-SAFETY: OPENCV_FFMPEG_WRITER_OPTIONS is process-wide. + // Two workers racing through this block would clobber each + // other's env var between setX264Opts() and VideoWriter::open(), + // silently mis-tuning or mis-picking a codec for one of them. + // Serialize the env-var + open() window with a function-local + // static mutex. The frame-write loop after this block is fully + // parallel because `videoWriter` is self-contained once opened. + // ────────────────────────────────────────────────────────────── + { + static std::mutex g_writerOptsMutex; + std::lock_guard optsLock(g_writerOptsMutex); + + std::string prevWriterOpts; + bool hadPrevWriterOpts = false; + if (const char* prev = std::getenv(kWriterOptsEnv)) { + prevWriterOpts = prev; + hadPrevWriterOpts = true; } - videoWriter.open(mp4OutputPath, fourcc, fps, - cv::Size(videoWidth, videoHeight), true); + auto setX264Opts = [&]() { + _putenv_s(kWriterOptsEnv, kX264WriterOpts); + }; + auto restoreOpts = [&]() { + if (hadPrevWriterOpts) { + _putenv_s(kWriterOptsEnv, prevWriterOpts.c_str()); + } else { + _putenv_s(kWriterOptsEnv, ""); + } + }; - if (videoWriter.isOpened()) { - std::cout << "Using codec: " << name - << (isH264Family ? " (libx264 forced, crf=26, preset=slow, tune=stillimage)" : "") - << std::endl; - usedCodec = name; - codecFound = true; - break; + for (const auto& [name, fourcc] : codecs) { + const bool isH264Family = + (name == "avc1" || name == "H264" || name == "x264"); + + if (isH264Family) { + setX264Opts(); + } else { + restoreOpts(); + } + + videoWriter.open(mp4OutputPath, fourcc, fps, + cv::Size(videoWidth, videoHeight), true); + + if (videoWriter.isOpened()) { + std::cout << "Using codec: " << name + << (isH264Family ? " (libx264 forced, crf=26, preset=slow, tune=stillimage)" : "") + << std::endl; + usedCodec = name; + codecFound = true; + break; + } + videoWriter.release(); } - videoWriter.release(); + + // Always restore the env var before releasing the mutex + // — don't leak the libx264 override into the rest of the + // process. + restoreOpts(); } - // Always restore the env var after we're done — don't leak the - // libx264 override into the rest of the process. - restoreOpts(); - if (!codecFound) { std::cerr << "Error: Could not open video writer with any codec!" << std::endl; return false; diff --git a/modules/ANSUtilities/ANSAWSS3.cpp b/modules/ANSUtilities/ANSAWSS3.cpp index d8c7ff0..93566b4 100644 --- a/modules/ANSUtilities/ANSAWSS3.cpp +++ b/modules/ANSUtilities/ANSAWSS3.cpp @@ -12,6 +12,10 @@ #include #include #include +#include +#include +#include +#include static bool ansawss3LicenceValid = false; static std::once_flag ansawss3LicenseOnceFlag; @@ -43,8 +47,23 @@ namespace ANSCENTER } } - // Private helper function to extract file name from a path - // Helper function to extract filename from path + // Builds a per-call unique suffix for scratch file names used by + // multipart uploads. Combining thread id, instance address, and a + // monotonically-increasing counter guarantees no collision between + // parallel uploads — including two threads uploading the same source + // file at the same time (Fix 2). + static std::string MakeUniqueMultipartToken(const void* instancePtr) { + static std::atomic counter{0}; + std::ostringstream oss; + oss << std::hex + << std::hash{}(std::this_thread::get_id()) << "_" + << reinterpret_cast(instancePtr) << "_" + << counter.fetch_add(1, std::memory_order_relaxed); + return oss.str(); + } + + // Private helper function to extract file name from a path + // Helper function to extract filename from path std::string ANSAWSS3::ExtractFileName(const std::string& filePath) { size_t pos = filePath.find_last_of("/\\"); if (pos != std::string::npos) { @@ -222,6 +241,12 @@ namespace ANSCENTER std::unique_ptr ANSAWSS3::CreateConnection() { auto conn = std::make_unique(); + // Apply request timeouts BEFORE Connect() so ConnectTimeoutMs takes + // effect on this very handshake. IdleTimeoutMs is sticky and applies + // to every subsequent request on this CkRest. + conn->rest.put_ConnectTimeoutMs(_connectTimeoutMs.load()); + conn->rest.put_IdleTimeoutMs(_idleTimeoutMs.load()); + // Connect if (!conn->rest.Connect(_fullAWSURL.c_str(), _port, _bTls, _bAutoReconnect)) { _logger.LogError("ANSAWSS3::CreateConnection", conn->rest.lastErrorText(), __FILE__, __LINE__); @@ -278,6 +303,148 @@ namespace ANSCENTER _pool.push_back(std::move(conn)); } + // Pre-populate the pool with `count` ready-to-use connections. Each + // connection performs a TLS handshake; doing `count` of them in parallel + // (rather than serially as AcquireConnection would on a cold pool) cuts + // the first-upload latency for workloads that fan out right after + // Connect() / SetAuthentication(). + int ANSAWSS3::PrewarmConnectionPool(int count) { + if (count <= 0) return 0; + if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { + _logger.LogError("ANSAWSS3::PrewarmConnectionPool", + "Not ready: call Connect() and SetAuthentication() first", + __FILE__, __LINE__); + return 0; + } + + // Hold _configMutex once across the whole prewarm. It blocks config + // writes for the duration, but does NOT block individual worker + // threads since CreateConnection() itself does not re-acquire it. + std::lock_guard cfgLk(_configMutex); + + std::vector> built(count); + std::vector workers; + workers.reserve(count); + for (int i = 0; i < count; ++i) { + workers.emplace_back([this, i, &built]() { + try { + built[i] = CreateConnection(); + } catch (...) { + built[i] = nullptr; + } + }); + } + for (auto& t : workers) t.join(); + + int added = 0; + { + std::lock_guard lk(_poolMutex); + for (auto& conn : built) { + if (conn) { + _pool.push_back(std::move(conn)); + ++added; + } + } + } + _logger.LogDebug("ANSAWSS3::PrewarmConnectionPool", + "Added " + std::to_string(added) + "/" + std::to_string(count) + " connections to pool", + __FILE__, __LINE__); + return added; + } + + // Update the per-connection request timeouts. New values are applied + // immediately to all pooled connections (IdleTimeoutMs is picked up on + // the next request; ConnectTimeoutMs is harmless on already-connected + // CkRest instances but takes effect if they ever reconnect) and baked + // into the defaults used by future CreateConnection() calls. + bool ANSAWSS3::SetTimeouts(int connectMs, int idleMs) { + if (connectMs < 1000 || idleMs < 1000) { + _logger.LogError("ANSAWSS3::SetTimeouts", + "Timeout values must be >= 1000 ms (got connect=" + + std::to_string(connectMs) + ", idle=" + std::to_string(idleMs) + ")", + __FILE__, __LINE__); + return false; + } + + _connectTimeoutMs.store(connectMs); + _idleTimeoutMs.store(idleMs); + + // Propagate to existing pooled connections so already-warm workers + // start using the new values on their next request. + { + std::lock_guard lk(_poolMutex); + for (auto& conn : _pool) { + if (conn) { + conn->rest.put_ConnectTimeoutMs(connectMs); + conn->rest.put_IdleTimeoutMs(idleMs); + } + } + } + + _logger.LogDebug("ANSAWSS3::SetTimeouts", + "connect=" + std::to_string(connectMs) + "ms, idle=" + std::to_string(idleMs) + "ms", + __FILE__, __LINE__); + return true; + } + + // Drives a retry loop for a single upload operation. See the declaration + // comment in ANSUtilities.h for the AttemptResult contract. + // + // Retry policy (shared across all upload entry points): + // - kUploadMaxAttempts attempts total + // - kUploadRetryDelayMs between attempts (no delay after the last one) + // - std::exception thrown inside attemptFn is treated as Transient + // - Any other exception is treated as Permanent (bail immediately) + bool ANSAWSS3::UploadWithRetry( + const std::string& opName, + const std::function& attemptFn) { + + std::string lastError; + + for (int attempt = 1; attempt <= kUploadMaxAttempts; ++attempt) { + AttemptResult result = AttemptResult::Transient; + try { + result = attemptFn(lastError); + } + catch (const std::exception& e) { + lastError = std::string("Exception: ") + e.what(); + result = AttemptResult::Transient; + } + catch (...) { + lastError = "Unknown exception"; + result = AttemptResult::Permanent; + } + + if (result == AttemptResult::Success) { + return true; + } + + if (result == AttemptResult::Permanent) { + _logger.LogError(opName, + "Permanent failure (no retry): " + lastError, + __FILE__, __LINE__); + return false; + } + + // Transient — retry if attempts remain. + if (attempt < kUploadMaxAttempts) { + _logger.LogDebug(opName, + "Attempt " + std::to_string(attempt) + "/" + + std::to_string(kUploadMaxAttempts) + " failed: " + lastError + + " — retrying in " + std::to_string(kUploadRetryDelayMs) + "ms", + __FILE__, __LINE__); + std::this_thread::sleep_for( + std::chrono::milliseconds(kUploadRetryDelayMs)); + } + } + + _logger.LogError(opName, + "Upload failed after " + std::to_string(kUploadMaxAttempts) + + " attempts. Last error: " + lastError, + __FILE__, __LINE__); + return false; + } + void ANSAWSS3::CheckLicense() { // Note: caller (Initialize) already holds _configMutex try { @@ -438,24 +605,37 @@ namespace ANSCENTER __FILE__, __LINE__); return false; } - std::lock_guard lock(_configMutex); - try { - _accessKey = accessKey; - _secretKey = secretKey; - _authReady = true; - // Clear pool so new connections pick up new credentials - { + // Write credentials + clear the pool under _configMutex. The lock is + // released before the auto-prewarm below so PrewarmConnectionPool + // (which also takes _configMutex) doesn't deadlock. + { + std::lock_guard lock(_configMutex); + try { + _accessKey = accessKey; + _secretKey = secretKey; + _authReady = true; + + // Clear pool so new connections pick up new credentials. std::lock_guard lk(_poolMutex); _pool.clear(); } + catch (const std::exception& e) { + _logger.LogFatal("ANSAWSS3::SetAuthentication", e.what(), __FILE__, __LINE__); + return false; + } + } - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::SetAuthentication", e.what(), __FILE__, __LINE__); - return false; - } + // Auto-prewarm the connection pool so the first N concurrent uploads + // don't serialize on TLS handshake. No-op if not connected yet. + // Callers wanting a different size can call PrewarmConnectionPool() + // directly at any time — this is just a sensible default that keeps + // existing LabVIEW callers (which only invoke SetAuthentication via + // the C wrapper) warm without any API change on their side. + static constexpr int kDefaultPrewarmCount = 8; + PrewarmConnectionPool(kDefaultPrewarmCount); + + return true; } std::vector ANSAWSS3::ListBuckets() { // Early validation checks @@ -1345,1268 +1525,992 @@ namespace ANSCENTER // Uploads text data from a file to the specified S3 bucket bool ANSAWSS3::UploadTextData(const std::string& bucketName, const std::string& textFilePath, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadTextData"; + + // Early validation (permanent — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadTextData", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || textFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadTextData", - "Bucket name or file path is empty", - __FILE__, __LINE__); + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); return false; } - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadTextData", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Read text file - CkFileAccess fac; - const char* fileContents = fac.readEntireTextFile(textFilePath.c_str(), "utf-8"); - if (!fac.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadTextData - Failed to read file", - fac.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Validate file contents - if (fileContents == nullptr || fileContents[0] == '\0') { - _logger.LogError("ANSAWSS3::UploadTextData", - "File is empty: " + textFilePath, - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Extract filename - std::string fileName = ExtractFileName(textFilePath); - if (fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadTextData", - "Failed to extract filename from path: " + textFilePath, - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Determine content type - std::string contentType = GetContentType(textFilePath); - - // Set headers - conn->rest.AddHeader("Content-Type", contentType.c_str()); - conn->rest.AddHeader("Content-Encoding", "gzip"); - conn->rest.AddHeader("Expect", "100-continue"); - - // Construct S3 object path - std::string objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - - _logger.LogDebug("ANSAWSS3::UploadTextData", - "Uploading file: " + fileName + " (" + contentType + ")", + // Filename and content type are attempt-independent. + const std::string fileName = ExtractFileName(textFilePath); + if (fileName.empty()) { + _logger.LogError(kOp, + "Failed to extract filename from path: " + textFilePath, __FILE__, __LINE__); + return false; + } + const std::string contentType = GetContentType(textFilePath); + const std::string objectPath = + _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - // Upload to S3 - const char* responseBodyStr = conn->rest.fullRequestString("PUT", objectPath.c_str(), fileContents); - - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadTextData - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Check HTTP status code - int statusCode = conn->rest.get_ResponseStatusCode(); - if (statusCode != 200) { - std::string errorMsg = "HTTP " + std::to_string(statusCode); - if (responseBodyStr != nullptr && responseBodyStr[0] != '\0') { - errorMsg += " - " + std::string(responseBodyStr); + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; } - _logger.LogError("ANSAWSS3::UploadTextData", errorMsg, __FILE__, __LINE__); + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + // Read the file on each attempt (cheap; keeps the body fresh). + CkFileAccess fac; + const char* fileContents = fac.readEntireTextFile(textFilePath.c_str(), "utf-8"); + if (!fac.get_LastMethodSuccess()) { + // File read failure is permanent (disk/IO, not network). + lastError = std::string("Failed to read file: ") + fac.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Permanent; + } + if (fileContents == nullptr || fileContents[0] == '\0') { + lastError = "File is empty: " + textFilePath; + ReleaseConnection(std::move(conn)); + return AttemptResult::Permanent; + } + + conn->rest.AddHeader("Content-Type", contentType.c_str()); + conn->rest.AddHeader("Content-Encoding", "gzip"); + conn->rest.AddHeader("Expect", "100-continue"); + + const char* responseBodyStr = + conn->rest.fullRequestString("PUT", objectPath.c_str(), fileContents); + + if (!conn->rest.get_LastMethodSuccess()) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; + _logger.LogDebug(kOp, + "Successfully uploaded: " + fileName + " | URL: " + uploadedFilePath, + __FILE__, __LINE__); + ReleaseConnection(std::move(conn)); + return AttemptResult::Success; + } + + lastError = "HTTP " + std::to_string(statusCode); + if (responseBodyStr != nullptr && responseBodyStr[0] != '\0') { + lastError += " - " + std::string(responseBodyStr); + } ReleaseConnection(std::move(conn)); - return false; - } - { - std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadTextData", - "Successfully uploaded: " + fileName + " to bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadTextData", e.what(), __FILE__, __LINE__); - return false; - } + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); } bool ANSAWSS3::UploadBinaryData(const std::string& bucketName, const std::string& dataFilePath, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadBinaryData"; + + // Early validation (permanent — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadBinaryData", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || dataFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadBinaryData", - "Bucket name or file path is empty", - __FILE__, __LINE__); + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); return false; } - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadBinaryData", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Load binary file - CkBinData binData; - if (!binData.LoadFile(dataFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadBinaryData - Failed to load file", - binData.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Extract filename from path - std::string fileName = ExtractFileName(dataFilePath); - if (fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadBinaryData", - "Failed to extract filename from path: " + dataFilePath, - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Determine content type from file extension - std::string contentType = GetContentType(dataFilePath); - - // Set headers - conn->rest.AddHeader("Content-Type", contentType.c_str()); - conn->rest.AddHeader("Expect", "100-continue"); - - // Construct S3 object path - std::string objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - - _logger.LogDebug("ANSAWSS3::UploadBinaryData", - "Uploading file: " + fileName + " (" + contentType + ")", + const std::string fileName = ExtractFileName(dataFilePath); + if (fileName.empty()) { + _logger.LogError(kOp, + "Failed to extract filename from path: " + dataFilePath, __FILE__, __LINE__); + return false; + } + const std::string contentType = GetContentType(dataFilePath); + const std::string objectPath = + _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - // Upload to S3 - CkStringBuilder sbResponse; - if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { - _logger.LogError("ANSAWSS3::UploadBinaryData - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Check HTTP status code - int statusCode = conn->rest.get_ResponseStatusCode(); - if (statusCode != 200) { - std::string errorMsg = "HTTP " + std::to_string(statusCode); - std::string response = sbResponse.getAsString(); - if (!response.empty()) { - errorMsg += " - " + response; + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; } - _logger.LogError("ANSAWSS3::UploadBinaryData", errorMsg, __FILE__, __LINE__); + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + CkBinData binData; + if (!binData.LoadFile(dataFilePath.c_str())) { + lastError = std::string("Failed to load file: ") + binData.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Permanent; // disk/IO, not network + } + + conn->rest.AddHeader("Content-Type", contentType.c_str()); + conn->rest.AddHeader("Expect", "100-continue"); + + CkStringBuilder sbResponse; + if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; + _logger.LogDebug(kOp, + "Successfully uploaded: " + fileName + " | URL: " + uploadedFilePath, + __FILE__, __LINE__); + ReleaseConnection(std::move(conn)); + return AttemptResult::Success; + } + + lastError = "HTTP " + std::to_string(statusCode); + std::string body = sbResponse.getAsString(); + if (!body.empty()) lastError += " - " + body; ReleaseConnection(std::move(conn)); - return false; - } - { - std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadBinaryData", - "Successfully uploaded: " + fileName + " to bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadBinaryData", e.what(), __FILE__, __LINE__); - return false; - } + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); } bool ANSAWSS3::UploadPrefixBinaryData(const std::string& bucketName, const std::string& prefix, const std::string& dataFilePath, const std::string& objectName, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadPrefixBinaryData"; + + // Early validation (permanent — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || dataFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData", - "Bucket name or file path is empty", - __FILE__, __LINE__); + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); return false; } - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadPrefixBinaryData", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Load binary file - CkBinData binData; - if (!binData.LoadFile(dataFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData - Failed to load file", - binData.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Extract filename from path - std::string fileName = objectName; + // Resolve filename (from objectName, fall back to path). + std::string fileName = objectName; + if (fileName.empty()) { + fileName = ExtractFileName(dataFilePath); if (fileName.empty()) { - fileName = ExtractFileName(dataFilePath); - if (fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData", - "Failed to extract filename from path: " + dataFilePath, + _logger.LogError(kOp, + "Failed to extract filename from path: " + dataFilePath, + __FILE__, __LINE__); + return false; + } + } + + // Build once: the object key and the HTTP request path. Keeping + // them separate avoids the path-style bucket-duplication bug in + // the returned URL. + std::string objectKey; // "/" — no leading '/' + if (!prefix.empty()) { + std::string normalizedPrefix = prefix; + if (!normalizedPrefix.empty() && normalizedPrefix.front() == '/') { + normalizedPrefix = normalizedPrefix.substr(1); + } + if (!normalizedPrefix.empty() && normalizedPrefix.back() != '/') { + normalizedPrefix += '/'; + } + objectKey = normalizedPrefix; + } + objectKey += fileName; + const std::string objectPath = _bAwsPath + ? ("/" + objectKey) + : ("/" + bucketName + "/" + objectKey); + + const std::string contentType = GetContentType(dataFilePath); + + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; + } + + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + CkBinData binData; + if (!binData.LoadFile(dataFilePath.c_str())) { + lastError = std::string("Failed to load file: ") + binData.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Permanent; + } + + conn->rest.AddHeader("Content-Type", contentType.c_str()); + conn->rest.AddHeader("Expect", "100-continue"); + + CkStringBuilder sbResponse; + if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { + // Build canonical path-style URL from parts — independent + // of the addressing style we actually used. + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + objectKey; + _logger.LogDebug(kOp, + "Successfully uploaded to: " + objectKey + " | URL: " + uploadedFilePath, __FILE__, __LINE__); ReleaseConnection(std::move(conn)); - return false; - } - } - - // Construct object path with optional prefix - std::string objectPath; - if (!prefix.empty()) { - // Normalize prefix: ensure it doesn't start with / but ends with / - std::string normalizedPrefix = prefix; - - // Remove leading slash if present - if (normalizedPrefix[0] == '/') { - normalizedPrefix = normalizedPrefix.substr(1); + return AttemptResult::Success; } - // Add trailing slash if not present - if (!normalizedPrefix.empty() && normalizedPrefix.back() != '/') { - normalizedPrefix += '/'; - } - - objectPath = _bAwsPath ? ("/" + normalizedPrefix + fileName) : ("/" + bucketName + "/" + normalizedPrefix + fileName); - } - else { - objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - } - - // Determine content type from file extension - std::string contentType = GetContentType(dataFilePath); - - // Set headers - conn->rest.AddHeader("Content-Type", contentType.c_str()); - conn->rest.AddHeader("Expect", "100-continue"); - - _logger.LogDebug("ANSAWSS3::UploadPrefixBinaryData", - "Uploading file: " + fileName + " (" + contentType + ") to: " + objectPath, - __FILE__, __LINE__); - - // Upload to S3 - CkStringBuilder sbResponse; - if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); + lastError = "HTTP " + std::to_string(statusCode); + std::string body = sbResponse.getAsString(); + if (!body.empty()) lastError += " - " + body; ReleaseConnection(std::move(conn)); - return false; - } - // Check HTTP status code - int statusCode = conn->rest.get_ResponseStatusCode(); - if (statusCode != 200) { - std::string errorMsg = "HTTP " + std::to_string(statusCode); - std::string response = sbResponse.getAsString(); - if (!response.empty()) { - errorMsg += " - " + response; - } - - _logger.LogError("ANSAWSS3::UploadPrefixBinaryData", errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - { - std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + objectPath; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadPrefixBinaryData", - "Successfully uploaded to: " + objectPath + " in bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadPrefixBinaryData", e.what(), __FILE__, __LINE__); - return false; - } + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); } bool ANSAWSS3::UploadFileStream(const std::string& bucketName, const std::string& dataFilePath, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadFileStream"; + + // Early validation (permanent — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadFileStream", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || dataFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadFileStream", - "Bucket name or file path is empty", - __FILE__, __LINE__); + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); return false; } - bool uploadSuccess = false; + const std::string fileName = ExtractFileName(dataFilePath); + const std::string objectPath = + _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadFileStream", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Set up file stream - CkStream fileStream; - fileStream.put_SourceFile(dataFilePath.c_str()); - - // Extract filename for S3 object key - std::string fileName = ExtractFileName(dataFilePath); - std::string objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - - // Upload to S3 - // If the application provided the SHA-256 hash of the file contents (as shown above) - // then file is streamed and never has to completely reside in memory. - const char* responseStr = conn->rest.fullRequestStream("PUT", objectPath.c_str(), fileStream); - - if (conn->rest.get_LastMethodSuccess() != true) { - _logger.LogError("ANSAWSS3::UploadFileStream - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - } - else if (conn->rest.get_ResponseStatusCode() != 200) { - // Examine the request/response to see what happened. - std::string errorMsg = "HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseStr != nullptr && responseStr[0] != '\0') { - errorMsg += " - " + std::string(responseStr); + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; } - _logger.LogError("ANSAWSS3::UploadFileStream", errorMsg, __FILE__, __LINE__); - } - else { - // Success - { + + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + // Fresh stream per attempt (it holds OS file handle state). + CkStream fileStream; + fileStream.put_SourceFile(dataFilePath.c_str()); + + const char* responseStr = + conn->rest.fullRequestStream("PUT", objectPath.c_str(), fileStream); + + if (conn->rest.get_LastMethodSuccess() != true) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadFileStream", - "Successfully uploaded file: " + fileName + " to bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - uploadSuccess = true; - } - - ReleaseConnection(std::move(conn)); - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadFileStream", - std::string("Exception: ") + e.what(), - __FILE__, __LINE__); - } - - return uploadSuccess; - } - bool ANSAWSS3::UploadMultipartData(const std::string& bucketName,const std::string& dataFilePath, std::string& uploadedFilePath, int partSize) { - // Early validation checks - if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", - __FILE__, __LINE__); - return false; - } - - if (bucketName.empty() || dataFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "Bucket name or file path is empty", - __FILE__, __LINE__); - return false; - } - - // Validate part size (minimum 5MB enforced by AWS) - if (partSize < 5242880) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "Part size must be at least 5MB (5242880 bytes)", - __FILE__, __LINE__); - return false; - } - - try { - // Check if file exists - CkFileAccess fac; - if (!fac.FileExists(dataFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "File not found: " + dataFilePath, - __FILE__, __LINE__); - return false; - } - - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadMultipartData", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Extract filename and construct object path - std::string fileName = ExtractFileName(dataFilePath); - if (fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "Failed to extract filename from path: " + dataFilePath, - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - std::string objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - - // ==================================================================== - // STEP 1: INITIATE MULTIPART UPLOAD - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Step 1: Initiating multipart upload for: " + fileName, - __FILE__, __LINE__); - - conn->rest.ClearAllQueryParams(); - conn->rest.AddQueryParam("uploads", ""); - - const char* responseXml = conn->rest.fullRequestNoBody("POST", objectPath.c_str()); - - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadMultipartData - Initiation failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseXml != nullptr && responseXml[0] != '\0') { - errorMsg += " - " + std::string(responseXml); - } - _logger.LogError("ANSAWSS3::UploadMultipartData - Initiation failed", - errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Parse response to get UploadId - CkXml xmlInit; - if (!xmlInit.LoadXml(responseXml)) { - _logger.LogError("ANSAWSS3::UploadMultipartData - XML parse error", - xmlInit.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - const char* uploadId = xmlInit.getChildContent("UploadId"); - if (uploadId == nullptr || uploadId[0] == '\0') { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "UploadId not found in response", - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Multipart upload initiated. UploadId: " + std::string(uploadId), - __FILE__, __LINE__); - - // ==================================================================== - // STEP 2: UPLOAD PARTS - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Step 2: Uploading parts", - __FILE__, __LINE__); - - // Calculate number of parts - fac.OpenForRead(dataFilePath.c_str()); - int numParts = fac.GetNumBlocks(partSize); - fac.FileClose(); - - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "File will be uploaded in " + std::to_string(numParts) + " parts", - __FILE__, __LINE__); - - // Setup parts list XML file path - std::string fileFolderPath = dataFilePath.substr(0, dataFilePath.find_last_of("/\\")); - std::string partsListFilePath = fileFolderPath + "/partsList_" + fileName + ".xml"; - - // Load or create parts list XML - CkXml partsListXml; - if (fac.FileExists(partsListFilePath.c_str())) { - if (!partsListXml.LoadXmlFile(partsListFilePath.c_str())) { - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Failed to load existing parts list, creating new one", - __FILE__, __LINE__); - } - } - - partsListXml.put_Tag("CompleteMultipartUpload"); - - // Upload each part - CkStringBuilder sbPartNumber; - int successfulParts = 0; - - for (int partNumber = 1; partNumber <= numParts; ++partNumber) { - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Processing part " + std::to_string(partNumber) + " of " + - std::to_string(numParts), - __FILE__, __LINE__); - - // Convert part number to string - sbPartNumber.Clear(); - sbPartNumber.AppendInt(partNumber); - - // Check if this part was already uploaded - bool partAlreadyUploaded = false; - int numUploadedParts = partsListXml.get_NumChildren(); - - if (numUploadedParts > 0) { - CkXml* xRec0 = partsListXml.GetChild(0); - CkXml* foundRec = xRec0->FindNextRecord("PartNumber", sbPartNumber.getAsString()); - - if (xRec0->get_LastMethodSuccess()) { - partAlreadyUploaded = true; - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Part " + std::to_string(partNumber) + " already uploaded, skipping", - __FILE__, __LINE__); - successfulParts++; - delete foundRec; - } - delete xRec0; - } - - if (!partAlreadyUploaded) { - // Setup stream for this part - CkStream fileStream; - fileStream.put_SourceFile(dataFilePath.c_str()); - fileStream.put_SourceFilePartSize(partSize); - fileStream.put_SourceFilePart(partNumber - 1); // 0-based index - - // Set query parameters for this part - conn->rest.ClearAllQueryParams(); - conn->rest.AddQueryParam("partNumber", sbPartNumber.getAsString()); - conn->rest.AddQueryParam("uploadId", uploadId); - - // Upload the part - CRITICAL FIX: Use objectPath instead of hardcoded name - const char* responseStr = conn->rest.fullRequestStream("PUT", objectPath.c_str(), fileStream); - - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "Failed to upload part " + std::to_string(partNumber) + ": " + - conn->rest.lastErrorText(), - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "Part " + std::to_string(partNumber) + - " - HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseStr != nullptr && responseStr[0] != '\0') { - errorMsg += " - " + std::string(responseStr); - } - - _logger.LogError("ANSAWSS3::UploadMultipartData - Upload failed", - errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Get ETag from response header - const char* etag = conn->rest.responseHdrByName("ETag"); - if (!conn->rest.get_LastMethodSuccess() || etag == nullptr || etag[0] == '\0') { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "ETag not found in response for part " + std::to_string(partNumber), - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Add part to XML list - CkXml* xPart = partsListXml.NewChild("Part", ""); - xPart->NewChildInt2("PartNumber", partNumber); - xPart->NewChild2("ETag", etag); - delete xPart; - - // Save parts list after each successful upload - if (!partsListXml.SaveXml(partsListFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadMultipartData", - "Failed to save parts list: " + std::string(partsListXml.lastErrorText()), - __FILE__, __LINE__); - } - - successfulParts++; - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Part " + std::to_string(partNumber) + " uploaded successfully", - __FILE__, __LINE__); - } - } - - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "All parts uploaded (" + std::to_string(successfulParts) + "/" + - std::to_string(numParts) + ")", - __FILE__, __LINE__); - - // ==================================================================== - // STEP 3: COMPLETE MULTIPART UPLOAD - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Step 3: Completing multipart upload", - __FILE__, __LINE__); - - conn->rest.ClearAllQueryParams(); - conn->rest.AddQueryParam("uploadId", uploadId); - - responseXml = conn->rest.fullRequestString("POST", objectPath.c_str(), partsListXml.getXml()); - - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadMultipartData - Completion failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseXml != nullptr && responseXml[0] != '\0') { - errorMsg += " - " + std::string(responseXml); - } - - _logger.LogError("ANSAWSS3::UploadMultipartData - Completion failed", - errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Parse completion response - CkXml xmlComplete; - if (xmlComplete.LoadXml(responseXml)) { - const char* location = xmlComplete.getChildContent("Location"); - const char* eTag = xmlComplete.getChildContent("ETag"); - - if (location != nullptr) { - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Upload completed. Location: " + std::string(location), - __FILE__, __LINE__); - } - } - - // Clean up parts list file on success - if (fac.FileExists(partsListFilePath.c_str())) { - fac.FileDelete(partsListFilePath.c_str()); - } - - std::string scheme = _bTls ? "https://" : "http://"; - uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; - _logger.LogDebug("ANSAWSS3::UploadMultipartData", - "Multipart upload completed successfully for: " + fileName, - __FILE__, __LINE__); - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadMultipartData", e.what(), __FILE__, __LINE__); - return false; - } - } - bool ANSAWSS3::UploadPrefixMultipartData(const std::string& bucketName, const std::string& prefix,const std::string& dataFilePath, const std::string& objectName, std::string& uploadedFilePath, int partSize) { - // Early validation checks - if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", - __FILE__, __LINE__); - return false; - } - - if (bucketName.empty() || dataFilePath.empty()) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "Bucket name or file path is empty", - __FILE__, __LINE__); - return false; - } - - // Validate part size (minimum 5MB enforced by AWS) - if (partSize < 5242880) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "Part size must be at least 5MB (5242880 bytes)", - __FILE__, __LINE__); - return false; - } - - try { - // Check if file exists - CkFileAccess fac; - if (!fac.FileExists(dataFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "File not found: " + dataFilePath, - __FILE__, __LINE__); - return false; - } - - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadPrefixMultipartData", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Extract filename and construct object path with optional prefix - std::string fileName = objectName; - - if (fileName.empty()) { - fileName=ExtractFileName(dataFilePath); - if (fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "Failed to extract filename from path: " + dataFilePath, + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; + _logger.LogDebug(kOp, + "Successfully uploaded file: " + fileName + " | URL: " + uploadedFilePath, __FILE__, __LINE__); ReleaseConnection(std::move(conn)); - return false; - } - } - - // Construct object path with prefix support - std::string objectPath; - if (!prefix.empty()) { - // Normalize prefix: ensure it doesn't start with / but ends with / - std::string normalizedPrefix = prefix; - - // Remove leading slash if present - if (normalizedPrefix[0] == '/') { - normalizedPrefix = normalizedPrefix.substr(1); + return AttemptResult::Success; } - // Add trailing slash if not present - if (!normalizedPrefix.empty() && normalizedPrefix.back() != '/') { - normalizedPrefix += '/'; + lastError = "HTTP " + std::to_string(statusCode); + if (responseStr != nullptr && responseStr[0] != '\0') { + lastError += " - " + std::string(responseStr); } - - objectPath = _bAwsPath ? ("/" + normalizedPrefix + fileName) : ("/" + bucketName + "/" + normalizedPrefix + fileName); - } - else { - objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - } - - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Object path: " + objectPath, - __FILE__, __LINE__); - - // ==================================================================== - // STEP 1: INITIATE MULTIPART UPLOAD - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Step 1: Initiating multipart upload for: " + fileName, - __FILE__, __LINE__); - - conn->rest.ClearAllQueryParams(); - conn->rest.AddQueryParam("uploads", ""); - - const char* responseXml = conn->rest.fullRequestNoBody("POST", objectPath.c_str()); - - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - Initiation failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); ReleaseConnection(std::move(conn)); + + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); + } + bool ANSAWSS3::UploadMultipartData(const std::string& bucketName,const std::string& dataFilePath, std::string& uploadedFilePath, int partSize) { + const std::string kOp = "ANSAWSS3::UploadMultipartData"; + + // Early validation (permanent — do NOT retry) + if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { + _logger.LogError(kOp, + !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", + __FILE__, __LINE__); + return false; + } + + if (bucketName.empty() || dataFilePath.empty()) { + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); + return false; + } + + if (partSize < 5242880) { + _logger.LogError(kOp, + "Part size must be at least 5MB (5242880 bytes)", + __FILE__, __LINE__); + return false; + } + + { + CkFileAccess fac; + if (!fac.FileExists(dataFilePath.c_str())) { + _logger.LogError(kOp, "File not found: " + dataFilePath, __FILE__, __LINE__); return false; } + } - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseXml != nullptr && responseXml[0] != '\0') { - errorMsg += " - " + std::string(responseXml); + const std::string fileName = ExtractFileName(dataFilePath); + if (fileName.empty()) { + _logger.LogError(kOp, + "Failed to extract filename from path: " + dataFilePath, + __FILE__, __LINE__); + return false; + } + + const std::string objectPath = + _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); + const std::string fileFolderPath = + dataFilePath.substr(0, dataFilePath.find_last_of("/\\")); + // Fix 2: include a per-call unique token in the scratch-file name so + // two parallel multipart uploads of the same local file don't share + // the same partsList XML (which would race on create/delete). + const std::string partsListFilePath = + fileFolderPath + "/partsList_" + fileName + "_" + + MakeUniqueMultipartToken(this) + ".xml"; + + // Note on retry semantics for multipart: + // Each attempt performs a fresh Initiate → Parts → Complete cycle. We + // reset the on-disk partsList XML at the start of each attempt because + // ETags returned by a prior attempt are bound to that attempt's + // UploadId and are not valid for a new Initiate. Parts uploaded during + // a failed attempt therefore become orphans on S3 until the service's + // multipart-lifecycle rule cleans them up (default 7 days, configurable). + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + CkFileAccess fac; + + // Ensure no stale parts list from a prior attempt. + if (fac.FileExists(partsListFilePath.c_str())) { + fac.FileDelete(partsListFilePath.c_str()); } - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - Initiation failed", - errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - // Parse response to get UploadId - CkXml xmlInit; - if (!xmlInit.LoadXml(responseXml)) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - XML parse error", - xmlInit.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - const char* uploadId = xmlInit.getChildContent("UploadId"); - if (uploadId == nullptr || uploadId[0] == '\0') { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "UploadId not found in response", - __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Multipart upload initiated. UploadId: " + std::string(uploadId), - __FILE__, __LINE__); - - // ==================================================================== - // STEP 2: UPLOAD PARTS - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Step 2: Uploading parts", - __FILE__, __LINE__); - - // Calculate number of parts - fac.OpenForRead(dataFilePath.c_str()); - int numParts = fac.GetNumBlocks(partSize); - fac.FileClose(); - - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "File will be uploaded in " + std::to_string(numParts) + " parts", - __FILE__, __LINE__); - - // Setup parts list XML file path (use sanitized filename for local storage) - std::string fileFolderPath = dataFilePath.substr(0, dataFilePath.find_last_of("/\\")); - - // Create a safe filename for the parts list (replace path separators) - std::string safeFileName = fileName; - std::replace(safeFileName.begin(), safeFileName.end(), '/', '_'); - std::replace(safeFileName.begin(), safeFileName.end(), '\\', '_'); - - std::string partsListFilePath = fileFolderPath + "/partsList_" + safeFileName + ".xml"; - - // Load or create parts list XML - CkXml partsListXml; - if (fac.FileExists(partsListFilePath.c_str())) { - if (!partsListXml.LoadXmlFile(partsListFilePath.c_str())) { - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Failed to load existing parts list, creating new one", - __FILE__, __LINE__); + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; } - } - partsListXml.put_Tag("CompleteMultipartUpload"); + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } - // Upload each part - CkStringBuilder sbPartNumber; - int successfulParts = 0; + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); - for (int partNumber = 1; partNumber <= numParts; ++partNumber) { - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Processing part " + std::to_string(partNumber) + " of " + - std::to_string(numParts), + // ---- STEP 1: INITIATE ---- + conn->rest.ClearAllQueryParams(); + conn->rest.AddQueryParam("uploads", ""); + const char* responseXml = conn->rest.fullRequestNoBody("POST", objectPath.c_str()); + + if (!conn->rest.get_LastMethodSuccess()) { + lastError = std::string("Initiate failed: ") + conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + int initStatus = conn->rest.get_ResponseStatusCode(); + if (initStatus != 200) { + lastError = "Initiate HTTP " + std::to_string(initStatus); + if (responseXml != nullptr && responseXml[0] != '\0') lastError += " - " + std::string(responseXml); + ReleaseConnection(std::move(conn)); + bool retryable = (initStatus >= 500 && initStatus <= 599) || initStatus == 408 || initStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + } + CkXml xmlInit; + if (!xmlInit.LoadXml(responseXml)) { + lastError = std::string("Initiate XML parse error: ") + xmlInit.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + const char* uploadId = xmlInit.getChildContent("UploadId"); + if (uploadId == nullptr || uploadId[0] == '\0') { + lastError = "UploadId not found in Initiate response"; + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + _logger.LogDebug(kOp, + "Multipart initiated. UploadId: " + std::string(uploadId), __FILE__, __LINE__); - // Convert part number to string - sbPartNumber.Clear(); - sbPartNumber.AppendInt(partNumber); + // ---- STEP 2: UPLOAD PARTS ---- + fac.OpenForRead(dataFilePath.c_str()); + int numParts = fac.GetNumBlocks(partSize); + fac.FileClose(); - // Check if this part was already uploaded - bool partAlreadyUploaded = false; - int numUploadedParts = partsListXml.get_NumChildren(); + CkXml partsListXml; + partsListXml.put_Tag("CompleteMultipartUpload"); - if (numUploadedParts > 0) { - CkXml* xRec0 = partsListXml.GetChild(0); - CkXml* foundRec = xRec0->FindNextRecord("PartNumber", sbPartNumber.getAsString()); + CkStringBuilder sbPartNumber; + for (int partNumber = 1; partNumber <= numParts; ++partNumber) { + sbPartNumber.Clear(); + sbPartNumber.AppendInt(partNumber); - if (xRec0->get_LastMethodSuccess()) { - partAlreadyUploaded = true; - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Part " + std::to_string(partNumber) + " already uploaded, skipping", - __FILE__, __LINE__); - successfulParts++; - delete foundRec; - } - delete xRec0; - } - - if (!partAlreadyUploaded) { - // Setup stream for this part CkStream fileStream; fileStream.put_SourceFile(dataFilePath.c_str()); fileStream.put_SourceFilePartSize(partSize); - fileStream.put_SourceFilePart(partNumber - 1); // 0-based index + fileStream.put_SourceFilePart(partNumber - 1); - // Set query parameters for this part conn->rest.ClearAllQueryParams(); conn->rest.AddQueryParam("partNumber", sbPartNumber.getAsString()); conn->rest.AddQueryParam("uploadId", uploadId); - // Upload the part using the full object path (with prefix) const char* responseStr = conn->rest.fullRequestStream("PUT", objectPath.c_str(), fileStream); if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "Failed to upload part " + std::to_string(partNumber) + ": " + - conn->rest.lastErrorText(), - __FILE__, __LINE__); + lastError = "Part " + std::to_string(partNumber) + " failed: " + conn->rest.lastErrorText(); ReleaseConnection(std::move(conn)); - return false; + return AttemptResult::Transient; } - - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "Part " + std::to_string(partNumber) + - " - HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseStr != nullptr && responseStr[0] != '\0') { - errorMsg += " - " + std::string(responseStr); - } - - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - Upload failed", - errorMsg, __FILE__, __LINE__); + int partStatus = conn->rest.get_ResponseStatusCode(); + if (partStatus != 200) { + lastError = "Part " + std::to_string(partNumber) + " HTTP " + std::to_string(partStatus); + if (responseStr != nullptr && responseStr[0] != '\0') lastError += " - " + std::string(responseStr); ReleaseConnection(std::move(conn)); - return false; + bool retryable = (partStatus >= 500 && partStatus <= 599) || partStatus == 408 || partStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; } - - // Get ETag from response header const char* etag = conn->rest.responseHdrByName("ETag"); if (!conn->rest.get_LastMethodSuccess() || etag == nullptr || etag[0] == '\0') { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "ETag not found in response for part " + std::to_string(partNumber), - __FILE__, __LINE__); + lastError = "Part " + std::to_string(partNumber) + ": ETag missing from response"; ReleaseConnection(std::move(conn)); - return false; + return AttemptResult::Transient; } - // Add part to XML list CkXml* xPart = partsListXml.NewChild("Part", ""); xPart->NewChildInt2("PartNumber", partNumber); xPart->NewChild2("ETag", etag); delete xPart; - // Save parts list after each successful upload - if (!partsListXml.SaveXml(partsListFilePath.c_str())) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData", - "Failed to save parts list: " + std::string(partsListXml.lastErrorText()), - __FILE__, __LINE__); - } - - successfulParts++; - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Part " + std::to_string(partNumber) + " uploaded successfully", - __FILE__, __LINE__); + // Persist partsList for diagnostics; not used for resume in this attempt. + partsListXml.SaveXml(partsListFilePath.c_str()); } - } - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "All parts uploaded (" + std::to_string(successfulParts) + "/" + - std::to_string(numParts) + ")", - __FILE__, __LINE__); + // ---- STEP 3: COMPLETE ---- + conn->rest.ClearAllQueryParams(); + conn->rest.AddQueryParam("uploadId", uploadId); + responseXml = conn->rest.fullRequestString("POST", objectPath.c_str(), partsListXml.getXml()); - // ==================================================================== - // STEP 3: COMPLETE MULTIPART UPLOAD - // ==================================================================== - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Step 3: Completing multipart upload", - __FILE__, __LINE__); + if (!conn->rest.get_LastMethodSuccess()) { + lastError = std::string("Complete failed: ") + conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + int completeStatus = conn->rest.get_ResponseStatusCode(); + if (completeStatus != 200) { + lastError = "Complete HTTP " + std::to_string(completeStatus); + if (responseXml != nullptr && responseXml[0] != '\0') lastError += " - " + std::string(responseXml); + ReleaseConnection(std::move(conn)); + bool retryable = (completeStatus >= 500 && completeStatus <= 599) || completeStatus == 408 || completeStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + } - conn->rest.ClearAllQueryParams(); - conn->rest.AddQueryParam("uploadId", uploadId); + // Cleanup parts list on success. + if (fac.FileExists(partsListFilePath.c_str())) { + fac.FileDelete(partsListFilePath.c_str()); + } - responseXml = conn->rest.fullRequestString("POST", objectPath.c_str(), partsListXml.getXml()); + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; + _logger.LogDebug(kOp, + "Multipart upload completed for: " + fileName + " | URL: " + uploadedFilePath, + __FILE__, __LINE__); - if (!conn->rest.get_LastMethodSuccess()) { - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - Completion failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); ReleaseConnection(std::move(conn)); - return false; - } + return AttemptResult::Success; + }); + } + bool ANSAWSS3::UploadPrefixMultipartData(const std::string& bucketName, const std::string& prefix,const std::string& dataFilePath, const std::string& objectName, std::string& uploadedFilePath, int partSize) { + const std::string kOp = "ANSAWSS3::UploadPrefixMultipartData"; - if (conn->rest.get_ResponseStatusCode() != 200) { - std::string errorMsg = "HTTP " + std::to_string(conn->rest.get_ResponseStatusCode()); - if (responseXml != nullptr && responseXml[0] != '\0') { - errorMsg += " - " + std::string(responseXml); - } - - _logger.LogError("ANSAWSS3::UploadPrefixMultipartData - Completion failed", - errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Parse completion response - CkXml xmlComplete; - if (xmlComplete.LoadXml(responseXml)) { - const char* location = xmlComplete.getChildContent("Location"); - const char* eTag = xmlComplete.getChildContent("ETag"); - - if (location != nullptr) { - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Upload completed. Location: " + std::string(location), - __FILE__, __LINE__); - } - } - - // Clean up parts list file on success - if (fac.FileExists(partsListFilePath.c_str())) { - fac.FileDelete(partsListFilePath.c_str()); - } - - std::string scheme = _bTls ? "https://" : "http://"; - uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + objectPath; - _logger.LogDebug("ANSAWSS3::UploadPrefixMultipartData", - "Multipart upload completed successfully. S3 path: " + objectPath, + // Early validation (permanent — do NOT retry) + if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { + _logger.LogError(kOp, + !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadPrefixMultipartData", e.what(), __FILE__, __LINE__); return false; } + + if (bucketName.empty() || dataFilePath.empty()) { + _logger.LogError(kOp, "Bucket name or file path is empty", __FILE__, __LINE__); + return false; + } + + if (partSize < 5242880) { + _logger.LogError(kOp, + "Part size must be at least 5MB (5242880 bytes)", + __FILE__, __LINE__); + return false; + } + + { + CkFileAccess fac; + if (!fac.FileExists(dataFilePath.c_str())) { + _logger.LogError(kOp, "File not found: " + dataFilePath, __FILE__, __LINE__); + return false; + } + } + + std::string fileName = objectName; + if (fileName.empty()) { + fileName = ExtractFileName(dataFilePath); + if (fileName.empty()) { + _logger.LogError(kOp, + "Failed to extract filename from path: " + dataFilePath, + __FILE__, __LINE__); + return false; + } + } + + // Build once: the object key and the HTTP request path. Keeping + // them separate avoids the path-style bucket-duplication bug in + // the returned URL. + std::string objectKey; // "/" — no leading '/' + if (!prefix.empty()) { + std::string normalizedPrefix = prefix; + if (!normalizedPrefix.empty() && normalizedPrefix.front() == '/') { + normalizedPrefix = normalizedPrefix.substr(1); + } + if (!normalizedPrefix.empty() && normalizedPrefix.back() != '/') { + normalizedPrefix += '/'; + } + objectKey = normalizedPrefix; + } + objectKey += fileName; + const std::string objectPath = _bAwsPath + ? ("/" + objectKey) + : ("/" + bucketName + "/" + objectKey); + + // Parts-list file uses a sanitized filename for safe local storage. + // Fix 2: also append a per-call unique token so parallel multipart + // uploads of the same local file don't share the same scratch file. + const std::string fileFolderPath = + dataFilePath.substr(0, dataFilePath.find_last_of("/\\")); + std::string safeFileName = fileName; + std::replace(safeFileName.begin(), safeFileName.end(), '/', '_'); + std::replace(safeFileName.begin(), safeFileName.end(), '\\', '_'); + const std::string partsListFilePath = + fileFolderPath + "/partsList_" + safeFileName + "_" + + MakeUniqueMultipartToken(this) + ".xml"; + + // See note in UploadMultipartData regarding multipart retry semantics. + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + CkFileAccess fac; + + if (fac.FileExists(partsListFilePath.c_str())) { + fac.FileDelete(partsListFilePath.c_str()); + } + + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; + } + + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + // ---- STEP 1: INITIATE ---- + conn->rest.ClearAllQueryParams(); + conn->rest.AddQueryParam("uploads", ""); + const char* responseXml = conn->rest.fullRequestNoBody("POST", objectPath.c_str()); + + if (!conn->rest.get_LastMethodSuccess()) { + lastError = std::string("Initiate failed: ") + conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + int initStatus = conn->rest.get_ResponseStatusCode(); + if (initStatus != 200) { + lastError = "Initiate HTTP " + std::to_string(initStatus); + if (responseXml != nullptr && responseXml[0] != '\0') lastError += " - " + std::string(responseXml); + ReleaseConnection(std::move(conn)); + bool retryable = (initStatus >= 500 && initStatus <= 599) || initStatus == 408 || initStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + } + CkXml xmlInit; + if (!xmlInit.LoadXml(responseXml)) { + lastError = std::string("Initiate XML parse error: ") + xmlInit.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + const char* uploadId = xmlInit.getChildContent("UploadId"); + if (uploadId == nullptr || uploadId[0] == '\0') { + lastError = "UploadId not found in Initiate response"; + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + _logger.LogDebug(kOp, + "Multipart initiated. UploadId: " + std::string(uploadId), + __FILE__, __LINE__); + + // ---- STEP 2: UPLOAD PARTS ---- + fac.OpenForRead(dataFilePath.c_str()); + int numParts = fac.GetNumBlocks(partSize); + fac.FileClose(); + + CkXml partsListXml; + partsListXml.put_Tag("CompleteMultipartUpload"); + + CkStringBuilder sbPartNumber; + for (int partNumber = 1; partNumber <= numParts; ++partNumber) { + sbPartNumber.Clear(); + sbPartNumber.AppendInt(partNumber); + + CkStream fileStream; + fileStream.put_SourceFile(dataFilePath.c_str()); + fileStream.put_SourceFilePartSize(partSize); + fileStream.put_SourceFilePart(partNumber - 1); + + conn->rest.ClearAllQueryParams(); + conn->rest.AddQueryParam("partNumber", sbPartNumber.getAsString()); + conn->rest.AddQueryParam("uploadId", uploadId); + + const char* responseStr = conn->rest.fullRequestStream("PUT", objectPath.c_str(), fileStream); + + if (!conn->rest.get_LastMethodSuccess()) { + lastError = "Part " + std::to_string(partNumber) + " failed: " + conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + int partStatus = conn->rest.get_ResponseStatusCode(); + if (partStatus != 200) { + lastError = "Part " + std::to_string(partNumber) + " HTTP " + std::to_string(partStatus); + if (responseStr != nullptr && responseStr[0] != '\0') lastError += " - " + std::string(responseStr); + ReleaseConnection(std::move(conn)); + bool retryable = (partStatus >= 500 && partStatus <= 599) || partStatus == 408 || partStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + } + const char* etag = conn->rest.responseHdrByName("ETag"); + if (!conn->rest.get_LastMethodSuccess() || etag == nullptr || etag[0] == '\0') { + lastError = "Part " + std::to_string(partNumber) + ": ETag missing from response"; + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + CkXml* xPart = partsListXml.NewChild("Part", ""); + xPart->NewChildInt2("PartNumber", partNumber); + xPart->NewChild2("ETag", etag); + delete xPart; + + partsListXml.SaveXml(partsListFilePath.c_str()); + } + + // ---- STEP 3: COMPLETE ---- + conn->rest.ClearAllQueryParams(); + conn->rest.AddQueryParam("uploadId", uploadId); + responseXml = conn->rest.fullRequestString("POST", objectPath.c_str(), partsListXml.getXml()); + + if (!conn->rest.get_LastMethodSuccess()) { + lastError = std::string("Complete failed: ") + conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + int completeStatus = conn->rest.get_ResponseStatusCode(); + if (completeStatus != 200) { + lastError = "Complete HTTP " + std::to_string(completeStatus); + if (responseXml != nullptr && responseXml[0] != '\0') lastError += " - " + std::string(responseXml); + ReleaseConnection(std::move(conn)); + bool retryable = (completeStatus >= 500 && completeStatus <= 599) || completeStatus == 408 || completeStatus == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + } + + if (fac.FileExists(partsListFilePath.c_str())) { + fac.FileDelete(partsListFilePath.c_str()); + } + + // Build canonical path-style URL from parts — independent + // of the addressing style we actually used. + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + objectKey; + _logger.LogDebug(kOp, + "Multipart upload completed. S3 key: " + objectKey + " | URL: " + uploadedFilePath, + __FILE__, __LINE__); + + ReleaseConnection(std::move(conn)); + return AttemptResult::Success; + }); } // Upload jpeg data bool ANSAWSS3::UploadJpegImage(const std::string& bucketName, unsigned char* jpeg_string, int32 bufferLength, const std::string& fileName, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadJpegImage"; + + // Early validation checks (permanent — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadJpegImage", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || bufferLength <= 0) { - _logger.LogError("ANSAWSS3::UploadJpegImage", + _logger.LogError(kOp, "Bucket name or buffer length is invalid", __FILE__, __LINE__); return false; } - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadJpegImage", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; - } + // Object path is attempt-independent. + const std::string objectPath = + _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); - } - - // Load binary file - CkBinData binData; - CkByteData jpegBytes; - jpegBytes.append2(jpeg_string, static_cast(bufferLength)); - binData.AppendBinary(jpegBytes); - - // Determine content type from file extension - std::string contentType = "image/jpeg"; - - // Set headers - conn->rest.AddHeader("Content-Type", contentType.c_str()); - conn->rest.AddHeader("Expect", "100-continue"); - - // Construct S3 object path - std::string objectPath = _bAwsPath ? ("/" + fileName) : ("/" + bucketName + "/" + fileName); - - // Upload to S3 - CkStringBuilder sbResponse; - if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { - _logger.LogError("ANSAWSS3::UploadJpegImage - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Check HTTP status code - int statusCode = conn->rest.get_ResponseStatusCode(); - if (statusCode != 200) { - std::string errorMsg = "HTTP " + std::to_string(statusCode); - std::string response = sbResponse.getAsString(); - if (!response.empty()) { - errorMsg += " - " + response; + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; } - _logger.LogError("ANSAWSS3::UploadJpegImage", errorMsg, __FILE__, __LINE__); + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + CkBinData binData; + CkByteData jpegBytes; + jpegBytes.append2(jpeg_string, static_cast(bufferLength)); + binData.AppendBinary(jpegBytes); + + conn->rest.AddHeader("Content-Type", "image/jpeg"); + conn->rest.AddHeader("Expect", "100-continue"); + + CkStringBuilder sbResponse; + if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; + _logger.LogDebug(kOp, + "Successfully uploaded: " + fileName + " | URL: " + uploadedFilePath, + __FILE__, __LINE__); + ReleaseConnection(std::move(conn)); + return AttemptResult::Success; + } + + lastError = "HTTP " + std::to_string(statusCode); + std::string body = sbResponse.getAsString(); + if (!body.empty()) lastError += " - " + body; ReleaseConnection(std::move(conn)); - return false; - } - { - std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + "/" + fileName; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadJpegImage", - "Successfully uploaded: " + fileName + " to bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadJpegImage", e.what(), __FILE__, __LINE__); - return false; - } + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); } bool ANSAWSS3::UploadPrefixJpegImage(const std::string& bucketName, const std::string& prefix,unsigned char* jpeg_string,int32 bufferLength,const std::string& fileName, std::string& uploadedFilePath) { - // Early validation checks + const std::string kOp = "ANSAWSS3::UploadPrefixJpegImage"; + + // Early validation checks (permanent failures — do NOT retry) if (!_isLicenseValid || !_isUnlockCodeValid || !_bConnected) { - _logger.LogError("ANSAWSS3::UploadJpegImage", + _logger.LogError(kOp, !_isLicenseValid ? "Invalid license" : !_isUnlockCodeValid ? "Invalid unlock code" : _retryInProgress.load() ? "Not connected (waiting for internet, retrying in background)" : "Not connected (no internet or connection not established)", __FILE__, __LINE__); return false; } if (bucketName.empty() || bufferLength <= 0 || fileName.empty()) { - _logger.LogError("ANSAWSS3::UploadJpegImage", + _logger.LogError(kOp, "Bucket name, buffer length, or filename is invalid", __FILE__, __LINE__); return false; } - try { - auto conn = AcquireConnection(); - if (!conn) { - _logger.LogError("UploadPrefixJpegImage", "Failed to acquire S3 connection", __FILE__, __LINE__); - return false; + // Build once: the object key (path within the bucket) and the HTTP + // request path (which depends on addressing style). Keeping these + // separate avoids the path-style bucket-duplication bug in the + // returned URL. + std::string objectKey; // "/" — no leading '/' + if (!prefix.empty()) { + std::string normalizedPrefix = prefix; + if (!normalizedPrefix.empty() && normalizedPrefix.front() == '/') { + normalizedPrefix = normalizedPrefix.substr(1); } - - // Set bucket-specific endpoint - if (_bAwsPath) { - conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); - } else { - conn->rest.put_Host(_fullAWSURL.c_str()); + if (!normalizedPrefix.empty() && normalizedPrefix.back() != '/') { + normalizedPrefix += '/'; } - - // Load binary file - CkBinData binData; - CkByteData jpegBytes; - jpegBytes.append2(jpeg_string, static_cast(bufferLength)); - binData.AppendBinary(jpegBytes); - - // Set headers - conn->rest.AddHeader("Content-Type", "image/jpeg"); - conn->rest.AddHeader("Expect", "100-continue"); - - // Construct S3 object path with prefix - std::string objectPath = _bAwsPath ? "/" : ("/" + bucketName + "/"); - if (!prefix.empty()) { - // Ensure prefix ends with "/" if it doesn't already - std::string normalizedPrefix = prefix; - if (normalizedPrefix.back() != '/') { - normalizedPrefix += '/'; - } - objectPath += normalizedPrefix; - } - objectPath += fileName; - - _logger.LogDebug("ANSAWSS3::UploadJpegImage", - "Uploading to: " + objectPath, - __FILE__, __LINE__); - - // Upload to S3 - CkStringBuilder sbResponse; - if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { - _logger.LogError("ANSAWSS3::UploadJpegImage - Upload failed", - conn->rest.lastErrorText(), __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - // Check HTTP status code - int statusCode = conn->rest.get_ResponseStatusCode(); - if (statusCode != 200) { - std::string errorMsg = "HTTP " + std::to_string(statusCode); - std::string response = sbResponse.getAsString(); - if (!response.empty()) { - errorMsg += " - " + response; - } - _logger.LogError("ANSAWSS3::UploadJpegImage", errorMsg, __FILE__, __LINE__); - ReleaseConnection(std::move(conn)); - return false; - } - - { - std::string scheme = _bTls ? "https://" : "http://"; - std::string uploadedUrl = scheme + _fullAWSURL + "/" + bucketName + objectPath; - uploadedFilePath = uploadedUrl; - _logger.LogDebug("ANSAWSS3::UploadPrefixJpegImage", - "Successfully uploaded: " + objectPath + " (" + std::to_string(bufferLength) + " bytes) to bucket: " + bucketName + " | URL: " + uploadedUrl, - __FILE__, __LINE__); - } - - ReleaseConnection(std::move(conn)); - return true; - } - catch (const std::exception& e) { - _logger.LogFatal("ANSAWSS3::UploadJpegImage", - std::string("Exception: ") + e.what(), - __FILE__, __LINE__); - return false; + objectKey = normalizedPrefix; } + objectKey += fileName; + const std::string objectPath = _bAwsPath + ? ("/" + objectKey) + : ("/" + bucketName + "/" + objectKey); + + return UploadWithRetry(kOp, + [&](std::string& lastError) -> AttemptResult { + auto conn = AcquireConnection(); + if (!conn) { + lastError = "Failed to acquire S3 connection"; + return AttemptResult::Transient; + } + + if (_bAwsPath) { + conn->rest.put_Host((bucketName + "." + _fullAWSURL).c_str()); + } else { + conn->rest.put_Host(_fullAWSURL.c_str()); + } + + // Fix 1: the CkRest inside a pooled S3Connection accumulates + // headers and query params across requests. Clear them at the + // start of every attempt so a request can't inherit stale + // state (e.g. Content-Encoding:gzip) from a prior upload on + // this same connection. + conn->rest.ClearAllHeaders(); + conn->rest.ClearAllQueryParams(); + + CkBinData binData; + CkByteData jpegBytes; + jpegBytes.append2(jpeg_string, static_cast(bufferLength)); + binData.AppendBinary(jpegBytes); + + conn->rest.AddHeader("Content-Type", "image/jpeg"); + conn->rest.AddHeader("Expect", "100-continue"); + + CkStringBuilder sbResponse; + if (!conn->rest.FullRequestBd("PUT", objectPath.c_str(), binData, sbResponse)) { + lastError = conn->rest.lastErrorText(); + ReleaseConnection(std::move(conn)); + return AttemptResult::Transient; + } + + int statusCode = conn->rest.get_ResponseStatusCode(); + if (statusCode == 200) { + // Build the returned URL from parts (independent of + // addressing style) — always the canonical path-style URL. + std::string scheme = _bTls ? "https://" : "http://"; + uploadedFilePath = scheme + _fullAWSURL + "/" + bucketName + "/" + objectKey; + _logger.LogDebug(kOp, + "Successfully uploaded: " + objectKey + " (" + std::to_string(bufferLength) + " bytes) | URL: " + uploadedFilePath, + __FILE__, __LINE__); + ReleaseConnection(std::move(conn)); + return AttemptResult::Success; + } + + lastError = "HTTP " + std::to_string(statusCode); + std::string body = sbResponse.getAsString(); + if (!body.empty()) lastError += " - " + body; + ReleaseConnection(std::move(conn)); + + bool retryable = (statusCode >= 500 && statusCode <= 599) + || statusCode == 408 || statusCode == 429; + return retryable ? AttemptResult::Transient : AttemptResult::Permanent; + }); } // Downloads bool ANSAWSS3::DownloadFile(const std::string& bucketName, diff --git a/modules/ANSUtilities/ANSUtilities.h b/modules/ANSUtilities/ANSUtilities.h index 94218ff..3316be5 100644 --- a/modules/ANSUtilities/ANSUtilities.h +++ b/modules/ANSUtilities/ANSUtilities.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace ANSCENTER { class ANSULT_API ANSUtilities { @@ -155,7 +156,9 @@ namespace ANSCENTER { private: bool _isUnlockCodeValid{ false }; bool _isLicenseValid{ false }; - bool _bConnected{ false }; + // Atomic because upload threads read _bConnected without a lock + // while Connect() / RetryLoop() can write it. + std::atomic _bConnected{ false }; bool _bAwsPath{ true }; // true = virtual-hosted (AWS), false = path-style (MinIO) std::string _licenseKey; std::string _unlockCode; @@ -178,6 +181,13 @@ namespace ANSCENTER { std::string _secretKey; bool _authReady{ false }; + // Request timeouts applied to every new S3Connection. + // _connectTimeoutMs : max time to wait for TCP/TLS connect. + // _idleTimeoutMs : max gap between bytes on a send/recv. + // Atomic so SetTimeouts can update them without holding _configMutex. + std::atomic _connectTimeoutMs{ 10000 }; // 10 s + std::atomic _idleTimeoutMs { 20000 }; // 20 s + // Proxy settings std::string _proxyHost; int _proxyPort{ 0 }; @@ -194,6 +204,21 @@ namespace ANSCENTER { std::string ExtractFileName(const std::string& filePath); std::string GetContentType(const std::string& filePath); + // ---- Retry helper for transient / network-stability failures ---- + // Each upload function delegates its per-attempt body to a lambda. + // The lambda returns one of these outcomes: + // Success -> stop, UploadWithRetry returns true + // Transient -> retry (up to kUploadMaxAttempts total) + // Permanent -> stop, UploadWithRetry returns false + // The lambda should write a short human-readable reason into + // `lastError` on any non-Success outcome so it can be logged. + enum class AttemptResult { Success, Transient, Permanent }; + static constexpr int kUploadMaxAttempts = 20; + static constexpr int kUploadRetryDelayMs = 100; + bool UploadWithRetry( + const std::string& opName, + const std::function& attemptFn); + // Background retry std::thread _retryThread; std::atomic _stopRetry{ false }; @@ -212,6 +237,24 @@ namespace ANSCENTER { // Returns: 1 = connected, 0 = failed (bad auth/URL), 2 = no internet (background retry started) [[nodiscard]] int Connect(const std::string& baseDomain, const std::string& bucketRegion, const std::string& serviceName, int port, bool bTls, bool autoReconnect, bool& awsPath); [[nodiscard]] bool SetAuthentication(const std::string& accessKey, const std::string& secretKey); + + // Pre-populate the connection pool with `count` ready-to-use + // S3Connections. Creating a connection performs a TLS handshake + // inside CreateConnection(); doing it up-front in parallel keeps + // the first `count` concurrent uploads from serializing on handshake. + // Safe to call after Connect() + SetAuthentication() have succeeded. + // Returns the number of connections actually added to the pool + // (may be less than `count` on transient network issues). + int PrewarmConnectionPool(int count); + + // Update request timeouts used by every new connection, and + // propagate them to all already-pooled connections so the new + // values take effect on the very next request. + // connectMs: max time to wait for TCP/TLS connect (>= 1000 ms) + // idleMs : max gap between bytes on send/recv (>= 1000 ms) + // Returns false on invalid input (values are left unchanged). + bool SetTimeouts(int connectMs, int idleMs); + [[nodiscard]] std::vector ListBuckets(); [[nodiscard]] std::vector ListBucketObjects(const std::string& bucketName); [[nodiscard]] std::vector ListBucketObjectsWithPrefix(const std::string& bucketName, const std::string& prefix);