538 lines
16 KiB
TypeScript
538 lines
16 KiB
TypeScript
// device_simulator_main.ts - 设备模拟器管理器
|
||
import { loadConfig, RawConfig } from "./simulator_config.ts";
|
||
import { connect, MqttClient } from "npm:mqtt@5.10.1";
|
||
|
||
interface DeviceInfo {
|
||
ip: string;
|
||
port: string;
|
||
slaveId: string;
|
||
deviceName: string;
|
||
protocolType: string;
|
||
brandName: string;
|
||
}
|
||
|
||
interface DeviceWorkerMessage {
|
||
type: "init" | "close" | "reconnect" | "action";
|
||
data?: any;
|
||
}
|
||
|
||
interface DeviceMainMessage {
|
||
type: "ready" | "error" | "status" | "log" | "reset";
|
||
data?: any;
|
||
}
|
||
|
||
interface DeviceWorker {
|
||
worker: Worker;
|
||
deviceId: string;
|
||
status: "starting" | "running" | "error" | "stopped";
|
||
config: any;
|
||
deviceInfo: DeviceInfo;
|
||
}
|
||
|
||
class DeviceSimulatorManager {
|
||
private workers: Map<string, DeviceWorker> = new Map();
|
||
private config!: RawConfig;
|
||
private mqttClient!: MqttClient;
|
||
private isConnected = false;
|
||
|
||
async initialize() {
|
||
this.config = await loadConfig();
|
||
console.log("📋 Device Simulator Manager initialized");
|
||
|
||
// 连接MQTT并监听instantActions消息
|
||
await this.connectMqtt();
|
||
}
|
||
|
||
private async connectMqtt() {
|
||
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
|
||
console.log(`🔌 Connecting to MQTT broker: ${brokerUrl}`);
|
||
|
||
this.mqttClient = connect(brokerUrl, {
|
||
clean: true,
|
||
keepalive: 60,
|
||
});
|
||
|
||
return new Promise<void>((resolve, reject) => {
|
||
const timeout = setTimeout(() => {
|
||
reject(new Error("MQTT connection timeout"));
|
||
}, 10000);
|
||
|
||
this.mqttClient.on("connect", () => {
|
||
clearTimeout(timeout);
|
||
this.isConnected = true;
|
||
console.log("✅ Connected to MQTT broker");
|
||
this.setupMqttHandlers();
|
||
resolve();
|
||
});
|
||
|
||
this.mqttClient.on("error", (error: Error) => {
|
||
clearTimeout(timeout);
|
||
console.error(`❌ MQTT connection error: ${error.message}`);
|
||
reject(error);
|
||
});
|
||
});
|
||
}
|
||
|
||
private setupMqttHandlers() {
|
||
// 订阅所有设备的instantActions主题
|
||
const instantActionsTopic = `${this.config.mqttBroker.vdaInterface}/+/instantActions`;
|
||
this.mqttClient.subscribe(instantActionsTopic, (err: Error | null) => {
|
||
if (err) {
|
||
console.error(`❌ Failed to subscribe to ${instantActionsTopic}: ${err.message}`);
|
||
} else {
|
||
console.log(`📡 Subscribed to ${instantActionsTopic}`);
|
||
}
|
||
});
|
||
|
||
this.mqttClient.on("message", (topic: string, message: Uint8Array) => {
|
||
try {
|
||
const data = JSON.parse(new TextDecoder().decode(message));
|
||
console.log(`📥 Received instantActions message on ${topic}`);
|
||
|
||
if (topic.includes("/instantActions")) {
|
||
this.handleInstantActionsMessage(data);
|
||
}
|
||
} catch (error) {
|
||
console.error(`❌ Error parsing MQTT message: ${(error as Error).message}`);
|
||
}
|
||
});
|
||
|
||
this.mqttClient.on("disconnect", () => {
|
||
this.isConnected = false;
|
||
console.log("⚠️ Disconnected from MQTT broker");
|
||
});
|
||
|
||
this.mqttClient.on("error", (error: Error) => {
|
||
console.error(`❌ MQTT error: ${error.message}`);
|
||
});
|
||
}
|
||
|
||
private async handleInstantActionsMessage(message: any) {
|
||
console.log("🔍 Processing instantActions message...");
|
||
|
||
try {
|
||
// 支持两种格式:标准VDA 5050的instantActions和实际使用的actions
|
||
const actions = message.instantActions || message.actions || [];
|
||
|
||
if (!Array.isArray(actions)) {
|
||
console.error("❌ Invalid actions format - not an array");
|
||
return;
|
||
}
|
||
|
||
for (const action of actions) {
|
||
if (action.actionType === "deviceSetup") {
|
||
await this.handleDeviceAction(action);
|
||
}
|
||
}
|
||
} catch (error) {
|
||
console.error(`❌ Error processing instantActions: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
// 从actionParameters中提取设备信息
|
||
private extractDeviceInfo(actionParameters: any[]): DeviceInfo | null {
|
||
const params: Record<string, string> = {};
|
||
|
||
for (const param of actionParameters) {
|
||
if (param.key && param.value) {
|
||
params[param.key] = param.value;
|
||
}
|
||
}
|
||
|
||
const required = ['ip', 'port', 'slaveId'];
|
||
for (const field of required) {
|
||
if (!params[field]) {
|
||
console.error(`❌ Missing required parameter: ${field}`);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
return {
|
||
ip: params.ip,
|
||
port: params.port,
|
||
slaveId: params.slaveId,
|
||
deviceName: params.deviceName || 'Unknown Device',
|
||
protocolType: params.protocolType || 'Unknown Protocol',
|
||
brandName: params.brandName || 'Unknown Brand'
|
||
};
|
||
}
|
||
|
||
// 生成设备ID
|
||
private generateDeviceId(deviceInfo: DeviceInfo): string {
|
||
return `${deviceInfo.ip}-${deviceInfo.port}-${deviceInfo.slaveId}`;
|
||
}
|
||
|
||
// 处理instantActions消息,动态创建设备模拟器
|
||
async handleDeviceAction(actionMessage: any): Promise<void> {
|
||
console.log("🔧 Processing device action:", actionMessage.actionType);
|
||
|
||
if (!actionMessage.actionParameters || !Array.isArray(actionMessage.actionParameters)) {
|
||
console.error("❌ Invalid action parameters");
|
||
return;
|
||
}
|
||
|
||
const deviceInfo = this.extractDeviceInfo(actionMessage.actionParameters);
|
||
if (!deviceInfo) {
|
||
console.error("❌ Failed to extract device info");
|
||
return;
|
||
}
|
||
|
||
const deviceId = this.generateDeviceId(deviceInfo);
|
||
|
||
// 检查设备模拟器是否已存在
|
||
if (this.workers.has(deviceId)) {
|
||
console.log(`📱 Device ${deviceId} already exists, sending action to existing worker`);
|
||
const worker = this.workers.get(deviceId)!;
|
||
worker.worker.postMessage({
|
||
type: "action",
|
||
data: actionMessage
|
||
} as DeviceWorkerMessage);
|
||
return;
|
||
}
|
||
|
||
// 创建新的设备模拟器
|
||
console.log(`🆕 Creating new device simulator for ${deviceId}`);
|
||
console.log(`📱 Device: ${deviceInfo.deviceName} (${deviceInfo.brandName})`);
|
||
console.log(`🔌 Protocol: ${deviceInfo.protocolType}`);
|
||
|
||
await this.startDeviceWorker(deviceId, deviceInfo, actionMessage);
|
||
}
|
||
|
||
private async startDeviceWorker(deviceId: string, deviceInfo: DeviceInfo, initialAction: any): Promise<void> {
|
||
console.log(`🔧 Creating device worker for ${deviceId}`);
|
||
|
||
const worker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
|
||
type: "module",
|
||
});
|
||
|
||
const workerConfig = {
|
||
deviceId,
|
||
deviceInfo,
|
||
vdaInterface: this.config.mqttBroker.vdaInterface,
|
||
mqtt: {
|
||
brokerUrl: `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`,
|
||
options: {
|
||
clean: true,
|
||
keepalive: 60,
|
||
}
|
||
},
|
||
vehicle: {
|
||
manufacturer: deviceInfo.brandName,
|
||
serialNumber: deviceId,
|
||
vdaVersion: this.config.vehicle.vdaVersion,
|
||
}
|
||
};
|
||
|
||
const deviceWorker: DeviceWorker = {
|
||
worker,
|
||
deviceId,
|
||
status: "starting",
|
||
config: workerConfig,
|
||
deviceInfo,
|
||
};
|
||
|
||
this.workers.set(deviceId, deviceWorker);
|
||
this.setupDeviceWorkerHandlers(deviceWorker);
|
||
|
||
// 发送初始化消息
|
||
worker.postMessage({ type: "init", data: workerConfig } as DeviceWorkerMessage);
|
||
|
||
// 等待worker准备就绪,然后发送初始action
|
||
return new Promise((resolve, reject) => {
|
||
const timeout = setTimeout(() => {
|
||
deviceWorker.status = "error";
|
||
reject(new Error(`Device worker ${deviceId} failed to start within timeout`));
|
||
}, 15000);
|
||
|
||
const originalHandler = worker.onmessage;
|
||
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
|
||
if (event.data.type === "ready") {
|
||
clearTimeout(timeout);
|
||
deviceWorker.status = "running";
|
||
worker.onmessage = originalHandler;
|
||
console.log(`✅ Device worker ${deviceId} started successfully`);
|
||
|
||
// 发送初始action
|
||
worker.postMessage({
|
||
type: "action",
|
||
data: initialAction
|
||
} as DeviceWorkerMessage);
|
||
|
||
resolve();
|
||
} else if (event.data.type === "error") {
|
||
clearTimeout(timeout);
|
||
deviceWorker.status = "error";
|
||
worker.onmessage = originalHandler;
|
||
reject(new Error(`Device worker ${deviceId} failed to start: ${event.data.data?.error}`));
|
||
}
|
||
};
|
||
});
|
||
}
|
||
|
||
private setupDeviceWorkerHandlers(deviceWorker: DeviceWorker) {
|
||
const { worker, deviceId } = deviceWorker;
|
||
|
||
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
|
||
const { type, data } = event.data;
|
||
|
||
switch (type) {
|
||
case "ready":
|
||
deviceWorker.status = "running";
|
||
console.log(`✅ Device ${deviceId} is ready`);
|
||
break;
|
||
|
||
case "error":
|
||
deviceWorker.status = "error";
|
||
console.error(`❌ Device ${deviceId} error:`, data?.error);
|
||
break;
|
||
|
||
case "status":
|
||
console.log(`📊 Device ${deviceId} status:`, data?.status);
|
||
if (data?.status === "closed") {
|
||
deviceWorker.status = "stopped";
|
||
}
|
||
break;
|
||
|
||
case "log":
|
||
// 转发日志到控制台
|
||
console.log(data?.message);
|
||
break;
|
||
|
||
case "reset":
|
||
console.log(`🔄 Worker reset requested for device ${deviceId}: ${data?.reason}`);
|
||
console.log(`Error: ${data?.error}`);
|
||
this.handleDeviceWorkerReset(deviceWorker, data);
|
||
break;
|
||
|
||
default:
|
||
console.warn(`Unknown message type from device ${deviceId}:`, type);
|
||
}
|
||
};
|
||
|
||
worker.onerror = (error) => {
|
||
deviceWorker.status = "error";
|
||
console.error(`❌ Device worker ${deviceId} error:`, error.message);
|
||
};
|
||
|
||
worker.onmessageerror = (error) => {
|
||
console.error(`❌ Message error from device worker ${deviceId}:`, error);
|
||
};
|
||
}
|
||
|
||
private async handleDeviceWorkerReset(deviceWorker: DeviceWorker, resetData: any): Promise<void> {
|
||
const { deviceId } = deviceWorker;
|
||
console.log(`🔄 Resetting device worker ${deviceId} due to: ${resetData?.reason}`);
|
||
|
||
try {
|
||
// 终止当前worker
|
||
deviceWorker.worker.terminate();
|
||
|
||
// 创建新的worker
|
||
const newWorker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
|
||
type: "module",
|
||
});
|
||
|
||
// 更新worker引用
|
||
deviceWorker.worker = newWorker;
|
||
deviceWorker.status = "starting";
|
||
|
||
// 重新设置消息处理
|
||
this.setupDeviceWorkerHandlers(deviceWorker);
|
||
|
||
// 重新初始化
|
||
newWorker.postMessage({ type: "init", data: deviceWorker.config } as DeviceWorkerMessage);
|
||
|
||
console.log(`✅ Device worker ${deviceId} reset completed`);
|
||
} catch (error) {
|
||
console.error(`❌ Failed to reset device worker ${deviceId}:`, (error as Error).message);
|
||
deviceWorker.status = "error";
|
||
}
|
||
}
|
||
|
||
async stopDeviceWorker(deviceId: string): Promise<void> {
|
||
const deviceWorker = this.workers.get(deviceId);
|
||
if (!deviceWorker) {
|
||
console.warn(`⚠️ Device worker ${deviceId} not found`);
|
||
return;
|
||
}
|
||
|
||
console.log(`🛑 Stopping device worker ${deviceId}`);
|
||
|
||
return new Promise((resolve) => {
|
||
const timeout = setTimeout(() => {
|
||
deviceWorker.worker.terminate();
|
||
this.workers.delete(deviceId);
|
||
console.log(`🔄 Force terminated device worker ${deviceId}`);
|
||
resolve();
|
||
}, 5000);
|
||
|
||
deviceWorker.worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
|
||
if (event.data.type === "status" && event.data.data?.status === "closed") {
|
||
clearTimeout(timeout);
|
||
deviceWorker.worker.terminate();
|
||
this.workers.delete(deviceId);
|
||
console.log(`✅ Device worker ${deviceId} stopped gracefully`);
|
||
resolve();
|
||
}
|
||
};
|
||
|
||
deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage);
|
||
});
|
||
}
|
||
|
||
async stopAllDeviceWorkers(): Promise<void> {
|
||
console.log("🛑 Stopping all device workers...");
|
||
const stopPromises = Array.from(this.workers.keys()).map(deviceId => this.stopDeviceWorker(deviceId));
|
||
await Promise.all(stopPromises);
|
||
console.log("✅ All device workers stopped");
|
||
|
||
// 关闭MQTT连接
|
||
if (this.mqttClient && this.isConnected) {
|
||
try {
|
||
await this.mqttClient.endAsync();
|
||
console.log("✅ MQTT connection closed");
|
||
} catch (error) {
|
||
console.error(`❌ Error closing MQTT connection: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
getDeviceWorkerStatus(deviceId?: string) {
|
||
if (deviceId) {
|
||
const worker = this.workers.get(deviceId);
|
||
return worker ? { deviceId, status: worker.status, deviceInfo: worker.deviceInfo } : null;
|
||
}
|
||
|
||
return Array.from(this.workers.entries()).map(([id, worker]) => ({
|
||
deviceId: id,
|
||
status: worker.status,
|
||
deviceInfo: worker.deviceInfo
|
||
}));
|
||
}
|
||
|
||
// 命令行接口
|
||
async handleCommand(command: string, args: string[] = []) {
|
||
const cmd = command.toLowerCase();
|
||
|
||
switch (cmd) {
|
||
case "status":{
|
||
const statuses = this.getDeviceWorkerStatus();
|
||
console.log("📊 Device Worker Status:");
|
||
if (Array.isArray(statuses)) {
|
||
statuses.forEach((s: any) => {
|
||
console.log(` ${s.deviceId}: ${s.status}`);
|
||
console.log(` Device: ${s.deviceInfo.deviceName} (${s.deviceInfo.brandName})`);
|
||
console.log(` Address: ${s.deviceInfo.ip}:${s.deviceInfo.port}/${s.deviceInfo.slaveId}`);
|
||
});
|
||
}
|
||
break;
|
||
}
|
||
|
||
case "stop":
|
||
if (args[0]) {
|
||
await this.stopDeviceWorker(args[0]);
|
||
} else {
|
||
await this.stopAllDeviceWorkers();
|
||
}
|
||
break;
|
||
|
||
case "list":{
|
||
const devices = this.getDeviceWorkerStatus();
|
||
console.log("📱 Active Device Simulators:");
|
||
if (Array.isArray(devices)) {
|
||
devices.forEach((d: any) => {
|
||
console.log(` ${d.deviceId} - ${d.deviceInfo.deviceName}`);
|
||
});
|
||
}
|
||
break;
|
||
}
|
||
|
||
case "help":
|
||
console.log(`
|
||
📖 Device Simulator Commands:
|
||
status [deviceId] - Show device worker status
|
||
stop [deviceId] - Stop device worker(s)
|
||
list - List all active devices
|
||
quit/exit - Exit the manager
|
||
help - Show this help
|
||
`);
|
||
break;
|
||
|
||
case "quit":
|
||
case "exit":
|
||
await this.stopAllDeviceWorkers();
|
||
Deno.exit(0);
|
||
break;
|
||
|
||
default:
|
||
console.log(`❌ Unknown command: ${command}. Type 'help' for available commands.`);
|
||
}
|
||
}
|
||
|
||
async startInteractiveMode() {
|
||
console.log(`
|
||
🎮 Device Simulator Manager - Interactive Mode
|
||
Type 'help' for available commands
|
||
Type 'quit' or 'exit' to stop all workers and exit
|
||
`);
|
||
|
||
const encoder = new TextEncoder();
|
||
const decoder = new TextDecoder();
|
||
|
||
while (true) {
|
||
await Deno.stdout.write(encoder.encode("device-sim> "));
|
||
|
||
const buf = new Uint8Array(1024);
|
||
const n = await Deno.stdin.read(buf);
|
||
if (n === null) break;
|
||
|
||
const input = decoder.decode(buf.subarray(0, n)).trim();
|
||
if (!input) continue;
|
||
|
||
const [command, ...args] = input.split(/\s+/);
|
||
|
||
try {
|
||
await this.handleCommand(command, args);
|
||
} catch (error) {
|
||
console.error(`❌ Command failed:`, (error as Error).message);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 主程序入口
|
||
if (import.meta.main) {
|
||
const manager = new DeviceSimulatorManager();
|
||
|
||
try {
|
||
await manager.initialize();
|
||
|
||
console.log(`
|
||
🎯 Device Simulator Manager started!
|
||
📡 Listening for instantActions messages on MQTT
|
||
🔧 Will automatically create device simulators when deviceSetup actions are received
|
||
`);
|
||
|
||
// 处理进程信号
|
||
Deno.addSignalListener("SIGINT", async () => {
|
||
console.log("\n⚠️ Received SIGINT, shutting down...");
|
||
await manager.stopAllDeviceWorkers();
|
||
Deno.exit(0);
|
||
});
|
||
|
||
if (Deno.build.os !== "windows") {
|
||
Deno.addSignalListener("SIGTERM", async () => {
|
||
console.log("\n⚠️ Received SIGTERM, shutting down...");
|
||
await manager.stopAllDeviceWorkers();
|
||
Deno.exit(0);
|
||
});
|
||
}
|
||
|
||
// 启动交互模式
|
||
await manager.startInteractiveMode();
|
||
|
||
} catch (error) {
|
||
console.error("❌ Failed to start device simulator manager:", (error as Error).message);
|
||
Deno.exit(1);
|
||
}
|
||
}
|