api-amr/device_simulator_main.ts
2025-06-04 19:15:02 +08:00

538 lines
16 KiB
TypeScript
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// device_simulator_main.ts - 设备模拟器管理器
import { loadConfig, RawConfig } from "./simulator_config.ts";
import { connect, MqttClient } from "npm:mqtt@5.10.1";
interface DeviceInfo {
ip: string;
port: string;
slaveId: string;
deviceName: string;
protocolType: string;
brandName: string;
}
interface DeviceWorkerMessage {
type: "init" | "close" | "reconnect" | "action";
data?: any;
}
interface DeviceMainMessage {
type: "ready" | "error" | "status" | "log" | "reset";
data?: any;
}
interface DeviceWorker {
worker: Worker;
deviceId: string;
status: "starting" | "running" | "error" | "stopped";
config: any;
deviceInfo: DeviceInfo;
}
class DeviceSimulatorManager {
private workers: Map<string, DeviceWorker> = new Map();
private config!: RawConfig;
private mqttClient!: MqttClient;
private isConnected = false;
async initialize() {
this.config = await loadConfig();
console.log("📋 Device Simulator Manager initialized");
// 连接MQTT并监听instantActions消息
await this.connectMqtt();
}
private async connectMqtt() {
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
console.log(`🔌 Connecting to MQTT broker: ${brokerUrl}`);
this.mqttClient = connect(brokerUrl, {
clean: true,
keepalive: 60,
});
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("MQTT connection timeout"));
}, 10000);
this.mqttClient.on("connect", () => {
clearTimeout(timeout);
this.isConnected = true;
console.log("✅ Connected to MQTT broker");
this.setupMqttHandlers();
resolve();
});
this.mqttClient.on("error", (error: Error) => {
clearTimeout(timeout);
console.error(`❌ MQTT connection error: ${error.message}`);
reject(error);
});
});
}
private setupMqttHandlers() {
// 订阅所有设备的instantActions主题
const instantActionsTopic = `${this.config.mqttBroker.vdaInterface}/+/instantActions`;
this.mqttClient.subscribe(instantActionsTopic, (err: Error | null) => {
if (err) {
console.error(`❌ Failed to subscribe to ${instantActionsTopic}: ${err.message}`);
} else {
console.log(`📡 Subscribed to ${instantActionsTopic}`);
}
});
this.mqttClient.on("message", (topic: string, message: Uint8Array) => {
try {
const data = JSON.parse(new TextDecoder().decode(message));
console.log(`📥 Received instantActions message on ${topic}`);
if (topic.includes("/instantActions")) {
this.handleInstantActionsMessage(data);
}
} catch (error) {
console.error(`❌ Error parsing MQTT message: ${(error as Error).message}`);
}
});
this.mqttClient.on("disconnect", () => {
this.isConnected = false;
console.log("⚠️ Disconnected from MQTT broker");
});
this.mqttClient.on("error", (error: Error) => {
console.error(`❌ MQTT error: ${error.message}`);
});
}
private async handleInstantActionsMessage(message: any) {
console.log("🔍 Processing instantActions message...");
try {
// 支持两种格式标准VDA 5050的instantActions和实际使用的actions
const actions = message.instantActions || message.actions || [];
if (!Array.isArray(actions)) {
console.error("❌ Invalid actions format - not an array");
return;
}
for (const action of actions) {
if (action.actionType === "deviceSetup") {
await this.handleDeviceAction(action);
}
}
} catch (error) {
console.error(`❌ Error processing instantActions: ${(error as Error).message}`);
}
}
// 从actionParameters中提取设备信息
private extractDeviceInfo(actionParameters: any[]): DeviceInfo | null {
const params: Record<string, string> = {};
for (const param of actionParameters) {
if (param.key && param.value) {
params[param.key] = param.value;
}
}
const required = ['ip', 'port', 'slaveId'];
for (const field of required) {
if (!params[field]) {
console.error(`❌ Missing required parameter: ${field}`);
return null;
}
}
return {
ip: params.ip,
port: params.port,
slaveId: params.slaveId,
deviceName: params.deviceName || 'Unknown Device',
protocolType: params.protocolType || 'Unknown Protocol',
brandName: params.brandName || 'Unknown Brand'
};
}
// 生成设备ID
private generateDeviceId(deviceInfo: DeviceInfo): string {
return `${deviceInfo.ip}-${deviceInfo.port}-${deviceInfo.slaveId}`;
}
// 处理instantActions消息动态创建设备模拟器
async handleDeviceAction(actionMessage: any): Promise<void> {
console.log("🔧 Processing device action:", actionMessage.actionType);
if (!actionMessage.actionParameters || !Array.isArray(actionMessage.actionParameters)) {
console.error("❌ Invalid action parameters");
return;
}
const deviceInfo = this.extractDeviceInfo(actionMessage.actionParameters);
if (!deviceInfo) {
console.error("❌ Failed to extract device info");
return;
}
const deviceId = this.generateDeviceId(deviceInfo);
// 检查设备模拟器是否已存在
if (this.workers.has(deviceId)) {
console.log(`📱 Device ${deviceId} already exists, sending action to existing worker`);
const worker = this.workers.get(deviceId)!;
worker.worker.postMessage({
type: "action",
data: actionMessage
} as DeviceWorkerMessage);
return;
}
// 创建新的设备模拟器
console.log(`🆕 Creating new device simulator for ${deviceId}`);
console.log(`📱 Device: ${deviceInfo.deviceName} (${deviceInfo.brandName})`);
console.log(`🔌 Protocol: ${deviceInfo.protocolType}`);
await this.startDeviceWorker(deviceId, deviceInfo, actionMessage);
}
private async startDeviceWorker(deviceId: string, deviceInfo: DeviceInfo, initialAction: any): Promise<void> {
console.log(`🔧 Creating device worker for ${deviceId}`);
const worker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
const workerConfig = {
deviceId,
deviceInfo,
vdaInterface: this.config.mqttBroker.vdaInterface,
mqtt: {
brokerUrl: `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`,
options: {
clean: true,
keepalive: 60,
}
},
vehicle: {
manufacturer: deviceInfo.brandName,
serialNumber: deviceId,
vdaVersion: this.config.vehicle.vdaVersion,
}
};
const deviceWorker: DeviceWorker = {
worker,
deviceId,
status: "starting",
config: workerConfig,
deviceInfo,
};
this.workers.set(deviceId, deviceWorker);
this.setupDeviceWorkerHandlers(deviceWorker);
// 发送初始化消息
worker.postMessage({ type: "init", data: workerConfig } as DeviceWorkerMessage);
// 等待worker准备就绪然后发送初始action
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
deviceWorker.status = "error";
reject(new Error(`Device worker ${deviceId} failed to start within timeout`));
}, 15000);
const originalHandler = worker.onmessage;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "ready") {
clearTimeout(timeout);
deviceWorker.status = "running";
worker.onmessage = originalHandler;
console.log(`✅ Device worker ${deviceId} started successfully`);
// 发送初始action
worker.postMessage({
type: "action",
data: initialAction
} as DeviceWorkerMessage);
resolve();
} else if (event.data.type === "error") {
clearTimeout(timeout);
deviceWorker.status = "error";
worker.onmessage = originalHandler;
reject(new Error(`Device worker ${deviceId} failed to start: ${event.data.data?.error}`));
}
};
});
}
private setupDeviceWorkerHandlers(deviceWorker: DeviceWorker) {
const { worker, deviceId } = deviceWorker;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
const { type, data } = event.data;
switch (type) {
case "ready":
deviceWorker.status = "running";
console.log(`✅ Device ${deviceId} is ready`);
break;
case "error":
deviceWorker.status = "error";
console.error(`❌ Device ${deviceId} error:`, data?.error);
break;
case "status":
console.log(`📊 Device ${deviceId} status:`, data?.status);
if (data?.status === "closed") {
deviceWorker.status = "stopped";
}
break;
case "log":
// 转发日志到控制台
console.log(data?.message);
break;
case "reset":
console.log(`🔄 Worker reset requested for device ${deviceId}: ${data?.reason}`);
console.log(`Error: ${data?.error}`);
this.handleDeviceWorkerReset(deviceWorker, data);
break;
default:
console.warn(`Unknown message type from device ${deviceId}:`, type);
}
};
worker.onerror = (error) => {
deviceWorker.status = "error";
console.error(`❌ Device worker ${deviceId} error:`, error.message);
};
worker.onmessageerror = (error) => {
console.error(`❌ Message error from device worker ${deviceId}:`, error);
};
}
private async handleDeviceWorkerReset(deviceWorker: DeviceWorker, resetData: any): Promise<void> {
const { deviceId } = deviceWorker;
console.log(`🔄 Resetting device worker ${deviceId} due to: ${resetData?.reason}`);
try {
// 终止当前worker
deviceWorker.worker.terminate();
// 创建新的worker
const newWorker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
// 更新worker引用
deviceWorker.worker = newWorker;
deviceWorker.status = "starting";
// 重新设置消息处理
this.setupDeviceWorkerHandlers(deviceWorker);
// 重新初始化
newWorker.postMessage({ type: "init", data: deviceWorker.config } as DeviceWorkerMessage);
console.log(`✅ Device worker ${deviceId} reset completed`);
} catch (error) {
console.error(`❌ Failed to reset device worker ${deviceId}:`, (error as Error).message);
deviceWorker.status = "error";
}
}
async stopDeviceWorker(deviceId: string): Promise<void> {
const deviceWorker = this.workers.get(deviceId);
if (!deviceWorker) {
console.warn(`⚠️ Device worker ${deviceId} not found`);
return;
}
console.log(`🛑 Stopping device worker ${deviceId}`);
return new Promise((resolve) => {
const timeout = setTimeout(() => {
deviceWorker.worker.terminate();
this.workers.delete(deviceId);
console.log(`🔄 Force terminated device worker ${deviceId}`);
resolve();
}, 5000);
deviceWorker.worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "status" && event.data.data?.status === "closed") {
clearTimeout(timeout);
deviceWorker.worker.terminate();
this.workers.delete(deviceId);
console.log(`✅ Device worker ${deviceId} stopped gracefully`);
resolve();
}
};
deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage);
});
}
async stopAllDeviceWorkers(): Promise<void> {
console.log("🛑 Stopping all device workers...");
const stopPromises = Array.from(this.workers.keys()).map(deviceId => this.stopDeviceWorker(deviceId));
await Promise.all(stopPromises);
console.log("✅ All device workers stopped");
// 关闭MQTT连接
if (this.mqttClient && this.isConnected) {
try {
await this.mqttClient.endAsync();
console.log("✅ MQTT connection closed");
} catch (error) {
console.error(`❌ Error closing MQTT connection: ${(error as Error).message}`);
}
}
}
getDeviceWorkerStatus(deviceId?: string) {
if (deviceId) {
const worker = this.workers.get(deviceId);
return worker ? { deviceId, status: worker.status, deviceInfo: worker.deviceInfo } : null;
}
return Array.from(this.workers.entries()).map(([id, worker]) => ({
deviceId: id,
status: worker.status,
deviceInfo: worker.deviceInfo
}));
}
// 命令行接口
async handleCommand(command: string, args: string[] = []) {
const cmd = command.toLowerCase();
switch (cmd) {
case "status":{
const statuses = this.getDeviceWorkerStatus();
console.log("📊 Device Worker Status:");
if (Array.isArray(statuses)) {
statuses.forEach((s: any) => {
console.log(` ${s.deviceId}: ${s.status}`);
console.log(` Device: ${s.deviceInfo.deviceName} (${s.deviceInfo.brandName})`);
console.log(` Address: ${s.deviceInfo.ip}:${s.deviceInfo.port}/${s.deviceInfo.slaveId}`);
});
}
break;
}
case "stop":
if (args[0]) {
await this.stopDeviceWorker(args[0]);
} else {
await this.stopAllDeviceWorkers();
}
break;
case "list":{
const devices = this.getDeviceWorkerStatus();
console.log("📱 Active Device Simulators:");
if (Array.isArray(devices)) {
devices.forEach((d: any) => {
console.log(` ${d.deviceId} - ${d.deviceInfo.deviceName}`);
});
}
break;
}
case "help":
console.log(`
📖 Device Simulator Commands:
status [deviceId] - Show device worker status
stop [deviceId] - Stop device worker(s)
list - List all active devices
quit/exit - Exit the manager
help - Show this help
`);
break;
case "quit":
case "exit":
await this.stopAllDeviceWorkers();
Deno.exit(0);
break;
default:
console.log(`❌ Unknown command: ${command}. Type 'help' for available commands.`);
}
}
async startInteractiveMode() {
console.log(`
🎮 Device Simulator Manager - Interactive Mode
Type 'help' for available commands
Type 'quit' or 'exit' to stop all workers and exit
`);
const encoder = new TextEncoder();
const decoder = new TextDecoder();
while (true) {
await Deno.stdout.write(encoder.encode("device-sim> "));
const buf = new Uint8Array(1024);
const n = await Deno.stdin.read(buf);
if (n === null) break;
const input = decoder.decode(buf.subarray(0, n)).trim();
if (!input) continue;
const [command, ...args] = input.split(/\s+/);
try {
await this.handleCommand(command, args);
} catch (error) {
console.error(`❌ Command failed:`, (error as Error).message);
}
}
}
}
// 主程序入口
if (import.meta.main) {
const manager = new DeviceSimulatorManager();
try {
await manager.initialize();
console.log(`
🎯 Device Simulator Manager started!
📡 Listening for instantActions messages on MQTT
🔧 Will automatically create device simulators when deviceSetup actions are received
`);
// 处理进程信号
Deno.addSignalListener("SIGINT", async () => {
console.log("\n⚠ Received SIGINT, shutting down...");
await manager.stopAllDeviceWorkers();
Deno.exit(0);
});
if (Deno.build.os !== "windows") {
Deno.addSignalListener("SIGTERM", async () => {
console.log("\n⚠ Received SIGTERM, shutting down...");
await manager.stopAllDeviceWorkers();
Deno.exit(0);
});
}
// 启动交互模式
await manager.startInteractiveMode();
} catch (error) {
console.error("❌ Failed to start device simulator manager:", (error as Error).message);
Deno.exit(1);
}
}