559 lines
16 KiB
C++
559 lines
16 KiB
C++
// dllmain.cpp : Defines the entry point for the DLL application.
|
|
#include "pch.h"
|
|
#include "ANSRabbitMQ.h"
|
|
#include <cstdint>
|
|
#include <unordered_map>
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
|
|
// Handle registry with refcount — prevents use-after-free when
|
|
// ReleaseANSRabbitMQHandle is called while an operation is still running.
|
|
static std::unordered_map<ANSCENTER::ANSRABBITMQ*, int>& RMQHandleRegistry() {
|
|
static std::unordered_map<ANSCENTER::ANSRABBITMQ*, int> s;
|
|
return s;
|
|
}
|
|
static std::mutex& RMQHandleRegistryMutex() {
|
|
static std::mutex m;
|
|
return m;
|
|
}
|
|
static std::condition_variable& RMQHandleRegistryCV() {
|
|
static std::condition_variable cv;
|
|
return cv;
|
|
}
|
|
|
|
static void RegisterRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
|
|
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
|
|
RMQHandleRegistry()[h] = 1;
|
|
}
|
|
|
|
static ANSCENTER::ANSRABBITMQ* AcquireRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
|
|
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
|
|
auto it = RMQHandleRegistry().find(h);
|
|
if (it == RMQHandleRegistry().end()) return nullptr;
|
|
it->second++;
|
|
return h;
|
|
}
|
|
|
|
static bool ReleaseRMQHandleRef(ANSCENTER::ANSRABBITMQ* h) {
|
|
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
|
|
auto it = RMQHandleRegistry().find(h);
|
|
if (it == RMQHandleRegistry().end()) return false;
|
|
it->second--;
|
|
if (it->second <= 0) {
|
|
RMQHandleRegistry().erase(it);
|
|
RMQHandleRegistryCV().notify_all();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
static bool UnregisterRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
|
|
std::unique_lock<std::mutex> lk(RMQHandleRegistryMutex());
|
|
auto it = RMQHandleRegistry().find(h);
|
|
if (it == RMQHandleRegistry().end()) return false;
|
|
it->second--;
|
|
bool ok = RMQHandleRegistryCV().wait_for(lk, std::chrono::seconds(30), [&]() {
|
|
auto it2 = RMQHandleRegistry().find(h);
|
|
return it2 == RMQHandleRegistry().end() || it2->second <= 0;
|
|
});
|
|
if (!ok) {
|
|
OutputDebugStringA("WARNING: UnregisterRMQHandle timed out waiting for in-flight operations\n");
|
|
}
|
|
RMQHandleRegistry().erase(h);
|
|
return true;
|
|
}
|
|
|
|
// RAII guard — ensures ReleaseRMQHandleRef is always called
|
|
class RMQHandleGuard {
|
|
ANSCENTER::ANSRABBITMQ* engine;
|
|
public:
|
|
explicit RMQHandleGuard(ANSCENTER::ANSRABBITMQ* e) : engine(e) {}
|
|
~RMQHandleGuard() { if (engine) ReleaseRMQHandleRef(engine); }
|
|
ANSCENTER::ANSRABBITMQ* get() const { return engine; }
|
|
explicit operator bool() const { return engine != nullptr; }
|
|
RMQHandleGuard(const RMQHandleGuard&) = delete;
|
|
RMQHandleGuard& operator=(const RMQHandleGuard&) = delete;
|
|
};
|
|
|
|
BOOL APIENTRY DllMain( HMODULE hModule,
|
|
DWORD ul_reason_for_call,
|
|
LPVOID lpReserved
|
|
)
|
|
{
|
|
switch (ul_reason_for_call)
|
|
{
|
|
case DLL_PROCESS_ATTACH:
|
|
case DLL_THREAD_ATTACH:
|
|
case DLL_THREAD_DETACH:
|
|
case DLL_PROCESS_DETACH:
|
|
break;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int CreateANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle, const char* licenseKey) {
|
|
if (Handle == nullptr || licenseKey == nullptr) return 0;
|
|
try {
|
|
// Release existing handle if called twice (prevents leak from LabVIEW)
|
|
if (*Handle) {
|
|
if (UnregisterRMQHandle(*Handle)) {
|
|
delete *Handle;
|
|
}
|
|
*Handle = nullptr;
|
|
}
|
|
|
|
*Handle = new ANSCENTER::ANSRABBITMQ();
|
|
if (*Handle == nullptr) {
|
|
return 0; // Memory allocation failed
|
|
}
|
|
RegisterRMQHandle(*Handle);
|
|
return 1;
|
|
}
|
|
catch (std::exception& e) {
|
|
if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; }
|
|
return 0;
|
|
}
|
|
catch (...) {
|
|
if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; }
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
static int ReleaseANSRabbitMQHandle_Impl(ANSCENTER::ANSRABBITMQ** Handle) {
|
|
try {
|
|
if (!Handle || !*Handle) return 1;
|
|
if (!UnregisterRMQHandle(*Handle)) {
|
|
*Handle = nullptr;
|
|
return 1;
|
|
}
|
|
delete *Handle;
|
|
*Handle = nullptr;
|
|
return 1;
|
|
}
|
|
catch (...) {
|
|
if (Handle) *Handle = nullptr;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int ReleaseANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle) {
|
|
__try {
|
|
return ReleaseANSRabbitMQHandle_Impl(Handle);
|
|
}
|
|
__except (EXCEPTION_EXECUTE_HANDLER) {
|
|
if (Handle) *Handle = nullptr;
|
|
return 0;
|
|
}
|
|
}
|
|
extern "C" ANSRabbitMQ_API int Connect(ANSCENTER::ANSRABBITMQ** Handle, const char* hostname, int port, const char* vhost,int channel, const char* userName, const char* password, int useSSL) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (hostname == nullptr || vhost == nullptr || userName == nullptr || password == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
bool SSL = false;
|
|
if (useSSL == 1) {
|
|
SSL = true;
|
|
}
|
|
if (guard.get()->Connect(hostname, port, vhost,channel, userName, password, SSL)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Connection failed
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int Disconnect(ANSCENTER::ANSRABBITMQ** Handle) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
|
|
if (guard.get()->Disconnect()) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Disconnection failed
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int Publish(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* routing_key, const char* message) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr || routing_key == nullptr || message == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->Publish(exchange_name, routing_key,message)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Publish failed
|
|
}
|
|
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int CreateChannel(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* exchange_type,
|
|
const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal)
|
|
{
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr || exchange_type == nullptr || queue_name == nullptr || routing_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->CreateChannel(exchange_name, exchange_type, queue_name, routing_name, passvive, durable, auto_delete, internalVal)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Setup failed
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int ConfigureSSL(ANSCENTER::ANSRABBITMQ** Handle, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (caCertPath == nullptr || clientCertPath == nullptr || clientKeyPath == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
bool verifyPeerBool = verifyPeer != 0;
|
|
bool verifyHostnameBool = verifyHostname != 0;
|
|
|
|
if (guard.get()->ConfigureSSL(caCertPath, clientCertPath, clientKeyPath, verifyPeerBool, verifyHostnameBool)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // SSL configuration failed
|
|
}
|
|
|
|
}
|
|
|
|
|
|
extern "C" ANSRabbitMQ_API int UnbindQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->UnbindQueue(exchange_name)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Unbind failed
|
|
}
|
|
|
|
}
|
|
extern "C" ANSRabbitMQ_API int DeleteQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->DeleteQueue(queue_name)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Delete failed
|
|
}
|
|
|
|
|
|
}
|
|
extern "C" ANSRabbitMQ_API int DeleteExchange(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->DeleteExchange(exchange_name)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Delete failed
|
|
}
|
|
|
|
|
|
}
|
|
extern "C" ANSRabbitMQ_API int PurgeQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->PurgeQueue(queue_name)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Purge failed
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int GetQueueMessage_CPP(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, std::string& message) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
|
|
return 1; // Success
|
|
}
|
|
else {
|
|
return 0; // Get message failed
|
|
}
|
|
|
|
}
|
|
extern "C" ANSRabbitMQ_API int GetQueueMessage(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, LStrHandle strMessage) {
|
|
if (Handle == nullptr || *Handle == nullptr) {
|
|
return -1; // Invalid handle
|
|
}
|
|
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr || strMessage == nullptr) {
|
|
return -1; // Invalid parameters
|
|
}
|
|
|
|
std::string message;
|
|
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
|
|
if (message.empty()) return 0;
|
|
int size = static_cast<int>(message.length());
|
|
MgErr error;
|
|
error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar));
|
|
if (error == noErr)
|
|
{
|
|
(*strMessage)->cnt = size;
|
|
memcpy((*strMessage)->str, message.c_str(), size);
|
|
return 1;
|
|
}
|
|
else return 0;
|
|
}
|
|
else {
|
|
return 0; // Get message failed
|
|
}
|
|
|
|
}
|
|
|
|
// ============================================================================
|
|
// V2 entry points — accept uint64_t handleVal by value instead of Handle**
|
|
// Eliminates LabVIEW buffer reuse bug when concurrent calls share the same
|
|
// Handle** buffer address.
|
|
// ============================================================================
|
|
|
|
extern "C" ANSRabbitMQ_API int ConfigureSSL_V2(uint64_t handleVal, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (caCertPath == nullptr || clientCertPath == nullptr || clientKeyPath == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
bool verifyPeerBool = verifyPeer != 0;
|
|
bool verifyHostnameBool = verifyHostname != 0;
|
|
|
|
if (guard.get()->ConfigureSSL(caCertPath, clientCertPath, clientKeyPath, verifyPeerBool, verifyHostnameBool)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int CreateChannel_V2(uint64_t handleVal, const char* exchange_name, const char* exchange_type,
|
|
const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal)
|
|
{
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr || exchange_type == nullptr || queue_name == nullptr || routing_name == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->CreateChannel(exchange_name, exchange_type, queue_name, routing_name, passvive, durable, auto_delete, internalVal)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int Connect_V2(uint64_t handleVal, const char* hostname, int port, const char* vhost, int channel, const char* userName, const char* password, int useSSL) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (hostname == nullptr || vhost == nullptr || userName == nullptr || password == nullptr) {
|
|
return -1;
|
|
}
|
|
bool SSL = false;
|
|
if (useSSL == 1) {
|
|
SSL = true;
|
|
}
|
|
if (guard.get()->Connect(hostname, port, vhost, channel, userName, password, SSL)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int Disconnect_V2(uint64_t handleVal) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
|
|
if (guard.get()->Disconnect()) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int Publish_V2(uint64_t handleVal, const char* exchange_name, const char* routing_key, const char* message) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr || routing_key == nullptr || message == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->Publish(exchange_name, routing_key, message)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int GetQueueMessage_V2(uint64_t handleVal, const char* queue_name, int ack_mode, LStrHandle strMessage) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr || strMessage == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
std::string message;
|
|
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
|
|
if (message.empty()) return 0;
|
|
int size = static_cast<int>(message.length());
|
|
MgErr error;
|
|
error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar));
|
|
if (error == noErr)
|
|
{
|
|
(*strMessage)->cnt = size;
|
|
memcpy((*strMessage)->str, message.c_str(), size);
|
|
return 1;
|
|
}
|
|
else return 0;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int UnbindQueue_V2(uint64_t handleVal, const char* exchange_name) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->UnbindQueue(exchange_name)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int DeleteExchange_V2(uint64_t handleVal, const char* exchange_name) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (exchange_name == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->DeleteExchange(exchange_name)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int DeleteQueue_V2(uint64_t handleVal, const char* queue_name) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->DeleteQueue(queue_name)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
extern "C" ANSRabbitMQ_API int PurgeQueue_V2(uint64_t handleVal, const char* queue_name) {
|
|
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
|
|
if (!_v2h) return 0;
|
|
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
|
|
if (!guard) return -1;
|
|
if (queue_name == nullptr) {
|
|
return -1;
|
|
}
|
|
|
|
if (guard.get()->PurgeQueue(queue_name)) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|