import os R = r"C:\Projects\CMSCore" def w(rel, txt): p = os.path.join(R, rel) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, "w", encoding="utf-8", newline="\r\n") as f: f.write(txt.lstrip("\n")) print(f" OK: {rel}") # ============================================================================ # 8. device_agent.cpp # ============================================================================ w("anscloud-device/src/device_agent.cpp", r'''#include #include #include #include #include namespace anscloud { // ========================================================================== // DeviceAgent::Impl (PIMPL) // ========================================================================== struct DeviceAgent::Impl { IMessageBroker* broker = nullptr; DeviceAgentConfig config; // Callbacks CommandCallback cmd_handler; MetricsProvider metrics_provider; CameraListProvider camera_provider; InferenceProvider inference_provider; ConnectionCallback connection_cb; ErrorCallback error_cb; BroadcastCallback broadcast_cb; // State std::atomic running{false}; std::atomic connected{false}; std::chrono::steady_clock::time_point start_time; // Stats std::atomic commands_received{0}; std::atomic commands_executed{0}; std::atomic heartbeats_sent{0}; std::atomic telemetry_sent{0}; std::atomic events_sent{0}; std::atomic reconnect_count{0}; std::string last_error; mutable std::mutex error_mutex; // Threads std::thread message_thread; std::thread heartbeat_thread; std::thread metrics_thread; // Consumer tags std::string cmd_consumer_tag; std::string broadcast_consumer_tag; void set_error(const std::string& err) { std::lock_guard lk(error_mutex); last_error = err; if (error_cb) error_cb(-1, err); } void log(const std::string& msg) { if (config.verbose_logging) { std::cout << "[DeviceAgent:" << config.device_id << "] " << msg << std::endl; } } // --- Boot sequence --- bool boot() { log("Boot: connecting to broker..."); if (!broker->is_connected()) { if (!broker->connect()) { set_error("Failed to connect to broker"); return false; } } connected = true; log("Boot: connected."); // Declare exchanges (idempotent) broker->declare_exchange(exchange::DEVICE_TELEMETRY, "topic"); broker->declare_exchange(exchange::DEVICE_STATUS, "topic"); broker->declare_exchange(exchange::DEVICE_EVENTS, "topic"); broker->declare_exchange(exchange::COMMAND, "topic"); broker->declare_exchange(exchange::COMMAND_RESPONSE, "topic"); broker->declare_exchange(exchange::BROADCAST, "fanout"); log("Boot: exchanges declared."); // Declare device command queue std::string cmd_q = queue::device_command(config.device_id); broker->declare_queue(cmd_q, true, false, false); broker->bind_queue(cmd_q, exchange::COMMAND, config.device_id); log("Boot: command queue " + cmd_q + " bound."); // Start consuming commands cmd_consumer_tag = broker->start_consuming(cmd_q, [this](const std::string&, const std::string&, const std::string&, const std::string& corr_id, const std::string&, const std::string& body) { handle_command(body, corr_id); }); // Declare broadcast queue std::string bcast_q = queue::device_broadcast(config.device_id); broker->declare_queue(bcast_q, false, true, true); // exclusive, auto-delete broker->bind_queue(bcast_q, exchange::BROADCAST, ""); broadcast_consumer_tag = broker->start_consuming(bcast_q, [this](const std::string&, const std::string&, const std::string&, const std::string&, const std::string&, const std::string& body) { if (broadcast_cb) broadcast_cb(body); }); log("Boot: consumers started."); // Publish online status publish_status(DeviceStatus::Online, "Device started"); // Send initial heartbeat send_heartbeat_impl(); log("Boot: COMPLETE"); return true; } // --- Command handler --- void handle_command(const std::string& body, const std::string& corr_id) { commands_received++; log("Command received: " + body.substr(0, 80) + "..."); Command cmd = json::deserialize_command(body); cmd.correlation_id = corr_id; CommandResponse resp; resp.command_id = cmd.command_id; resp.device_id = config.device_id; resp.correlation_id = corr_id; resp.timestamp = json::now_iso8601(); if (cmd_handler) { auto t0 = std::chrono::steady_clock::now(); resp = cmd_handler(cmd); auto t1 = std::chrono::steady_clock::now(); resp.execution_time_ms = std::chrono::duration(t1 - t0).count(); resp.correlation_id = corr_id; commands_executed++; } else { resp.status = CommandStatus::Rejected; resp.error_message = "No command handler registered"; } // Publish response std::string resp_json = json::serialize(resp); broker->publish(exchange::COMMAND_RESPONSE, config.device_id, resp_json, corr_id); log("Command response sent."); } // --- Publishing helpers --- void publish_status(DeviceStatus status, const std::string& msg) { DeviceStatusMessage sm; sm.device_id = config.device_id; sm.status = status; sm.timestamp = json::now_iso8601(); sm.firmware_version = config.firmware_version; sm.ansvis_version = config.ansvis_version; sm.message = msg; broker->publish(exchange::DEVICE_STATUS, config.device_id, json::serialize(sm)); } bool send_heartbeat_impl() { Heartbeat hb; hb.device_id = config.device_id; hb.timestamp = json::now_iso8601(); hb.status = DeviceStatus::Online; auto elapsed = std::chrono::steady_clock::now() - start_time; hb.uptime_seconds = (uint64_t)std::chrono::duration_cast(elapsed).count(); hb.firmware_version = config.firmware_version; hb.ansvis_version = config.ansvis_version; bool ok = broker->publish(exchange::DEVICE_TELEMETRY, config.device_id + ".heartbeat", json::serialize(hb)); if (ok) heartbeats_sent++; return ok; } bool send_telemetry_impl() { DeviceTelemetry tel; tel.device_id = config.device_id; tel.timestamp = json::now_iso8601(); if (metrics_provider) tel.metrics = metrics_provider(); if (camera_provider) tel.cameras = camera_provider(); if (inference_provider) tel.inference = inference_provider(); bool ok = broker->publish(exchange::DEVICE_TELEMETRY, config.device_id + ".metrics", json::serialize(tel)); if (ok) telemetry_sent++; return ok; } // --- Background threads --- void message_loop() { while (running) { if (broker && broker->is_connected()) { broker->process_messages(100); } else { std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } } void heartbeat_loop() { while (running) { std::this_thread::sleep_for(std::chrono::seconds(config.heartbeat_interval_sec)); if (running && connected) { send_heartbeat_impl(); } } } void metrics_loop() { while (running) { std::this_thread::sleep_for(std::chrono::seconds(config.metrics_interval_sec)); if (running && connected) { send_telemetry_impl(); } } } }; // ========================================================================== // DeviceAgent public methods // ========================================================================== DeviceAgent::DeviceAgent() : m_impl(std::make_unique()) {} DeviceAgent::~DeviceAgent() { stop(); } void DeviceAgent::configure(IMessageBroker* broker, const DeviceAgentConfig& config) { m_impl->broker = broker; m_impl->config = config; } void DeviceAgent::set_command_handler(CommandCallback handler) { m_impl->cmd_handler = std::move(handler); } void DeviceAgent::set_metrics_provider(MetricsProvider provider) { m_impl->metrics_provider = std::move(provider); } void DeviceAgent::set_camera_list_provider(CameraListProvider prov) { m_impl->camera_provider = std::move(prov); } void DeviceAgent::set_inference_provider(InferenceProvider provider) { m_impl->inference_provider = std::move(provider); } bool DeviceAgent::start() { if (m_impl->running) return true; m_impl->start_time = std::chrono::steady_clock::now(); m_impl->running = true; if (!m_impl->boot()) { m_impl->running = false; return false; } // Start background threads m_impl->message_thread = std::thread([this] { m_impl->message_loop(); }); m_impl->heartbeat_thread = std::thread([this] { m_impl->heartbeat_loop(); }); m_impl->metrics_thread = std::thread([this] { m_impl->metrics_loop(); }); return true; } void DeviceAgent::stop() { if (!m_impl->running) return; m_impl->running = false; // Publish offline if (m_impl->broker && m_impl->broker->is_connected()) { m_impl->publish_status(DeviceStatus::Offline, "Device stopping"); if (!m_impl->cmd_consumer_tag.empty()) m_impl->broker->cancel_consumer(m_impl->cmd_consumer_tag); if (!m_impl->broadcast_consumer_tag.empty()) m_impl->broker->cancel_consumer(m_impl->broadcast_consumer_tag); } // Join threads if (m_impl->message_thread.joinable()) m_impl->message_thread.join(); if (m_impl->heartbeat_thread.joinable()) m_impl->heartbeat_thread.join(); if (m_impl->metrics_thread.joinable()) m_impl->metrics_thread.join(); m_impl->connected = false; } bool DeviceAgent::is_running() const { return m_impl->running; } bool DeviceAgent::is_connected() const { return m_impl->connected && m_impl->broker && m_impl->broker->is_connected(); } bool DeviceAgent::publish_event(const DeviceEvent& evt) { if (!m_impl->connected) return false; std::string routing = m_impl->config.device_id + "." + to_string(evt.type); bool ok = m_impl->broker->publish(exchange::DEVICE_EVENTS, routing, json::serialize(evt)); if (ok) m_impl->events_sent++; return ok; } bool DeviceAgent::publish_telemetry() { return m_impl->send_telemetry_impl(); } bool DeviceAgent::send_heartbeat() { return m_impl->send_heartbeat_impl(); } bool DeviceAgent::send_metrics() { return m_impl->send_telemetry_impl(); } void DeviceAgent::on_connection_changed(ConnectionCallback cb) { m_impl->connection_cb = std::move(cb); } void DeviceAgent::on_error(ErrorCallback cb) { m_impl->error_cb = std::move(cb); } void DeviceAgent::on_broadcast(BroadcastCallback cb) { m_impl->broadcast_cb = std::move(cb); } DeviceAgentStatus DeviceAgent::get_status() const { DeviceAgentStatus s; s.connected = m_impl->connected; s.running = m_impl->running; if (m_impl->running) { auto elapsed = std::chrono::steady_clock::now() - m_impl->start_time; s.uptime_seconds = (uint64_t)std::chrono::duration_cast(elapsed).count(); } s.commands_received = m_impl->commands_received; s.commands_executed = m_impl->commands_executed; s.heartbeats_sent = m_impl->heartbeats_sent; s.telemetry_sent = m_impl->telemetry_sent; s.events_sent = m_impl->events_sent; s.reconnect_count = m_impl->reconnect_count; { std::lock_guard lk(m_impl->error_mutex); s.last_error = m_impl->last_error; } return s; } } // namespace anscloud ''') print(" [8] device_agent.cpp done")