Files
ANSCORE/ANSRabbitMQ/ANSRabbitMQ.cpp

807 lines
32 KiB
C++
Raw Blame History

#include "ANSRabbitMQ.h"
#include <ctype.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <cstdint>
#include <windows.h>
#include <iostream>
void die(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
}
void die_on_error(int x, char const* context) {
if (x < 0) {
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const* context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
auto* m =
static_cast<amqp_connection_close_t*>(x.reply.decoded);
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, static_cast<int>(m->reply_text.len),
static_cast<const char*>(m->reply_text.bytes));
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
auto* m = static_cast<amqp_channel_close_t*>(x.reply.decoded);
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, static_cast<int>(m->reply_text.len),
static_cast<const char*>(m->reply_text.bytes));
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
break;
}
}
static void dump_row(long count, int numinrow, int* chs) {
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8) {
printf(" :");
}
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8) {
printf(" :");
}
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i])) {
printf("%c", chs[i]);
}
else {
printf(".");
}
}
}
printf("\n");
}
static int rows_eq(int* a, int* b) {
int i;
for (i = 0; i < 16; i++)
if (a[i] != b[i]) {
return 0;
}
return 1;
}
void amqp_dump(void const* buffer, size_t len) {
const auto* buf = static_cast<const unsigned char*>(buffer);
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = { 0 };
int showed_dots = 0;
size_t i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int j;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(
" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
}
else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (j = 0; j < 16; j++) {
oldchs[j] = chs[j];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0) {
printf("%08lX:\n", count);
}
}
uint64_t now_microseconds(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) /
10;
}
void microsleep(int usec) { Sleep(usec / 1000); }
namespace ANSCENTER {
ANSRABBITMQ::ANSRABBITMQ() {
_port = 5672;
_channel = 1;
_channelMax = 0; // set to 0 for default, let server decides
_frameMax = 131072; // no limit
_heartBeat = 60; // 60 seconds
std::string _vhost = "/";
}
ANSRABBITMQ::~ANSRABBITMQ() {
Disconnect(); // Safely handles cleanup
}
BindingInfo ANSRABBITMQ::GetBindingInfo(const std::string& exchange_name) {
// Search for the binding info based on the exchange name
for (const auto& binding : _bindings) {
if (binding._exchange_name == exchange_name) {
return binding;
}
}
return BindingInfo{}; // Return default empty BindingInfo if not found
}
BindingInfo ANSRABBITMQ::GetBindingInfoFromQueueName(const std::string& queue_name) {
// Search for the binding info based on the queue name
for (const auto& binding : _bindings) {
if (binding._queue_name == queue_name) {
return binding;
}
}
return BindingInfo{}; // Return default empty BindingInfo if not found
}
bool ANSRABBITMQ::ConfigureSSLOptions() {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_caCertPath.empty() && _clientCertPath.empty() && _clientKeyPath.empty()) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "No SSL options configured.", __FILE__, __LINE__);
return true; // No SSL options to configure
}
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Error: SSL socket is not initialized.", __FILE__, __LINE__);
return false;
}
// These functions accept `amqp_socket_t*` even though they apply only to SSL sockets
if (!_caCertPath.empty()) {
if (amqp_ssl_socket_set_cacert(_socket, _caCertPath.c_str()) != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set CA certificate.", __FILE__, __LINE__);
return false;
}
}
if (!_clientCertPath.empty() && !_clientKeyPath.empty()) {
if (amqp_ssl_socket_set_key(_socket, _clientCertPath.c_str(), _clientKeyPath.c_str()) != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set client certificate/key.", __FILE__, __LINE__);
return false;
}
}
amqp_ssl_socket_set_verify_peer(_socket, _verifyPeer ? 1 : 0);
amqp_ssl_socket_set_verify_hostname(_socket, _verifyHostname ? 1 : 0);
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSLOptions. Error configuring SSL options:",e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::SetupExchange(const std::string & _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _exchange_name.empty() || _exchange_type.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::SetupExchange", "Invalid parameters in SetupExchange.", __FILE__, __LINE__);
return false;
}
amqp_exchange_declare(
_conn, _channel,
amqp_cstring_bytes(_exchange_name.c_str()),
amqp_cstring_bytes(_exchange_type.c_str()),
_passive,
_durable,
_auto_delete,
_internal,
amqp_empty_table
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring exchange");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::SetupExchange. Error in SetupExchange:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::SetupQueue", "Invalid parameters in SetupQueue.", __FILE__, __LINE__);
return false;
}
amqp_queue_declare(
_conn, _channel,
amqp_cstring_bytes(_queue_name.c_str()),
_passive,
_durable,
false, // not exclusive
_auto_delete,
amqp_empty_table
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::SetupQueue. Error in SetupQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::BindQueue(const std::string & _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _exchange_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::BindQueue", "Invalid parameters in BindQueue.", __FILE__, __LINE__);
return false;
}
amqp_table_t arguments = amqp_empty_table;
if (_exchange_type == "headers") {
// Match messages with header: type = important
static amqp_table_entry_t items[2];
items[0].key = amqp_cstring_bytes("x-match");
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
items[0].value.value.bytes = amqp_cstring_bytes("all");
items[1].key = amqp_cstring_bytes("type");
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
items[1].value.value.bytes = amqp_cstring_bytes("important");
arguments.num_entries = 2;
arguments.entries = items;
}
amqp_queue_bind(
_conn, _channel,
amqp_cstring_bytes(_queue_name.c_str()),
amqp_cstring_bytes(_exchange_name.c_str()),
amqp_cstring_bytes(_binding_key.c_str()), // required, even for fanout/headers
arguments
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Binding queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::BindQueue. Error in BindQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::UnbindQueue(const std::string& _exchange_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr ||
_exchange_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Invalid parameters in UnbindQueue.", __FILE__, __LINE__);
return false;
}
amqp_table_t arguments = amqp_empty_table;
BindingInfo bindingInfo = GetBindingInfo(_exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Exchange is not bound or set up.", __FILE__, __LINE__);
return false;
}
if (bindingInfo._exchange_type == "headers") {
// Same header structure used in BindQueue()
static amqp_table_entry_t items[2];
items[0].key = amqp_cstring_bytes("x-match");
items[0].value.kind = AMQP_FIELD_KIND_UTF8;
items[0].value.value.bytes = amqp_cstring_bytes("all");
items[1].key = amqp_cstring_bytes("type");
items[1].value.kind = AMQP_FIELD_KIND_UTF8;
items[1].value.value.bytes = amqp_cstring_bytes("important");
arguments.num_entries = 2;
arguments.entries = items;
}
amqp_queue_unbind(
_conn, bindingInfo._channel,
amqp_cstring_bytes(bindingInfo._queue_name.c_str()),
amqp_cstring_bytes(bindingInfo._exchange_name.c_str()),
amqp_cstring_bytes(bindingInfo._binding_key.c_str()), // still required by API
arguments
);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Unbinding queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::UnbindQueue. Error in UnbindQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::DeleteQueue(const std::string& queue_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try
{
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Channel is not set.", __FILE__, __LINE__);
return false;
}
// Only delete if queue is unused and empty <20> set to false for force delete
int if_unused = 0;
int if_empty = 0;
BindingInfo binding = GetBindingInfoFromQueueName(queue_name); // Check if the queue is set up
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_queue_delete(_conn, binding._channel,
amqp_cstring_bytes(queue_name.c_str()),
if_unused,
if_empty);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::DeleteQueue. Error in DeleteQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::DeleteExchange(const std::string& exchange_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try
{
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Channel is not set.", __FILE__, __LINE__);
return false;
}
// Only delete if exchange is unused (set to 0 to delete regardless)
int if_unused = 0;
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_exchange_delete(_conn, bindingInfo._channel,
amqp_cstring_bytes(exchange_name.c_str()),
if_unused);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting exchange");
// remove the binding info from the list
_bindings.erase(
std::remove_if(
_bindings.begin(),
_bindings.end(),
[&](const BindingInfo& b) {
return b._exchange_name == bindingInfo._exchange_name &&
b._exchange_type == bindingInfo._exchange_type &&
b._queue_name == bindingInfo._queue_name &&
b._binding_key == bindingInfo._binding_key;
}
),
_bindings.end()
);
return true;
}
catch (const std::exception& e) {
this->_logger.LogError("ANSRABBITMQ::DeleteExchange. Error in DeleteExchange:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::PurgeQueue(const std::string & _queue_name) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (_queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
if (_channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Channel is not set.", __FILE__, __LINE__);
return false;
}
BindingInfo binding = GetBindingInfoFromQueueName(_queue_name); // Check if the queue is set up
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
// Purge the queue: remove all ready messages
amqp_queue_purge(_conn, binding._channel, amqp_cstring_bytes(_queue_name.c_str()));
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Purging queue");
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::PurgeQueue. Error in PurgeQueue:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
_caCertPath = caCertPath;
_clientCertPath = clientCertPath;
_clientKeyPath = clientKeyPath;
_verifyPeer = verifyPeer;
_verifyHostname = verifyHostname;
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSL. Error configuring SSL:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key,
int passiveVal, int durableVal, int autoDelete, int internalVal) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try{
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange name is empty.", __FILE__, __LINE__);
return false;
}
if (exchange_type.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange type is empty.", __FILE__, __LINE__);
return false;
}
if (queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Queue name is empty.", __FILE__, __LINE__);
return false;
}
amqp_channel_open(_conn, _channel);
die_on_amqp_error(amqp_get_rpc_reply(_conn), "Opening channel");
bool exchangeStatus = SetupExchange(exchange_name, exchange_type, passiveVal, durableVal, autoDelete, internalVal);
bool queueStatus = SetupQueue(queue_name, passiveVal, durableVal, autoDelete, internalVal);
bool bindStatus = BindQueue(exchange_name, exchange_type, queue_name, binding_key);
if (!exchangeStatus || !queueStatus || !bindStatus) {
this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Failed to setup exchange, queue or bind.", __FILE__, __LINE__);
return false;
}
BindingInfo bindingInfo;
bindingInfo._exchange_name = exchange_name;
bindingInfo._exchange_type = exchange_type;
bindingInfo._queue_name = queue_name;
bindingInfo._binding_key = binding_key;
bindingInfo._passive = passiveVal;
bindingInfo._durable = durableVal;
bindingInfo._auto_delete = autoDelete;
bindingInfo._internal = internalVal;
bindingInfo._channel = _channel; // Store the channel used for this binding
_bindings.push_back(bindingInfo);
_channel++; // Increment channel for next use
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::CreateChannel. Error in Setup::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Connect(const std::string& hostname, int port, const std::string& vhost,
int channel, const std::string& userName, const std::string& password,
bool useSSL)
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
_conn = amqp_new_connection();
if (useSSL) {
_socket = amqp_ssl_socket_new(_conn);
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create SSL socket", __FILE__, __LINE__);
die("Creating SSL socket");
}
if (!ConfigureSSLOptions()) {
return false;
}
}
else {
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create socket", __FILE__, __LINE__);
die("Creating TCP socket");
}
}
if (!_socket) return false;
try {
int opened = amqp_socket_open(_socket, hostname.c_str(), port);
std::cout << "Opened socket: " << opened << std::endl;
if (opened) {
this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot open socket", __FILE__, __LINE__);
die("Opening socket");
return false;
}
// Use defaults if values are unset
if (_channelMax <= 0) _channelMax = 0; // 0 = unlimited (let server decide)
if (_frameMax <= 0) _frameMax = 131072; // default 128 KB
if (_heartBeat <= 0) _heartBeat = 60; // 60 seconds
die_on_amqp_error(
amqp_login(_conn, vhost.c_str(), _channelMax, _frameMax, _heartBeat,
AMQP_SASL_METHOD_PLAIN, userName.c_str(), password.c_str()),
"Logging in");
_channel = channel;
_hostname = hostname;
_port = port;
_status = 0;
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
return false;
}
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Disconnect() {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn) {
// loop through all bindings and unbind them
for (const auto& binding : _bindings) {
if (binding._channel > 0) {
amqp_rpc_reply_t reply = amqp_channel_close(_conn, binding._channel, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close channel cleanly.", __FILE__, __LINE__);
}
}
}
amqp_rpc_reply_t reply = amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close connection cleanly.", __FILE__, __LINE__);
}
int destroyStatus = amqp_destroy_connection(_conn);
if (destroyStatus != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::Disconnect. Failed to destroy connection:", amqp_error_string2(destroyStatus), __FILE__, __LINE__);
return false;
}
_conn = nullptr;
_socket = nullptr;
_channel = 1;
_status = -1;
}
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::DisConnect. Error in Disconnect::", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Connection is not established.", __FILE__, __LINE__);
return false;
}
if (_socket == nullptr) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Socket is not initialized.", __FILE__, __LINE__);
return false;
}
if (exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange is not set.", __FILE__, __LINE__);
return false;
}
BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up
if (bindingInfo._exchange_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange name is not set.", __FILE__, __LINE__);
return false;
}
if (bindingInfo._exchange_type != "fanout" && bindingInfo._exchange_type != "headers" && bindingInfo._binding_key.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Routing key is required for this exchange type.", __FILE__, __LINE__);
return false;
}
if (message.empty()) {
this->_logger.LogError("ANSRABBITMQ::Publish.", "Message is empty.", __FILE__, __LINE__);
return false;
}
amqp_bytes_t exchange_bytes = amqp_cstring_bytes(bindingInfo._exchange_name.c_str());
amqp_bytes_t routing_key_bytes = amqp_cstring_bytes(
(bindingInfo._exchange_type == "fanout" || bindingInfo._exchange_type == "headers") ? "" : routing_key.c_str());
amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
amqp_basic_properties_t props;
amqp_basic_properties_t* pProps = nullptr;
if (bindingInfo._exchange_type == "headers") {
// Example static header: type = important
static amqp_table_entry_t headers[1];
headers[0].key = amqp_cstring_bytes("type");
headers[0].value.kind = AMQP_FIELD_KIND_UTF8;
headers[0].value.value.bytes = amqp_cstring_bytes("important");
props._flags = AMQP_BASIC_HEADERS_FLAG;
props.headers.num_entries = 1;
props.headers.entries = headers;
pProps = &props;
}
int ret = amqp_basic_publish(
_conn, bindingInfo._channel,
exchange_bytes,
routing_key_bytes,
0, 0,
pProps,
message_bytes
);
if (ret != AMQP_STATUS_OK) {
this->_logger.LogError("ANSRABBITMQ::Publish. Failed to publish message:", amqp_error_string2(ret), __FILE__, __LINE__);
return false;
}
return true;
}
catch (const std::exception& e) {
this->_logger.LogFatal("ANSRABBITMQ::Publish. Error in Publish:", e.what(), __FILE__, __LINE__);
return false;
}
}
bool ANSRABBITMQ::GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
try {
if (_conn == nullptr || _socket == nullptr || queue_name.empty() || _channel <= 0) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Invalid state or parameters in GetQueueMessage.", __FILE__, __LINE__);
return false;
}
BindingInfo binding = GetBindingInfoFromQueueName(queue_name);
if (binding._queue_name.empty()) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Queue is not bound or set up.", __FILE__, __LINE__);
return false;
}
amqp_maybe_release_buffers(_conn);
// Request to get a single message
amqp_basic_get(_conn, binding._channel, amqp_cstring_bytes(binding._queue_name.c_str()), ack_mode); // auto-ack = 1
amqp_rpc_reply_t reply = amqp_get_rpc_reply(_conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL || reply.reply.id != AMQP_BASIC_GET_OK_METHOD) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "No message available or error getting message.", __FILE__, __LINE__);
return false;
}
// Cast the method to extract metadata (optional)
[[maybe_unused]] auto* get_ok = static_cast<amqp_basic_get_ok_t*>(reply.reply.decoded);
// Read the content
amqp_frame_t frame;
amqp_bytes_t body;
size_t body_received = 0;
size_t body_size = 0;
// Read next frame (header)
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_HEADER) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message header.", __FILE__, __LINE__);
return false;
}
body_size = frame.payload.properties.body_size;
// Read body frames
std::string body_data;
while (body_received < body_size) {
if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_BODY) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message body.", __FILE__, __LINE__);
return false;
}
body_data.append(static_cast<const char*>(frame.payload.body_fragment.bytes), frame.payload.body_fragment.len);
body_received += frame.payload.body_fragment.len;
}
message = body_data;
return true;
}
catch (const std::exception& e) {
this->_logger.LogError("ANSRABBITMQ::GetQueueMessage. Exception in GetQueueMessage:", e.what(), __FILE__, __LINE__);
return false;
}
}
}