#ifndef ANSRabbitMQ_H #define ANSRabbitMQ_H #define ANSRabbitMQ_API __declspec(dllexport) #pragma once #include #include #include #include #include #include #include #include "ANSLicense.h" #include "LabVIEWHeader/extcode.h" #define SUMMARY_EVERY_US 1000000 void die(const char* fmt, ...); void die_on_error(int x, char const* context); void die_on_amqp_error(amqp_rpc_reply_t x, char const* context); void amqp_dump(void const* buffer, size_t len); uint64_t now_microseconds(void); void microsleep(int usec); namespace ANSCENTER { struct BindingInfo { std::string _exchange_name; std::string _exchange_type; std::string _queue_name; std::string _binding_key; int _passive = 0; int _durable = 1; int _auto_delete = 0; int _internal = 0; int _channel = 1; }; class ANSRabbitMQ_API ANSRABBITMQ { private: std::string _hostname; int _port = 5672; std::string _vhost = "/"; int _channel = 1; int _status = -1; int _rate_limit = 0; int _message_count = 0; int _channelMax = 0; // no limit, let server decides int _frameMax = 0; // no limit int _heartBeat = 60; // 60 seconds SPDLogger& _logger = SPDLogger::GetInstance("ANSRabbitMQ", true); std::vector _bindings; // SSL cert paths (optional) std::string _caCertPath; std::string _clientCertPath; std::string _clientKeyPath; bool _verifyPeer = true; bool _verifyHostname = true; amqp_socket_t* _socket = nullptr; amqp_connection_state_t _conn = nullptr; std::recursive_mutex _mutex; [[nodiscard]] bool SetupExchange(const std::string& _exchange_name, const std::string& _exchange_type, int _passive, int _durable, int _auto_delete, int _internal); [[nodiscard]] bool SetupQueue(const std::string _queue_name, int _passive, int _durable, int _auto_delete, int _internal); [[nodiscard]] bool BindQueue(const std::string& _exchange_name, const std::string& _exchange_type, const std::string& _queue_name, const std::string& _binding_key); [[nodiscard]] bool ConfigureSSLOptions(); [[nodiscard]] BindingInfo GetBindingInfo(const std::string& exchange_name); [[nodiscard]] BindingInfo GetBindingInfoFromQueueName(const std::string& queue_name); public: ANSRABBITMQ(); ~ANSRABBITMQ(); [[nodiscard]] bool ConfigureSSL(const std::string& caCertPath, const std::string& clientCertPath, const std::string& clientKeyPath, bool verifyPeer, bool verifyHostname); [[nodiscard]] bool Connect(const std::string& hostname, int port, const std::string& vhost,int channel, const std::string& userName, const std::string& password,bool useSSL); [[nodiscard]] bool Disconnect(); [[nodiscard]] bool CreateChannel(const std::string& exchange_name, const std::string& exchange_type, const std::string& queue_name, const std::string& binding_key, int passiveVal, int durableVal, int autoDelete, int internalVal); [[nodiscard]] bool Publish(const std::string& exchange_name, const std::string& routing_key, const std::string& message); [[nodiscard]] bool GetQueueMessage(const std::string& queue_name, std::string& message, int ack_mode); [[nodiscard]] bool UnbindQueue(const std::string& exchange_name); [[nodiscard]] bool DeleteExchange(const std::string& exchange_name); [[nodiscard]] bool DeleteQueue(const std::string& queue_name); [[nodiscard]] bool PurgeQueue(const std::string& queue_name); // Others [[nodiscard]] int GetStatus() const { return _status; } void SetRateLimit(int rate_limit) { _rate_limit = rate_limit; } void SetMessageCount(int message_count) { _message_count = message_count; } [[nodiscard]] int GetMessageCount() const { return _message_count; } [[nodiscard]] std::string GetHostname() const { return _hostname; } [[nodiscard]] int GetPort() const { return _port; } }; } // C API exports extern "C" { ANSRabbitMQ_API int CreateANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle , const char* licenseKey); ANSRabbitMQ_API int ReleaseANSRabbitMQHandle(ANSCENTER::ANSRABBITMQ** Handle); ANSRabbitMQ_API int ConfigureSSL(ANSCENTER::ANSRABBITMQ** Handle,const char* caCertPath, const char* clientCertPath, const char * clientKeyPath, int verifyPeer, int verifyHostname); ANSRabbitMQ_API int CreateChannel(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal); ANSRabbitMQ_API int Connect(ANSCENTER::ANSRABBITMQ** Handle, const char* hostname, int port, const char* vhost,int channel, const char* userName, const char* password, int useSSL); ANSRabbitMQ_API int Disconnect(ANSCENTER::ANSRABBITMQ** Handle); ANSRabbitMQ_API int Publish(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name, const char* routing_key, const char* message); ANSRabbitMQ_API int GetQueueMessage_CPP(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name,int ack_mode, std::string &message); ANSRabbitMQ_API int GetQueueMessage(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name, int ack_mode, LStrHandle strMessage); ANSRabbitMQ_API int UnbindQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name); ANSRabbitMQ_API int DeleteExchange(ANSCENTER::ANSRABBITMQ** Handle, const char* exchange_name); ANSRabbitMQ_API int DeleteQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name); ANSRabbitMQ_API int PurgeQueue(ANSCENTER::ANSRABBITMQ** Handle, const char* queue_name); // V2 entry points — accept uint64_t handleVal by value (LabVIEW concurrency fix) ANSRabbitMQ_API int ConfigureSSL_V2(uint64_t handleVal, const char* caCertPath, const char* clientCertPath, const char* clientKeyPath, int verifyPeer, int verifyHostname); ANSRabbitMQ_API int CreateChannel_V2(uint64_t handleVal, const char* exchange_name, const char* exchange_type, const char* queue_name, const char* routing_name, int passvive, int durable, int auto_delete, int internalVal); ANSRabbitMQ_API int Connect_V2(uint64_t handleVal, const char* hostname, int port, const char* vhost, int channel, const char* userName, const char* password, int useSSL); ANSRabbitMQ_API int Disconnect_V2(uint64_t handleVal); ANSRabbitMQ_API int Publish_V2(uint64_t handleVal, const char* exchange_name, const char* routing_key, const char* message); ANSRabbitMQ_API int GetQueueMessage_V2(uint64_t handleVal, const char* queue_name, int ack_mode, LStrHandle strMessage); ANSRabbitMQ_API int UnbindQueue_V2(uint64_t handleVal, const char* exchange_name); ANSRabbitMQ_API int DeleteExchange_V2(uint64_t handleVal, const char* exchange_name); ANSRabbitMQ_API int DeleteQueue_V2(uint64_t handleVal, const char* queue_name); ANSRabbitMQ_API int PurgeQueue_V2(uint64_t handleVal, const char* queue_name); } #endif