#ifndef ANSPALSAR_H #define ANSPALSAR_H #define ANSPALSAR_API __declspec(dllexport) #pragma once #include #include #include #include #include "ANSLicense.h" #include "LabVIEWHeader/extcode.h" #include using pulsar::Client; namespace ANSCENTER { class ANSPALSAR_API ANSPULSAR { private: std::string _hostname; int _port = 5672; std::string _jwtToken; bool _useTls = false; std::string _caCertPath; bool _enableBatching = false; int _batchingMaxMessages = 100; int _batchingDelayMs = 1000; bool _allowInsecureConnection = false; bool _verifyHostname = true; bool _procuderInit = false; bool _consumerInit = false; std::string _pubTopicName; std::string _subTopicName; std::string _subscriptionName = "ans_sub"; std::string _fullServiceUrl; pulsar::CompressionType _compressionType = pulsar::CompressionLZ4; std::unique_ptr _client = nullptr; pulsar::Producer _producer; pulsar::Consumer _consumer; pulsar::ClientConfiguration _clientConfig; pulsar::ProducerConfiguration _producerConfig; pulsar::ConsumerConfiguration _consumerConfig; SPDLogger& _logger = SPDLogger::GetInstance("ANSPulsar", true); std::recursive_mutex _mutex; public: ANSPULSAR(); ~ANSPULSAR(); [[nodiscard]] bool Initialize(const std::string& hostname, int port, const std::string& jwtToken, bool useSSL, const std::string& caCertPath, bool enableBatching, int batchingMaxMessages, int batchingDelayMs, bool allowInsecureConnection, bool verifyHostname); [[nodiscard]] bool Connect(const std::string& topicName); [[nodiscard]] bool Disconnect(); [[nodiscard]] bool Publish(const std::string& message, const std::string& properties); [[nodiscard]] bool PublishAsync(const std::string& message, const std::string& properties); [[nodiscard]] bool Subscribe(const std::string& topicName, const std::string& subscriptionName); [[nodiscard]] bool Receive(std::string& messageOut, bool doAcknowledge, int timeoutMs = 1000); [[nodiscard]] bool Close(); }; } // C API exports extern "C" { ANSPALSAR_API int CreateANSPulsarHandle(ANSCENTER::ANSPULSAR** Handle, const char* hostname,int port, const char* jwtToken, int useTls, const char* caCertPath, int enableBatching, int batchingMaxMessages, int batchingDelayMs, int allowInsecureConnection,int verifyHostName); ANSPALSAR_API int ReleasePulsarHandle(ANSCENTER::ANSPULSAR** Handle); ANSPALSAR_API int ConnectANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* topicName); ANSPALSAR_API int DisconnectANSPulsar(ANSCENTER::ANSPULSAR** Handle); ANSPALSAR_API int PublishANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* message, const char* properties); ANSPALSAR_API int PublishAsyncANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* message, const char* properties); ANSPALSAR_API int SubscribeANSPulsar(ANSCENTER::ANSPULSAR** Handle, const char* topicName, const char* subscriptionName); ANSPALSAR_API int ReceiveMessageANSPulsar(ANSCENTER::ANSPULSAR** Handle, int timeoutMs, int doAcknowledge, LStrHandle strMessage); ANSPALSAR_API int ReceiveMessageANSPulsar_CPP(ANSCENTER::ANSPULSAR** Handle, int timeoutMs, int doAcknowledge, std::string& message); // V2 API — handle passed by value (uint64_t) to avoid LabVIEW buffer reuse bug ANSPALSAR_API int ConnectANSPulsar_V2(uint64_t handleVal, const char* topicName); ANSPALSAR_API int DisconnectANSPulsar_V2(uint64_t handleVal); ANSPALSAR_API int PublishANSPulsar_V2(uint64_t handleVal, const char* message, const char* properties); ANSPALSAR_API int PublishAsyncANSPulsar_V2(uint64_t handleVal, const char* message, const char* properties); ANSPALSAR_API int SubscribeANSPulsar_V2(uint64_t handleVal, const char* topicName, const char* subscriptionName); ANSPALSAR_API int ReceiveMessageANSPulsar_V2(uint64_t handleVal, int timeoutMs, int doAcknowledge, LStrHandle strMessage); } #endif