Refactor project structure

This commit is contained in:
2026-03-28 19:56:39 +11:00
parent 1d267378b2
commit 8a2e721058
511 changed files with 59 additions and 48 deletions

View File

@@ -0,0 +1,807 @@
#include "ANSRabbitMQ.h"
#include <ctype.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <cstdint>
#include <windows.h>
#include <iostream>
void die(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
}
void die_on_error(int x, char const* context) {
if (x < 0) {
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const* context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
auto* m =
static_cast<amqp_connection_close_t*>(x.reply.decoded);
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, static_cast<int>(m->reply_text.len),
static_cast<const char*>(m->reply_text.bytes));
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
auto* m = static_cast<amqp_channel_close_t*>(x.reply.decoded);
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, static_cast<int>(m->reply_text.len),
static_cast<const char*>(m->reply_text.bytes));
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
break;
}
}
static void dump_row(long count, int numinrow, int* chs) {
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8) {
printf(" :");
}
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8) {
printf(" :");
}
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i])) {
printf("%c", chs[i]);
}
else {
printf(".");
}
}
}
printf("\n");
}
static int rows_eq(int* a, int* b) {
int i;
for (i = 0; i < 16; i++)
if (a[i] != b[i]) {
return 0;
}
return 1;
}
void amqp_dump(void const* buffer, size_t len) {
const auto* buf = static_cast<const unsigned char*>(buffer);
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = { 0 };
int showed_dots = 0;
size_t i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int j;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(
" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
}
else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (j = 0; j < 16; j++) {
oldchs[j] = chs[j];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0) {
printf("%08lX:\n", count);
}
}
uint64_t now_microseconds(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) /
10;
}
void microsleep(int usec) { Sleep(usec / 1000); }
namespace ANSCENTER {
ANSRABBITMQ::ANSRABBITMQ() {
_port = 5672;
_channel = 1;
_channelMax = 0; // set to 0 for default, let server decides
_frameMax = 131072; // no limit
_heartBeat = 60; // 60 seconds
std::string _vhost = "/";
}
ANSRABBITMQ::~ANSRABBITMQ() {
Disconnect(); // Safely handles cleanup
}
BindingInfo ANSRABBITMQ::GetBindingInfo(const std::string& exchange_name) {
// Search for the binding info based on the exchange name
for (const auto& binding : _bindings) {
if (binding._exchange_name == exchange_name) {
return binding;
}
}
return BindingInfo{}; // Return default empty BindingInfo if not found
}
BindingInfo ANSRABBITMQ::GetBindingInfoFromQueueName(const std::string& queue_name) {
// Search for the binding info based on the queue name
for (const auto& binding : _bindings) {
if (binding._queue_name == queue_name) {
return binding;
}
}
return BindingInfo{}; // Return default empty BindingInfo if not found
}
bool ANSRABBITMQ::ConfigureSSLOptions() {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_caCertPath.empty() && _clientCertPath.empty() && _clientKeyPath.empty()) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "No SSL options configured.", __FILE__, __LINE__);
return true; // No SSL options to configure
}
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Error: SSL socket is not initialized.", __FILE__, __LINE__);
return false;
}
// These functions accept `amqp_socket_t*` even though they apply only to SSL sockets
if (!_caCertPath.empty()) {
if (amqp_ssl_socket_set_cacert(_socket, _caCertPath.c_str()) != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set CA certificate.", __FILE__, __LINE__);
return false;
}
}
if (!_clientCertPath.empty() && !_clientKeyPath.empty()) {
if (amqp_ssl_socket_set_key(_socket, _clientCertPath.c_str(), _clientKeyPath.c_str()) != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set client certificate/key.", __FILE__, __LINE__);
return false;
}
}
amqp_ssl_socket_set_verify_peer(_socket, _verifyPeer ? 1 : 0);
amqp_ssl_socket_set_verify_hostname(_socket, _verifyHostname ? 1 : 0);
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSLOptions. Error configuring SSL options:",e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::SetupExchange(const std::string & _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _exchange_name.empty() || _exchange_type.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::SetupExchange", "Invalid parameters in SetupExchange.", __FILE__, __LINE__);
return false;
}
amqp_exchange_declare(
_conn, _channel,
amqp_cstring_bytes(_exchange_name.c_str()),
amqp_cstring_bytes(_exchange_type.c_str()),
_passive,
_durable,
_auto_delete,
_internal,
amqp_empty_table
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring exchange");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::SetupExchange. Error in SetupExchange:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::SetupQueue", "Invalid parameters in SetupQueue.", __FILE__, __LINE__);
return false;
}
amqp_queue_declare(
_conn, _channel,
amqp_cstring_bytes(_queue_name.c_str()),
_passive,
_durable,
false, // not exclusive
_auto_delete,
amqp_empty_table
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::SetupQueue. Error in SetupQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::BindQueue(const std::string & _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _exchange_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::BindQueue", "Invalid parameters in BindQueue.", __FILE__, __LINE__);
return false;
}
amqp_table_t arguments = amqp_empty_table;
if (_exchange_type == "headers") {
// Match messages with header: type = important
static amqp_table_entry_t items[2];
items[0].key = amqp_cstring_bytes("x-match");
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
items[0].value.value.bytes = amqp_cstring_bytes("all");
items[1].key = amqp_cstring_bytes("type");
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
items[1].value.value.bytes = amqp_cstring_bytes("important");
arguments.num_entries = 2;
arguments.entries = items;
}
amqp_queue_bind(
_conn, _channel,
amqp_cstring_bytes(_queue_name.c_str()),
amqp_cstring_bytes(_exchange_name.c_str()),
amqp_cstring_bytes(_binding_key.c_str()), // required, even for fanout/headers
arguments
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Binding queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::BindQueue. Error in BindQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::UnbindQueue(const std::string& _exchange_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr ||
_exchange_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Invalid parameters in UnbindQueue.", __FILE__, __LINE__);
return false;
}
amqp_table_t arguments = amqp_empty_table;
BindingInfo bindingInfo = GetBindingInfo(_exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Exchange is not bound or set up.", __FILE__, __LINE__);
return false;
}
if (bindingInfo._exchange_type == "headers") {
// Same header structure used in BindQueue()
static amqp_table_entry_t items[2];
items[0].key = amqp_cstring_bytes("x-match");
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
items[0].value.value.bytes = amqp_cstring_bytes("all");
items[1].key = amqp_cstring_bytes("type");
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
items[1].value.value.bytes = amqp_cstring_bytes("important");
arguments.num_entries = 2;
arguments.entries = items;
}
amqp_queue_unbind(
_conn, bindingInfo._channel,
amqp_cstring_bytes(bindingInfo._queue_name.c_str()),
amqp_cstring_bytes(bindingInfo._exchange_name.c_str()),
amqp_cstring_bytes(bindingInfo._binding_key.c_str()), // still required by API
arguments
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Unbinding queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::UnbindQueue. Error in UnbindQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::DeleteQueue(const std::string& queue_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try
{
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Channel is not set.", __FILE__, __LINE__);
return false;
}
// Only delete if queue is unused and empty <20> set to false for force delete
int if_unused = 0;
int if_empty = 0;
BindingInfo binding = GetBindingInfoFromQueueName(queue_name); // Check if the queue is set up
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_queue_delete(_conn, binding._channel,
amqp_cstring_bytes(queue_name.c_str()),
if_unused,
if_empty);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::DeleteQueue. Error in DeleteQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::DeleteExchange(const std::string& exchange_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try
{
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Channel is not set.", __FILE__, __LINE__);
return false;
}
// Only delete if exchange is unused (set to 0 to delete regardless)
int if_unused = 0;
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_exchange_delete(_conn, bindingInfo._channel,
amqp_cstring_bytes(exchange_name.c_str()),
if_unused);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting exchange");
// remove the binding info from the list
_bindings.erase(
std::remove_if(
_bindings.begin(),
_bindings.end(),
[&](const BindingInfo& b) {
return b._exchange_name == bindingInfo._exchange_name &&
b._exchange_type == bindingInfo._exchange_type &&
b._queue_name == bindingInfo._queue_name &&
b._binding_key == bindingInfo._binding_key;
}
),
_bindings.end()
);
return true;
}
catch (const std::exception& e) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange. Error in DeleteExchange:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::PurgeQueue(const std::string & _queue_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (_queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Channel is not set.", __FILE__, __LINE__);
return false;
}
BindingInfo binding = GetBindingInfoFromQueueName(_queue_name); // Check if the queue is set up
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
// Purge the queue: remove all ready messages
amqp_queue_purge(_conn, binding._channel, amqp_cstring_bytes(_queue_name.c_str()));
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Purging queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::PurgeQueue. Error in PurgeQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
_caCertPath = caCertPath;
_clientCertPath = clientCertPath;
_clientKeyPath = clientKeyPath;
_verifyPeer = verifyPeer;
_verifyHostname = verifyHostname;
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSL. Error configuring SSL:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key,
int passiveVal, int durableVal, int autoDelete, int internalVal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try{
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange name is empty.", __FILE__, __LINE__);
return false;
}
if (exchange_type.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange type is empty.", __FILE__, __LINE__);
return false;
}
if (queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
amqp_channel_open(_conn, _channel);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Opening channel");
bool exchangeStatus = SetupExchange(exchange_name, exchange_type, passiveVal, durableVal, autoDelete, internalVal);
bool queueStatus = SetupQueue(queue_name, passiveVal, durableVal, autoDelete, internalVal);
bool bindStatus = BindQueue(exchange_name, exchange_type, queue_name, binding_key);
if (!exchangeStatus || !queueStatus || !bindStatus) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Failed to setup exchange, queue or bind.", __FILE__, __LINE__);
return false;
}
BindingInfo bindingInfo;
bindingInfo._exchange_name = exchange_name;
bindingInfo._exchange_type = exchange_type;
bindingInfo._queue_name = queue_name;
bindingInfo._binding_key = binding_key;
bindingInfo._passive = passiveVal;
bindingInfo._durable = durableVal;
bindingInfo._auto_delete = autoDelete;
bindingInfo._internal = internalVal;
bindingInfo._channel = _channel; // Store the channel used for this binding
_bindings.push_back(bindingInfo);
_channel++; // Increment channel for next use
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::CreateChannel. Error in Setup::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Connect(const std::string& hostname, int port, const std::string& vhost,
int channel, const std::string& userName, const std::string& password,
bool useSSL)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
_conn = amqp_new_connection();
if (useSSL) {
_socket = amqp_ssl_socket_new(_conn);
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create SSL socket", __FILE__, __LINE__);
die("Creating SSL socket");
}
if (!ConfigureSSLOptions()) {
return false;
}
}
else {
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create socket", __FILE__, __LINE__);
die("Creating TCP socket");
}
}
if (!_socket) return false;
try {
int opened = amqp_socket_open(_socket, hostname.c_str(), port);
std::cout << "Opened socket: " << opened << std::endl;
if (opened) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot open socket", __FILE__, __LINE__);
die("Opening socket");
return false;
}
// Use defaults if values are unset
if (_channelMax <= 0) _channelMax = 0; // 0 = unlimited (let server decide)
if (_frameMax <= 0) _frameMax = 131072; // default 128 KB
if (_heartBeat <= 0) _heartBeat = 60; // 60 seconds
die_on_amqp_error(
amqp_login(_conn, vhost.c_str(), _channelMax, _frameMax, _heartBeat,
AMQP_SASL_METHOD_PLAIN, userName.c_str(), password.c_str()),
"Logging in");
_channel = channel;
_hostname = hostname;
_port = port;
_status = 0;
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
return false;
}
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Disconnect() {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn) {
// loop through all bindings and unbind them
for (const auto& binding : _bindings) {
if (binding._channel > 0) {
amqp_rpc_reply_t reply = amqp_channel_close(_conn, binding._channel, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close channel cleanly.", __FILE__, __LINE__);
}
}
}
amqp_rpc_reply_t reply = amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close connection cleanly.", __FILE__, __LINE__);
}
int destroyStatus = amqp_destroy_connection(_conn);
if (destroyStatus != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::Disconnect. Failed to destroy connection:", amqp_error_string2(destroyStatus), __FILE__, __LINE__);
return false;
}
_conn = nullptr;
_socket = nullptr;
_channel = 1;
_status = -1;
}
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::DisConnect. Error in Disconnect::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange is not set.", __FILE__, __LINE__);
return false;
}
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange name is not set.", __FILE__, __LINE__);
return false;
}
if (bindingInfo._exchange_type != "fanout" && bindingInfo._exchange_type != "headers" && bindingInfo._binding_key.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Routing key is required for this exchange type.", __FILE__, __LINE__);
return false;
}
if (message.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Message is empty.", __FILE__, __LINE__);
return false;
}
amqp_bytes_t exchange_bytes = amqp_cstring_bytes(bindingInfo._exchange_name.c_str());
amqp_bytes_t routing_key_bytes = amqp_cstring_bytes(
(bindingInfo._exchange_type == "fanout" || bindingInfo._exchange_type == "headers") ? "" : routing_key.c_str());
amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
amqp_basic_properties_t props;
amqp_basic_properties_t* pProps = nullptr;
if (bindingInfo._exchange_type == "headers") {
// Example static header: type = important
static amqp_table_entry_t headers[1];
headers[0].key = amqp_cstring_bytes("type");
headers[0].value.kind = AMQP_FIELD_KIND_UTF8;
headers[0].value.value.bytes = amqp_cstring_bytes("important");
props._flags = AMQP_BASIC_HEADERS_FLAG;
props.headers.num_entries = 1;
props.headers.entries = headers;
pProps = &props;
}
int ret = amqp_basic_publish(
_conn, bindingInfo._channel,
exchange_bytes,
routing_key_bytes,
0, 0,
pProps,
message_bytes
);
if (ret != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::Publish. Failed to publish message:", amqp_error_string2(ret), __FILE__, __LINE__);
return false;
}
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Publish. Error in Publish:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || queue_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Invalid state or parameters in GetQueueMessage.", __FILE__, __LINE__);
return false;
}
BindingInfo binding = GetBindingInfoFromQueueName(queue_name);
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_maybe_release_buffers(_conn);
// Request to get a single message
amqp_basic_get(_conn, binding._channel, amqp_cstring_bytes(binding._queue_name.c_str()), ack_mode); // auto-ack = 1
amqp_rpc_reply_t reply = amqp_get_rpc_reply(_conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL || reply.reply.id != AMQP_BASIC_GET_OK_METHOD) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "No message available or error getting message.", __FILE__, __LINE__);
return false;
}
// Cast the method to extract metadata (optional)
[[maybe_unused]] auto* get_ok = static_cast<amqp_basic_get_ok_t*>(reply.reply.decoded);
// Read the content
amqp_frame_t frame;
amqp_bytes_t body;
size_t body_received = 0;
size_t body_size = 0;
// Read next frame (header)
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_HEADER) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message header.", __FILE__, __LINE__);
return false;
}
body_size = frame.payload.properties.body_size;
// Read body frames
std::string body_data;
while (body_received < body_size) {
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_BODY) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message body.", __FILE__, __LINE__);
return false;
}
body_data.append(static_cast<const char*>(frame.payload.body_fragment.bytes), frame.payload.body_fragment.len);
body_received += frame.payload.body_fragment.len;
}
message = body_data;
return true;
}
catch (const std::exception& e) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage. Exception in GetQueueMessage:", e.what(), __FILE__, __LINE__);
return false;
}
}
}

