// device_simulator_main.ts - 设备模拟器管理器 import { loadConfig, RawConfig } from "./simulator_config.ts"; import { connect, MqttClient } from "npm:mqtt@5.10.1"; interface DeviceInfo { ip: string; port: string; slaveId: string; deviceName: string; protocolType: string; brandName: string; } interface DeviceWorkerMessage { type: "init" | "close" | "reconnect" | "action"; data?: any; } interface DeviceMainMessage { type: "ready" | "error" | "status" | "log" | "reset"; data?: any; } interface DeviceWorker { worker: Worker; deviceId: string; status: "starting" | "running" | "error" | "stopped"; config: any; deviceInfo: DeviceInfo; } class DeviceSimulatorManager { private workers: Map = new Map(); private config!: RawConfig; private mqttClient!: MqttClient; private isConnected = false; async initialize() { this.config = await loadConfig(); console.log("📋 Device Simulator Manager initialized"); // 连接MQTT并监听instantActions消息 await this.connectMqtt(); } private async connectMqtt() { const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`; console.log(`🔌 Connecting to MQTT broker: ${brokerUrl}`); this.mqttClient = connect(brokerUrl, { clean: true, keepalive: 60, }); return new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error("MQTT connection timeout")); }, 10000); this.mqttClient.on("connect", () => { clearTimeout(timeout); this.isConnected = true; console.log("✅ Connected to MQTT broker"); this.setupMqttHandlers(); resolve(); }); this.mqttClient.on("error", (error: Error) => { clearTimeout(timeout); console.error(`❌ MQTT connection error: ${error.message}`); reject(error); }); }); } private setupMqttHandlers() { // 订阅所有设备的instantActions主题 const instantActionsTopic = `${this.config.mqttBroker.vdaInterface}/+/instantActions`; this.mqttClient.subscribe(instantActionsTopic, (err: Error | null) => { if (err) { console.error(`❌ Failed to subscribe to ${instantActionsTopic}: ${err.message}`); } else { console.log(`📡 Subscribed to ${instantActionsTopic}`); } }); this.mqttClient.on("message", (topic: string, message: Uint8Array) => { try { const data = JSON.parse(new TextDecoder().decode(message)); console.log(`📥 Received instantActions message on ${topic}`); if (topic.includes("/instantActions")) { this.handleInstantActionsMessage(data); } } catch (error) { console.error(`❌ Error parsing MQTT message: ${(error as Error).message}`); } }); this.mqttClient.on("disconnect", () => { this.isConnected = false; console.log("⚠️ Disconnected from MQTT broker"); }); this.mqttClient.on("error", (error: Error) => { console.error(`❌ MQTT error: ${error.message}`); }); } private async handleInstantActionsMessage(message: any) { console.log("🔍 Processing instantActions message..."); try { // 支持两种格式:标准VDA 5050的instantActions和实际使用的actions const actions = message.instantActions || message.actions || []; if (!Array.isArray(actions)) { console.error("❌ Invalid actions format - not an array"); return; } for (const action of actions) { if (action.actionType === "deviceSetup") { await this.handleDeviceAction(action); } } } catch (error) { console.error(`❌ Error processing instantActions: ${(error as Error).message}`); } } // 从actionParameters中提取设备信息 private extractDeviceInfo(actionParameters: any[]): DeviceInfo | null { const params: Record = {}; for (const param of actionParameters) { if (param.key && param.value) { params[param.key] = param.value; } } const required = ['ip', 'port', 'slaveId']; for (const field of required) { if (!params[field]) { console.error(`❌ Missing required parameter: ${field}`); return null; } } return { ip: params.ip, port: params.port, slaveId: params.slaveId, deviceName: params.deviceName || 'Unknown Device', protocolType: params.protocolType || 'Unknown Protocol', brandName: params.brandName || 'Unknown Brand' }; } // 生成设备ID private generateDeviceId(deviceInfo: DeviceInfo): string { return `${deviceInfo.ip}-${deviceInfo.port}-${deviceInfo.slaveId}`; } // 处理instantActions消息,动态创建设备模拟器 async handleDeviceAction(actionMessage: any): Promise { console.log("🔧 Processing device action:", actionMessage.actionType); if (!actionMessage.actionParameters || !Array.isArray(actionMessage.actionParameters)) { console.error("❌ Invalid action parameters"); return; } const deviceInfo = this.extractDeviceInfo(actionMessage.actionParameters); if (!deviceInfo) { console.error("❌ Failed to extract device info"); return; } const deviceId = this.generateDeviceId(deviceInfo); // 检查设备模拟器是否已存在 if (this.workers.has(deviceId)) { console.log(`📱 Device ${deviceId} already exists, sending action to existing worker`); const worker = this.workers.get(deviceId)!; worker.worker.postMessage({ type: "action", data: actionMessage } as DeviceWorkerMessage); return; } // 创建新的设备模拟器 console.log(`🆕 Creating new device simulator for ${deviceId}`); console.log(`📱 Device: ${deviceInfo.deviceName} (${deviceInfo.brandName})`); console.log(`🔌 Protocol: ${deviceInfo.protocolType}`); await this.startDeviceWorker(deviceId, deviceInfo, actionMessage); } private async startDeviceWorker(deviceId: string, deviceInfo: DeviceInfo, initialAction: any): Promise { console.log(`🔧 Creating device worker for ${deviceId}`); const worker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, { type: "module", }); const workerConfig = { deviceId, deviceInfo, vdaInterface: this.config.mqttBroker.vdaInterface, mqtt: { brokerUrl: `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`, options: { clean: true, keepalive: 60, } }, vehicle: { manufacturer: deviceInfo.brandName, serialNumber: deviceId, vdaVersion: this.config.vehicle.vdaVersion, } }; const deviceWorker: DeviceWorker = { worker, deviceId, status: "starting", config: workerConfig, deviceInfo, }; this.workers.set(deviceId, deviceWorker); this.setupDeviceWorkerHandlers(deviceWorker); // 发送初始化消息 worker.postMessage({ type: "init", data: workerConfig } as DeviceWorkerMessage); // 等待worker准备就绪,然后发送初始action return new Promise((resolve, reject) => { const timeout = setTimeout(() => { deviceWorker.status = "error"; reject(new Error(`Device worker ${deviceId} failed to start within timeout`)); }, 15000); const originalHandler = worker.onmessage; worker.onmessage = (event: MessageEvent) => { if (event.data.type === "ready") { clearTimeout(timeout); deviceWorker.status = "running"; worker.onmessage = originalHandler; console.log(`✅ Device worker ${deviceId} started successfully`); // 发送初始action worker.postMessage({ type: "action", data: initialAction } as DeviceWorkerMessage); resolve(); } else if (event.data.type === "error") { clearTimeout(timeout); deviceWorker.status = "error"; worker.onmessage = originalHandler; reject(new Error(`Device worker ${deviceId} failed to start: ${event.data.data?.error}`)); } }; }); } private setupDeviceWorkerHandlers(deviceWorker: DeviceWorker) { const { worker, deviceId } = deviceWorker; worker.onmessage = (event: MessageEvent) => { const { type, data } = event.data; switch (type) { case "ready": deviceWorker.status = "running"; console.log(`✅ Device ${deviceId} is ready`); break; case "error": deviceWorker.status = "error"; console.error(`❌ Device ${deviceId} error:`, data?.error); break; case "status": console.log(`📊 Device ${deviceId} status:`, data?.status); if (data?.status === "closed") { deviceWorker.status = "stopped"; } break; case "log": // 转发日志到控制台 console.log(data?.message); break; case "reset": console.log(`🔄 Worker reset requested for device ${deviceId}: ${data?.reason}`); console.log(`Error: ${data?.error}`); this.handleDeviceWorkerReset(deviceWorker, data); break; default: console.warn(`Unknown message type from device ${deviceId}:`, type); } }; worker.onerror = (error) => { deviceWorker.status = "error"; console.error(`❌ Device worker ${deviceId} error:`, error.message); }; worker.onmessageerror = (error) => { console.error(`❌ Message error from device worker ${deviceId}:`, error); }; } private async handleDeviceWorkerReset(deviceWorker: DeviceWorker, resetData: any): Promise { const { deviceId } = deviceWorker; console.log(`🔄 Resetting device worker ${deviceId} due to: ${resetData?.reason}`); try { // 终止当前worker deviceWorker.worker.terminate(); // 创建新的worker const newWorker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, { type: "module", }); // 更新worker引用 deviceWorker.worker = newWorker; deviceWorker.status = "starting"; // 重新设置消息处理 this.setupDeviceWorkerHandlers(deviceWorker); // 重新初始化 newWorker.postMessage({ type: "init", data: deviceWorker.config } as DeviceWorkerMessage); console.log(`✅ Device worker ${deviceId} reset completed`); } catch (error) { console.error(`❌ Failed to reset device worker ${deviceId}:`, (error as Error).message); deviceWorker.status = "error"; } } async stopDeviceWorker(deviceId: string): Promise { const deviceWorker = this.workers.get(deviceId); if (!deviceWorker) { console.warn(`⚠️ Device worker ${deviceId} not found`); return; } console.log(`🛑 Stopping device worker ${deviceId}`); return new Promise((resolve) => { const timeout = setTimeout(() => { deviceWorker.worker.terminate(); this.workers.delete(deviceId); console.log(`🔄 Force terminated device worker ${deviceId}`); resolve(); }, 5000); deviceWorker.worker.onmessage = (event: MessageEvent) => { if (event.data.type === "status" && event.data.data?.status === "closed") { clearTimeout(timeout); deviceWorker.worker.terminate(); this.workers.delete(deviceId); console.log(`✅ Device worker ${deviceId} stopped gracefully`); resolve(); } }; deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage); }); } async stopAllDeviceWorkers(): Promise { console.log("🛑 Stopping all device workers..."); const stopPromises = Array.from(this.workers.keys()).map(deviceId => this.stopDeviceWorker(deviceId)); await Promise.all(stopPromises); console.log("✅ All device workers stopped"); // 关闭MQTT连接 if (this.mqttClient && this.isConnected) { try { await this.mqttClient.endAsync(); console.log("✅ MQTT connection closed"); } catch (error) { console.error(`❌ Error closing MQTT connection: ${(error as Error).message}`); } } } getDeviceWorkerStatus(deviceId?: string) { if (deviceId) { const worker = this.workers.get(deviceId); return worker ? { deviceId, status: worker.status, deviceInfo: worker.deviceInfo } : null; } return Array.from(this.workers.entries()).map(([id, worker]) => ({ deviceId: id, status: worker.status, deviceInfo: worker.deviceInfo })); } // 命令行接口 async handleCommand(command: string, args: string[] = []) { const cmd = command.toLowerCase(); switch (cmd) { case "status":{ const statuses = this.getDeviceWorkerStatus(); console.log("📊 Device Worker Status:"); if (Array.isArray(statuses)) { statuses.forEach((s: any) => { console.log(` ${s.deviceId}: ${s.status}`); console.log(` Device: ${s.deviceInfo.deviceName} (${s.deviceInfo.brandName})`); console.log(` Address: ${s.deviceInfo.ip}:${s.deviceInfo.port}/${s.deviceInfo.slaveId}`); }); } break; } case "stop": if (args[0]) { await this.stopDeviceWorker(args[0]); } else { await this.stopAllDeviceWorkers(); } break; case "list":{ const devices = this.getDeviceWorkerStatus(); console.log("📱 Active Device Simulators:"); if (Array.isArray(devices)) { devices.forEach((d: any) => { console.log(` ${d.deviceId} - ${d.deviceInfo.deviceName}`); }); } break; } case "help": console.log(` 📖 Device Simulator Commands: status [deviceId] - Show device worker status stop [deviceId] - Stop device worker(s) list - List all active devices quit/exit - Exit the manager help - Show this help `); break; case "quit": case "exit": await this.stopAllDeviceWorkers(); Deno.exit(0); break; default: console.log(`❌ Unknown command: ${command}. Type 'help' for available commands.`); } } async startInteractiveMode() { console.log(` 🎮 Device Simulator Manager - Interactive Mode Type 'help' for available commands Type 'quit' or 'exit' to stop all workers and exit `); const encoder = new TextEncoder(); const decoder = new TextDecoder(); while (true) { await Deno.stdout.write(encoder.encode("device-sim> ")); const buf = new Uint8Array(1024); const n = await Deno.stdin.read(buf); if (n === null) break; const input = decoder.decode(buf.subarray(0, n)).trim(); if (!input) continue; const [command, ...args] = input.split(/\s+/); try { await this.handleCommand(command, args); } catch (error) { console.error(`❌ Command failed:`, (error as Error).message); } } } } // 主程序入口 if (import.meta.main) { const manager = new DeviceSimulatorManager(); try { await manager.initialize(); console.log(` 🎯 Device Simulator Manager started! 📡 Listening for instantActions messages on MQTT 🔧 Will automatically create device simulators when deviceSetup actions are received `); // 处理进程信号 Deno.addSignalListener("SIGINT", async () => { console.log("\n⚠️ Received SIGINT, shutting down..."); await manager.stopAllDeviceWorkers(); Deno.exit(0); }); if (Deno.build.os !== "windows") { Deno.addSignalListener("SIGTERM", async () => { console.log("\n⚠️ Received SIGTERM, shutting down..."); await manager.stopAllDeviceWorkers(); Deno.exit(0); }); } // 启动交互模式 await manager.startInteractiveMode(); } catch (error) { console.error("❌ Failed to start device simulator manager:", (error as Error).message); Deno.exit(1); } }