api-amr/simulator_main.ts
2025-06-04 19:15:02 +08:00

1303 lines
43 KiB
TypeScript
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// simulator_main.ts - 主进程管理器
import { loadConfig, RawConfig } from "./simulator_config.ts";
import { createModuleLogger, perfMonitor, logMemoryUsage, setDebugLevel, LogLevel } from "./debug_logger.ts";
interface WorkerMessage {
type: "init" | "close" | "reconnect";
data?: any;
}
interface MainMessage {
type: "ready" | "error" | "status" | "log" | "device_request";
data?: any;
}
interface DeviceWorkerMessage {
type: "init" | "close" | "reconnect" | "action";
data?: any;
}
interface DeviceMainMessage {
type: "ready" | "error" | "status" | "log" | "reset";
data?: any;
}
interface AgvWorker {
worker: Worker;
agvId: string;
status: "starting" | "running" | "error" | "stopped";
config: any;
}
interface DeviceWorker {
worker: Worker;
deviceId: string;
status: "starting" | "running" | "error" | "stopped";
config: any;
deviceInfo: {
ip: string;
port: string;
slaveId: string;
deviceName: string;
protocolType: string;
brandName: string;
};
}
interface DeviceKV {
[deviceId: string]: {
status: string;
createdAt: string;
lastActivity: string;
deviceInfo: any;
// 添加寄存器配置信息
registersConfig?: string;
actionParameters?: Array<{
key: string;
value: string;
}>;
};
}
// 持久化设备信息接口
interface PersistedDeviceInfo {
deviceId: string;
deviceInfo: {
ip: string;
port: string;
slaveId: string;
deviceName: string;
protocolType: string;
brandName: string;
};
createdAt: string;
lastActivity: string;
// 添加寄存器配置信息
registersConfig?: string;
actionParameters?: Array<{
key: string;
value: string;
}>;
}
interface PersistedDevices {
devices: PersistedDeviceInfo[];
version: string;
lastUpdated: string;
}
class DevicePersistence {
private filePath: string;
constructor(filePath: string = "./device_registry.json") {
this.filePath = filePath;
}
async saveDevices(devices: PersistedDeviceInfo[]): Promise<void> {
const data: PersistedDevices = {
devices,
version: "1.0.0",
lastUpdated: new Date().toISOString()
};
try {
await Deno.writeTextFile(this.filePath, JSON.stringify(data, null, 2));
console.log(`📁 Saved ${devices.length} devices to ${this.filePath}`);
} catch (error) {
console.error(`❌ Failed to save devices: ${(error as Error).message}`);
}
}
async loadDevices(): Promise<PersistedDeviceInfo[]> {
try {
const data = await Deno.readTextFile(this.filePath);
const parsed: PersistedDevices = JSON.parse(data);
console.log(`📁 Loaded ${parsed.devices.length} devices from ${this.filePath}`);
return parsed.devices || [];
} catch (error) {
if (error instanceof Deno.errors.NotFound) {
console.log(`📁 No existing device registry found at ${this.filePath}`);
return [];
}
console.error(`❌ Failed to load devices: ${(error as Error).message}`);
return [];
}
}
async deviceExists(deviceId: string): Promise<boolean> {
const devices = await this.loadDevices();
return devices.some(device => device.deviceId === deviceId);
}
async addDevice(deviceInfo: PersistedDeviceInfo): Promise<void> {
const devices = await this.loadDevices();
// 检查是否已存在,如果存在则更新
const existingIndex = devices.findIndex(d => d.deviceId === deviceInfo.deviceId);
if (existingIndex >= 0) {
devices[existingIndex] = deviceInfo;
console.log(`📝 Updated existing device: ${deviceInfo.deviceId}`);
} else {
devices.push(deviceInfo);
console.log(`📝 Added new device: ${deviceInfo.deviceId}`);
}
await this.saveDevices(devices);
}
async removeDevice(deviceId: string): Promise<void> {
const devices = await this.loadDevices();
const filteredDevices = devices.filter(d => d.deviceId !== deviceId);
if (filteredDevices.length < devices.length) {
await this.saveDevices(filteredDevices);
console.log(`📝 Removed device: ${deviceId}`);
}
}
}
class SimulatorManager {
private workers: Map<string, AgvWorker> = new Map();
private deviceWorkers: Map<string, DeviceWorker> = new Map();
private deviceKV: DeviceKV = {};
private config!: RawConfig;
private devicePersistence: DevicePersistence;
private logger = createModuleLogger("SIMULATOR_MAIN");
constructor() {
this.devicePersistence = new DevicePersistence();
this.logger.info("🎯 SimulatorManager initialized");
}
async initialize() {
this.config = await loadConfig();
console.log("📋 Loaded configuration for", this.config.settings.robotCount, "robots");
console.log("🔧 Device simulator management initialized");
// 恢复持久化的设备
await this.restorePersistedDevices();
}
private async restorePersistedDevices() {
console.log("🔄 Restoring persisted devices...");
try {
const persistedDevices = await this.devicePersistence.loadDevices();
if (persistedDevices.length === 0) {
console.log("📱 No persisted devices found");
return;
}
console.log(`🚀 Starting restoration of ${persistedDevices.length} devices...`);
// helper to restore a device with retries on failure
const restoreWithRetry = async (pd: PersistedDeviceInfo) => {
try {
console.log(`🔧 Restoring device: ${pd.deviceId} (${pd.deviceInfo.deviceName})`);
await this.createDeviceWorkerFromPersisted(pd);
this.deviceKV[pd.deviceId] = {
status: "running",
createdAt: pd.createdAt,
lastActivity: new Date().toISOString(),
deviceInfo: pd.deviceInfo,
registersConfig: pd.registersConfig,
actionParameters: pd.actionParameters
};
console.log(`✅ Device ${pd.deviceId} restored successfully`);
} catch (error) {
console.error(`❌ Failed to restore device ${pd.deviceId}:`, (error as Error).message);
setTimeout(() => restoreWithRetry(pd), 5000);
}
};
for (const persistedDevice of persistedDevices) {
restoreWithRetry(persistedDevice);
}
console.log(`🎉 Device restoration tasks scheduled. Active devices: ${this.getDeviceCount()}`);
} catch (error) {
console.error("❌ Failed to restore persisted devices:", (error as Error).message);
}
}
private async createDeviceWorkerFromPersisted(persistedDevice: PersistedDeviceInfo): Promise<void> {
const { deviceId, deviceInfo } = persistedDevice;
// 创建Worker配置
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
const workerConfig = {
deviceId,
deviceInfo,
vdaInterface: this.config.mqttBroker.vdaInterface,
mqtt: {
brokerUrl,
options: {
clean: true,
keepalive: 60,
}
},
vehicle: {
manufacturer: this.config.vehicle.manufacturer,
serialNumber: deviceId,
vdaVersion: this.config.vehicle.vdaVersion,
}
};
await this.startDeviceWorker(deviceId, deviceInfo, workerConfig);
}
async startAllWorkers() {
console.log(`🚀 Starting ${this.config.settings.robotCount} AGV workers...`);
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
for (let i = 0; i < this.config.settings.robotCount; i++) {
const agvId = `${this.config.vehicle.serialNumber}${i}`;
const workerConfig = {
vdaInterface: this.config.mqttBroker.vdaInterface,
zoneSetId: this.config.settings.mapId,
mapId: this.config.settings.mapId,
vehicle: {
manufacturer: this.config.vehicle.manufacturer,
serialNumber: agvId,
vdaVersion: this.config.vehicle.vdaVersion,
},
mqtt: {
brokerUrl,
options: {
clean: true,
keepalive: 60,
}
},
settings: {
robotCount: this.config.settings.robotCount,
stateFrequency: this.config.settings.stateFrequency,
visualizationFrequency: this.config.settings.visualizationFrequency,
speed: this.config.settings.speed,
},
};
await this.startWorker(agvId, workerConfig);
}
}
private async startWorker(agvId: string, config: any): Promise<void> {
if (this.workers.has(agvId)) {
console.log(`⚠️ Worker ${agvId} already exists, stopping it first`);
await this.stopWorker(agvId);
}
console.log(`🔧 Creating worker for AGV ${agvId}`);
const worker = new Worker(new URL("./simulator.ts", import.meta.url).href, {
type: "module",
});
const agvWorker: AgvWorker = {
worker,
agvId,
status: "starting",
config,
};
this.workers.set(agvId, agvWorker);
this.setupWorkerHandlers(agvWorker);
// 发送初始化消息
worker.postMessage({ type: "init", data: config } as WorkerMessage);
// 等待worker准备就绪
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
agvWorker.status = "error";
reject(new Error(`Worker ${agvId} failed to start within timeout`));
}, 15000); // 15秒超时
const originalHandler = worker.onmessage;
worker.onmessage = (event: MessageEvent<MainMessage>) => {
if (event.data.type === "ready") {
clearTimeout(timeout);
agvWorker.status = "running";
worker.onmessage = originalHandler;
console.log(`✅ Worker ${agvId} started successfully`);
resolve();
} else if (event.data.type === "error") {
clearTimeout(timeout);
agvWorker.status = "error";
worker.onmessage = originalHandler;
reject(new Error(`Worker ${agvId} failed to start: ${event.data.data?.error}`));
}
};
});
}
private setupWorkerHandlers(agvWorker: AgvWorker) {
const { worker, agvId } = agvWorker;
worker.onmessage = (event: MessageEvent<MainMessage>) => {
const { type, data } = event.data;
switch (type) {
case "ready":
agvWorker.status = "running";
console.log(`✅ AGV ${agvId} is ready`);
break;
case "error":
agvWorker.status = "error";
console.error(`❌ AGV ${agvId} error:`, data?.error);
break;
case "status":
console.log(`📊 AGV ${agvId} status:`, data?.status);
if (data?.status === "closed") {
agvWorker.status = "stopped";
}
break;
case "log":
// 转发日志到控制台
console.log(data?.message);
break;
case "device_request":
// 处理设备模拟器请求
this.handleDeviceRequest(agvId, data);
break;
default:
console.warn(`Unknown message type from ${agvId}:`, type);
}
};
worker.onerror = (error) => {
agvWorker.status = "error";
console.error(`❌ Worker ${agvId} error:`, error.message);
};
worker.onmessageerror = (error) => {
console.error(`❌ Message error from worker ${agvId}:`, error);
};
}
async stopWorker(agvId: string): Promise<void> {
const agvWorker = this.workers.get(agvId);
if (!agvWorker) {
console.warn(`⚠️ Worker ${agvId} not found`);
return;
}
console.log(`🛑 Stopping worker ${agvId}`);
return new Promise((resolve) => {
const timeout = setTimeout(() => {
agvWorker.worker.terminate();
this.workers.delete(agvId);
console.log(`🔄 Force terminated worker ${agvId}`);
resolve();
}, 5000);
agvWorker.worker.onmessage = (event: MessageEvent<MainMessage>) => {
if (event.data.type === "status" && event.data.data?.status === "closed") {
clearTimeout(timeout);
agvWorker.worker.terminate();
this.workers.delete(agvId);
console.log(`✅ Worker ${agvId} stopped gracefully`);
resolve();
}
};
agvWorker.worker.postMessage({ type: "close" } as WorkerMessage);
});
}
async stopAllWorkers(): Promise<void> {
console.log("🛑 Stopping all workers...");
const stopPromises = Array.from(this.workers.keys()).map(agvId => this.stopWorker(agvId));
await Promise.all(stopPromises);
// 同时停止所有设备模拟器
await this.stopAllDeviceWorkers();
console.log("✅ All workers stopped");
}
async resetWorker(agvId: string): Promise<void> {
const agvWorker = this.workers.get(agvId);
if (!agvWorker) {
console.error(`❌ Worker ${agvId} not found`);
return;
}
console.log(`🔄 Resetting worker ${agvId} (restarting worker process)`);
const config = agvWorker.config;
await this.stopWorker(agvId);
await this.startWorker(agvId, config);
}
async reconnectWorker(agvId: string): Promise<void> {
const agvWorker = this.workers.get(agvId);
if (!agvWorker) {
console.error(`❌ Worker ${agvId} not found`);
return;
}
console.log(`🔌 Reconnecting worker ${agvId}`);
agvWorker.worker.postMessage({ type: "reconnect" } as WorkerMessage);
}
async restartWorker(agvId: string): Promise<void> {
const agvWorker = this.workers.get(agvId);
if (!agvWorker) {
console.error(`❌ Worker ${agvId} not found`);
return;
}
console.log(`🔄 Restarting worker ${agvId}`);
const config = agvWorker.config;
await this.stopWorker(agvId);
await this.startWorker(agvId, config);
}
getWorkerStatus(agvId?: string) {
if (agvId) {
const worker = this.workers.get(agvId);
return worker ? { agvId, status: worker.status } : null;
}
return Array.from(this.workers.entries()).map(([id, worker]) => ({
agvId: id,
status: worker.status
}));
}
getWorkerCount(): number {
return this.workers.size;
}
// 交互式命令处理
async handleCommand(command: string, args: string[] = []) {
const cmd = command.toLowerCase();
switch (cmd) {
case "status":
if (args[0]) {
const status = this.getWorkerStatus(args[0]);
if (status && !Array.isArray(status)) {
console.log(`📊 AGV ${args[0]}: ${status.status}`);
} else {
console.log(`❌ AGV ${args[0]} not found`);
}
} else {
const statuses = this.getWorkerStatus();
console.log("📊 AGV Worker Status:");
if (Array.isArray(statuses)) {
statuses.forEach((s: any) => console.log(` ${s.agvId}: ${s.status}`));
}
// 显示设备状态
console.log(`📱 Device Workers: ${this.getDeviceCount()}`);
const deviceKV = this.getDeviceKV();
for (const [deviceId, info] of Object.entries(deviceKV)) {
console.log(` ${deviceId}: ${info.status} (${info.deviceInfo.deviceName})`);
}
}
break;
case "devices":{
const deviceKV = this.getDeviceKV();
console.log(`📱 Active Device Simulators (${this.getDeviceCount()}):`);
for (const [deviceId, info] of Object.entries(deviceKV)) {
console.log(` ${deviceId}:`);
console.log(` Name: ${info.deviceInfo.deviceName} (${info.deviceInfo.brandName})`);
console.log(` Address: ${info.deviceInfo.ip}:${info.deviceInfo.port}/${info.deviceInfo.slaveId}`);
console.log(` Protocol: ${info.deviceInfo.protocolType}`);
console.log(` Status: ${info.status}`);
console.log(` Created: ${info.createdAt}`);
console.log(` Last Activity: ${info.lastActivity}`);
// 显示寄存器配置信息
if (info.registersConfig) {
console.log(` 📝 Registers Configuration:`);
try {
const registers = JSON.parse(info.registersConfig);
console.log(` Total Registers: ${registers.length}`);
registers.forEach((reg: any, index: number) => {
console.log(` [${index + 1}] ${reg.name}:`);
console.log(` Function Code: ${reg.fnCode} (${this.getFunctionCodeDescription(reg.fnCode)})`);
console.log(` Address: ${reg.regAddress}`);
console.log(` Count: ${reg.regCount}`);
});
} catch (error) {
console.log(` ❌ Invalid JSON format: ${info.registersConfig}`);
}
} else {
console.log(` 📝 Registers Configuration: None`);
}
// 显示其他动作参数
if (info.actionParameters && info.actionParameters.length > 0) {
console.log(` 🔧 Action Parameters:`);
info.actionParameters.forEach((param: any, index: number) => {
if (param.key !== "registers") { // registers已经单独显示了
const displayValue = param.value.length > 50 ?
param.value.substring(0, 50) + '...' :
param.value;
console.log(` [${index + 1}] ${param.key}: ${displayValue}`);
}
});
}
console.log(); // 空行分隔
}
break;
}
case "devicestop":
if (args[0]) {
await this.stopDeviceSimulator(args[0]);
} else {
console.log("Usage: devicestop <deviceId>");
console.log("Available devices:");
const devices = this.getDeviceKV();
for (const deviceId of Object.keys(devices)) {
console.log(` ${deviceId}`);
}
}
break;
case "persistent":
if (args[0] === "list") {
await this.listPersistedDevices();
} else if (args[0] === "clear") {
await this.clearPersistedDevices();
} else if (args[0] === "restore") {
await this.restorePersistedDevices();
} else {
console.log("Usage: persistent <list|clear|restore>");
console.log(" list - Show all persisted devices");
console.log(" clear - Clear all persisted devices");
console.log(" restore - Restore persisted devices (force reload)");
}
break;
case "start":
await this.startAllWorkers();
break;
case "stop":
if (args[0]) {
await this.stopWorker(args[0]);
} else {
await this.stopAllWorkers();
}
break;
case "reset":
if (args[0]) {
await this.resetWorker(args[0]);
} else {
// 重置所有workers
console.log("🔄 Resetting all workers...");
const workerIds = Array.from(this.workers.keys());
for (const agvId of workerIds) {
await this.resetWorker(agvId);
}
}
break;
case "reconnect":
if (args[0]) {
await this.reconnectWorker(args[0]);
} else {
console.log("Usage: reconnect <agvId>");
}
break;
case "restart":
if (args[0]) {
await this.restartWorker(args[0]);
} else {
console.log("Usage: restart <agvId>");
}
break;
case "count":
console.log(`📊 Active Workers: ${this.getWorkerCount()}`);
console.log(`📱 Active Devices: ${this.getDeviceCount()}`);
break;
case "debug":
if (args[0] === "level") {
if (args[1]) {
const level = args[1].toUpperCase();
const module = args[2];
try {
setDebugLevel(level, module);
} catch (error) {
console.log(`❌ Invalid debug level: ${level}`);
console.log("Valid levels: ERROR, WARN, INFO, DEBUG, TRACE");
}
} else {
console.log("Usage: debug level <LEVEL> [module]");
console.log("Levels: ERROR, WARN, INFO, DEBUG, TRACE");
console.log("Modules: DEVICE_SIMULATOR, MODBUS, MQTT, REGISTER_CONFIG, DEVICE_MANAGER, SIMULATOR_MAIN");
}
} else if (args[0] === "memory") {
logMemoryUsage("SIMULATOR_MAIN");
} else {
console.log(`
🔧 Debug Commands:
debug level <LEVEL> [module] - Set debug level (ERROR/WARN/INFO/DEBUG/TRACE)
debug memory - Show current memory usage
Examples:
debug level DEBUG - Set global debug level
debug level TRACE DEVICE_SIMULATOR - Set module-specific level
debug memory - Show memory usage
`);
}
break;
case "help":
console.log(`
📖 Available Commands:
AGV Management:
status [agvId] - Show AGV worker status
start - Start all AGV workers
stop [agvId] - Stop AGV worker(s)
reset [agvId] - Reset AGV worker(s) (stop and restart)
reconnect <agvId> - Reconnect AGV worker MQTT
restart <agvId> - Restart AGV worker process
count - Show worker and device counts
Device Management:
devices - List all active device simulators
devicestop <id> - Stop specific device simulator
persistent list - Show all persisted devices
persistent clear - Clear all persisted devices
persistent restore - Force restore persisted devices
Debug Commands:
debug level <LEVEL> [module] - Set debug level
debug memory - Show memory usage
General:
help - Show this help
quit/exit - Stop all workers and exit
`);
break;
case "quit":
case "exit":
await this.stopAllWorkers();
Deno.exit(0);
break;
default:
console.log(`❌ Unknown command: ${command}. Type 'help' for available commands.`);
}
}
async startInteractiveMode() {
console.log(`
🎮 Interactive Mode Started
Type 'help' for available commands
Type 'quit' or 'exit' to stop all workers and exit
`);
// 简单的命令行界面
const encoder = new TextEncoder();
const decoder = new TextDecoder();
while (true) {
// 显示提示符
await Deno.stdout.write(encoder.encode("simulator> "));
// 读取用户输入
const buf = new Uint8Array(1024);
const n = await Deno.stdin.read(buf);
if (n === null) break;
const input = decoder.decode(buf.subarray(0, n)).trim();
if (!input) continue;
const [command, ...args] = input.split(/\s+/);
try {
await this.handleCommand(command, args);
} catch (error) {
console.error(`❌ Command failed:`, (error as Error).message);
}
}
}
// 设备模拟器管理方法
private async handleDeviceRequest(agvId: string, data: any): Promise<void> {
const { action, deviceInfo, deviceId, originalAction } = data;
console.log(`🔧 Handling device request from ${agvId}: ${action}`);
switch (action) {
case "create":
await this.createDeviceSimulator(deviceInfo, originalAction);
break;
case "stop":
await this.stopDeviceSimulator(deviceId);
break;
case "delete":
await this.stopDeviceSimulator(deviceId);
break;
case "forward":
await this.forwardActionToDevice(deviceId, originalAction);
break;
default:
console.warn(`Unknown device action: ${action}`);
}
}
private async createDeviceSimulator(deviceInfo: any, originalAction: any): Promise<void> {
const deviceId = `${deviceInfo.ip}-${deviceInfo.port}-${deviceInfo.slaveId}`;
console.log(`🆕 Processing device creation request: ${deviceId}`);
console.log(`📱 Device: ${deviceInfo.deviceName} (${deviceInfo.brandName})`);
console.log(`🔌 Protocol: ${deviceInfo.protocolType}`);
// 检查设备模拟器是否已经在运行中
if (this.deviceWorkers.has(deviceId)) {
console.log(`⚠️ Device ${deviceId} worker is already running, ignoring duplicate creation request`);
console.log(`📡 Forwarding action to existing device instead`);
await this.forwardActionToDevice(deviceId, originalAction);
return;
}
// 检查是否在持久化存储中存在(可能是重启后还未恢复的设备)
const persistedExists = await this.devicePersistence.deviceExists(deviceId);
if (persistedExists) {
console.log(`📁 Device ${deviceId} exists in persistent storage but worker not running`);
console.log(`🚀 Starting worker for existing persisted device`);
} else {
console.log(`🆕 Creating new device: ${deviceId}`);
}
// 创建Worker配置
const brokerUrl = `mqtt://${this.config.mqttBroker.host}:${this.config.mqttBroker.port}`;
const workerConfig = {
deviceId,
deviceInfo,
vdaInterface: this.config.mqttBroker.vdaInterface,
mqtt: {
brokerUrl,
options: {
clean: true,
keepalive: 60,
}
},
vehicle: {
manufacturer: this.config.vehicle.manufacturer,
serialNumber: deviceId,
vdaVersion: this.config.vehicle.vdaVersion,
}
};
// Attempt to start the device worker, but proceed regardless of success or failure
let startError: Error | null = null;
try {
await this.startDeviceWorker(deviceId, deviceInfo, workerConfig);
} catch (error) {
startError = error as Error;
console.error(`❌ Failed to start device worker for ${deviceId}:`, startError.message);
}
// Record and persist the device in all cases
const now = new Date().toISOString();
// 提取寄存器配置
const registersParam = originalAction.actionParameters?.find((p: any) => p.key === "registers");
const registersConfig = registersParam?.value;
// 更新KV存储
this.deviceKV[deviceId] = {
status: startError ? "error" : "running",
createdAt: now,
lastActivity: now,
deviceInfo,
registersConfig,
actionParameters: originalAction.actionParameters
};
// 保存到持久化存储
const persistedDevice: PersistedDeviceInfo = {
deviceId,
deviceInfo,
createdAt: now,
lastActivity: now,
registersConfig,
actionParameters: originalAction.actionParameters
};
await this.devicePersistence.addDevice(persistedDevice);
// 等待设备模拟器启动完成后发送初始action
setTimeout(async () => {
await this.forwardActionToDevice(deviceId, originalAction);
}, 2000);
// Provide a combined status message
if (startError) {
console.log(`⚠ Device simulator ${deviceId} added but worker start failed: ${startError.message}`);
} else {
console.log(`✅ Device simulator ${deviceId} created and persisted successfully`);
}
}
private async startDeviceWorker(deviceId: string, deviceInfo: any, config: any): Promise<void> {
console.log(`🔧 Starting device worker for ${deviceId}`);
const worker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
const deviceWorker: DeviceWorker = {
worker,
deviceId,
status: "starting",
config,
deviceInfo,
};
this.deviceWorkers.set(deviceId, deviceWorker);
this.setupDeviceWorkerHandlers(deviceWorker);
// 发送初始化消息
worker.postMessage({ type: "init", data: config } as DeviceWorkerMessage);
// 等待worker准备就绪
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
deviceWorker.status = "error";
reject(new Error(`Device worker ${deviceId} failed to start within timeout`));
}, 15000);
const originalHandler = worker.onmessage;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "ready") {
clearTimeout(timeout);
deviceWorker.status = "running";
worker.onmessage = originalHandler;
console.log(`✅ Device worker ${deviceId} started successfully`);
resolve();
} else if (event.data.type === "error") {
clearTimeout(timeout);
deviceWorker.status = "error";
worker.onmessage = originalHandler;
reject(new Error(`Device worker ${deviceId} failed to start: ${event.data.data?.error}`));
}
};
});
}
private setupDeviceWorkerHandlers(deviceWorker: DeviceWorker) {
const { worker, deviceId } = deviceWorker;
worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
const { type, data } = event.data;
switch (type) {
case "ready":
deviceWorker.status = "running";
console.log(`✅ Device ${deviceId} is ready`);
break;
case "error":
deviceWorker.status = "error";
console.error(`❌ Device ${deviceId} error:`, data?.error);
break;
case "status":
console.log(`📊 Device ${deviceId} status:`, data?.status);
if (data?.status === "closed") {
deviceWorker.status = "stopped";
// 设备 worker 自我关闭时,需要完整清理所有数据
this.handleDeviceWorkerClosure(deviceId);
}
break;
case "log":
// 转发日志到控制台
console.log(data?.message);
break;
case "reset":
console.log(`🔄 Worker reset requested for device ${deviceId}: ${data?.reason}`);
console.log(`Error: ${data?.error}`);
this.handleDeviceWorkerReset(deviceWorker, data);
break;
default:
console.warn(`Unknown message type from device ${deviceId}:`, type);
}
// 更新最后活动时间
if (this.deviceKV[deviceId]) {
this.deviceKV[deviceId].lastActivity = new Date().toISOString();
}
};
worker.onerror = (error) => {
deviceWorker.status = "error";
console.error(`❌ Device worker ${deviceId} error:`, error.message);
};
worker.onmessageerror = (error) => {
console.error(`❌ Message error from device worker ${deviceId}:`, error);
};
}
private async handleDeviceWorkerReset(deviceWorker: DeviceWorker, resetData: any): Promise<void> {
const { deviceId } = deviceWorker;
console.log(`🔄 Resetting device worker ${deviceId} due to: ${resetData?.reason}`);
try {
// 终止当前worker
deviceWorker.worker.terminate();
// 创建新的worker
const newWorker = new Worker(new URL("./device_simulator.ts", import.meta.url).href, {
type: "module",
});
// 更新worker引用
deviceWorker.worker = newWorker;
deviceWorker.status = "starting";
// 重新设置消息处理
this.setupDeviceWorkerHandlers(deviceWorker);
// 重新初始化
newWorker.postMessage({ type: "init", data: deviceWorker.config } as DeviceWorkerMessage);
console.log(`✅ Device worker ${deviceId} reset completed`);
} catch (error) {
console.error(`❌ Failed to reset device worker ${deviceId}:`, (error as Error).message);
deviceWorker.status = "error";
}
}
private async forwardActionToDevice(deviceId: string, action: any): Promise<void> {
const deviceWorker = this.deviceWorkers.get(deviceId);
if (!deviceWorker) {
console.warn(`⚠️ Device worker ${deviceId} not found, cannot forward action`);
return;
}
console.log(`📡 Forwarding action to device ${deviceId}: ${action.actionType}`);
deviceWorker.worker.postMessage({
type: "action",
data: action
} as DeviceWorkerMessage);
}
// 处理设备 worker 自我关闭(如 deviceDelete 触发的关闭)
private async handleDeviceWorkerClosure(deviceId: string): Promise<void> {
console.log(`🔄 Handling device worker closure for ${deviceId}`);
// 清理内存引用
this.cleanupDeviceReferences(deviceId);
// 检查是否是 deviceDelete 操作导致的关闭,如果是,则清理持久化数据
// 注意:这里我们需要判断是正常删除还是意外关闭
// 由于 deviceDelete 会主动关闭 worker我们检查持久化存储中是否仍然存在此设备
try {
const deviceExists = await this.devicePersistence.deviceExists(deviceId);
if (deviceExists) {
console.log(`🗑️ Device ${deviceId} closure appears to be from deviceDelete, cleaning up persistent data...`);
await this.cleanupDeviceData(deviceId);
} else {
console.log(` Device ${deviceId} closure - no persistent data cleanup needed`);
}
} catch (error) {
console.error(`❌ Error checking device existence during closure: ${(error as Error).message}`);
}
console.log(`✅ Device worker closure handling completed for ${deviceId}`);
}
// 只停止设备worker不删除持久化数据用于应用退出时
private async stopDeviceWorker(deviceId: string): Promise<void> {
const deviceWorker = this.deviceWorkers.get(deviceId);
if (!deviceWorker) {
console.warn(`⚠️ Device worker ${deviceId} not found`);
return;
}
console.log(`🛑 Stopping device worker ${deviceId} (preserving persistence)`);
return new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
deviceWorker.worker.terminate();
this.deviceWorkers.delete(deviceId);
delete this.deviceKV[deviceId];
console.log(`🔄 Force terminated device worker ${deviceId}`);
resolve();
}, 5000);
deviceWorker.worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "status" && event.data.data?.status === "closed") {
clearTimeout(timeout);
deviceWorker.worker.terminate();
this.deviceWorkers.delete(deviceId);
delete this.deviceKV[deviceId];
console.log(`✅ Device worker ${deviceId} stopped gracefully`);
resolve();
}
};
deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage);
});
}
// 停止设备模拟器并删除持久化数据(用于明确的设备删除)
private async stopDeviceSimulator(deviceId: string): Promise<void> {
const deviceWorker = this.deviceWorkers.get(deviceId);
if (!deviceWorker) {
console.warn(`⚠️ Device worker ${deviceId} not found`);
// 即使worker不存在也要尝试清理持久化数据
await this.cleanupDeviceData(deviceId);
return;
}
console.log(`🛑 Stopping device simulator ${deviceId} (removing all data)`);
return new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
console.log(`⚠️ Timeout waiting for ${deviceId} to close gracefully, forcing termination`);
deviceWorker.worker.terminate();
this.cleanupDeviceReferences(deviceId);
console.log(`🔄 Force terminated device worker ${deviceId}`);
resolve();
}, 5000);
deviceWorker.worker.onmessage = (event: MessageEvent<DeviceMainMessage>) => {
if (event.data.type === "status" && event.data.data?.status === "closed") {
clearTimeout(timeout);
deviceWorker.worker.terminate();
this.cleanupDeviceReferences(deviceId);
console.log(`✅ Device worker ${deviceId} stopped gracefully`);
resolve();
}
};
deviceWorker.worker.postMessage({ type: "close" } as DeviceWorkerMessage);
}).then(async () => {
// 完整清理所有设备数据
await this.cleanupDeviceData(deviceId);
});
}
// 清理内存中的设备引用
private cleanupDeviceReferences(deviceId: string): void {
console.log(`🧹 Cleaning up device references for ${deviceId}`);
// 清理设备Worker映射
if (this.deviceWorkers.has(deviceId)) {
this.deviceWorkers.delete(deviceId);
console.log(` ✅ Removed device worker reference`);
}
// 清理设备KV存储
if (this.deviceKV[deviceId]) {
delete this.deviceKV[deviceId];
console.log(` ✅ Removed device KV data`);
}
}
// 清理持久化的设备数据
private async cleanupDeviceData(deviceId: string): Promise<void> {
console.log(`🗑️ Cleaning up persistent data for ${deviceId}`);
try {
// 从持久化存储中移除设备
await this.devicePersistence.removeDevice(deviceId);
console.log(` ✅ Device ${deviceId} removed from persistent storage`);
// 验证删除是否成功
const stillExists = await this.devicePersistence.deviceExists(deviceId);
if (stillExists) {
console.warn(` ⚠️ Device ${deviceId} still exists in persistent storage after deletion attempt`);
} else {
console.log(` ✅ Verified: Device ${deviceId} successfully removed from persistent storage`);
}
} catch (error) {
console.error(` ❌ Failed to remove device ${deviceId} from persistent storage:`, (error as Error).message);
// 尝试强制清理
console.log(` 🔄 Attempting force cleanup for ${deviceId}`);
try {
const devices = await this.devicePersistence.loadDevices();
const filteredDevices = devices.filter(d => d.deviceId !== deviceId);
await this.devicePersistence.saveDevices(filteredDevices);
console.log(` ✅ Force cleanup completed for ${deviceId}`);
} catch (forceError) {
console.error(` ❌ Force cleanup failed:`, (forceError as Error).message);
}
}
console.log(`🎯 Device ${deviceId} cleanup completed`);
}
private async stopAllDeviceWorkers(): Promise<void> {
console.log("🛑 Stopping all device workers...");
// 使用新的stopDeviceWorker方法只停止worker但保留持久化数据
const stopPromises = Array.from(this.deviceWorkers.keys()).map(deviceId => this.stopDeviceWorker(deviceId));
await Promise.all(stopPromises);
console.log("✅ All device workers stopped");
}
getDeviceKV(): DeviceKV {
return { ...this.deviceKV };
}
getDeviceCount(): number {
return this.deviceWorkers.size;
}
private getFunctionCodeDescription(fnCode: string): string {
const descriptions: Record<string, string> = {
"1": "Read Coils (读线圈)",
"2": "Read Discrete Inputs (读离散输入)",
"3": "Read Holding Registers (读保持寄存器)",
"4": "Read Input Registers (读输入寄存器)",
"5": "Write Single Coil (写单个线圈)",
"6": "Write Single Register (写单个寄存器)",
"15": "Write Multiple Coils (写多个线圈)",
"16": "Write Multiple Registers (写多个寄存器)"
};
return descriptions[fnCode] || `Unknown Function Code (未知功能码 ${fnCode})`;
}
private async listPersistedDevices() {
const devices = await this.devicePersistence.loadDevices();
console.log(`📁 Persisted Devices (${devices.length}):`);
devices.forEach((device, index) => {
console.log(`${index + 1}. ${device.deviceId}:`);
console.log(` Name: ${device.deviceInfo.deviceName} (${device.deviceInfo.brandName})`);
console.log(` Address: ${device.deviceInfo.ip}:${device.deviceInfo.port}/${device.deviceInfo.slaveId}`);
console.log(` Protocol: ${device.deviceInfo.protocolType}`);
console.log(` Created: ${device.createdAt}`);
console.log(` Last Activity: ${device.lastActivity}`);
// 显示寄存器配置信息
if (device.registersConfig) {
console.log(` 📝 Registers Configuration:`);
try {
const registers = JSON.parse(device.registersConfig);
console.log(` Total Registers: ${registers.length}`);
registers.forEach((reg: any, regIndex: number) => {
console.log(` [${regIndex + 1}] ${reg.name}:`);
console.log(` Function Code: ${reg.fnCode} (${this.getFunctionCodeDescription(reg.fnCode)})`);
console.log(` Address: ${reg.regAddress}`);
console.log(` Count: ${reg.regCount}`);
});
} catch (error) {
console.log(` ❌ Invalid JSON format: ${device.registersConfig}`);
}
} else {
console.log(` 📝 Registers Configuration: None`);
}
console.log(); // 空行分隔
});
}
private async clearPersistedDevices() {
await this.devicePersistence.saveDevices([]);
console.log("📁 All persisted devices cleared");
}
}
// 主程序入口
if (import.meta.main) {
const manager = new SimulatorManager();
// 重试配置
const RETRY_CONFIG = {
maxRetries: 10, // 最大重试次数
initialDelay: 1000, // 初始延迟 1秒
maxDelay: 30000, // 最大延迟 30秒
backoffMultiplier: 2 // 指数退避倍数
};
let retryCount = 0;
let currentDelay = RETRY_CONFIG.initialDelay;
// 延迟函数
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
// 启动函数
const startSimulator = async (): Promise<void> => {
try {
console.log(`🚀 Starting simulator (attempt ${retryCount + 1}/${RETRY_CONFIG.maxRetries + 1})...`);
await manager.initialize();
await manager.startAllWorkers();
console.log(`
🎯 All ${manager.getWorkerCount()} AGV workers started successfully!
Starting interactive mode...
`);
// 重置重试计数器
retryCount = 0;
currentDelay = RETRY_CONFIG.initialDelay;
return; // 成功启动,退出重试循环
} catch (error) {
const errorMessage = (error as Error).message;
console.error(`❌ Startup failed (attempt ${retryCount + 1}):`, errorMessage);
retryCount++;
if (retryCount > RETRY_CONFIG.maxRetries) {
console.error(`💥 Maximum retry attempts (${RETRY_CONFIG.maxRetries}) exceeded. Giving up.`);
console.error("🔧 Please check your configuration and network connectivity.");
Deno.exit(1);
}
console.log(`⏳ Waiting ${currentDelay / 1000}s before retry ${retryCount + 1}...`);
await delay(currentDelay);
// 指数退避,但不超过最大延迟
currentDelay = Math.min(
currentDelay * RETRY_CONFIG.backoffMultiplier,
RETRY_CONFIG.maxDelay
);
// 递归重试
return startSimulator();
}
};
// 启动模拟器
await startSimulator();
// 处理进程信号
Deno.addSignalListener("SIGINT", async () => {
console.log("\n⚠ Received SIGINT, shutting down...");
await manager.stopAllWorkers();
Deno.exit(0);
});
// SIGTERM只在非Windows系统支持
if (Deno.build.os !== "windows") {
Deno.addSignalListener("SIGTERM", async () => {
console.log("\n⚠ Received SIGTERM, shutting down...");
await manager.stopAllWorkers();
Deno.exit(0);
});
}
// 启动交互模式
await manager.startInteractiveMode();
}