#include "ANSRabbitMQ.h" #include #include #include #include #include #include #include #include #include void die(const char* fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fprintf(stderr, "\n"); } void die_on_error(int x, char const* context) { if (x < 0) { fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x)); } } void die_on_amqp_error(amqp_rpc_reply_t x, char const* context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return; case AMQP_RESPONSE_NONE: fprintf(stderr, "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { auto* m = static_cast(x.reply.decoded); fprintf(stderr, "%s: server connection error %uh, message: %.*s\n", context, m->reply_code, static_cast(m->reply_text.len), static_cast(m->reply_text.bytes)); break; } case AMQP_CHANNEL_CLOSE_METHOD: { auto* m = static_cast(x.reply.decoded); fprintf(stderr, "%s: server channel error %uh, message: %.*s\n", context, m->reply_code, static_cast(m->reply_text.len), static_cast(m->reply_text.bytes)); break; } default: fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } } static void dump_row(long count, int numinrow, int* chs) { int i; printf("%08lX:", count - numinrow); if (numinrow > 0) { for (i = 0; i < numinrow; i++) { if (i == 8) { printf(" :"); } printf(" %02X", chs[i]); } for (i = numinrow; i < 16; i++) { if (i == 8) { printf(" :"); } printf(" "); } printf(" "); for (i = 0; i < numinrow; i++) { if (isprint(chs[i])) { printf("%c", chs[i]); } else { printf("."); } } } printf("\n"); } static int rows_eq(int* a, int* b) { int i; for (i = 0; i < 16; i++) if (a[i] != b[i]) { return 0; } return 1; } void amqp_dump(void const* buffer, size_t len) { const auto* buf = static_cast(buffer); long count = 0; int numinrow = 0; int chs[16]; int oldchs[16] = { 0 }; int showed_dots = 0; size_t i; for (i = 0; i < len; i++) { int ch = buf[i]; if (numinrow == 16) { int j; if (rows_eq(oldchs, chs)) { if (!showed_dots) { showed_dots = 1; printf( " .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); } } else { showed_dots = 0; dump_row(count, numinrow, chs); } for (j = 0; j < 16; j++) { oldchs[j] = chs[j]; } numinrow = 0; } count++; chs[numinrow++] = ch; } dump_row(count, numinrow, chs); if (numinrow != 0) { printf("%08lX:\n", count); } } uint64_t now_microseconds(void) { FILETIME ft; GetSystemTimeAsFileTime(&ft); return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) / 10; } void microsleep(int usec) { Sleep(usec / 1000); } namespace ANSCENTER { ANSRABBITMQ::ANSRABBITMQ() { _port = 5672; _channel = 1; _channelMax = 0; // set to 0 for default, let server decides _frameMax = 131072; // no limit _heartBeat = 60; // 60 seconds std::string _vhost = "/"; } ANSRABBITMQ::~ANSRABBITMQ() { Disconnect(); // Safely handles cleanup } BindingInfo ANSRABBITMQ::GetBindingInfo(const std::string& exchange_name) { // Search for the binding info based on the exchange name for (const auto& binding : _bindings) { if (binding._exchange_name == exchange_name) { return binding; } } return BindingInfo{}; // Return default empty BindingInfo if not found } BindingInfo ANSRABBITMQ::GetBindingInfoFromQueueName(const std::string& queue_name) { // Search for the binding info based on the queue name for (const auto& binding : _bindings) { if (binding._queue_name == queue_name) { return binding; } } return BindingInfo{}; // Return default empty BindingInfo if not found } bool ANSRABBITMQ::ConfigureSSLOptions() { std::lock_guard lock(_mutex); try { if (_caCertPath.empty() && _clientCertPath.empty() && _clientKeyPath.empty()) { this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "No SSL options configured.", __FILE__, __LINE__); return true; // No SSL options to configure } if (!_socket) { this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Error: SSL socket is not initialized.", __FILE__, __LINE__); return false; } // These functions accept `amqp_socket_t*` even though they apply only to SSL sockets if (!_caCertPath.empty()) { if (amqp_ssl_socket_set_cacert(_socket, _caCertPath.c_str()) != AMQP_STATUS_OK) { this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set CA certificate.", __FILE__, __LINE__); return false; } } if (!_clientCertPath.empty() && !_clientKeyPath.empty()) { if (amqp_ssl_socket_set_key(_socket, _clientCertPath.c_str(), _clientKeyPath.c_str()) != AMQP_STATUS_OK) { this->_logger.LogError("ANSRABBITMQ::ConfigureSSLOptions", "Failed to set client certificate/key.", __FILE__, __LINE__); return false; } } amqp_ssl_socket_set_verify_peer(_socket, _verifyPeer ? 1 : 0); amqp_ssl_socket_set_verify_hostname(_socket, _verifyHostname ? 1 : 0); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSLOptions. Error configuring SSL options:",e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::SetupExchange(const std::string & _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal) { std::lock_guard lock(_mutex); try { if (_conn == nullptr || _socket == nullptr || _exchange_name.empty() || _exchange_type.empty() || _channel <= 0) { this->_logger.LogError("ANSRABBITMQ::SetupExchange", "Invalid parameters in SetupExchange.", __FILE__, __LINE__); return false; } amqp_exchange_declare( _conn, _channel, amqp_cstring_bytes(_exchange_name.c_str()), amqp_cstring_bytes(_exchange_type.c_str()), _passive, _durable, _auto_delete, _internal, amqp_empty_table ); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring exchange"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::SetupExchange. Error in SetupExchange:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal) { std::lock_guard lock(_mutex); try { if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _channel <= 0) { this->_logger.LogError("ANSRABBITMQ::SetupQueue", "Invalid parameters in SetupQueue.", __FILE__, __LINE__); return false; } amqp_queue_declare( _conn, _channel, amqp_cstring_bytes(_queue_name.c_str()), _passive, _durable, false, // not exclusive _auto_delete, amqp_empty_table ); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Declaring queue"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::SetupQueue. Error in SetupQueue:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::BindQueue(const std::string & _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key) { std::lock_guard lock(_mutex); try { if (_conn == nullptr || _socket == nullptr || _queue_name.empty() || _exchange_name.empty() || _channel <= 0) { this->_logger.LogError("ANSRABBITMQ::BindQueue", "Invalid parameters in BindQueue.", __FILE__, __LINE__); return false; } amqp_table_t arguments = amqp_empty_table; if (_exchange_type == "headers") { // Match messages with header: type = important static amqp_table_entry_t items[2]; items[0].key = amqp_cstring_bytes("x-match"); items[0].value.kind = AMQP_FIELD_KIND_UTF8; items[0].value.value.bytes = amqp_cstring_bytes("all"); items[1].key = amqp_cstring_bytes("type"); items[1].value.kind = AMQP_FIELD_KIND_UTF8; items[1].value.value.bytes = amqp_cstring_bytes("important"); arguments.num_entries = 2; arguments.entries = items; } amqp_queue_bind( _conn, _channel, amqp_cstring_bytes(_queue_name.c_str()), amqp_cstring_bytes(_exchange_name.c_str()), amqp_cstring_bytes(_binding_key.c_str()), // required, even for fanout/headers arguments ); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Binding queue"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::BindQueue. Error in BindQueue:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::UnbindQueue(const std::string& _exchange_name) { std::lock_guard lock(_mutex); try { if (_conn == nullptr || _socket == nullptr || _exchange_name.empty() || _channel <= 0) { this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Invalid parameters in UnbindQueue.", __FILE__, __LINE__); return false; } amqp_table_t arguments = amqp_empty_table; BindingInfo bindingInfo = GetBindingInfo(_exchange_name); // Check if the exchange is set up if (bindingInfo._exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::UnbindQueue", "Exchange is not bound or set up.", __FILE__, __LINE__); return false; } if (bindingInfo._exchange_type == "headers") { // Same header structure used in BindQueue() static amqp_table_entry_t items[2]; items[0].key = amqp_cstring_bytes("x-match"); items[0].value.kind = AMQP_FIELD_KIND_UTF8; items[0].value.value.bytes = amqp_cstring_bytes("all"); items[1].key = amqp_cstring_bytes("type"); items[1].value.kind = AMQP_FIELD_KIND_UTF8; items[1].value.value.bytes = amqp_cstring_bytes("important"); arguments.num_entries = 2; arguments.entries = items; } amqp_queue_unbind( _conn, bindingInfo._channel, amqp_cstring_bytes(bindingInfo._queue_name.c_str()), amqp_cstring_bytes(bindingInfo._exchange_name.c_str()), amqp_cstring_bytes(bindingInfo._binding_key.c_str()), // still required by API arguments ); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Unbinding queue"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::UnbindQueue. Error in UnbindQueue:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::DeleteQueue(const std::string& queue_name) { std::lock_guard lock(_mutex); try { if (_conn == nullptr) { this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Connection is not established.", __FILE__, __LINE__); return false; } if (_socket == nullptr) { this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Socket is not initialized.", __FILE__, __LINE__); return false; } if (queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue name is empty.", __FILE__, __LINE__); return false; } if (_channel <= 0) { this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Channel is not set.", __FILE__, __LINE__); return false; } // Only delete if queue is unused and empty � set to false for force delete int if_unused = 0; int if_empty = 0; BindingInfo binding = GetBindingInfoFromQueueName(queue_name); // Check if the queue is set up if (binding._queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::DeleteQueue", "Queue is not bound or set up.", __FILE__, __LINE__); return false; } amqp_queue_delete(_conn, binding._channel, amqp_cstring_bytes(queue_name.c_str()), if_unused, if_empty); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting queue"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::DeleteQueue. Error in DeleteQueue:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::DeleteExchange(const std::string& exchange_name) { std::lock_guard lock(_mutex); try { if (_conn == nullptr) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Connection is not established.", __FILE__, __LINE__); return false; } if (_socket == nullptr) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Socket is not initialized.", __FILE__, __LINE__); return false; } if (exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange name is empty.", __FILE__, __LINE__); return false; } if (_channel <= 0) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Channel is not set.", __FILE__, __LINE__); return false; } // Only delete if exchange is unused (set to 0 to delete regardless) int if_unused = 0; BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up if (bindingInfo._exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange", "Exchange is not bound or set up.", __FILE__, __LINE__); return false; } amqp_exchange_delete(_conn, bindingInfo._channel, amqp_cstring_bytes(exchange_name.c_str()), if_unused); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Deleting exchange"); // remove the binding info from the list _bindings.erase( std::remove_if( _bindings.begin(), _bindings.end(), [&](const BindingInfo& b) { return b._exchange_name == bindingInfo._exchange_name && b._exchange_type == bindingInfo._exchange_type && b._queue_name == bindingInfo._queue_name && b._binding_key == bindingInfo._binding_key; } ), _bindings.end() ); return true; } catch (const std::exception& e) { this->_logger.LogError("ANSRABBITMQ::DeleteExchange. Error in DeleteExchange:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::PurgeQueue(const std::string & _queue_name) { std::lock_guard lock(_mutex); try { if (_conn == nullptr) { this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Connection is not established.", __FILE__, __LINE__); return false; } if (_socket == nullptr) { this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Socket is not initialized.", __FILE__, __LINE__); return false; } if (_queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue name is empty.", __FILE__, __LINE__); return false; } if (_channel <= 0) { this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Channel is not set.", __FILE__, __LINE__); return false; } BindingInfo binding = GetBindingInfoFromQueueName(_queue_name); // Check if the queue is set up if (binding._queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::PurgeQueue", "Queue is not bound or set up.", __FILE__, __LINE__); return false; } // Purge the queue: remove all ready messages amqp_queue_purge(_conn, binding._channel, amqp_cstring_bytes(_queue_name.c_str())); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Purging queue"); return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::PurgeQueue. Error in PurgeQueue:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname) { std::lock_guard lock(_mutex); try { _caCertPath = caCertPath; _clientCertPath = clientCertPath; _clientKeyPath = clientKeyPath; _verifyPeer = verifyPeer; _verifyHostname = verifyHostname; return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::ConfigureSSL. Error configuring SSL:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key, int passiveVal, int durableVal, int autoDelete, int internalVal) { std::lock_guard lock(_mutex); try{ if (exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange name is empty.", __FILE__, __LINE__); return false; } if (exchange_type.empty()) { this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Exchange type is empty.", __FILE__, __LINE__); return false; } if (queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Queue name is empty.", __FILE__, __LINE__); return false; } amqp_channel_open(_conn, _channel); die_on_amqp_error(amqp_get_rpc_reply(_conn), "Opening channel"); bool exchangeStatus = SetupExchange(exchange_name, exchange_type, passiveVal, durableVal, autoDelete, internalVal); bool queueStatus = SetupQueue(queue_name, passiveVal, durableVal, autoDelete, internalVal); bool bindStatus = BindQueue(exchange_name, exchange_type, queue_name, binding_key); if (!exchangeStatus || !queueStatus || !bindStatus) { this->_logger.LogError("ANSRABBITMQ::CreateChannel", "Failed to setup exchange, queue or bind.", __FILE__, __LINE__); return false; } BindingInfo bindingInfo; bindingInfo._exchange_name = exchange_name; bindingInfo._exchange_type = exchange_type; bindingInfo._queue_name = queue_name; bindingInfo._binding_key = binding_key; bindingInfo._passive = passiveVal; bindingInfo._durable = durableVal; bindingInfo._auto_delete = autoDelete; bindingInfo._internal = internalVal; bindingInfo._channel = _channel; // Store the channel used for this binding _bindings.push_back(bindingInfo); _channel++; // Increment channel for next use return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::CreateChannel. Error in Setup::", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::Connect(const std::string& hostname, int port, const std::string& vhost, int channel, const std::string& userName, const std::string& password, bool useSSL) { std::lock_guard lock(_mutex); try { _conn = amqp_new_connection(); if (useSSL) { _socket = amqp_ssl_socket_new(_conn); if (!_socket) { this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create SSL socket", __FILE__, __LINE__); die("Creating SSL socket"); } if (!ConfigureSSLOptions()) { return false; } } else { _socket = amqp_tcp_socket_new(_conn); if (!_socket) { this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot create socket", __FILE__, __LINE__); die("Creating TCP socket"); } } if (!_socket) return false; try { int opened = amqp_socket_open(_socket, hostname.c_str(), port); std::cout << "Opened socket: " << opened << std::endl; if (opened) { this->_logger.LogError("ANSRABBITMQ::Connect. Error in Setup::", "Cannot open socket", __FILE__, __LINE__); die("Opening socket"); return false; } // Use defaults if values are unset if (_channelMax <= 0) _channelMax = 0; // 0 = unlimited (let server decide) if (_frameMax <= 0) _frameMax = 131072; // default 128 KB if (_heartBeat <= 0) _heartBeat = 60; // 60 seconds die_on_amqp_error( amqp_login(_conn, vhost.c_str(), _channelMax, _frameMax, _heartBeat, AMQP_SASL_METHOD_PLAIN, userName.c_str(), password.c_str()), "Logging in"); _channel = channel; _hostname = hostname; _port = port; _status = 0; return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__); return false; } } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::Connect. Error in Connect::", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::Disconnect() { std::lock_guard lock(_mutex); try { if (_conn) { // loop through all bindings and unbind them for (const auto& binding : _bindings) { if (binding._channel > 0) { amqp_rpc_reply_t reply = amqp_channel_close(_conn, binding._channel, AMQP_REPLY_SUCCESS); if (reply.reply_type != AMQP_RESPONSE_NORMAL) { this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close channel cleanly.", __FILE__, __LINE__); } } } amqp_rpc_reply_t reply = amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); if (reply.reply_type != AMQP_RESPONSE_NORMAL) { this->_logger.LogError("ANSRABBITMQ::Disconnect.", "Failed to close connection cleanly.", __FILE__, __LINE__); } int destroyStatus = amqp_destroy_connection(_conn); if (destroyStatus != AMQP_STATUS_OK) { this->_logger.LogError("ANSRABBITMQ::Disconnect. Failed to destroy connection:", amqp_error_string2(destroyStatus), __FILE__, __LINE__); return false; } _conn = nullptr; _socket = nullptr; _channel = 1; _status = -1; } return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::DisConnect. Error in Disconnect::", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message) { std::lock_guard lock(_mutex); try { if (_conn == nullptr) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Connection is not established.", __FILE__, __LINE__); return false; } if (_socket == nullptr) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Socket is not initialized.", __FILE__, __LINE__); return false; } if (exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange is not set.", __FILE__, __LINE__); return false; } BindingInfo bindingInfo = GetBindingInfo(exchange_name); // Check if the exchange is set up if (bindingInfo._exchange_name.empty()) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Exchange name is not set.", __FILE__, __LINE__); return false; } if (bindingInfo._exchange_type != "fanout" && bindingInfo._exchange_type != "headers" && bindingInfo._binding_key.empty()) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Routing key is required for this exchange type.", __FILE__, __LINE__); return false; } if (message.empty()) { this->_logger.LogError("ANSRABBITMQ::Publish.", "Message is empty.", __FILE__, __LINE__); return false; } amqp_bytes_t exchange_bytes = amqp_cstring_bytes(bindingInfo._exchange_name.c_str()); amqp_bytes_t routing_key_bytes = amqp_cstring_bytes( (bindingInfo._exchange_type == "fanout" || bindingInfo._exchange_type == "headers") ? "" : routing_key.c_str()); amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str()); amqp_basic_properties_t props; amqp_basic_properties_t* pProps = nullptr; if (bindingInfo._exchange_type == "headers") { // Example static header: type = important static amqp_table_entry_t headers[1]; headers[0].key = amqp_cstring_bytes("type"); headers[0].value.kind = AMQP_FIELD_KIND_UTF8; headers[0].value.value.bytes = amqp_cstring_bytes("important"); props._flags = AMQP_BASIC_HEADERS_FLAG; props.headers.num_entries = 1; props.headers.entries = headers; pProps = &props; } int ret = amqp_basic_publish( _conn, bindingInfo._channel, exchange_bytes, routing_key_bytes, 0, 0, pProps, message_bytes ); if (ret != AMQP_STATUS_OK) { this->_logger.LogError("ANSRABBITMQ::Publish. Failed to publish message:", amqp_error_string2(ret), __FILE__, __LINE__); return false; } return true; } catch (const std::exception& e) { this->_logger.LogFatal("ANSRABBITMQ::Publish. Error in Publish:", e.what(), __FILE__, __LINE__); return false; } } bool ANSRABBITMQ::GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode) { std::lock_guard lock(_mutex); try { if (_conn == nullptr || _socket == nullptr || queue_name.empty() || _channel <= 0) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Invalid state or parameters in GetQueueMessage.", __FILE__, __LINE__); return false; } BindingInfo binding = GetBindingInfoFromQueueName(queue_name); if (binding._queue_name.empty()) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Queue is not bound or set up.", __FILE__, __LINE__); return false; } amqp_maybe_release_buffers(_conn); // Request to get a single message amqp_basic_get(_conn, binding._channel, amqp_cstring_bytes(binding._queue_name.c_str()), ack_mode); // auto-ack = 1 amqp_rpc_reply_t reply = amqp_get_rpc_reply(_conn); if (reply.reply_type != AMQP_RESPONSE_NORMAL || reply.reply.id != AMQP_BASIC_GET_OK_METHOD) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "No message available or error getting message.", __FILE__, __LINE__); return false; } // Cast the method to extract metadata (optional) [[maybe_unused]] auto* get_ok = static_cast(reply.reply.decoded); // Read the content amqp_frame_t frame; amqp_bytes_t body; size_t body_received = 0; size_t body_size = 0; // Read next frame (header) if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_HEADER) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message header.", __FILE__, __LINE__); return false; } body_size = frame.payload.properties.body_size; // Read body frames std::string body_data; while (body_received < body_size) { if (amqp_simple_wait_frame(_conn, &frame) != AMQP_STATUS_OK || frame.frame_type != AMQP_FRAME_BODY) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage.", "Failed to receive message body.", __FILE__, __LINE__); return false; } body_data.append(static_cast(frame.payload.body_fragment.bytes), frame.payload.body_fragment.len); body_received += frame.payload.body_fragment.len; } message = body_data; return true; } catch (const std::exception& e) { this->_logger.LogError("ANSRABBITMQ::GetQueueMessage. Exception in GetQueueMessage:", e.what(), __FILE__, __LINE__); return false; } } }