View File

@@ -0,0 +1,124 @@
#ifndef ANSRabbitMQ_H
#define ANSRabbitMQ_H
#define ANSRabbitMQ_API __declspec(dllexport)
#pragma once
#include <amqp.h>
#include <tcp_socket.h>
#include <ssl_socket.h>
#include <cstdint>
#include <string>
#include <mutex>
#include <vector>
#include "ANSLicense.h"
#include "LabVIEWHeader/extcode.h"
#define SUMMARY_EVERY_US 1000000
void die(const char* fmt, ...);
void die_on_error(int x, char const* context);
void die_on_amqp_error(amqp_rpc_reply_t x, char const* context);
void amqp_dump(void const* buffer, size_t len);
uint64_t now_microseconds(void);
void microsleep(int usec);
namespace ANSCENTER {
struct BindingInfo {
std::string _exchange_name;
std::string _exchange_type;
std::string _queue_name;
std::string _binding_key;
int _passive = 0;
int _durable = 1;
int _auto_delete = 0;
int _internal = 0;
int _channel = 1;
};
class ANSRabbitMQ_API ANSRABBITMQ {
private:
std::string _hostname;
int _port = 5672;
std::string _vhost = "/";
int _channel = 1;
int _status = -1;
int _rate_limit = 0;
int _message_count = 0;
int _channelMax = 0; // no limit, let server decides
int _frameMax = 0; // no limit
int _heartBeat = 60; // 60 seconds
SPDLogger& _logger = SPDLogger::GetInstance("ANSRabbitMQ", true);
std::vector<BindingInfo> _bindings;
// SSL cert paths (optional)
std::string _caCertPath;
std::string _clientCertPath;
std::string _clientKeyPath;
bool _verifyPeer = true;
bool _verifyHostname = true;
amqp_socket_t* _socket = nullptr;
amqp_connection_state_t _conn = nullptr;
std::recursive_mutex _mutex;
[[nodiscard]] bool SetupExchange(const std::string& _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal);
[[nodiscard]] bool SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal);
[[nodiscard]] bool BindQueue(const std::string& _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key);
[[nodiscard]] bool ConfigureSSLOptions();
[[nodiscard]] BindingInfo GetBindingInfo(const std::string& exchange_name);
[[nodiscard]] BindingInfo GetBindingInfoFromQueueName(const std::string& queue_name);
public:
ANSRABBITMQ();
~ANSRABBITMQ();
[[nodiscard]] bool ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname);
[[nodiscard]] bool Connect(const std::string& hostname, int port, const std::string& vhost,int channel, const std::string& userName, const std::string& password,bool useSSL);
[[nodiscard]] bool Disconnect();
[[nodiscard]] bool CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key, int passiveVal, int durableVal, int autoDelete, int internalVal);
[[nodiscard]] bool Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message);
[[nodiscard]] bool GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode);
[[nodiscard]] bool UnbindQueue(const std::string& exchange_name);
[[nodiscard]] bool DeleteExchange(const std::string& exchange_name);
[[nodiscard]] bool DeleteQueue(const std::string& queue_name);
[[nodiscard]] bool PurgeQueue(const std::string& queue_name);
// Others
[[nodiscard]] int GetStatus() const { return _status; }
void SetRateLimit(int rate_limit) { _rate_limit = rate_limit; }
void SetMessageCount(int message_count) { _message_count = message_count; }
[[nodiscard]] int GetMessageCount() const { return _message_count; }
[[nodiscard]] std::string GetHostname() const { return _hostname; }
[[nodiscard]] int GetPort() const { return _port; }
};
}
// C API exports
extern "C" {
ANSRabbitMQ_API int CreateANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle , const char* licenseKey);
ANSRabbitMQ_API int ReleaseANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle);
ANSRabbitMQ_API int ConfigureSSL(ANSCENTER::ANSRABBITMQ** Handle,const char* caCertPath, const char* clientCertPath, const char * clientKeyPath, int verifyPeer, int verifyHostname);
ANSRabbitMQ_API int CreateChannel(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal);
ANSRabbitMQ_API int Connect(ANSCENTER::ANSRABBITMQ** Handle, const char* hostname, int port, const char* vhost,int channel, const char* userName, const char* password, int useSSL);
ANSRabbitMQ_API int Disconnect(ANSCENTER::ANSRABBITMQ** Handle);
ANSRabbitMQ_API int Publish(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* routing_key, const char* message);
ANSRabbitMQ_API int GetQueueMessage_CPP(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name,int ack_mode, std::string &message);
ANSRabbitMQ_API int GetQueueMessage(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, LStrHandle strMessage);
ANSRabbitMQ_API int UnbindQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name);
ANSRabbitMQ_API int DeleteExchange(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name);
ANSRabbitMQ_API int DeleteQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name);
ANSRabbitMQ_API int PurgeQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name);
// V2 entry points — accept uint64_t handleVal by value (LabVIEW concurrency fix)
ANSRabbitMQ_API int ConfigureSSL_V2(uint64_t handleVal, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname);
ANSRabbitMQ_API int CreateChannel_V2(uint64_t handleVal, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal);
ANSRabbitMQ_API int Connect_V2(uint64_t handleVal, const char* hostname, int port, const char* vhost, int channel, const char* userName, const char* password, int useSSL);
ANSRabbitMQ_API int Disconnect_V2(uint64_t handleVal);
ANSRabbitMQ_API int Publish_V2(uint64_t handleVal, const char* exchange_name, const char* routing_key, const char* message);
ANSRabbitMQ_API int GetQueueMessage_V2(uint64_t handleVal, const char* queue_name, int ack_mode, LStrHandle strMessage);
ANSRabbitMQ_API int UnbindQueue_V2(uint64_t handleVal, const char* exchange_name);
ANSRabbitMQ_API int DeleteExchange_V2(uint64_t handleVal, const char* exchange_name);
ANSRabbitMQ_API int DeleteQueue_V2(uint64_t handleVal, const char* queue_name);
ANSRabbitMQ_API int PurgeQueue_V2(uint64_t handleVal, const char* queue_name);
}
#endif

