// dllmain.cpp : Defines the entry point for the DLL application. #include "pch.h" #include "ANSPulsar.h" #include #include #include #include // Handle registry with refcount — prevents use-after-free when // ReleasePulsarHandle is called while an operation is still running. static std::unordered_map& PulsarHandleRegistry() { static std::unordered_map s; return s; } static std::mutex& PulsarHandleRegistryMutex() { static std::mutex m; return m; } static std::condition_variable& PulsarHandleRegistryCV() { static std::condition_variable cv; return cv; } static void RegisterPulsarHandle(ANSCENTER::ANSPULSAR* h) { std::lock_guard lk(PulsarHandleRegistryMutex()); PulsarHandleRegistry()[h] = 1; } static ANSCENTER::ANSPULSAR* AcquirePulsarHandle(ANSCENTER::ANSPULSAR* h) { std::lock_guard lk(PulsarHandleRegistryMutex()); auto it = PulsarHandleRegistry().find(h); if (it == PulsarHandleRegistry().end()) return nullptr; it->second++; return h; } static bool ReleasePulsarHandleRef(ANSCENTER::ANSPULSAR* h) { std::lock_guard lk(PulsarHandleRegistryMutex()); auto it = PulsarHandleRegistry().find(h); if (it == PulsarHandleRegistry().end()) return false; it->second--; if (it->second <= 0) { PulsarHandleRegistry().erase(it); PulsarHandleRegistryCV().notify_all(); return true; } return false; } static bool UnregisterPulsarHandle(ANSCENTER::ANSPULSAR* h) { std::unique_lock lk(PulsarHandleRegistryMutex()); auto it = PulsarHandleRegistry().find(h); if (it == PulsarHandleRegistry().end()) return false; it->second--; bool ok = PulsarHandleRegistryCV().wait_for(lk, std::chrono::seconds(30), [&]() { auto it2 = PulsarHandleRegistry().find(h); return it2 == PulsarHandleRegistry().end() || it2->second <= 0; }); if (!ok) { OutputDebugStringA("WARNING: UnregisterPulsarHandle timed out waiting for in-flight operations\n"); } PulsarHandleRegistry().erase(h); return true; } // RAII guard — ensures ReleasePulsarHandleRef is always called class PulsarHandleGuard { ANSCENTER::ANSPULSAR* engine; public: explicit PulsarHandleGuard(ANSCENTER::ANSPULSAR* e) : engine(e) {} ~PulsarHandleGuard() { if (engine) ReleasePulsarHandleRef(engine); } ANSCENTER::ANSPULSAR* get() const { return engine; } explicit operator bool() const { return engine != nullptr; } PulsarHandleGuard(const PulsarHandleGuard&) = delete; PulsarHandleGuard& operator=(const PulsarHandleGuard&) = delete; }; BOOL APIENTRY DllMain( HMODULE hModule, DWORD ul_reason_for_call, LPVOID lpReserved ) { switch (ul_reason_for_call) { case DLL_PROCESS_ATTACH: case DLL_THREAD_ATTACH: case DLL_THREAD_DETACH: case DLL_PROCESS_DETACH: break; } return TRUE; } extern "C" ANSPALSAR_API int CreateANSPulsarHandle(ANSCENTER::ANSPULSAR** Handle, const char* hostname, int port, const char* jwtToken, int useTls, const char* caCertPath, int enableBatching, int batchingMaxMessages, int batchingDelayMs, int allowInsecureConnection, int verifyHostName) { if (Handle == nullptr) { return 0; // Invalid handle } try { // Release existing handle if called twice (prevents leak from LabVIEW) if (*Handle) { if (UnregisterPulsarHandle(*Handle)) { delete *Handle; } *Handle = nullptr; } *Handle = new ANSCENTER::ANSPULSAR(); if (*Handle == nullptr) { return 0; // Memory allocation failed } std::string host = hostname ? hostname : ""; std::string token = jwtToken ? jwtToken : ""; std::string certPath = caCertPath ? caCertPath : ""; bool _useTls = (useTls == 1); bool _enableBatching = (enableBatching == 1); bool _allowInsecureConnection = (allowInsecureConnection == 1); bool _verifyHostname = (verifyHostName == 1); if (!(*Handle)->Initialize(host, port, token, _useTls, certPath, _enableBatching, batchingMaxMessages, batchingDelayMs, _allowInsecureConnection, _verifyHostname)) { delete *Handle; *Handle = nullptr; return 0; } RegisterPulsarHandle(*Handle); return 1; } catch (std::exception& e) { if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; } return 0; } catch (...) { if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; } return 0; } } static int ReleasePulsarHandle_Impl(ANSCENTER::ANSPULSAR** Handle) { try { if (!Handle || !*Handle) return 1; if (!UnregisterPulsarHandle(*Handle)) { *Handle = nullptr; return 1; } delete *Handle; *Handle = nullptr; return 1; } catch (...) { if (Handle) *Handle = nullptr; return 0; } } extern "C" ANSPALSAR_API int ReleasePulsarHandle(ANSCENTER::ANSPULSAR** Handle) { __try { return ReleasePulsarHandle_Impl(Handle); } __except (EXCEPTION_EXECUTE_HANDLER) { if (Handle) *Handle = nullptr; return 0; } } extern "C" ANSPALSAR_API int ConnectANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* topicName) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (topicName == nullptr) { return -1; // Invalid parameters } std::string topic = topicName; if (guard.get()->Connect(topic)) { return 1; // Success } else { return 0; // Connection failed } } extern "C" ANSPALSAR_API int DisconnectANSPulsar(ANSCENTER::ANSPULSAR** Handle) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (guard.get()->Disconnect()) { return 1; // Success } else { return 0; // Disconnection failed } } extern "C" ANSPALSAR_API int PublishANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* message, const char* properties) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (message == nullptr) { return -1; // Invalid parameters } std::string msg = message ? message : ""; std::string props = properties ? properties : ""; if (guard.get()->Publish(msg, props)) { return 1; // Success } else { return 0; // Publish failed } } extern "C" ANSPALSAR_API int PublishAsyncANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* message, const char* properties) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (message == nullptr) { return -1; // Invalid parameters } std::string msg = message ? message : ""; std::string props = properties ? properties : ""; if (guard.get()->PublishAsync(msg, props)) { return 1; // Success } else { return 0; // Publish failed } } extern "C" ANSPALSAR_API int SubscribeANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* topicName, const char* subscriptionName) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (topicName == nullptr || subscriptionName == nullptr) { return -1; // Invalid parameters } std::string topic = topicName ? topicName : ""; std::string subName = subscriptionName ? subscriptionName : ""; if (guard.get()->Subscribe(topic, subName)) { return 1; // Success } else { return 0; // Subscribe failed } } extern "C" ANSPALSAR_API int ReceiveMessageANSPulsar_CPP(ANSCENTER::ANSPULSAR** Handle, int timeoutMs, int doAcknowledge, std::string& message) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (timeoutMs < 0) { return -1; // Invalid parameters } bool _doAcknowledge = (doAcknowledge == 1); if (guard.get()->Receive(message, _doAcknowledge, timeoutMs)) { return 1; // Success } else { return 0; // Receive failed } } extern "C" ANSPALSAR_API int ReceiveMessageANSPulsar(ANSCENTER::ANSPULSAR** Handle, int timeoutMs, int doAcknowledge, LStrHandle strMessage) { if (Handle == nullptr || *Handle == nullptr) { return -1; // Invalid handle } PulsarHandleGuard guard(AcquirePulsarHandle(*Handle)); if (!guard) return -1; if (timeoutMs < 0 || strMessage == nullptr) { return -1; // Invalid parameters } std::string message; bool _doAcknowledge = (doAcknowledge == 1); if (guard.get()->Receive(message, _doAcknowledge, timeoutMs)) { if (message.empty()) return 0; int size = static_cast(message.length()); MgErr error; error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar)); if (error == noErr) { (*strMessage)->cnt = size; memcpy((*strMessage)->str, message.c_str(), size); return 1; } else return 0; } else { return 0; // Receive failed } } // ============================================================================ // V2 API — handle passed by value (uint64_t) to avoid LabVIEW buffer reuse bug // ============================================================================ extern "C" ANSPALSAR_API int ConnectANSPulsar_V2(uint64_t handleVal, const char* topicName) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (topicName == nullptr) { return -1; } std::string topic = topicName; if (guard.get()->Connect(topic)) { return 1; } else { return 0; } } extern "C" ANSPALSAR_API int DisconnectANSPulsar_V2(uint64_t handleVal) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (guard.get()->Disconnect()) { return 1; } else { return 0; } } extern "C" ANSPALSAR_API int PublishANSPulsar_V2(uint64_t handleVal, const char* message, const char* properties) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (message == nullptr) { return -1; } std::string msg = message ? message : ""; std::string props = properties ? properties : ""; if (guard.get()->Publish(msg, props)) { return 1; } else { return 0; } } extern "C" ANSPALSAR_API int PublishAsyncANSPulsar_V2(uint64_t handleVal, const char* message, const char* properties) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (message == nullptr) { return -1; } std::string msg = message ? message : ""; std::string props = properties ? properties : ""; if (guard.get()->PublishAsync(msg, props)) { return 1; } else { return 0; } } extern "C" ANSPALSAR_API int SubscribeANSPulsar_V2(uint64_t handleVal, const char* topicName, const char* subscriptionName) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (topicName == nullptr || subscriptionName == nullptr) { return -1; } std::string topic = topicName ? topicName : ""; std::string subName = subscriptionName ? subscriptionName : ""; if (guard.get()->Subscribe(topic, subName)) { return 1; } else { return 0; } } extern "C" ANSPALSAR_API int ReceiveMessageANSPulsar_V2(uint64_t handleVal, int timeoutMs, int doAcknowledge, LStrHandle strMessage) { auto* _v2h = reinterpret_cast(handleVal); if (!_v2h) return -1; PulsarHandleGuard guard(AcquirePulsarHandle(_v2h)); if (!guard) return -1; if (timeoutMs < 0 || strMessage == nullptr) { return -1; } std::string message; bool _doAcknowledge = (doAcknowledge == 1); if (guard.get()->Receive(message, _doAcknowledge, timeoutMs)) { if (message.empty()) return 0; int size = static_cast(message.length()); MgErr error; error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar)); if (error == noErr) { (*strMessage)->cnt = size; memcpy((*strMessage)->str, message.c_str(), size); return 1; } else return 0; } else { return 0; } }