807 lines
32 KiB
C++
807 lines
32 KiB
C++
|
|
#include "ANSRabbitMQ.h"
|
|||
|
|
#include <ctype.h>
|
|||
|
|
#include <stdarg.h>
|
|||
|
|
#include <stdio.h>
|
|||
|
|
#include <stdlib.h>
|
|||
|
|
#include <string.h>
|
|||
|
|
#include <stdint.h>
|
|||
|
|
#include <cstdint>
|
|||
|
|
#include <windows.h>
|
|||
|
|
#include <iostream>
|
|||
|
|
void die(const char* fmt, ...) {
|
|||
|
|
va_list ap;
|
|||
|
|
va_start(ap, fmt);
|
|||
|
|
vfprintf(stderr, fmt, ap);
|
|||
|
|
va_end(ap);
|
|||
|
|
fprintf(stderr, "\n");
|
|||
|
|
}
|
|||
|
|
void die_on_error(int x, char const* context) {
|
|||
|
|
if (x < 0) {
|
|||
|
|
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
void die_on_amqp_error(amqp_rpc_reply_t x, char const* context) {
|
|||
|
|
switch (x.reply_type) {
|
|||
|
|
case AMQP_RESPONSE_NORMAL:
|
|||
|
|
return;
|
|||
|
|
case AMQP_RESPONSE_NONE:
|
|||
|
|
fprintf(stderr, "%s: missing RPC reply type!\n", context);
|
|||
|
|
break;
|
|||
|
|
|
|||
|
|
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
|
|||
|
|
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
|
|||
|
|
break;
|
|||
|
|
|
|||
|
|
case AMQP_RESPONSE_SERVER_EXCEPTION:
|
|||
|
|
switch (x.reply.id) {
|
|||
|
|
case AMQP_CONNECTION_CLOSE_METHOD: {
|
|||
|
|
auto* m =
|
|||
|
|
static_cast<amqp_connection_close_t*>(x.reply.decoded);
|
|||
|
|
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
|
|||
|
|
context, m->reply_code, static_cast<int>(m->reply_text.len),
|
|||
|
|
static_cast<const char*>(m->reply_text.bytes));
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
case AMQP_CHANNEL_CLOSE_METHOD: {
|
|||
|
|
auto* m = static_cast<amqp_channel_close_t*>(x.reply.decoded);
|
|||
|
|
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
|
|||
|
|
context, m->reply_code, static_cast<int>(m->reply_text.len),
|
|||
|
|
static_cast<const char*>(m->reply_text.bytes));
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
default:
|
|||
|
|
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
|
|||
|
|
context, x.reply.id);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
static void dump_row(long count, int numinrow, int* chs) {
|
|||
|
|
int i;
|
|||
|
|
|
|||
|
|
printf("%08lX:", count - numinrow);
|
|||
|
|
|
|||
|
|
if (numinrow > 0) {
|
|||
|
|
for (i = 0; i < numinrow; i++) {
|
|||
|
|
if (i == 8) {
|
|||
|
|
printf(" :");
|
|||
|
|
}
|
|||
|
|
printf(" %02X", chs[i]);
|
|||
|
|
}
|
|||
|
|
for (i = numinrow; i < 16; i++) {
|
|||
|
|
if (i == 8) {
|
|||
|
|
printf(" :");
|
|||
|
|
}
|
|||
|
|
printf(" ");
|
|||
|
|
}
|
|||
|
|
printf(" ");
|
|||
|
|
for (i = 0; i < numinrow; i++) {
|
|||
|
|
if (isprint(chs[i])) {
|
|||
|
|
printf("%c", chs[i]);
|
|||
|
|
}
|
|||
|
|
else {
|
|||
|
|
printf(".");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
printf("\n");
|
|||
|
|
}
|
|||
|
|
static int rows_eq(int* a, int* b) {
|
|||
|
|
int i;
|
|||
|
|
|
|||
|
|
for (i = 0; i < 16; i++)
|
|||
|
|
if (a[i] != b[i]) {
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return 1;
|
|||
|
|
}
|
|||
|
|
void amqp_dump(void const* buffer, size_t len) {
|
|||
|
|
const auto* buf = static_cast<const unsigned char*>(buffer);
|
|||
|
|
long count = 0;
|
|||
|
|
int numinrow = 0;
|
|||
|
|
int chs[16];
|
|||
|
|
int oldchs[16] = { 0 };
|
|||
|
|
int showed_dots = 0;
|
|||
|
|
size_t i;
|
|||
|
|
|
|||
|
|
for (i = 0; i < len; i++) {
|
|||
|
|
int ch = buf[i];
|
|||
|
|
|
|||
|
|
if (numinrow == 16) {
|
|||
|
|
int j;
|
|||
|
|
|
|||
|
|
if (rows_eq(oldchs, chs)) {
|
|||
|
|
if (!showed_dots) {
|
|||
|
|
showed_dots = 1;
|
|||
|
|
printf(
|
|||
|
|
" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
else {
|
|||
|
|
showed_dots = 0;
|
|||
|
|
dump_row(count, numinrow, chs);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (j = 0; j < 16; j++) {
|
|||
|
|
oldchs[j] = chs[j];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
numinrow = 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
count++;
|
|||
|
|
chs[numinrow++] = ch;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
dump_row(count, numinrow, chs);
|
|||
|
|
|
|||
|
|
if (numinrow != 0) {
|
|||
|
|
printf("%08lX:\n", count);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
uint64_t now_microseconds(void) {
|
|||
|
|
FILETIME ft;
|
|||
|
|
GetSystemTimeAsFileTime(&ft);
|
|||
|
|
return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) /
|
|||
|
|
10;
|
|||
|
|
}
|
|||
|
|
void microsleep(int usec) { Sleep(usec / 1000); }
|
|||
|
|
|
|||
|
|
namespace ANSCENTER {
|
|||
|
|
|
|||
|
|
ANSRABBITMQ::ANSRABBITMQ() {
|
|||
|
|
_port = 5672;
|
|||
|
|
_channel = 1;
|
|||
|
|
_channelMax = 0; // set to 0 for default, let server decides
|
|||
|
|
_frameMax = 131072; // no limit
|
|||
|
|
_heartBeat = 60; // 60 seconds
|
|||
|
|
std::string _vhost = "/";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ANSRABBITMQ::~ANSRABBITMQ() {
|
|||
|
|
Disconnect(); // Safely handles cleanup
|
|||
|
|
}
|
|||
|
|
BindingInfo ANSRABBITMQ::GetBindingInfo(const std::string& exchange_name) {
|
|||
|
|
// Search for the binding info based on the exchange name
|
|||
|
|
for (const auto& binding : _bindings) {
|
|||
|
|
if (binding._exchange_name == exchange_name) {
|
|||
|
|
return binding;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return BindingInfo{}; // Return default empty BindingInfo if not found
|
|||
|
|
}
|
|||
|
|
BindingInfo ANSRABBITMQ::GetBindingInfoFromQueueName(const std::string& queue_name) {
|
|||
|
|
// Search for the binding info based on the queue name
|
|||
|
|
for (const auto& binding : _bindings) {
|
|||
|
|
if (binding._queue_name == queue_name) {
|
|||
|
|
return binding;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return BindingInfo{}; // Return default empty BindingInfo if not found
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
bool ANSRABBITMQ::ConfigureSSLOptions() {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_caCertPath.empty() && _clientCertPath.empty() && _clientKeyPath.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "No SSL options configured.", __FILE__, __LINE__);
|
|||
|
|
return true; // No SSL options to configure
|
|||
|
|
}
|
|||
|
|
if (!_socket) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Error: SSL socket is not initialized.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// These functions accept `amqp_socket_t*` even though they apply only to SSL sockets
|
|||
|
|
if (!_caCertPath.empty()) {
|
|||
|
|
if (amqp_ssl_socket_set_cacert(_socket, _caCertPath.c_str()) != AMQP_STATUS_OK) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set CA certificate.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (!_clientCertPath.empty() && !_clientKeyPath.empty()) {
|
|||
|
|
if (amqp_ssl_socket_set_key(_socket, _clientCertPath.c_str(), _clientKeyPath.c_str()) != AMQP_STATUS_OK) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set client certificate/key.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_ssl_socket_set_verify_peer(_socket, _verifyPeer ? 1 : 0);
|
|||
|
|
amqp_ssl_socket_set_verify_hostname(_socket, _verifyHostname ? 1 : 0);
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSLOptions. Error configuring SSL options:",e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::SetupExchange(const std::string & _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr || _socket == nullptr || _exchange_name.empty() || _exchange_type.empty() || _channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::SetupExchange", "Invalid parameters in SetupExchange.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_exchange_declare(
|
|||
|
|
_conn, _channel,
|
|||
|
|
amqp_cstring_bytes(_exchange_name.c_str()),
|
|||
|
|
amqp_cstring_bytes(_exchange_type.c_str()),
|
|||
|
|
_passive,
|
|||
|
|
_durable,
|
|||
|
|
_auto_delete,
|
|||
|
|
_internal,
|
|||
|
|
amqp_empty_table
|
|||
|
|
);
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring exchange");
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::SetupExchange. Error in SetupExchange:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::SetupQueue", "Invalid parameters in SetupQueue.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_queue_declare(
|
|||
|
|
_conn, _channel,
|
|||
|
|
amqp_cstring_bytes(_queue_name.c_str()),
|
|||
|
|
_passive,
|
|||
|
|
_durable,
|
|||
|
|
false, // not exclusive
|
|||
|
|
_auto_delete,
|
|||
|
|
amqp_empty_table
|
|||
|
|
);
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring queue");
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::SetupQueue. Error in SetupQueue:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::BindQueue(const std::string & _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _exchange_name.empty() || _channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::BindQueue", "Invalid parameters in BindQueue.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_table_t arguments = amqp_empty_table;
|
|||
|
|
|
|||
|
|
if (_exchange_type == "headers") {
|
|||
|
|
// Match messages with header: type = important
|
|||
|
|
static amqp_table_entry_t items[2];
|
|||
|
|
|
|||
|
|
items[0].key = amqp_cstring_bytes("x-match");
|
|||
|
|
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
|
|||
|
|
items[0].value.value.bytes = amqp_cstring_bytes("all");
|
|||
|
|
|
|||
|
|
items[1].key = amqp_cstring_bytes("type");
|
|||
|
|
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
|
|||
|
|
items[1].value.value.bytes = amqp_cstring_bytes("important");
|
|||
|
|
|
|||
|
|
arguments.num_entries = 2;
|
|||
|
|
arguments.entries = items;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_queue_bind(
|
|||
|
|
_conn, _channel,
|
|||
|
|
amqp_cstring_bytes(_queue_name.c_str()),
|
|||
|
|
amqp_cstring_bytes(_exchange_name.c_str()),
|
|||
|
|
amqp_cstring_bytes(_binding_key.c_str()), // required, even for fanout/headers
|
|||
|
|
arguments
|
|||
|
|
);
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Binding queue");
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::BindQueue. Error in BindQueue:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::UnbindQueue(const std::string& _exchange_name) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr || _socket == nullptr ||
|
|||
|
|
_exchange_name.empty() || _channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Invalid parameters in UnbindQueue.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
amqp_table_t arguments = amqp_empty_table;
|
|||
|
|
|
|||
|
|
BindingInfo bindingInfo = GetBindingInfo(_exchange_name); // Check if the exchange is set up
|
|||
|
|
if (bindingInfo._exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Exchange is not bound or set up.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (bindingInfo._exchange_type == "headers") {
|
|||
|
|
// Same header structure used in BindQueue()
|
|||
|
|
static amqp_table_entry_t items[2];
|
|||
|
|
|
|||
|
|
items[0].key = amqp_cstring_bytes("x-match");
|
|||
|
|
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
|
|||
|
|
items[0].value.value.bytes = amqp_cstring_bytes("all");
|
|||
|
|
|
|||
|
|
items[1].key = amqp_cstring_bytes("type");
|
|||
|
|
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
|
|||
|
|
items[1].value.value.bytes = amqp_cstring_bytes("important");
|
|||
|
|
|
|||
|
|
arguments.num_entries = 2;
|
|||
|
|
arguments.entries = items;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_queue_unbind(
|
|||
|
|
_conn, bindingInfo._channel,
|
|||
|
|
amqp_cstring_bytes(bindingInfo._queue_name.c_str()),
|
|||
|
|
amqp_cstring_bytes(bindingInfo._exchange_name.c_str()),
|
|||
|
|
amqp_cstring_bytes(bindingInfo._binding_key.c_str()), // still required by API
|
|||
|
|
arguments
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Unbinding queue");
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::UnbindQueue. Error in UnbindQueue:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::DeleteQueue(const std::string& queue_name) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_conn == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Connection is not established.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_socket == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Socket is not initialized.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue name is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Channel is not set.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Only delete if queue is unused and empty <20> set to false for force delete
|
|||
|
|
int if_unused = 0;
|
|||
|
|
int if_empty = 0;
|
|||
|
|
BindingInfo binding = GetBindingInfoFromQueueName(queue_name); // Check if the queue is set up
|
|||
|
|
if (binding._queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
amqp_queue_delete(_conn, binding._channel,
|
|||
|
|
amqp_cstring_bytes(queue_name.c_str()),
|
|||
|
|
if_unused,
|
|||
|
|
if_empty);
|
|||
|
|
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting queue");
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::DeleteQueue. Error in DeleteQueue:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::DeleteExchange(const std::string& exchange_name) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_conn == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Connection is not established.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_socket == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Socket is not initialized.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange name is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Channel is not set.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
// Only delete if exchange is unused (set to 0 to delete regardless)
|
|||
|
|
int if_unused = 0;
|
|||
|
|
|
|||
|
|
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
|
|||
|
|
if (bindingInfo._exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange is not bound or set up.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
amqp_exchange_delete(_conn, bindingInfo._channel,
|
|||
|
|
amqp_cstring_bytes(exchange_name.c_str()),
|
|||
|
|
if_unused);
|
|||
|
|
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting exchange");
|
|||
|
|
|
|||
|
|
|
|||
|
|
// remove the binding info from the list
|
|||
|
|
_bindings.erase(
|
|||
|
|
std::remove_if(
|
|||
|
|
_bindings.begin(),
|
|||
|
|
_bindings.end(),
|
|||
|
|
[&](const BindingInfo& b) {
|
|||
|
|
return b._exchange_name == bindingInfo._exchange_name &&
|
|||
|
|
b._exchange_type == bindingInfo._exchange_type &&
|
|||
|
|
b._queue_name == bindingInfo._queue_name &&
|
|||
|
|
b._binding_key == bindingInfo._binding_key;
|
|||
|
|
}
|
|||
|
|
),
|
|||
|
|
_bindings.end()
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::DeleteExchange. Error in DeleteExchange:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::PurgeQueue(const std::string & _queue_name) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Connection is not established.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_socket == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Socket is not initialized.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue name is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (_channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Channel is not set.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
BindingInfo binding = GetBindingInfoFromQueueName(_queue_name); // Check if the queue is set up
|
|||
|
|
if (binding._queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Purge the queue: remove all ready messages
|
|||
|
|
amqp_queue_purge(_conn, binding._channel, amqp_cstring_bytes(_queue_name.c_str()));
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Purging queue");
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::PurgeQueue. Error in PurgeQueue:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
_caCertPath = caCertPath;
|
|||
|
|
_clientCertPath = clientCertPath;
|
|||
|
|
_clientKeyPath = clientKeyPath;
|
|||
|
|
_verifyPeer = verifyPeer;
|
|||
|
|
_verifyHostname = verifyHostname;
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSL. Error configuring SSL:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key,
|
|||
|
|
int passiveVal, int durableVal, int autoDelete, int internalVal) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try{
|
|||
|
|
if (exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange name is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (exchange_type.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange type is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Queue name is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
amqp_channel_open(_conn, _channel);
|
|||
|
|
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Opening channel");
|
|||
|
|
bool exchangeStatus = SetupExchange(exchange_name, exchange_type, passiveVal, durableVal, autoDelete, internalVal);
|
|||
|
|
bool queueStatus = SetupQueue(queue_name, passiveVal, durableVal, autoDelete, internalVal);
|
|||
|
|
bool bindStatus = BindQueue(exchange_name, exchange_type, queue_name, binding_key);
|
|||
|
|
if (!exchangeStatus || !queueStatus || !bindStatus) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Failed to setup exchange, queue or bind.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
BindingInfo bindingInfo;
|
|||
|
|
bindingInfo._exchange_name = exchange_name;
|
|||
|
|
bindingInfo._exchange_type = exchange_type;
|
|||
|
|
bindingInfo._queue_name = queue_name;
|
|||
|
|
bindingInfo._binding_key = binding_key;
|
|||
|
|
bindingInfo._passive = passiveVal;
|
|||
|
|
bindingInfo._durable = durableVal;
|
|||
|
|
bindingInfo._auto_delete = autoDelete;
|
|||
|
|
bindingInfo._internal = internalVal;
|
|||
|
|
bindingInfo._channel = _channel; // Store the channel used for this binding
|
|||
|
|
_bindings.push_back(bindingInfo);
|
|||
|
|
_channel++; // Increment channel for next use
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::CreateChannel. Error in Setup::", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::Connect(const std::string& hostname, int port, const std::string& vhost,
|
|||
|
|
int channel, const std::string& userName, const std::string& password,
|
|||
|
|
bool useSSL)
|
|||
|
|
{
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
_conn = amqp_new_connection();
|
|||
|
|
if (useSSL) {
|
|||
|
|
_socket = amqp_ssl_socket_new(_conn);
|
|||
|
|
if (!_socket) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create SSL socket", __FILE__, __LINE__);
|
|||
|
|
die("Creating SSL socket");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (!ConfigureSSLOptions()) {
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
else {
|
|||
|
|
_socket = amqp_tcp_socket_new(_conn);
|
|||
|
|
if (!_socket) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create socket", __FILE__, __LINE__);
|
|||
|
|
die("Creating TCP socket");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (!_socket) return false;
|
|||
|
|
try {
|
|||
|
|
int opened = amqp_socket_open(_socket, hostname.c_str(), port);
|
|||
|
|
std::cout << "Opened socket: " << opened << std::endl;
|
|||
|
|
if (opened) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot open socket", __FILE__, __LINE__);
|
|||
|
|
die("Opening socket");
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Use defaults if values are unset
|
|||
|
|
if (_channelMax <= 0) _channelMax = 0; // 0 = unlimited (let server decide)
|
|||
|
|
if (_frameMax <= 0) _frameMax = 131072; // default 128 KB
|
|||
|
|
if (_heartBeat <= 0) _heartBeat = 60; // 60 seconds
|
|||
|
|
|
|||
|
|
die_on_amqp_error(
|
|||
|
|
amqp_login(_conn, vhost.c_str(), _channelMax, _frameMax, _heartBeat,
|
|||
|
|
AMQP_SASL_METHOD_PLAIN, userName.c_str(), password.c_str()),
|
|||
|
|
"Logging in");
|
|||
|
|
_channel = channel;
|
|||
|
|
_hostname = hostname;
|
|||
|
|
_port = port;
|
|||
|
|
_status = 0;
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
bool ANSRABBITMQ::Disconnect() {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
if (_conn) {
|
|||
|
|
// loop through all bindings and unbind them
|
|||
|
|
for (const auto& binding : _bindings) {
|
|||
|
|
if (binding._channel > 0) {
|
|||
|
|
amqp_rpc_reply_t reply = amqp_channel_close(_conn, binding._channel, AMQP_REPLY_SUCCESS);
|
|||
|
|
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close channel cleanly.", __FILE__, __LINE__);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
amqp_rpc_reply_t reply = amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
|
|||
|
|
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close connection cleanly.", __FILE__, __LINE__);
|
|||
|
|
}
|
|||
|
|
int destroyStatus = amqp_destroy_connection(_conn);
|
|||
|
|
if (destroyStatus != AMQP_STATUS_OK) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Disconnect. Failed to destroy connection:", amqp_error_string2(destroyStatus), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
_conn = nullptr;
|
|||
|
|
_socket = nullptr;
|
|||
|
|
_channel = 1;
|
|||
|
|
_status = -1;
|
|||
|
|
}
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::DisConnect. Error in Disconnect::", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool ANSRABBITMQ::Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Connection is not established.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (_socket == nullptr) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Socket is not initialized.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange is not set.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
|
|||
|
|
if (bindingInfo._exchange_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange name is not set.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
if (bindingInfo._exchange_type != "fanout" && bindingInfo._exchange_type != "headers" && bindingInfo._binding_key.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Routing key is required for this exchange type.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
if (message.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish.", "Message is empty.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_bytes_t exchange_bytes = amqp_cstring_bytes(bindingInfo._exchange_name.c_str());
|
|||
|
|
amqp_bytes_t routing_key_bytes = amqp_cstring_bytes(
|
|||
|
|
(bindingInfo._exchange_type == "fanout" || bindingInfo._exchange_type == "headers") ? "" : routing_key.c_str());
|
|||
|
|
amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
|
|||
|
|
amqp_basic_properties_t props;
|
|||
|
|
amqp_basic_properties_t* pProps = nullptr;
|
|||
|
|
|
|||
|
|
if (bindingInfo._exchange_type == "headers") {
|
|||
|
|
// Example static header: type = important
|
|||
|
|
static amqp_table_entry_t headers[1];
|
|||
|
|
headers[0].key = amqp_cstring_bytes("type");
|
|||
|
|
headers[0].value.kind = AMQP_FIELD_KIND_UTF8;
|
|||
|
|
headers[0].value.value.bytes = amqp_cstring_bytes("important");
|
|||
|
|
props._flags = AMQP_BASIC_HEADERS_FLAG;
|
|||
|
|
props.headers.num_entries = 1;
|
|||
|
|
props.headers.entries = headers;
|
|||
|
|
pProps = &props;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int ret = amqp_basic_publish(
|
|||
|
|
_conn, bindingInfo._channel,
|
|||
|
|
exchange_bytes,
|
|||
|
|
routing_key_bytes,
|
|||
|
|
0, 0,
|
|||
|
|
pProps,
|
|||
|
|
message_bytes
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
if (ret != AMQP_STATUS_OK) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::Publish. Failed to publish message:", amqp_error_string2(ret), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogFatal("ANSRABBITMQ::Publish. Error in Publish:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool ANSRABBITMQ::GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode) {
|
|||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
if (_conn == nullptr || _socket == nullptr || queue_name.empty() || _channel <= 0) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Invalid state or parameters in GetQueueMessage.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
BindingInfo binding = GetBindingInfoFromQueueName(queue_name);
|
|||
|
|
if (binding._queue_name.empty()) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Queue is not bound or set up.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
amqp_maybe_release_buffers(_conn);
|
|||
|
|
// Request to get a single message
|
|||
|
|
amqp_basic_get(_conn, binding._channel, amqp_cstring_bytes(binding._queue_name.c_str()), ack_mode); // auto-ack = 1
|
|||
|
|
amqp_rpc_reply_t reply = amqp_get_rpc_reply(_conn);
|
|||
|
|
|
|||
|
|
if (reply.reply_type != AMQP_RESPONSE_NORMAL || reply.reply.id != AMQP_BASIC_GET_OK_METHOD) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "No message available or error getting message.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Cast the method to extract metadata (optional)
|
|||
|
|
[[maybe_unused]] auto* get_ok = static_cast<amqp_basic_get_ok_t*>(reply.reply.decoded);
|
|||
|
|
|
|||
|
|
// Read the content
|
|||
|
|
amqp_frame_t frame;
|
|||
|
|
amqp_bytes_t body;
|
|||
|
|
size_t body_received = 0;
|
|||
|
|
size_t body_size = 0;
|
|||
|
|
|
|||
|
|
// Read next frame (header)
|
|||
|
|
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_HEADER) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message header.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
body_size = frame.payload.properties.body_size;
|
|||
|
|
|
|||
|
|
// Read body frames
|
|||
|
|
std::string body_data;
|
|||
|
|
while (body_received < body_size) {
|
|||
|
|
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_BODY) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message body.", __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
body_data.append(static_cast<const char*>(frame.payload.body_fragment.bytes), frame.payload.body_fragment.len);
|
|||
|
|
body_received += frame.payload.body_fragment.len;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
message = body_data;
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
catch (const std::exception& e) {
|
|||
|
|
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage. Exception in GetQueueMessage:", e.what(), __FILE__, __LINE__);
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
}
|