337 lines
12 KiB
Python
337 lines
12 KiB
Python
|
|
import os
|
||
|
|
R = r"C:\Projects\CMSCore"
|
||
|
|
def w(rel, txt):
|
||
|
|
p = os.path.join(R, rel)
|
||
|
|
os.makedirs(os.path.dirname(p), exist_ok=True)
|
||
|
|
with open(p, "w", encoding="utf-8", newline="\r\n") as f:
|
||
|
|
f.write(txt.lstrip("\n"))
|
||
|
|
print(f" OK: {rel}")
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# 8. device_agent.cpp
|
||
|
|
# ============================================================================
|
||
|
|
w("anscloud-device/src/device_agent.cpp", r'''#include <anscloud/device/device_agent.h>
|
||
|
|
#include <anscloud/common/json_serializer.h>
|
||
|
|
#include <iostream>
|
||
|
|
#include <thread>
|
||
|
|
#include <chrono>
|
||
|
|
|
||
|
|
namespace anscloud {
|
||
|
|
|
||
|
|
// ==========================================================================
|
||
|
|
// DeviceAgent::Impl (PIMPL)
|
||
|
|
// ==========================================================================
|
||
|
|
struct DeviceAgent::Impl {
|
||
|
|
IMessageBroker* broker = nullptr;
|
||
|
|
DeviceAgentConfig config;
|
||
|
|
|
||
|
|
// Callbacks
|
||
|
|
CommandCallback cmd_handler;
|
||
|
|
MetricsProvider metrics_provider;
|
||
|
|
CameraListProvider camera_provider;
|
||
|
|
InferenceProvider inference_provider;
|
||
|
|
ConnectionCallback connection_cb;
|
||
|
|
ErrorCallback error_cb;
|
||
|
|
BroadcastCallback broadcast_cb;
|
||
|
|
|
||
|
|
// State
|
||
|
|
std::atomic<bool> running{false};
|
||
|
|
std::atomic<bool> connected{false};
|
||
|
|
std::chrono::steady_clock::time_point start_time;
|
||
|
|
|
||
|
|
// Stats
|
||
|
|
std::atomic<uint64_t> commands_received{0};
|
||
|
|
std::atomic<uint64_t> commands_executed{0};
|
||
|
|
std::atomic<uint64_t> heartbeats_sent{0};
|
||
|
|
std::atomic<uint64_t> telemetry_sent{0};
|
||
|
|
std::atomic<uint64_t> events_sent{0};
|
||
|
|
std::atomic<uint64_t> reconnect_count{0};
|
||
|
|
std::string last_error;
|
||
|
|
mutable std::mutex error_mutex;
|
||
|
|
|
||
|
|
// Threads
|
||
|
|
std::thread message_thread;
|
||
|
|
std::thread heartbeat_thread;
|
||
|
|
std::thread metrics_thread;
|
||
|
|
|
||
|
|
// Consumer tags
|
||
|
|
std::string cmd_consumer_tag;
|
||
|
|
std::string broadcast_consumer_tag;
|
||
|
|
|
||
|
|
void set_error(const std::string& err) {
|
||
|
|
std::lock_guard<std::mutex> lk(error_mutex);
|
||
|
|
last_error = err;
|
||
|
|
if (error_cb) error_cb(-1, err);
|
||
|
|
}
|
||
|
|
|
||
|
|
void log(const std::string& msg) {
|
||
|
|
if (config.verbose_logging) {
|
||
|
|
std::cout << "[DeviceAgent:" << config.device_id << "] " << msg << std::endl;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Boot sequence ---
|
||
|
|
bool boot() {
|
||
|
|
log("Boot: connecting to broker...");
|
||
|
|
|
||
|
|
if (!broker->is_connected()) {
|
||
|
|
if (!broker->connect()) {
|
||
|
|
set_error("Failed to connect to broker");
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
connected = true;
|
||
|
|
log("Boot: connected.");
|
||
|
|
|
||
|
|
// Declare exchanges (idempotent)
|
||
|
|
broker->declare_exchange(exchange::DEVICE_TELEMETRY, "topic");
|
||
|
|
broker->declare_exchange(exchange::DEVICE_STATUS, "topic");
|
||
|
|
broker->declare_exchange(exchange::DEVICE_EVENTS, "topic");
|
||
|
|
broker->declare_exchange(exchange::COMMAND, "topic");
|
||
|
|
broker->declare_exchange(exchange::COMMAND_RESPONSE, "topic");
|
||
|
|
broker->declare_exchange(exchange::BROADCAST, "fanout");
|
||
|
|
log("Boot: exchanges declared.");
|
||
|
|
|
||
|
|
// Declare device command queue
|
||
|
|
std::string cmd_q = queue::device_command(config.device_id);
|
||
|
|
broker->declare_queue(cmd_q, true, false, false);
|
||
|
|
broker->bind_queue(cmd_q, exchange::COMMAND, config.device_id);
|
||
|
|
log("Boot: command queue " + cmd_q + " bound.");
|
||
|
|
|
||
|
|
// Start consuming commands
|
||
|
|
cmd_consumer_tag = broker->start_consuming(cmd_q,
|
||
|
|
[this](const std::string&, const std::string&, const std::string&,
|
||
|
|
const std::string& corr_id, const std::string&, const std::string& body)
|
||
|
|
{
|
||
|
|
handle_command(body, corr_id);
|
||
|
|
});
|
||
|
|
|
||
|
|
// Declare broadcast queue
|
||
|
|
std::string bcast_q = queue::device_broadcast(config.device_id);
|
||
|
|
broker->declare_queue(bcast_q, false, true, true); // exclusive, auto-delete
|
||
|
|
broker->bind_queue(bcast_q, exchange::BROADCAST, "");
|
||
|
|
broadcast_consumer_tag = broker->start_consuming(bcast_q,
|
||
|
|
[this](const std::string&, const std::string&, const std::string&,
|
||
|
|
const std::string&, const std::string&, const std::string& body)
|
||
|
|
{
|
||
|
|
if (broadcast_cb) broadcast_cb(body);
|
||
|
|
});
|
||
|
|
log("Boot: consumers started.");
|
||
|
|
|
||
|
|
// Publish online status
|
||
|
|
publish_status(DeviceStatus::Online, "Device started");
|
||
|
|
|
||
|
|
// Send initial heartbeat
|
||
|
|
send_heartbeat_impl();
|
||
|
|
|
||
|
|
log("Boot: COMPLETE");
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Command handler ---
|
||
|
|
void handle_command(const std::string& body, const std::string& corr_id) {
|
||
|
|
commands_received++;
|
||
|
|
log("Command received: " + body.substr(0, 80) + "...");
|
||
|
|
|
||
|
|
Command cmd = json::deserialize_command(body);
|
||
|
|
cmd.correlation_id = corr_id;
|
||
|
|
|
||
|
|
CommandResponse resp;
|
||
|
|
resp.command_id = cmd.command_id;
|
||
|
|
resp.device_id = config.device_id;
|
||
|
|
resp.correlation_id = corr_id;
|
||
|
|
resp.timestamp = json::now_iso8601();
|
||
|
|
|
||
|
|
if (cmd_handler) {
|
||
|
|
auto t0 = std::chrono::steady_clock::now();
|
||
|
|
resp = cmd_handler(cmd);
|
||
|
|
auto t1 = std::chrono::steady_clock::now();
|
||
|
|
resp.execution_time_ms = std::chrono::duration<double, std::milli>(t1 - t0).count();
|
||
|
|
resp.correlation_id = corr_id;
|
||
|
|
commands_executed++;
|
||
|
|
} else {
|
||
|
|
resp.status = CommandStatus::Rejected;
|
||
|
|
resp.error_message = "No command handler registered";
|
||
|
|
}
|
||
|
|
|
||
|
|
// Publish response
|
||
|
|
std::string resp_json = json::serialize(resp);
|
||
|
|
broker->publish(exchange::COMMAND_RESPONSE, config.device_id, resp_json, corr_id);
|
||
|
|
log("Command response sent.");
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Publishing helpers ---
|
||
|
|
void publish_status(DeviceStatus status, const std::string& msg) {
|
||
|
|
DeviceStatusMessage sm;
|
||
|
|
sm.device_id = config.device_id;
|
||
|
|
sm.status = status;
|
||
|
|
sm.timestamp = json::now_iso8601();
|
||
|
|
sm.firmware_version = config.firmware_version;
|
||
|
|
sm.ansvis_version = config.ansvis_version;
|
||
|
|
sm.message = msg;
|
||
|
|
broker->publish(exchange::DEVICE_STATUS, config.device_id, json::serialize(sm));
|
||
|
|
}
|
||
|
|
|
||
|
|
bool send_heartbeat_impl() {
|
||
|
|
Heartbeat hb;
|
||
|
|
hb.device_id = config.device_id;
|
||
|
|
hb.timestamp = json::now_iso8601();
|
||
|
|
hb.status = DeviceStatus::Online;
|
||
|
|
auto elapsed = std::chrono::steady_clock::now() - start_time;
|
||
|
|
hb.uptime_seconds = (uint64_t)std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
|
||
|
|
hb.firmware_version = config.firmware_version;
|
||
|
|
hb.ansvis_version = config.ansvis_version;
|
||
|
|
|
||
|
|
bool ok = broker->publish(exchange::DEVICE_TELEMETRY,
|
||
|
|
config.device_id + ".heartbeat",
|
||
|
|
json::serialize(hb));
|
||
|
|
if (ok) heartbeats_sent++;
|
||
|
|
return ok;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool send_telemetry_impl() {
|
||
|
|
DeviceTelemetry tel;
|
||
|
|
tel.device_id = config.device_id;
|
||
|
|
tel.timestamp = json::now_iso8601();
|
||
|
|
if (metrics_provider) tel.metrics = metrics_provider();
|
||
|
|
if (camera_provider) tel.cameras = camera_provider();
|
||
|
|
if (inference_provider) tel.inference = inference_provider();
|
||
|
|
|
||
|
|
bool ok = broker->publish(exchange::DEVICE_TELEMETRY,
|
||
|
|
config.device_id + ".metrics",
|
||
|
|
json::serialize(tel));
|
||
|
|
if (ok) telemetry_sent++;
|
||
|
|
return ok;
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Background threads ---
|
||
|
|
void message_loop() {
|
||
|
|
while (running) {
|
||
|
|
if (broker && broker->is_connected()) {
|
||
|
|
broker->process_messages(100);
|
||
|
|
} else {
|
||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void heartbeat_loop() {
|
||
|
|
while (running) {
|
||
|
|
std::this_thread::sleep_for(std::chrono::seconds(config.heartbeat_interval_sec));
|
||
|
|
if (running && connected) {
|
||
|
|
send_heartbeat_impl();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void metrics_loop() {
|
||
|
|
while (running) {
|
||
|
|
std::this_thread::sleep_for(std::chrono::seconds(config.metrics_interval_sec));
|
||
|
|
if (running && connected) {
|
||
|
|
send_telemetry_impl();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
// ==========================================================================
|
||
|
|
// DeviceAgent public methods
|
||
|
|
// ==========================================================================
|
||
|
|
|
||
|
|
DeviceAgent::DeviceAgent() : m_impl(std::make_unique<Impl>()) {}
|
||
|
|
DeviceAgent::~DeviceAgent() { stop(); }
|
||
|
|
|
||
|
|
void DeviceAgent::configure(IMessageBroker* broker, const DeviceAgentConfig& config) {
|
||
|
|
m_impl->broker = broker;
|
||
|
|
m_impl->config = config;
|
||
|
|
}
|
||
|
|
|
||
|
|
void DeviceAgent::set_command_handler(CommandCallback handler) { m_impl->cmd_handler = std::move(handler); }
|
||
|
|
void DeviceAgent::set_metrics_provider(MetricsProvider provider) { m_impl->metrics_provider = std::move(provider); }
|
||
|
|
void DeviceAgent::set_camera_list_provider(CameraListProvider prov) { m_impl->camera_provider = std::move(prov); }
|
||
|
|
void DeviceAgent::set_inference_provider(InferenceProvider provider) { m_impl->inference_provider = std::move(provider); }
|
||
|
|
|
||
|
|
bool DeviceAgent::start() {
|
||
|
|
if (m_impl->running) return true;
|
||
|
|
m_impl->start_time = std::chrono::steady_clock::now();
|
||
|
|
m_impl->running = true;
|
||
|
|
|
||
|
|
if (!m_impl->boot()) {
|
||
|
|
m_impl->running = false;
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Start background threads
|
||
|
|
m_impl->message_thread = std::thread([this] { m_impl->message_loop(); });
|
||
|
|
m_impl->heartbeat_thread = std::thread([this] { m_impl->heartbeat_loop(); });
|
||
|
|
m_impl->metrics_thread = std::thread([this] { m_impl->metrics_loop(); });
|
||
|
|
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
void DeviceAgent::stop() {
|
||
|
|
if (!m_impl->running) return;
|
||
|
|
m_impl->running = false;
|
||
|
|
|
||
|
|
// Publish offline
|
||
|
|
if (m_impl->broker && m_impl->broker->is_connected()) {
|
||
|
|
m_impl->publish_status(DeviceStatus::Offline, "Device stopping");
|
||
|
|
if (!m_impl->cmd_consumer_tag.empty())
|
||
|
|
m_impl->broker->cancel_consumer(m_impl->cmd_consumer_tag);
|
||
|
|
if (!m_impl->broadcast_consumer_tag.empty())
|
||
|
|
m_impl->broker->cancel_consumer(m_impl->broadcast_consumer_tag);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Join threads
|
||
|
|
if (m_impl->message_thread.joinable()) m_impl->message_thread.join();
|
||
|
|
if (m_impl->heartbeat_thread.joinable()) m_impl->heartbeat_thread.join();
|
||
|
|
if (m_impl->metrics_thread.joinable()) m_impl->metrics_thread.join();
|
||
|
|
|
||
|
|
m_impl->connected = false;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool DeviceAgent::is_running() const { return m_impl->running; }
|
||
|
|
bool DeviceAgent::is_connected() const { return m_impl->connected && m_impl->broker && m_impl->broker->is_connected(); }
|
||
|
|
|
||
|
|
bool DeviceAgent::publish_event(const DeviceEvent& evt) {
|
||
|
|
if (!m_impl->connected) return false;
|
||
|
|
std::string routing = m_impl->config.device_id + "." + to_string(evt.type);
|
||
|
|
bool ok = m_impl->broker->publish(exchange::DEVICE_EVENTS, routing, json::serialize(evt));
|
||
|
|
if (ok) m_impl->events_sent++;
|
||
|
|
return ok;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool DeviceAgent::publish_telemetry() { return m_impl->send_telemetry_impl(); }
|
||
|
|
bool DeviceAgent::send_heartbeat() { return m_impl->send_heartbeat_impl(); }
|
||
|
|
bool DeviceAgent::send_metrics() { return m_impl->send_telemetry_impl(); }
|
||
|
|
|
||
|
|
void DeviceAgent::on_connection_changed(ConnectionCallback cb) { m_impl->connection_cb = std::move(cb); }
|
||
|
|
void DeviceAgent::on_error(ErrorCallback cb) { m_impl->error_cb = std::move(cb); }
|
||
|
|
void DeviceAgent::on_broadcast(BroadcastCallback cb) { m_impl->broadcast_cb = std::move(cb); }
|
||
|
|
|
||
|
|
DeviceAgentStatus DeviceAgent::get_status() const {
|
||
|
|
DeviceAgentStatus s;
|
||
|
|
s.connected = m_impl->connected;
|
||
|
|
s.running = m_impl->running;
|
||
|
|
if (m_impl->running) {
|
||
|
|
auto elapsed = std::chrono::steady_clock::now() - m_impl->start_time;
|
||
|
|
s.uptime_seconds = (uint64_t)std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
|
||
|
|
}
|
||
|
|
s.commands_received = m_impl->commands_received;
|
||
|
|
s.commands_executed = m_impl->commands_executed;
|
||
|
|
s.heartbeats_sent = m_impl->heartbeats_sent;
|
||
|
|
s.telemetry_sent = m_impl->telemetry_sent;
|
||
|
|
s.events_sent = m_impl->events_sent;
|
||
|
|
s.reconnect_count = m_impl->reconnect_count;
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lk(m_impl->error_mutex);
|
||
|
|
s.last_error = m_impl->last_error;
|
||
|
|
}
|
||
|
|
return s;
|
||
|
|
}
|
||
|
|
|
||
|
|
} // namespace anscloud
|
||
|
|
''')
|
||
|
|
|
||
|
|
print(" [8] device_agent.cpp done")
|