301 lines
12 KiB
C++
301 lines
12 KiB
C++
|
|
#include "ANSPulsar.h"
|
||
|
|
#include <sstream>
|
||
|
|
|
||
|
|
namespace ANSCENTER {
|
||
|
|
|
||
|
|
void Callback(pulsar::Result code, const pulsar::MessageId& msgId) {
|
||
|
|
std::cout << "Received code:" << code << " -- MsgID: " << msgId << std::endl;
|
||
|
|
}
|
||
|
|
ANSPULSAR::ANSPULSAR() {
|
||
|
|
// Initialize the Pulsar client with default values
|
||
|
|
_hostname = "localhost";
|
||
|
|
_port = 5672;
|
||
|
|
_jwtToken = "";
|
||
|
|
_useTls = false;
|
||
|
|
_caCertPath = "";
|
||
|
|
_enableBatching = false;
|
||
|
|
_batchingMaxMessages = 100;
|
||
|
|
_batchingDelayMs = 1000;
|
||
|
|
_allowInsecureConnection = false;
|
||
|
|
_verifyHostname = true;
|
||
|
|
_procuderInit = false;
|
||
|
|
_consumerInit = false;
|
||
|
|
_compressionType = pulsar::CompressionLZ4;
|
||
|
|
}
|
||
|
|
ANSPULSAR::~ANSPULSAR() {
|
||
|
|
Close();
|
||
|
|
}
|
||
|
|
bool ANSPULSAR::Initialize(const std::string& hostname, int port, const std::string& jwtToken, bool useTls, const std::string& caCertPath,
|
||
|
|
bool enableBatching, int batchingMaxMessages, int batchingDelayMs,
|
||
|
|
bool allowInsecureConnection, bool verifyHostname) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
_hostname = hostname;
|
||
|
|
_port = port;
|
||
|
|
_jwtToken = jwtToken;
|
||
|
|
_useTls = useTls;
|
||
|
|
_caCertPath = caCertPath;
|
||
|
|
_enableBatching = enableBatching;
|
||
|
|
_batchingMaxMessages = batchingMaxMessages;
|
||
|
|
_batchingDelayMs = batchingDelayMs;
|
||
|
|
_allowInsecureConnection = allowInsecureConnection;
|
||
|
|
_verifyHostname = verifyHostname;
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool ANSPULSAR::Connect(const std::string& topicName) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
_pubTopicName = topicName;
|
||
|
|
|
||
|
|
std::string hostname = _hostname;
|
||
|
|
size_t pos = hostname.find("://");
|
||
|
|
if (pos != std::string::npos) {
|
||
|
|
hostname = hostname.substr(pos + 3);
|
||
|
|
}
|
||
|
|
|
||
|
|
_fullServiceUrl = (_useTls ? "pulsar+ssl://" : "pulsar://") + hostname + ":" + std::to_string(_port);
|
||
|
|
|
||
|
|
if (_useTls) {
|
||
|
|
_clientConfig.setUseTls(true);
|
||
|
|
_clientConfig.setTlsTrustCertsFilePath(_caCertPath);
|
||
|
|
_clientConfig.setTlsAllowInsecureConnection(_allowInsecureConnection);
|
||
|
|
_clientConfig.setValidateHostName(_verifyHostname);
|
||
|
|
}
|
||
|
|
|
||
|
|
if (!_jwtToken.empty()) {
|
||
|
|
_clientConfig.setAuth(pulsar::AuthToken::createWithToken(_jwtToken));
|
||
|
|
}
|
||
|
|
|
||
|
|
_producerConfig.setSendTimeout(5000);
|
||
|
|
_producerConfig.setCompressionType(_compressionType);
|
||
|
|
_producerConfig.setBatchingEnabled(_enableBatching);
|
||
|
|
if (_enableBatching) {
|
||
|
|
_producerConfig.setBatchingMaxMessages(_batchingMaxMessages);
|
||
|
|
_producerConfig.setBatchingMaxPublishDelayMs(_batchingDelayMs);
|
||
|
|
}
|
||
|
|
|
||
|
|
_client = std::make_unique<pulsar::Client>(_fullServiceUrl, _clientConfig);
|
||
|
|
auto result = _client->createProducer(_pubTopicName, _producerConfig, _producer);
|
||
|
|
if (result != pulsar::ResultOk) {
|
||
|
|
_logger.LogError("ANSPULSAR::Connect", "Failed to create producer: " + std::to_string(result), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
_procuderInit = true;
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Connect", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Connect", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool ANSPULSAR::Disconnect() {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
if (_procuderInit) {
|
||
|
|
_producer.close();
|
||
|
|
_procuderInit = false;
|
||
|
|
}
|
||
|
|
if (_consumerInit) {
|
||
|
|
_consumer.close();
|
||
|
|
_consumerInit=false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogError("ANSPULSAR::Disconnect", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogError("ANSPULSAR::Disconnect", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool ANSPULSAR::Close() {
|
||
|
|
try {
|
||
|
|
if (_client) {
|
||
|
|
_client->close();
|
||
|
|
_client.reset();
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogError("ANSPULSAR::Close", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogError("ANSPULSAR::Close", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool ANSPULSAR::Publish(const std::string& message, const std::string& properties) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
if (message.empty()) {
|
||
|
|
_logger.LogError("ANSPULSAR::Publish", "Cannot send empty message", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
pulsar::MessageBuilder builder;
|
||
|
|
builder.setContent(message);
|
||
|
|
builder.setProperty("content-type", "application/json");
|
||
|
|
|
||
|
|
if (!properties.empty()) {
|
||
|
|
std::istringstream iss(properties);
|
||
|
|
std::string pair;
|
||
|
|
while (std::getline(iss, pair, ';')) {
|
||
|
|
size_t pos = pair.find(':');
|
||
|
|
if (pos != std::string::npos) {
|
||
|
|
std::string key = pair.substr(0, pos);
|
||
|
|
std::string value = pair.substr(pos + 1);
|
||
|
|
key.erase(0, key.find_first_not_of(" \t\r\n"));
|
||
|
|
key.erase(key.find_last_not_of(" \t\r\n") + 1);
|
||
|
|
value.erase(0, value.find_first_not_of(" \t\r\n"));
|
||
|
|
value.erase(value.find_last_not_of(" \t\r\n") + 1);
|
||
|
|
builder.setProperty(key, value);
|
||
|
|
}
|
||
|
|
else {
|
||
|
|
_logger.LogError("ANSPULSAR::Publish", "Invalid property: " + pair, __FILE__, __LINE__);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
auto msg = builder.build();
|
||
|
|
auto result = _producer.send(msg);
|
||
|
|
if (result != pulsar::ResultOk) {
|
||
|
|
_logger.LogError("ANSPULSAR::Publish", "Send failed: " + std::to_string(result), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Publish", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Publish", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
bool ANSPULSAR::PublishAsync(const std::string& message, const std::string& properties) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
if (message.empty()) {
|
||
|
|
_logger.LogError("ANSPULSAR::PublishAsync", "Cannot send empty message", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
pulsar::MessageBuilder builder;
|
||
|
|
builder.setContent(message);
|
||
|
|
builder.setProperty("content-type", "application/json");
|
||
|
|
|
||
|
|
if (!properties.empty()) {
|
||
|
|
std::istringstream iss(properties);
|
||
|
|
std::string pair;
|
||
|
|
while (std::getline(iss, pair, ';')) {
|
||
|
|
size_t pos = pair.find(':');
|
||
|
|
if (pos != std::string::npos) {
|
||
|
|
std::string key = pair.substr(0, pos);
|
||
|
|
std::string value = pair.substr(pos + 1);
|
||
|
|
key.erase(0, key.find_first_not_of(" \t\r\n"));
|
||
|
|
key.erase(key.find_last_not_of(" \t\r\n") + 1);
|
||
|
|
value.erase(0, value.find_first_not_of(" \t\r\n"));
|
||
|
|
value.erase(value.find_last_not_of(" \t\r\n") + 1);
|
||
|
|
builder.setProperty(key, value);
|
||
|
|
}
|
||
|
|
else {
|
||
|
|
_logger.LogError("ANSPULSAR::PublishAsync", "Invalid property: " + pair, __FILE__, __LINE__);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
auto msg = builder.build();
|
||
|
|
_producer.sendAsync(msg, Callback);
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::PublishAsync", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::PublishAsync", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool ANSPULSAR::Subscribe(const std::string& topicName, const std::string& subscriptionName) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
if (!_client) {
|
||
|
|
_logger.LogError("ANSPULSAR::Subscribe", "Client not initialized", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
_consumerConfig.setConsumerType(pulsar::ConsumerExclusive);
|
||
|
|
_consumerConfig.setReceiverQueueSize(1000);
|
||
|
|
_subTopicName = topicName;
|
||
|
|
_subscriptionName = subscriptionName;
|
||
|
|
auto result = _client->subscribe(_subTopicName, _subscriptionName, _consumerConfig, _consumer);
|
||
|
|
if (result != pulsar::ResultOk) {
|
||
|
|
_logger.LogError("ANSPULSAR::Subscribe", "Subscribe failed: " + std::to_string(result), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
_consumerInit = true;
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Subscribe", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Subscribe", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
bool ANSPULSAR::Receive(std::string& messageOut, bool doAcknowledge, int timeoutMs) {
|
||
|
|
std::lock_guard<std::recursive_mutex> lock(_mutex);
|
||
|
|
try {
|
||
|
|
if (!_client) {
|
||
|
|
_logger.LogError("ANSPULSAR::Receive", "Client not initialized", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
pulsar::Message msg;
|
||
|
|
pulsar::Result result = _consumer.receive(msg, timeoutMs);
|
||
|
|
|
||
|
|
if (result != pulsar::ResultOk) {
|
||
|
|
if (result != pulsar::ResultTimeout) {
|
||
|
|
_logger.LogError("ANSPULSAR::Receive", "Receive failed: " + std::to_string(result), __FILE__, __LINE__);
|
||
|
|
}
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
messageOut = msg.getDataAsString();
|
||
|
|
|
||
|
|
if (doAcknowledge) {
|
||
|
|
result = _consumer.acknowledge(msg);
|
||
|
|
if (result != pulsar::ResultOk) {
|
||
|
|
_logger.LogError("ANSPULSAR::Receive", "Acknowledge failed: " + std::to_string(result), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch (const std::exception& e) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Receive", std::string("Exception: ") + e.what(), __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
catch (...) {
|
||
|
|
_logger.LogFatal("ANSPULSAR::Receive", "Unknown exception occurred", __FILE__, __LINE__);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
} // namespace ANSCENTER
|
||
|
|
|