#include "ANSPulsar.h" #include namespace ANSCENTER { void Callback(pulsar::Result code, const pulsar::MessageId& msgId) { std::cout << "Received code:" << code << " -- MsgID: " << msgId << std::endl; } ANSPULSAR::ANSPULSAR() { // Initialize the Pulsar client with default values _hostname = "localhost"; _port = 5672; _jwtToken = ""; _useTls = false; _caCertPath = ""; _enableBatching = false; _batchingMaxMessages = 100; _batchingDelayMs = 1000; _allowInsecureConnection = false; _verifyHostname = true; _procuderInit = false; _consumerInit = false; _compressionType = pulsar::CompressionLZ4; } ANSPULSAR::~ANSPULSAR() { Close(); } bool ANSPULSAR::Initialize(const std::string& hostname, int port, const std::string& jwtToken, bool useTls, const std::string& caCertPath, bool enableBatching, int batchingMaxMessages, int batchingDelayMs, bool allowInsecureConnection, bool verifyHostname) { std::lock_guard lock(_mutex); _hostname = hostname; _port = port; _jwtToken = jwtToken; _useTls = useTls; _caCertPath = caCertPath; _enableBatching = enableBatching; _batchingMaxMessages = batchingMaxMessages; _batchingDelayMs = batchingDelayMs; _allowInsecureConnection = allowInsecureConnection; _verifyHostname = verifyHostname; return true; } bool ANSPULSAR::Connect(const std::string& topicName) { std::lock_guard lock(_mutex); try { _pubTopicName = topicName; std::string hostname = _hostname; size_t pos = hostname.find("://"); if (pos != std::string::npos) { hostname = hostname.substr(pos + 3); } _fullServiceUrl = (_useTls ? "pulsar+ssl://" : "pulsar://") + hostname + ":" + std::to_string(_port); if (_useTls) { _clientConfig.setUseTls(true); _clientConfig.setTlsTrustCertsFilePath(_caCertPath); _clientConfig.setTlsAllowInsecureConnection(_allowInsecureConnection); _clientConfig.setValidateHostName(_verifyHostname); } if (!_jwtToken.empty()) { _clientConfig.setAuth(pulsar::AuthToken::createWithToken(_jwtToken)); } _producerConfig.setSendTimeout(5000); _producerConfig.setCompressionType(_compressionType); _producerConfig.setBatchingEnabled(_enableBatching); if (_enableBatching) { _producerConfig.setBatchingMaxMessages(_batchingMaxMessages); _producerConfig.setBatchingMaxPublishDelayMs(_batchingDelayMs); } _client = std::make_unique(_fullServiceUrl, _clientConfig); auto result = _client->createProducer(_pubTopicName, _producerConfig, _producer); if (result != pulsar::ResultOk) { _logger.LogError("ANSPULSAR::Connect", "Failed to create producer: " + std::to_string(result), __FILE__, __LINE__); return false; } _procuderInit = true; return true; } catch (const std::exception& e) { _logger.LogFatal("ANSPULSAR::Connect", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogFatal("ANSPULSAR::Connect", "Unknown exception occurred", __FILE__, __LINE__); return false; } } bool ANSPULSAR::Disconnect() { std::lock_guard lock(_mutex); try { if (_procuderInit) { _producer.close(); _procuderInit = false; } if (_consumerInit) { _consumer.close(); _consumerInit=false; } } catch (const std::exception& e) { _logger.LogError("ANSPULSAR::Disconnect", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogError("ANSPULSAR::Disconnect", "Unknown exception occurred", __FILE__, __LINE__); return false; } return true; } bool ANSPULSAR::Close() { try { if (_client) { _client->close(); _client.reset(); } return true; } catch (const std::exception& e) { _logger.LogError("ANSPULSAR::Close", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogError("ANSPULSAR::Close", "Unknown exception occurred", __FILE__, __LINE__); return false; } } bool ANSPULSAR::Publish(const std::string& message, const std::string& properties) { std::lock_guard lock(_mutex); try { if (message.empty()) { _logger.LogError("ANSPULSAR::Publish", "Cannot send empty message", __FILE__, __LINE__); return false; } pulsar::MessageBuilder builder; builder.setContent(message); builder.setProperty("content-type", "application/json"); if (!properties.empty()) { std::istringstream iss(properties); std::string pair; while (std::getline(iss, pair, ';')) { size_t pos = pair.find(':'); if (pos != std::string::npos) { std::string key = pair.substr(0, pos); std::string value = pair.substr(pos + 1); key.erase(0, key.find_first_not_of(" \t\r\n")); key.erase(key.find_last_not_of(" \t\r\n") + 1); value.erase(0, value.find_first_not_of(" \t\r\n")); value.erase(value.find_last_not_of(" \t\r\n") + 1); builder.setProperty(key, value); } else { _logger.LogError("ANSPULSAR::Publish", "Invalid property: " + pair, __FILE__, __LINE__); } } } auto msg = builder.build(); auto result = _producer.send(msg); if (result != pulsar::ResultOk) { _logger.LogError("ANSPULSAR::Publish", "Send failed: " + std::to_string(result), __FILE__, __LINE__); return false; } return true; } catch (const std::exception& e) { _logger.LogFatal("ANSPULSAR::Publish", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogFatal("ANSPULSAR::Publish", "Unknown exception occurred", __FILE__, __LINE__); return false; } } bool ANSPULSAR::PublishAsync(const std::string& message, const std::string& properties) { std::lock_guard lock(_mutex); try { if (message.empty()) { _logger.LogError("ANSPULSAR::PublishAsync", "Cannot send empty message", __FILE__, __LINE__); return false; } pulsar::MessageBuilder builder; builder.setContent(message); builder.setProperty("content-type", "application/json"); if (!properties.empty()) { std::istringstream iss(properties); std::string pair; while (std::getline(iss, pair, ';')) { size_t pos = pair.find(':'); if (pos != std::string::npos) { std::string key = pair.substr(0, pos); std::string value = pair.substr(pos + 1); key.erase(0, key.find_first_not_of(" \t\r\n")); key.erase(key.find_last_not_of(" \t\r\n") + 1); value.erase(0, value.find_first_not_of(" \t\r\n")); value.erase(value.find_last_not_of(" \t\r\n") + 1); builder.setProperty(key, value); } else { _logger.LogError("ANSPULSAR::PublishAsync", "Invalid property: " + pair, __FILE__, __LINE__); } } } auto msg = builder.build(); _producer.sendAsync(msg, Callback); return true; } catch (const std::exception& e) { _logger.LogFatal("ANSPULSAR::PublishAsync", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogFatal("ANSPULSAR::PublishAsync", "Unknown exception occurred", __FILE__, __LINE__); return false; } } bool ANSPULSAR::Subscribe(const std::string& topicName, const std::string& subscriptionName) { std::lock_guard lock(_mutex); try { if (!_client) { _logger.LogError("ANSPULSAR::Subscribe", "Client not initialized", __FILE__, __LINE__); return false; } _consumerConfig.setConsumerType(pulsar::ConsumerExclusive); _consumerConfig.setReceiverQueueSize(1000); _subTopicName = topicName; _subscriptionName = subscriptionName; auto result = _client->subscribe(_subTopicName, _subscriptionName, _consumerConfig, _consumer); if (result != pulsar::ResultOk) { _logger.LogError("ANSPULSAR::Subscribe", "Subscribe failed: " + std::to_string(result), __FILE__, __LINE__); return false; } _consumerInit = true; return true; } catch (const std::exception& e) { _logger.LogFatal("ANSPULSAR::Subscribe", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogFatal("ANSPULSAR::Subscribe", "Unknown exception occurred", __FILE__, __LINE__); return false; } } bool ANSPULSAR::Receive(std::string& messageOut, bool doAcknowledge, int timeoutMs) { std::lock_guard lock(_mutex); try { if (!_client) { _logger.LogError("ANSPULSAR::Receive", "Client not initialized", __FILE__, __LINE__); return false; } pulsar::Message msg; pulsar::Result result = _consumer.receive(msg, timeoutMs); if (result != pulsar::ResultOk) { if (result != pulsar::ResultTimeout) { _logger.LogError("ANSPULSAR::Receive", "Receive failed: " + std::to_string(result), __FILE__, __LINE__); } return false; } messageOut = msg.getDataAsString(); if (doAcknowledge) { result = _consumer.acknowledge(msg); if (result != pulsar::ResultOk) { _logger.LogError("ANSPULSAR::Receive", "Acknowledge failed: " + std::to_string(result), __FILE__, __LINE__); return false; } } return true; } catch (const std::exception& e) { _logger.LogFatal("ANSPULSAR::Receive", std::string("Exception: ") + e.what(), __FILE__, __LINE__); return false; } catch (...) { _logger.LogFatal("ANSPULSAR::Receive", "Unknown exception occurred", __FILE__, __LINE__); return false; } } } // namespace ANSCENTER