api-amr/device_simulator_main.ts

538 lines
16 KiB
TypeScript
Raw Normal View History

2025-06-04 19:15:02 +08:00
// device_simulator_main.ts - 设备模拟器管理器
import { loadConfig, RawConfig } from "./simulator_config.ts";
import { connect, MqttClient } from "npm:mqtt@5.10.1";
interface DeviceInfo {
ip: string;
port: string;
slaveId: string;
deviceName: string;
protocolType: string;
brandName: string;
}
interface DeviceWorkerMessage {
type: "init" | "close" | "reconnect" | "action";
data?: any;
}
interface DeviceMainMessage {
type: "ready" | "error" | "status" | "log" | "reset";
data?: any;
}
interface DeviceWorker {
worker: Worker;
deviceId: string;
status: "starting" | "running" | "error" | "stopped";
config: any;
deviceInfo: DeviceInfo;
}
class DeviceSimulatorManager {
private workers: Map<string, DeviceWorker> = new Map();
private config!: RawConfig;
private mqttClient!: MqttClient;
private isConnected = false;
async initialize() {
this.config = await loadConfig();
console.log("📋 Device Simulator Manager initialized");
// 连接MQTT并监听instantActions消息
await this.connectMqtt();
}
private async connectMqtt() {
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
console.log(`🔌 Connecting to MQTT broker: ${brokerUrl}`);
this.mqttClient = connect(brokerUrl, {
clean: true,
keepalive: 60,
});
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("MQTT connection timeout"));
}, 10000);
this.mqttClient.on("connect", () => {
clearTimeout(timeout);
this.isConnected = true;
console.log("✅ Connected to MQTT broker");
this.setupMqttHandlers();
resolve();
});
this.mqttClient.on("error", (error: Error) => {
clearTimeout(timeout);
console.error(`❌ MQTT connection error: ${error.message}`);
reject(error);
});
});
}
private setupMqttHandlers() {
// 订阅所有设备的instantActions主题
const instantActionsTopic = `${this.config.mqttBroker.vdaInterface}/+/instantActions`;
this.mqttClient.subscribe(instantActionsTopic, (err: Error | null) => {
if (err) {
console.error(`❌ Failed to subscribe to ${instantActionsTopic}: ${err.message}`);
} else {
console.log(`📡 Subscribed to ${instantActionsTopic}`);
}
});
this.mqttClient.on("message", (topic: string, message: Uint8Array) => {
try {
const data = JSON.parse(new TextDecoder().decode(message));
console.log(`📥 Received instantActions message on ${topic}`);
if (topic.includes("/instantActions")) {
this.handleInstantActionsMessage(data);
}
} catch (error) {
console.error(`❌ Error parsing MQTT message: ${(error as Error).message}`);
}
});
this.mqttClient.on("disconnect", () => {
this.isConnected = false;
console.log("⚠️ Disconnected from MQTT broker");
});
this.mqttClient.on("error", (error: Error) => {
console.error(`❌ MQTT error: ${error.message}`);
});
}
private async handleInstantActionsMessage(message: any) {
console.log("🔍 Processing instantActions message...");
try {
// 支持两种格式标准VDA 5050的instantActions和实际使用的actions
const actions = message.instantActions || message.actions || [];
if (!Array.isArray(actions)) {
console.error("❌ Invalid actions format - not an array");
return;
}
for (const action of actions) {
if (action.actionType === "deviceSetup") {
await this.handleDeviceAction(action);
}
}
} catch (error) {
console.error(`❌ Error processing instantActions: ${(error as Error).message}`);
}
}
// 从actionParameters中提取设备信息
private extractDeviceInfo(actionParameters: any[]): DeviceInfo | null {
const params: Record<string, string> = {};
for (const param of actionParameters) {
if (param.key && param.value) {
params[param.key] = param.value;
}
}
const required = ['ip', 'port', 'slaveId'];
for (const field of required) {
if (!params[field]) {
console.error(`❌ Missing required parameter: ${field}`);
return null;
}
}
return {
ip: params.ip,
port: params.port,
slaveId: params.slaveId,
deviceName: params.deviceName || 'Unknown Device',
protocolType: params.protocolType || 'Unknown Protocol',
brandName: params.brandName || 'Unknown Brand'
};
}
// 生成设备ID
private generateDeviceId(deviceInfo: DeviceInfo): string {
return `${deviceInfo.ip}-${deviceInfo.port}-${deviceInfo.slaveId}`;
}
// 处理instantActions消息动态创建设备模拟器
async handleDeviceAction(actionMessage: any): Promise<void> {
console.log("🔧 Processing device action:", actionMessage.actionType);
if (!actionMessage.actionParameters || !Array.isArray(actionMessage.actionParameters)) {
console.error("❌ Invalid action parameters");
return;
}
const deviceInfo = this.extractDeviceInfo(actionMessage.actionParameters);
if (!deviceInfo) {
console.error("❌ Failed to extract device info");
return;
}
const deviceId = this.generateDeviceId(deviceInfo);
// 检查设备模拟器是否已存在
if (this.workers.has(deviceId)) {
console.log(`📱 Device ${deviceId} already exists, sending action to existing worker`);
const worker = this.workers.get(deviceId)!;
worker.worker.postMessage({
type: "action",
data: actionMessage
} as DeviceWorkerMessage);
return;
}
// 创建新的设备模拟器
console.log(`🆕 Creating new device simulator for ${deviceId}`);
console.log(`📱 Device: ${deviceInfo.deviceName} (${deviceInfo.brandName})`);
console.log(`🔌 Protocol: ${deviceInfo.protocolType}`);
await this.startDeviceWorker(deviceId, deviceInfo, actionMessage);
}
private async startDeviceWorker(deviceId: string, deviceInfo: DeviceInfo, initialAction: any): Promise<void> {
console.log(`🔧 Creating device worker for ${deviceId}`);
const worker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
const workerConfig = {
deviceId,
deviceInfo,
vdaInterface: this.config.mqttBroker.vdaInterface,
mqtt: {
brokerUrl: `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`,
options: {
clean: true,
keepalive: 60,
}
},
vehicle: {
manufacturer: deviceInfo.brandName,
serialNumber: deviceId,
vdaVersion: this.config.vehicle.vdaVersion,
}
};
const deviceWorker: DeviceWorker = {
worker,
deviceId,
status: "starting",
config: workerConfig,
deviceInfo,
};
this.workers.set(deviceId, deviceWorker);
this.setupDeviceWorkerHandlers(deviceWorker);
// 发送初始化消息
worker.postMessage({ type: "init", data: workerConfig } as DeviceWorkerMessage);
// 等待worker准备就绪然后发送初始action
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
deviceWorker.status = "error";
reject(new Error(`Device worker ${deviceId} failed to start within timeout`));
}, 15000);
const originalHandler = worker.onmessage;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "ready") {
clearTimeout(timeout);
deviceWorker.status = "running";
worker.onmessage = originalHandler;
console.log(`✅ Device worker ${deviceId} started successfully`);
// 发送初始action
worker.postMessage({
type: "action",
data: initialAction
} as DeviceWorkerMessage);
resolve();
} else if (event.data.type === "error") {
clearTimeout(timeout);
deviceWorker.status = "error";
worker.onmessage = originalHandler;
reject(new Error(`Device worker ${deviceId} failed to start: ${event.data.data?.error}`));
}
};
});
}
private setupDeviceWorkerHandlers(deviceWorker: DeviceWorker) {
const { worker, deviceId } = deviceWorker;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
const { type, data } = event.data;
switch (type) {
case "ready":
deviceWorker.status = "running";
console.log(`✅ Device ${deviceId} is ready`);
break;
case "error":
deviceWorker.status = "error";
console.error(`❌ Device ${deviceId} error:`, data?.error);
break;
case "status":
console.log(`📊 Device ${deviceId} status:`, data?.status);
if (data?.status === "closed") {
deviceWorker.status = "stopped";
}
break;
case "log":
// 转发日志到控制台
console.log(data?.message);
break;
case "reset":
console.log(`🔄 Worker reset requested for device ${deviceId}: ${data?.reason}`);
console.log(`Error: ${data?.error}`);
this.handleDeviceWorkerReset(deviceWorker, data);
break;
default:
console.warn(`Unknown message type from device ${deviceId}:`, type);
}
};
worker.onerror = (error) => {
deviceWorker.status = "error";
console.error(`❌ Device worker ${deviceId} error:`, error.message);
};
worker.onmessageerror = (error) => {
console.error(`❌ Message error from device worker ${deviceId}:`, error);
};
}
private async handleDeviceWorkerReset(deviceWorker: DeviceWorker, resetData: any): Promise<void> {
const { deviceId } = deviceWorker;
console.log(`🔄 Resetting device worker ${deviceId} due to: ${resetData?.reason}`);
try {
// 终止当前worker
deviceWorker.worker.terminate();
// 创建新的worker
const newWorker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
// 更新worker引用
deviceWorker.worker = newWorker;
deviceWorker.status = "starting";
// 重新设置消息处理
this.setupDeviceWorkerHandlers(deviceWorker);
// 重新初始化
newWorker.postMessage({ type: "init", data: deviceWorker.config } as DeviceWorkerMessage);
console.log(`✅ Device worker ${deviceId} reset completed`);
} catch (error) {
console.error(`❌ Failed to reset device worker ${deviceId}:`, (error as Error).message);
deviceWorker.status = "error";
}
}
async stopDeviceWorker(deviceId: string): Promise<void> {
const deviceWorker = this.workers.get(deviceId);
if (!deviceWorker) {
console.warn(`⚠️ Device worker ${deviceId} not found`);
return;
}
console.log(`🛑 Stopping device worker ${deviceId}`);
return new Promise((resolve) => {
const timeout = setTimeout(() => {
deviceWorker.worker.terminate();
this.workers.delete(deviceId);
console.log(`🔄 Force terminated device worker ${deviceId}`);
resolve();
}, 5000);
deviceWorker.worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "status" && event.data.data?.status === "closed") {
clearTimeout(timeout);
deviceWorker.worker.terminate();
this.workers.delete(deviceId);
console.log(`✅ Device worker ${deviceId} stopped gracefully`);
resolve();
}
};
deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage);
});
}
async stopAllDeviceWorkers(): Promise<void> {
console.log("🛑 Stopping all device workers...");
const stopPromises = Array.from(this.workers.keys()).map(deviceId => this.stopDeviceWorker(deviceId));
await Promise.all(stopPromises);
console.log("✅ All device workers stopped");
// 关闭MQTT连接
if (this.mqttClient && this.isConnected) {
try {
await this.mqttClient.endAsync();
console.log("✅ MQTT connection closed");
} catch (error) {
console.error(`❌ Error closing MQTT connection: ${(error as Error).message}`);
}
}
}
getDeviceWorkerStatus(deviceId?: string) {
if (deviceId) {
const worker = this.workers.get(deviceId);
return worker ? { deviceId, status: worker.status, deviceInfo: worker.deviceInfo } : null;
}
return Array.from(this.workers.entries()).map(([id, worker]) => ({
deviceId: id,
status: worker.status,
deviceInfo: worker.deviceInfo
}));
}
// 命令行接口
async handleCommand(command: string, args: string[] = []) {
const cmd = command.toLowerCase();
switch (cmd) {
case "status":{
const statuses = this.getDeviceWorkerStatus();
console.log("📊 Device Worker Status:");
if (Array.isArray(statuses)) {
statuses.forEach((s: any) => {
console.log(` ${s.deviceId}: ${s.status}`);
console.log(` Device: ${s.deviceInfo.deviceName} (${s.deviceInfo.brandName})`);
console.log(` Address: ${s.deviceInfo.ip}:${s.deviceInfo.port}/${s.deviceInfo.slaveId}`);
});
}
break;
}
case "stop":
if (args[0]) {
await this.stopDeviceWorker(args[0]);
} else {
await this.stopAllDeviceWorkers();
}
break;
case "list":{
const devices = this.getDeviceWorkerStatus();
console.log("📱 Active Device Simulators:");
if (Array.isArray(devices)) {
devices.forEach((d: any) => {
console.log(` ${d.deviceId} - ${d.deviceInfo.deviceName}`);
});
}
break;
}
case "help":
console.log(`
📖 Device Simulator Commands:
status [deviceId] - Show device worker status
stop [deviceId] - Stop device worker(s)
list - List all active devices
quit/exit - Exit the manager
help - Show this help
`);
break;
case "quit":
case "exit":
await this.stopAllDeviceWorkers();
Deno.exit(0);
break;
default:
console.log(`❌ Unknown command: ${command}. Type 'help' for available commands.`);
}
}
async startInteractiveMode() {
console.log(`
🎮 Device Simulator Manager - Interactive Mode
Type 'help' for available commands
Type 'quit' or 'exit' to stop all workers and exit
`);
const encoder = new TextEncoder();
const decoder = new TextDecoder();
while (true) {
await Deno.stdout.write(encoder.encode("device-sim> "));
const buf = new Uint8Array(1024);
const n = await Deno.stdin.read(buf);
if (n === null) break;
const input = decoder.decode(buf.subarray(0, n)).trim();
if (!input) continue;
const [command, ...args] = input.split(/\s+/);
try {
await this.handleCommand(command, args);
} catch (error) {
console.error(`❌ Command failed:`, (error as Error).message);
}
}
}
}
// 主程序入口
if (import.meta.main) {
const manager = new DeviceSimulatorManager();
try {
await manager.initialize();
console.log(`
🎯 Device Simulator Manager started!
📡 Listening for instantActions messages on MQTT
🔧 Will automatically create device simulators when deviceSetup actions are received
`);
// 处理进程信号
Deno.addSignalListener("SIGINT", async () => {
console.log("\n⚠ Received SIGINT, shutting down...");
await manager.stopAllDeviceWorkers();
Deno.exit(0);
});
if (Deno.build.os !== "windows") {
Deno.addSignalListener("SIGTERM", async () => {
console.log("\n⚠ Received SIGTERM, shutting down...");
await manager.stopAllDeviceWorkers();
Deno.exit(0);
});
}
// 启动交互模式
await manager.startInteractiveMode();
} catch (error) {
console.error("❌ Failed to start device simulator manager:", (error as Error).message);
Deno.exit(1);
}
}