Files
ANSCORE/ANSRabbitMQ/ANSRabbitMQ.h

125 lines
7.0 KiB
C++

#ifndef ANSRabbitMQ_H
#define ANSRabbitMQ_H
#define ANSRabbitMQ_API __declspec(dllexport)
#pragma once
#include <amqp.h>
#include <tcp_socket.h>
#include <ssl_socket.h>
#include <cstdint>
#include <string>
#include <mutex>
#include <vector>
#include "ANSLicense.h"
#include "LabVIEWHeader/extcode.h"
#define SUMMARY_EVERY_US 1000000
void die(const char* fmt, ...);
void die_on_error(int x, char const* context);
void die_on_amqp_error(amqp_rpc_reply_t x, char const* context);
void amqp_dump(void const* buffer, size_t len);
uint64_t now_microseconds(void);
void microsleep(int usec);
namespace ANSCENTER {
struct BindingInfo {
std::string _exchange_name;
std::string _exchange_type;
std::string _queue_name;
std::string _binding_key;
int _passive = 0;
int _durable = 1;
int _auto_delete = 0;
int _internal = 0;
int _channel = 1;
};
class ANSRabbitMQ_API ANSRABBITMQ {
private:
std::string _hostname;
int _port = 5672;
std::string _vhost = "/";
int _channel = 1;
int _status = -1;
int _rate_limit = 0;
int _message_count = 0;
int _channelMax = 0; // no limit, let server decides
int _frameMax = 0; // no limit
int _heartBeat = 60; // 60 seconds
SPDLogger& _logger = SPDLogger::GetInstance("ANSRabbitMQ", true);
std::vector<BindingInfo> _bindings;
// SSL cert paths (optional)
std::string _caCertPath;
std::string _clientCertPath;
std::string _clientKeyPath;
bool _verifyPeer = true;
bool _verifyHostname = true;
amqp_socket_t* _socket = nullptr;
amqp_connection_state_t _conn = nullptr;
std::recursive_mutex _mutex;
[[nodiscard]] bool SetupExchange(const std::string& _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal);
[[nodiscard]] bool SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal);
[[nodiscard]] bool BindQueue(const std::string& _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key);
[[nodiscard]] bool ConfigureSSLOptions();
[[nodiscard]] BindingInfo GetBindingInfo(const std::string& exchange_name);
[[nodiscard]] BindingInfo GetBindingInfoFromQueueName(const std::string& queue_name);
public:
ANSRABBITMQ();
~ANSRABBITMQ();
[[nodiscard]] bool ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname);
[[nodiscard]] bool Connect(const std::string& hostname, int port, const std::string& vhost,int channel, const std::string& userName, const std::string& password,bool useSSL);
[[nodiscard]] bool Disconnect();
[[nodiscard]] bool CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key, int passiveVal, int durableVal, int autoDelete, int internalVal);
[[nodiscard]] bool Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message);
[[nodiscard]] bool GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode);
[[nodiscard]] bool UnbindQueue(const std::string& exchange_name);
[[nodiscard]] bool DeleteExchange(const std::string& exchange_name);
[[nodiscard]] bool DeleteQueue(const std::string& queue_name);
[[nodiscard]] bool PurgeQueue(const std::string& queue_name);
// Others
[[nodiscard]] int GetStatus() const { return _status; }
void SetRateLimit(int rate_limit) { _rate_limit = rate_limit; }
void SetMessageCount(int message_count) { _message_count = message_count; }
[[nodiscard]] int GetMessageCount() const { return _message_count; }
[[nodiscard]] std::string GetHostname() const { return _hostname; }
[[nodiscard]] int GetPort() const { return _port; }
};
}
// C API exports
extern "C" {
ANSRabbitMQ_API int CreateANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle , const char* licenseKey);
ANSRabbitMQ_API int ReleaseANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle);
ANSRabbitMQ_API int ConfigureSSL(ANSCENTER::ANSRABBITMQ** Handle,const char* caCertPath, const char* clientCertPath, const char * clientKeyPath, int verifyPeer, int verifyHostname);
ANSRabbitMQ_API int CreateChannel(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal);
ANSRabbitMQ_API int Connect(ANSCENTER::ANSRABBITMQ** Handle, const char* hostname, int port, const char* vhost,int channel, const char* userName, const char* password, int useSSL);
ANSRabbitMQ_API int Disconnect(ANSCENTER::ANSRABBITMQ** Handle);
ANSRabbitMQ_API int Publish(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* routing_key, const char* message);
ANSRabbitMQ_API int GetQueueMessage_CPP(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name,int ack_mode, std::string &message);
ANSRabbitMQ_API int GetQueueMessage(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, LStrHandle strMessage);
ANSRabbitMQ_API int UnbindQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name);
ANSRabbitMQ_API int DeleteExchange(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name);
ANSRabbitMQ_API int DeleteQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name);
ANSRabbitMQ_API int PurgeQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name);
// V2 entry points — accept uint64_t handleVal by value (LabVIEW concurrency fix)
ANSRabbitMQ_API int ConfigureSSL_V2(uint64_t handleVal, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname);
ANSRabbitMQ_API int CreateChannel_V2(uint64_t handleVal, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal);
ANSRabbitMQ_API int Connect_V2(uint64_t handleVal, const char* hostname, int port, const char* vhost, int channel, const char* userName, const char* password, int useSSL);
ANSRabbitMQ_API int Disconnect_V2(uint64_t handleVal);
ANSRabbitMQ_API int Publish_V2(uint64_t handleVal, const char* exchange_name, const char* routing_key, const char* message);
ANSRabbitMQ_API int GetQueueMessage_V2(uint64_t handleVal, const char* queue_name, int ack_mode, LStrHandle strMessage);
ANSRabbitMQ_API int UnbindQueue_V2(uint64_t handleVal, const char* exchange_name);
ANSRabbitMQ_API int DeleteExchange_V2(uint64_t handleVal, const char* exchange_name);
ANSRabbitMQ_API int DeleteQueue_V2(uint64_t handleVal, const char* queue_name);
ANSRabbitMQ_API int PurgeQueue_V2(uint64_t handleVal, const char* queue_name);
}
#endif