View File

@@ -0,0 +1,30 @@
# ANSRabbitMQ — RabbitMQ messaging DLL
add_library(ANSRabbitMQ SHARED
ANSRabbitMQ.cpp
ANSRabbitMQ.h
dllmain.cpp
pch.cpp
pch.h
framework.h
)
target_include_directories(ANSRabbitMQ PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${SHARED_INCLUDE_DIR}
)
target_include_directories(ANSRabbitMQ PRIVATE
${ANSLIBS_DIR}/RabbitMQ/include
)
target_link_directories(ANSRabbitMQ PRIVATE ${ANSLIBS_DIR}/RabbitMQ/libs)
target_link_libraries(ANSRabbitMQ
PRIVATE ANSLicensingSystem
PRIVATE anslicensing
PRIVATE labview
PRIVATE rabbitmq.4.lib
)
target_compile_definitions(ANSRabbitMQ PRIVATE UNICODE _UNICODE ANSRABBITMQ_EXPORTS _USRDLL)
target_precompile_headers(ANSRabbitMQ PRIVATE pch.h)

View File

@@ -0,0 +1,558 @@
// dllmain.cpp : Defines the entry point for the DLL application.
#include "pch.h"
#include "ANSRabbitMQ.h"
#include <cstdint>
#include <unordered_map>
#include <condition_variable>
#include <mutex>
// Handle registry with refcount — prevents use-after-free when
// ReleaseANSRabbitMQHandle is called while an operation is still running.
static std::unordered_map<ANSCENTER::ANSRABBITMQ*, int>& RMQHandleRegistry() {
static std::unordered_map<ANSCENTER::ANSRABBITMQ*, int> s;
return s;
}
static std::mutex& RMQHandleRegistryMutex() {
static std::mutex m;
return m;
}
static std::condition_variable& RMQHandleRegistryCV() {
static std::condition_variable cv;
return cv;
}
static void RegisterRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
RMQHandleRegistry()[h] = 1;
}
static ANSCENTER::ANSRABBITMQ* AcquireRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
auto it = RMQHandleRegistry().find(h);
if (it == RMQHandleRegistry().end()) return nullptr;
it->second++;
return h;
}
static bool ReleaseRMQHandleRef(ANSCENTER::ANSRABBITMQ* h) {
std::lock_guard<std::mutex> lk(RMQHandleRegistryMutex());
auto it = RMQHandleRegistry().find(h);
if (it == RMQHandleRegistry().end()) return false;
it->second--;
if (it->second <= 0) {
RMQHandleRegistry().erase(it);
RMQHandleRegistryCV().notify_all();
return true;
}
return false;
}
static bool UnregisterRMQHandle(ANSCENTER::ANSRABBITMQ* h) {
std::unique_lock<std::mutex> lk(RMQHandleRegistryMutex());
auto it = RMQHandleRegistry().find(h);
if (it == RMQHandleRegistry().end()) return false;
it->second--;
bool ok = RMQHandleRegistryCV().wait_for(lk, std::chrono::seconds(30), [&]() {
auto it2 = RMQHandleRegistry().find(h);
return it2 == RMQHandleRegistry().end() || it2->second <= 0;
});
if (!ok) {
OutputDebugStringA("WARNING: UnregisterRMQHandle timed out waiting for in-flight operations\n");
}
RMQHandleRegistry().erase(h);
return true;
}
// RAII guard — ensures ReleaseRMQHandleRef is always called
class RMQHandleGuard {
ANSCENTER::ANSRABBITMQ* engine;
public:
explicit RMQHandleGuard(ANSCENTER::ANSRABBITMQ* e) : engine(e) {}
~RMQHandleGuard() { if (engine) ReleaseRMQHandleRef(engine); }
ANSCENTER::ANSRABBITMQ* get() const { return engine; }
explicit operator bool() const { return engine != nullptr; }
RMQHandleGuard(const RMQHandleGuard&) = delete;
RMQHandleGuard& operator=(const RMQHandleGuard&) = delete;
};
BOOL APIENTRY DllMain( HMODULE hModule,
DWORD ul_reason_for_call,
LPVOID lpReserved
)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
case DLL_THREAD_ATTACH:
case DLL_THREAD_DETACH:
case DLL_PROCESS_DETACH:
break;
}
return TRUE;
}
extern "C" ANSRabbitMQ_API int CreateANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle, const char* licenseKey) {
if (Handle == nullptr || licenseKey == nullptr) return 0;
try {
// Release existing handle if called twice (prevents leak from LabVIEW)
if (*Handle) {
if (UnregisterRMQHandle(*Handle)) {
delete *Handle;
}
*Handle = nullptr;
}
*Handle = new ANSCENTER::ANSRABBITMQ();
if (*Handle == nullptr) {
return 0; // Memory allocation failed
}
RegisterRMQHandle(*Handle);
return 1;
}
catch (std::exception& e) {
if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; }
return 0;
}
catch (...) {
if (*Handle != nullptr) { delete *Handle; *Handle = nullptr; }
return 0;
}
}
static int ReleaseANSRabbitMQHandle_Impl(ANSCENTER::ANSRABBITMQ** Handle) {
try {
if (!Handle || !*Handle) return 1;
if (!UnregisterRMQHandle(*Handle)) {
*Handle = nullptr;
return 1;
}
delete *Handle;
*Handle = nullptr;
return 1;
}
catch (...) {
if (Handle) *Handle = nullptr;
return 0;
}
}
extern "C" ANSRabbitMQ_API int ReleaseANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle) {
__try {
return ReleaseANSRabbitMQHandle_Impl(Handle);
}
__except (EXCEPTION_EXECUTE_HANDLER) {
if (Handle) *Handle = nullptr;
return 0;
}
}
extern "C" ANSRabbitMQ_API int Connect(ANSCENTER::ANSRABBITMQ** Handle, const char* hostname, int port, const char* vhost,int channel, const char* userName, const char* password, int useSSL) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (hostname == nullptr || vhost == nullptr || userName == nullptr || password == nullptr) {
return -1; // Invalid parameters
}
bool SSL = false;
if (useSSL == 1) {
SSL = true;
}
if (guard.get()->Connect(hostname, port, vhost,channel, userName, password, SSL)) {
return 1; // Success
}
else {
return 0; // Connection failed
}
}
extern "C" ANSRabbitMQ_API int Disconnect(ANSCENTER::ANSRABBITMQ** Handle) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (guard.get()->Disconnect()) {
return 1; // Success
}
else {
return 0; // Disconnection failed
}
}
extern "C" ANSRabbitMQ_API int Publish(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* routing_key, const char* message) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (exchange_name == nullptr || routing_key == nullptr || message == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->Publish(exchange_name, routing_key,message)) {
return 1; // Success
}
else {
return 0; // Publish failed
}
}
extern "C" ANSRabbitMQ_API int CreateChannel(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* exchange_type,
const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal)
{
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (exchange_name == nullptr || exchange_type == nullptr || queue_name == nullptr || routing_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->CreateChannel(exchange_name, exchange_type, queue_name, routing_name, passvive, durable, auto_delete, internalVal)) {
return 1; // Success
}
else {
return 0; // Setup failed
}
}
extern "C" ANSRabbitMQ_API int ConfigureSSL(ANSCENTER::ANSRABBITMQ** Handle, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (caCertPath == nullptr || clientCertPath == nullptr || clientKeyPath == nullptr) {
return -1; // Invalid parameters
}
bool verifyPeerBool = verifyPeer != 0;
bool verifyHostnameBool = verifyHostname != 0;
if (guard.get()->ConfigureSSL(caCertPath, clientCertPath, clientKeyPath, verifyPeerBool, verifyHostnameBool)) {
return 1; // Success
}
else {
return 0; // SSL configuration failed
}
}
extern "C" ANSRabbitMQ_API int UnbindQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (exchange_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->UnbindQueue(exchange_name)) {
return 1; // Success
}
else {
return 0; // Unbind failed
}
}
extern "C" ANSRabbitMQ_API int DeleteQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (queue_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->DeleteQueue(queue_name)) {
return 1; // Success
}
else {
return 0; // Delete failed
}
}
extern "C" ANSRabbitMQ_API int DeleteExchange(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (exchange_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->DeleteExchange(exchange_name)) {
return 1; // Success
}
else {
return 0; // Delete failed
}
}
extern "C" ANSRabbitMQ_API int PurgeQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (queue_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->PurgeQueue(queue_name)) {
return 1; // Success
}
else {
return 0; // Purge failed
}
}
extern "C" ANSRabbitMQ_API int GetQueueMessage_CPP(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, std::string& message) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (queue_name == nullptr) {
return -1; // Invalid parameters
}
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
return 1; // Success
}
else {
return 0; // Get message failed
}
}
extern "C" ANSRabbitMQ_API int GetQueueMessage(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, LStrHandle strMessage) {
if (Handle == nullptr || *Handle == nullptr) {
return -1; // Invalid handle
}
RMQHandleGuard guard(AcquireRMQHandle(*Handle));
if (!guard) return -1;
if (queue_name == nullptr || strMessage == nullptr) {
return -1; // Invalid parameters
}
std::string message;
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
if (message.empty()) return 0;
int size = static_cast<int>(message.length());
MgErr error;
error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar));
if (error == noErr)
{
(*strMessage)->cnt = size;
memcpy((*strMessage)->str, message.c_str(), size);
return 1;
}
else return 0;
}
else {
return 0; // Get message failed
}
}
// ============================================================================
// V2 entry points — accept uint64_t handleVal by value instead of Handle**
// Eliminates LabVIEW buffer reuse bug when concurrent calls share the same
// Handle** buffer address.
// ============================================================================
extern "C" ANSRabbitMQ_API int ConfigureSSL_V2(uint64_t handleVal, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (caCertPath == nullptr || clientCertPath == nullptr || clientKeyPath == nullptr) {
return -1;
}
bool verifyPeerBool = verifyPeer != 0;
bool verifyHostnameBool = verifyHostname != 0;
if (guard.get()->ConfigureSSL(caCertPath, clientCertPath, clientKeyPath, verifyPeerBool, verifyHostnameBool)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int CreateChannel_V2(uint64_t handleVal, const char* exchange_name, const char* exchange_type,
const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal)
{
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (exchange_name == nullptr || exchange_type == nullptr || queue_name == nullptr || routing_name == nullptr) {
return -1;
}
if (guard.get()->CreateChannel(exchange_name, exchange_type, queue_name, routing_name, passvive, durable, auto_delete, internalVal)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int Connect_V2(uint64_t handleVal, const char* hostname, int port, const char* vhost, int channel, const char* userName, const char* password, int useSSL) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (hostname == nullptr || vhost == nullptr || userName == nullptr || password == nullptr) {
return -1;
}
bool SSL = false;
if (useSSL == 1) {
SSL = true;
}
if (guard.get()->Connect(hostname, port, vhost, channel, userName, password, SSL)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int Disconnect_V2(uint64_t handleVal) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (guard.get()->Disconnect()) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int Publish_V2(uint64_t handleVal, const char* exchange_name, const char* routing_key, const char* message) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (exchange_name == nullptr || routing_key == nullptr || message == nullptr) {
return -1;
}
if (guard.get()->Publish(exchange_name, routing_key, message)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int GetQueueMessage_V2(uint64_t handleVal, const char* queue_name, int ack_mode, LStrHandle strMessage) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (queue_name == nullptr || strMessage == nullptr) {
return -1;
}
std::string message;
if (guard.get()->GetQueueMessage(queue_name, message, ack_mode)) {
if (message.empty()) return 0;
int size = static_cast<int>(message.length());
MgErr error;
error = DSSetHandleSize(strMessage, sizeof(int32) + size * sizeof(uChar));
if (error == noErr)
{
(*strMessage)->cnt = size;
memcpy((*strMessage)->str, message.c_str(), size);
return 1;
}
else return 0;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int UnbindQueue_V2(uint64_t handleVal, const char* exchange_name) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (exchange_name == nullptr) {
return -1;
}
if (guard.get()->UnbindQueue(exchange_name)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int DeleteExchange_V2(uint64_t handleVal, const char* exchange_name) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (exchange_name == nullptr) {
return -1;
}
if (guard.get()->DeleteExchange(exchange_name)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int DeleteQueue_V2(uint64_t handleVal, const char* queue_name) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (queue_name == nullptr) {
return -1;
}
if (guard.get()->DeleteQueue(queue_name)) {
return 1;
}
else {
return 0;
}
}
extern "C" ANSRabbitMQ_API int PurgeQueue_V2(uint64_t handleVal, const char* queue_name) {
auto* _v2h = reinterpret_cast<ANSCENTER::ANSRABBITMQ*>(handleVal);
if (!_v2h) return 0;
RMQHandleGuard guard(AcquireRMQHandle(_v2h));
if (!guard) return -1;
if (queue_name == nullptr) {
return -1;
}
if (guard.get()->PurgeQueue(queue_name)) {
return 1;
}
else {
return 0;
}
}

View File

@@ -0,0 +1,7 @@
#pragma once
#define WIN32_LEAN_AND_MEAN // Exclude rarely-used stuff from Windows headers
#define NOMINMAX // Prevent windows.h from defining min/max macros
// which break std::min / std::max (C2589)
// Windows Header Files
#include <windows.h>

View File

@@ -0,0 +1,5 @@
// pch.cpp: source file corresponding to the pre-compiled header
#include "pch.h"
// When you are using pre-compiled headers, this source file is necessary for compilation to succeed.

View File

@@ -0,0 +1,13 @@
// pch.h: This is a precompiled header file.
// Files listed below are compiled only once, improving build performance for future builds.
// This also affects IntelliSense performance, including code completion and many code browsing features.
// However, files listed here are ALL re-compiled if any one of them is updated between builds.
// Do not add files here that you will be updating frequently as this negates the performance advantage.
#ifndef PCH_H
#define PCH_H
// add headers that you want to pre-compile here
#include "framework.h"
#endif //PCH_H