api-amr/device_simulator.ts

1374 lines
46 KiB
TypeScript
Raw Permalink Normal View History

2025-06-04 19:15:02 +08:00
// device_simulator.ts - 设备模拟器Worker
import { connect, MqttClient } from "npm:mqtt@5.10.1";
import { ModbusManager } from "./modbus_manager.ts";
import { getDefaultPollInterval, parseRegistersFromString, createModbusPollConfigFromRegisters } from "./device_protocol_config.ts";
import { createModuleLogger, perfMonitor, logMemoryUsage } from "./debug_logger.ts";
interface DeviceInfo {
ip: string;
port: string;
slaveId: string;
deviceName: string;
protocolType: string;
brandName: string;
registers: string;
}
interface DeviceWorkerMessage {
type: "init" | "close" | "reconnect" | "action";
data?: any;
}
interface DeviceMainMessage {
type: "ready" | "error" | "status" | "log" | "reset";
data?: any;
}
interface DeviceConfig {
deviceId: string;
deviceInfo: DeviceInfo;
vdaInterface: string;
mqtt: {
brokerUrl: string;
options: any;
};
vehicle: {
manufacturer: string;
serialNumber: string;
vdaVersion: string;
};
// 设备创建时的动作参数
actionParameters?: Array<{
key: string;
value: string;
}>;
modbus?: {
host: string;
port: number;
unitId: number;
poll: Array<{
fn: "readHoldingRegisters" | "readInputRegisters" | "readCoils" | "readDiscreteInputs";
start: number;
len: number;
mqttKey: string;
bind?: string;
}>;
pollInterval: number;
};
}
interface Position {
x: number;
y: number;
theta: number;
mapId: string;
mapDescription?: string;
}
interface NodeState {
nodeId: string;
sequenceId: number;
nodeDescription?: string;
nodePosition?: Position;
released: boolean;
}
interface EdgeState {
edgeId: string;
sequenceId: number;
edgeDescription?: string;
released: boolean;
trajectory?: Position[];
}
interface AgvPosition {
x: number;
y: number;
theta: number;
mapId: string;
mapDescription?: string;
positionInitialized: boolean;
localizationScore: number;
deviationRange: number;
}
interface Velocity {
vx: number;
vy: number;
omega: number;
}
interface BatteryState {
batteryCharge: number;
batteryVoltage: number;
batteryHealth: number;
charging: boolean;
reach: number;
}
interface ErrorState {
errorType: string;
errorLevel: "WARNING" | "FATAL";
errorDescription: string;
errorHint?: string;
errorReferences?: Array<{ referenceKey: string; referenceValue: string }>;
}
interface SafetyState {
eStop: string;
fieldViolation: boolean;
}
interface ActionState {
actionType: string;
actionId: string;
actionDescription?: string;
actionStatus: "WAITING" | "INITIALIZING" | "RUNNING" | "PAUSED" | "FINISHED" | "FAILED";
resultDescription?: string;
actionParameters?: Array<{ key: string; value: string }>;
blockingType: "NONE" | "SOFT" | "HARD";
}
interface AgvState {
headerId: number;
timestamp: string;
version: string;
manufacturer: string;
serialNumber: string;
orderId: string;
orderUpdateId: number;
zoneSetId: string;
lastNodeId: string;
lastNodeSequenceId: number;
driving: boolean;
paused: boolean;
newBaseRequest: boolean;
distanceSinceLastNode: number;
operatingMode: "AUTOMATIC" | "SEMIAUTOMATIC" | "MANUAL" | "SERVICE" | "TEACHIN";
nodeStates: NodeState[];
edgeStates: EdgeState[];
agvPosition: AgvPosition;
velocity: Velocity;
loads: any[];
batteryState: BatteryState;
errors: ErrorState[];
information: any[];
safetyState: SafetyState;
actionStates: ActionState[];
waitingForInteractionZoneRelease: boolean;
forkState?: any;
}
interface Connection {
headerId: number;
timestamp: string;
version: string;
manufacturer: string;
serialNumber: string;
connectionState: "ONLINE" | "OFFLINE" | "CONNECTIONBROKEN";
}
class DeviceSimulator {
private client!: MqttClient;
private config!: DeviceConfig;
private state!: AgvState;
private connection!: Connection;
private stateInterval?: number;
private connectionInterval?: number;
private headerId = 1;
private isConnected = false;
private deviceActions: ActionState[] = [];
private modbus!: ModbusManager;
private modbusPollTimer?: number;
private logger = createModuleLogger("DEVICE_SIMULATOR");
private enableLog = false;
private log(message: string) {
if(this.enableLog){
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] [Device ${this.config.deviceId}] ${message}`;
console.log(logMessage);
// 发送日志到主进程
self.postMessage({
type: "log",
data: { message: logMessage }
} as DeviceMainMessage);
}
}
private sendMessage(type: string, data?: any) {
self.postMessage({ type, data } as DeviceMainMessage);
}
private getFunctionCodeDescription(fnCode: string): string {
const descriptions: Record<string, string> = {
"1": "Read Coils (读线圈)",
"2": "Read Discrete Inputs (读离散输入)",
"3": "Read Holding Registers (读保持寄存器)",
"4": "Read Input Registers (读输入寄存器)",
"5": "Write Single Coil (写单个线圈)",
"6": "Write Single Register (写单个寄存器)",
"15": "Write Multiple Coils (写多个线圈)",
"16": "Write Multiple Registers (写多个寄存器)"
};
return descriptions[fnCode] || `Unknown Function Code (未知功能码 ${fnCode})`;
}
private getFunctionCodeFromModbusFunction(modbusFunction: string): string {
const functionMap: Record<string, string> = {
"readCoils": "1",
"readDiscreteInputs": "2",
"readHoldingRegisters": "3",
"readInputRegisters": "4"
};
return functionMap[modbusFunction] || "3"; // 默认为读保持寄存器
}
private updateRegisterInformation(registerName: string, bind: string, values: number[], address: number, count: number, modbusFunction: string, timestamp: string): void {
// 创建符合VDA 5050标准的信息项将寄存器信息放到infoReferences中
const infoType = `CUSTOM_REGISTER`;
const infoDescription = `REGISTER_${registerName.toUpperCase()}`;
const informationItem = {
infoType: infoType,
infoDescription: infoDescription,
infoLevel: "INFO",
infoReferences: [
{ referenceKey: "name", referenceValue: registerName },
{ referenceKey: "bind", referenceValue: bind },
{ referenceKey: "values", referenceValue: JSON.stringify(values) },
{ referenceKey: "address", referenceValue: address.toString() },
{ referenceKey: "count", referenceValue: count.toString() },
{ referenceKey: "function", referenceValue: modbusFunction },
{ referenceKey: "timestamp", referenceValue: timestamp }
]
};
// 查找是否已存在相同寄存器的信息项
const existingIndex = this.state.information.findIndex(info =>
info.infoDescription === infoDescription
);
if (existingIndex >= 0) {
// 更新现有信息项
this.state.information[existingIndex] = informationItem;
this.logger.trace("🔄 Updated register information", {
registerName,
infoType,
address,
newValues: values,
timestamp
});
} else {
// 添加新信息项
this.state.information.push(informationItem);
this.logger.trace(" Added new register information", {
registerName,
infoType,
address,
values,
totalInformation: this.state.information.length
});
}
// 可选限制information数组的大小避免无限增长
const maxInformation = 100; // 最多保留100个信息项
if (this.state.information.length > maxInformation) {
// 删除最旧的信息项(按时间戳排序)
this.state.information.sort((a, b) => {
const timeA = new Date(a.timeOfOccurrence).getTime();
const timeB = new Date(b.timeOfOccurrence).getTime();
return timeA - timeB;
});
const removedItems = this.state.information.splice(0, this.state.information.length - maxInformation);
this.logger.debug("🗑️ Removed old information items", {
removedCount: removedItems.length,
remainingCount: this.state.information.length
});
}
}
async initialize(config: DeviceConfig) {
this.config = config;
this.logger.info("🚀 Initializing device simulator", {
deviceId: config.deviceId,
deviceName: config.deviceInfo?.deviceName,
protocol: config.deviceInfo?.protocolType
});
// console.log(config);
logMemoryUsage("DEVICE_SIMULATOR");
try {
await perfMonitor.measureAsync("device_initialization", async () => {
await this.connectMqtt();
await this.setupModbus();
this.initializeState();
this.initializeConnection();
this.startPeriodicReporting();
});
this.logger.info("✅ Device simulator initialized successfully");
this.log("Device simulator initialized successfully");
this.sendMessage("ready");
} catch (error) {
this.logger.error("❌ Failed to initialize device simulator", error);
this.log(`Failed to initialize: ${(error as Error).message}`);
// 如果初始化失败请求重启worker
if ((error as Error).message.includes("MQTT") || (error as Error).message.includes("timeout")) {
this.log(`❌ Initialization failed due to MQTT issue, requesting worker reset`);
this.sendMessage("reset", { reason: "init_failure", error: (error as Error).message });
} else {
this.sendMessage("error", { error: (error as Error).message });
}
}
}
private async connectMqtt() {
this.log(`Connecting to MQTT broker: ${this.config.mqtt.brokerUrl}`);
this.client = connect(this.config.mqtt.brokerUrl, this.config.mqtt.options);
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
const error = new Error("MQTT connection timeout");
this.log(`❌ MQTT connection timeout, requesting worker reset`);
this.sendMessage("reset", { reason: "mqtt_timeout", error: error.message });
reject(error);
}, 10000);
this.client.on("connect", () => {
clearTimeout(timeout);
this.isConnected = true;
this.log("Connected to MQTT broker");
this.setupMqttHandlers();
resolve();
});
this.client.on("error", (error: Error) => {
clearTimeout(timeout);
this.log(`❌ MQTT connection error: ${error.message}, requesting worker reset`);
this.sendMessage("reset", { reason: "mqtt_error", error: error.message });
reject(error);
});
});
}
private async setupModbus() {
this.logger.info("🔧 Setting up Modbus connection...");
// 从设备信息中动态创建Modbus配置
const host = this.config.deviceInfo.ip;
const port = parseInt(this.config.deviceInfo.port) || 502;
const unitId = parseInt(this.config.deviceInfo.slaveId) || 1;
this.logger.debug("📋 Modbus connection parameters", {
host,
port,
unitId,
deviceName: this.config.deviceInfo.deviceName,
brandName: this.config.deviceInfo.brandName,
protocolType: this.config.deviceInfo.protocolType
});
this.log(`Creating Modbus config from device info: ${host}:${port}, unitId: ${unitId}`);
this.log(`Device: ${this.config.deviceInfo.deviceName} (${this.config.deviceInfo.brandName})`);
this.log(`Protocol: ${this.config.deviceInfo.protocolType}`);
// 获取设备品牌特定的轮询间隔
const pollInterval = getDefaultPollInterval(this.config.deviceInfo.brandName);
this.logger.debug("⏱️ Poll interval configuration", {
brandName: this.config.deviceInfo.brandName,
pollInterval: `${pollInterval}ms`
});
this.log(`Using poll interval for ${this.config.deviceInfo.brandName || "通用"}: ${pollInterval}ms`);
let pollConfig: Array<{
fn: "readHoldingRegisters" | "readInputRegisters" | "readCoils" | "readDiscreteInputs";
start: number;
len: number;
mqttKey: string;
bind?: string;
}> = [];
// 检查是否有寄存器配置
this.logger.trace("🔍 Checking for registers configuration in device info...");
// 首先检查 deviceInfo.registers
let registersConfigString: string | undefined;
if (this.config.deviceInfo.registers) {
registersConfigString = this.config.deviceInfo.registers;
this.logger.info("✅ Found registers configuration in deviceInfo.registers");
this.logger.trace("📝 Raw registers config from deviceInfo", { config: registersConfigString });
this.log("✅ Found registers configuration in deviceInfo.registers");
} else {
// 如果 deviceInfo.registers 不存在,则检查 actionParameters
const registersParam = this.config.actionParameters?.find(param => param.key === "registers");
if (registersParam && registersParam.value) {
registersConfigString = registersParam.value;
this.logger.info("✅ Found registers configuration in action parameters");
this.logger.trace("📝 Raw registers config from actionParameters", { config: registersConfigString });
this.log("✅ Found registers configuration in action parameters");
}
}
if (registersConfigString) {
this.log(`📝 Raw registers config: ${registersConfigString}`);
// 验证JSON格式
try {
const parsedConfig = JSON.parse(registersConfigString);
this.logger.debug("✅ Registers configuration JSON validation passed", {
registersCount: Array.isArray(parsedConfig) ? parsedConfig.length : 'not_array',
structure: parsedConfig
});
this.log("✅ Registers configuration is valid JSON");
} catch (error) {
this.logger.error("❌ Invalid JSON in registers configuration", {
error: (error as Error).message,
rawConfig: registersConfigString
});
this.log(`❌ Invalid JSON in registers configuration: ${(error as Error).message}`);
pollConfig = [];
return;
}
// 解析寄存器配置
const registers = perfMonitor.measure("register_parsing", () =>
parseRegistersFromString(registersConfigString!)
);
this.logger.info("📊 Register parsing completed", {
totalRegisters: registers.length,
registers: registers.map(reg => ({
name: reg.name,
fnCode: reg.fnCode,
address: reg.regAddress,
count: reg.regCount
}))
});
this.log(`📊 Parsed ${registers.length} register definitions:`);
if (registers.length === 0) {
this.logger.warn("⚠️ No valid register definitions found after parsing");
this.log("⚠️ No valid register definitions found after parsing");
}
registers.forEach((reg, index) => {
this.logger.trace(`Register definition [${index + 1}]`, {
name: reg.name,
fnCode: reg.fnCode,
fnDescription: this.getFunctionCodeDescription(reg.fnCode),
address: reg.regAddress,
count: reg.regCount
});
this.log(` [${index + 1}] ${reg.name}:`);
this.log(` - Function Code: ${reg.fnCode} (${this.getFunctionCodeDescription(reg.fnCode)})`);
this.log(` - Address: ${reg.regAddress}`);
this.log(` - Count: ${reg.regCount}`);
});
// 使用解析的寄存器创建轮询配置
pollConfig = perfMonitor.measure("poll_config_creation", () =>
createModbusPollConfigFromRegisters(registers)
);
this.logger.info("🔄 Polling configuration created", {
totalTasks: pollConfig.length,
tasks: pollConfig.map(config => ({
function: config.fn,
startAddress: config.start,
length: config.len,
mqttKey: config.mqttKey
}))
});
this.log(`🔄 Created ${pollConfig.length} polling tasks from register definitions:`);
pollConfig.forEach((config, index) => {
this.log(` [${index + 1}] ${config.fn}:`);
this.log(` - Start Address: ${config.start}`);
this.log(` - Length: ${config.len}`);
this.log(` - MQTT Key: ${config.mqttKey}`);
});
} else {
this.logger.warn("❌ No registers configuration found");
this.log("❌ No registers configuration found");
this.log("💡 Expected registers configuration in deviceInfo.registers or actionParameters");
this.log("📝 Example: [{\"fnCode\":\"6\",\"name\":\"button1\",\"regCount\":\"1\",\"regAddress\":\"1\"}]");
pollConfig = [];
}
// 动态创建Modbus配置
this.config.modbus = {
host: host,
port: port,
unitId: unitId,
poll: pollConfig,
pollInterval: pollInterval
};
this.modbus = new ModbusManager(
this.config.modbus.host,
this.config.modbus.port,
this.config.modbus.unitId,
(msg) => this.log(`[Modbus] ${msg}`)
);
try {
await this.modbus.init();
} catch (error) {
const errorMessage = (error as Error).message;
this.log(`❌ Modbus initialization failed: ${errorMessage}`);
// Report initialization error without auto-reset
this.sendMessage("error", { error: `Modbus initialization error: ${errorMessage}` });
throw error;
}
// 只有在有轮询配置时才启动循环读取任务
if (this.config.modbus.poll.length > 0) {
this.modbusPollTimer = setInterval(
() => { void this.pollModbus(); },
this.config.modbus.pollInterval
);
this.log(`Modbus setup completed - polling ${this.config.modbus.poll.length} register groups every ${this.config.modbus.pollInterval}ms`);
} else {
this.log(`Modbus setup completed - no polling configured, only write operations will be available`);
}
}
private async pollModbus() {
try {
// 确保Modbus配置存在
if (!this.config.modbus) {
this.logger.error("❌ Modbus configuration not available for polling");
this.log("❌ Modbus configuration not available for polling");
return;
}
const pollStartTime = performance.now();
this.logger.debug("🔄 Starting Modbus polling cycle", {
totalGroups: this.config.modbus.poll.length,
pollInterval: this.config.modbus.pollInterval
});
this.log(`🔄 Starting Modbus polling cycle - ${this.config.modbus.poll.length} register groups to read`);
for (const [index, item] of this.config.modbus.poll.entries()) {
const itemStartTime = performance.now();
try {
this.logger.trace(`📖 Reading register group [${index + 1}/${this.config.modbus.poll.length}]`, {
function: item.fn,
startAddress: item.start,
length: item.len,
mqttKey: item.mqttKey
});
this.log(`📖 [${index + 1}/${this.config.modbus.poll.length}] Reading ${item.fn} - Address: ${item.start}, Length: ${item.len}`);
const values = await perfMonitor.measureAsync(`modbus_read_${item.mqttKey}`, async () =>
this.modbus.enqueueRead(item.fn, item.start, item.len)
);
const readDuration = performance.now() - itemStartTime;
this.logger.debug("✅ Modbus read successful", {
mqttKey: item.mqttKey,
values,
duration: `${readDuration.toFixed(2)}ms`,
valuesCount: Array.isArray(values) ? values.length : 1
});
this.log(`✅ Read successful: ${JSON.stringify(values)}`);
// 将寄存器数据存储到information数组中
const registerName = item.mqttKey.split('/').pop() || `register_${item.start}`;
const bind = item.bind || registerName;
const registerValues = Array.isArray(values) ? values : [values];
const timestamp = new Date().toISOString();
// 更新或添加到information数组
this.updateRegisterInformation(registerName, bind, registerValues, item.start, item.len, item.fn, timestamp);
this.logger.trace("📦 Register data stored in information", {
registerName,
address: item.start,
count: item.len,
values: registerValues,
function: item.fn,
totalInformation: this.state.information.length
});
this.log(`📦 Stored register data in information: ${registerName} = ${JSON.stringify(registerValues)}`);
} catch (error) {
const readDuration = performance.now() - itemStartTime;
const errorMessage = (error as Error).message;
this.logger.error("❌ Modbus read error", {
mqttKey: item.mqttKey,
function: item.fn,
startAddress: item.start,
length: item.len,
error: errorMessage,
duration: `${readDuration.toFixed(2)}ms`
});
this.log(`❌ Modbus read error for ${item.mqttKey}: ${errorMessage}`);
this.log(`🔧 Failed operation: ${item.fn} at address ${item.start}, length ${item.len}`);
// 检查是否为连接失效错误如果是则请求重置worker
if (this.isConnectionError(errorMessage)) {
this.log(`❌ Modbus connection failure detected, requesting worker reset`);
this.sendMessage("reset", {
reason: "modbus_connection_failure",
error: `Modbus connection error: ${errorMessage}`
});
return; // 停止当前轮询周期
}
}
}
const totalDuration = performance.now() - pollStartTime;
this.logger.debug("🏁 Modbus polling cycle completed", {
totalDuration: `${totalDuration.toFixed(2)}ms`,
averagePerGroup: `${(totalDuration / this.config.modbus.poll.length).toFixed(2)}ms`,
groupsProcessed: this.config.modbus.poll.length
});
this.log(`🏁 Modbus polling cycle completed`);
// 记录内存使用情况每10次轮询记录一次
if (Math.random() < 0.1) {
logMemoryUsage("DEVICE_SIMULATOR");
}
} catch (error) {
// 捕获整个轮询周期的未预期错误
const errorMessage = (error as Error).message;
this.logger.error("❌ Unexpected error in Modbus polling cycle", {
error: errorMessage,
stack: (error as Error).stack
});
this.log(`❌ Unexpected error in Modbus polling cycle: ${errorMessage}`);
// 检查是否为连接相关错误
if (this.isConnectionError(errorMessage)) {
this.log(`❌ Polling cycle connection failure detected, requesting worker reset`);
this.sendMessage("reset", {
reason: "modbus_polling_failure",
error: `Modbus polling error: ${errorMessage}`
});
}
}
}
// 检查是否为连接相关错误的辅助方法
private isConnectionError(errorMessage: string): boolean {
const connectionErrors = [
"no connection",
"connection lost",
"connection closed",
"connection refused",
"timeout",
"timed out",
"ECONNRESET",
"ENOTCONN",
"ECONNREFUSED",
"ETIMEDOUT",
"EPIPE",
"ENETUNREACH",
"EHOSTUNREACH"
];
return connectionErrors.some(keyword =>
errorMessage.toLowerCase().includes(keyword.toLowerCase())
);
}
private setupMqttHandlers() {
// 订阅instantActions主题
const instantActionsTopic = `${this.config.vdaInterface}/${this.config.vehicle.vdaVersion}/${this.config.vehicle.manufacturer}/${this.config.vehicle.serialNumber}/instantActions`;
this.client.subscribe(instantActionsTopic, (err: Error | null) => {
if (err) {
this.log(`Failed to subscribe to ${instantActionsTopic}: ${err.message}`);
} else {
this.log(`Subscribed to ${instantActionsTopic}`);
}
});
this.client.on("message", (topic: string, message: Uint8Array) => {
try {
const data = JSON.parse(new TextDecoder().decode(message));
this.log(`📥 Received message on ${topic}`);
if (topic.endsWith("/instantActions")) {
this.handleInstantActions(data);
}
} catch (error) {
this.log(`Error parsing message: ${(error as Error).message}`);
}
});
this.client.on("disconnect", () => {
this.isConnected = false;
this.log("❌ Disconnected from MQTT broker, requesting worker reset");
this.sendMessage("reset", { reason: "mqtt_disconnect", error: "MQTT broker disconnected" });
});
this.client.on("error", (error: Error) => {
this.log(`❌ MQTT runtime error: ${error.message}, requesting worker reset`);
this.sendMessage("reset", { reason: "mqtt_runtime_error", error: error.message });
});
}
private initializeState() {
const now = new Date().toISOString();
this.state = {
headerId: this.headerId++,
timestamp: now,
version: this.config.vehicle.vdaVersion,
manufacturer: this.config.vehicle.manufacturer,
serialNumber: this.config.vehicle.serialNumber,
orderId: "order_0",
orderUpdateId: 0,
zoneSetId: "zone_0",
lastNodeId: "node_0",
lastNodeSequenceId: 0,
driving: false,
paused: false,
newBaseRequest: false,
distanceSinceLastNode: 0,
operatingMode: "AUTOMATIC",
nodeStates: [],
edgeStates: [],
agvPosition: {
x: 0,
y: 0,
theta: 0,
mapId: "map_1",
mapDescription: "Device Map",
positionInitialized: true,
localizationScore: 1.0,
deviationRange: 0.1,
},
velocity: {
vx: 0,
vy: 0,
omega: 0,
},
loads: [],
batteryState: {
batteryCharge: 85.5,
batteryVoltage: 24.2,
batteryHealth: 95.0,
charging: false,
reach: 8500,
},
errors: [],
information: [],
safetyState: {
eStop: "NONE",
fieldViolation: false,
},
actionStates: [],
waitingForInteractionZoneRelease: false,
};
}
private initializeConnection() {
const now = new Date().toISOString();
this.connection = {
headerId: this.headerId++,
timestamp: now,
version: this.config.vehicle.vdaVersion,
manufacturer: this.config.vehicle.manufacturer,
serialNumber: this.config.vehicle.serialNumber,
connectionState: "ONLINE",
};
}
private startPeriodicReporting() {
// 每2秒发送状态
this.stateInterval = setInterval(() => {
this.publishState();
}, 2000);
// 每1秒发送连接状态
this.connectionInterval = setInterval(() => {
this.publishConnection();
}, 1000);
// 立即发送一次
this.publishState();
this.publishConnection();
}
private publishState() {
if (!this.isConnected) return;
this.state.headerId = this.headerId++;
this.state.timestamp = new Date().toISOString();
this.state.actionStates = [...this.deviceActions];
const topic = `${this.config.vdaInterface}/${this.config.vehicle.vdaVersion}/${this.config.vehicle.manufacturer}/${this.config.vehicle.serialNumber}/state`;
const message = JSON.stringify(this.state, null, 2);
this.client.publish(topic, message, (err?: Error) => {
if (err) {
this.log(`Failed to publish state: ${err.message}`);
} else {
// this.log(`📤 Published state (headerId: ${this.state.headerId})`);
}
});
}
private publishConnection() {
if (!this.isConnected) return;
this.connection.headerId = this.headerId++;
this.connection.timestamp = new Date().toISOString();
const topic = `${this.config.vdaInterface}/${this.config.vehicle.vdaVersion}/${this.config.vehicle.manufacturer}/${this.config.vehicle.serialNumber}/connection`;
const message = JSON.stringify(this.connection, null, 2);
this.client.publish(topic, message, (err?: Error) => {
if (err) {
this.log(`Failed to publish connection: ${err.message}`);
} else {
// this.log(`📤 Published connection (headerId: ${this.connection.headerId})`);
}
});
}
private handleInstantActions(message: any) {
this.log(`Processing instantActions message`);
try {
// 支持两种格式标准VDA 5050的instantActions和实际使用的actions
const actions = message.instantActions || message.actions || [];
if (!Array.isArray(actions)) {
this.log("❌ Invalid actions format - not an array");
return;
}
for (const action of actions) {
// 检查是否为Modbus写操作
if (action.actionType === "deviceWrite") {
this.handleModbusWrite(action);
} else {
this.processDeviceAction(action);
}
}
} catch (error) {
this.log(`❌ Error processing instantActions: ${(error as Error).message}`);
}
}
private async handleModbusWrite(action: any) {
this.log(`🔧 Processing Modbus write action: ${action.actionId}`);
console.log("收到 Modbus 写入动作消息", action);
try {
// 提取设备信息
const getParam = (key: string) => action.actionParameters?.find((p: any) => p.key === key)?.value;
const protocolType = getParam("protocolType");
const brandName = getParam("brandName");
const ip = getParam("ip");
const port = getParam("port");
const deviceName = getParam("deviceName");
const slaveId = getParam("slaveId");
const registersParam = getParam("registers");
this.log(`📋 设备信息: ${deviceName} (${brandName}) - ${ip}:${port}, SlaveID: ${slaveId}, 协议: ${protocolType}`);
if (!registersParam) {
this.log(`❌ Missing registers parameter in Modbus write action`);
return;
}
// 解析寄存器数组
let registers: any[];
try {
registers = JSON.parse(registersParam);
if (!Array.isArray(registers)) {
throw new Error("Registers parameter is not an array");
}
} catch (error) {
this.log(`❌ Failed to parse registers parameter: ${(error as Error).message}`);
return;
}
this.log(`📝 解析到 ${registers.length} 个寄存器写入操作:`);
registers.forEach((reg, index) => {
this.log(` [${index + 1}] ${reg.name}: 功能码${reg.fnCode}, 地址${reg.regAddress}, 值${reg.regValue}`);
});
// 创建动作状态
const actionState: ActionState = {
actionType: action.actionType,
actionId: action.actionId,
actionDescription: `Modbus write ${registers.length} registers to ${deviceName}`,
actionStatus: "RUNNING",
actionParameters: action.actionParameters || [],
blockingType: action.blockingType || "NONE",
};
this.deviceActions.push(actionState);
// 执行所有寄存器写操作
const writeResults: any[] = [];
let successCount = 0;
let failureCount = 0;
for (const [index, register] of registers.entries()) {
try {
const address = Number(register.regAddress);
const value = Number(register.regValue);
const fnCode = register.fnCode;
const registerName = register.name;
this.log(`🔧 [${index + 1}/${registers.length}] 写入寄存器 ${registerName}: 地址${address}, 值${value}, 功能码${fnCode}`);
// 根据功能码确定写入类型和数据格式
let functionType: "writeSingleRegister" | "writeMultipleRegisters";
let payload: number | number[];
if (fnCode === "6") {
functionType = "writeSingleRegister";
payload = value; // 单个寄存器写入,使用单个数值
} else if (fnCode === "16" || fnCode === "10") {
functionType = "writeMultipleRegisters";
payload = [value]; // 多个寄存器写入,需要数组格式
} else {
throw new Error(`Unsupported function code: ${fnCode}`);
}
this.log(`📡 执行 ${functionType}(地址${address}, 数据${JSON.stringify(payload)})`);
// 执行Modbus写操作
await this.modbus.enqueueWrite(functionType, address, payload);
writeResults.push({
registerName,
address,
value,
fnCode,
status: "success"
});
successCount++;
this.log(`✅ 寄存器 ${registerName} 写入成功`);
} catch (error) {
const errorMsg = (error as Error).message;
this.log(`❌ 寄存器 ${register.name} 写入失败: ${errorMsg}`);
// 检查是否为连接失效错误
if (this.isConnectionError(errorMsg)) {
this.log(`❌ Modbus write connection failure detected, requesting worker reset`);
this.sendMessage("reset", {
reason: "modbus_write_connection_failure",
error: `Modbus write connection error: ${errorMsg}`
});
return; // 停止当前写操作
}
writeResults.push({
registerName: register.name,
address: register.regAddress,
value: register.regValue,
fnCode: register.fnCode,
status: "error",
error: errorMsg
});
failureCount++;
}
}
// 更新动作状态
if (failureCount === 0) {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = `All ${successCount} registers written successfully`;
this.log(`✅ 所有 ${successCount} 个寄存器写入完成`);
} else if (successCount === 0) {
actionState.actionStatus = "FAILED";
actionState.resultDescription = `All ${failureCount} registers failed to write`;
this.log(`❌ 所有 ${failureCount} 个寄存器写入失败`);
} else {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = `Partial success: ${successCount} succeeded, ${failureCount} failed`;
this.log(`⚠️ 部分成功: ${successCount} 个成功, ${failureCount} 个失败`);
}
// 发布写操作结果的MQTT消息
const topic = `${this.config.vdaInterface}/${this.config.vehicle.vdaVersion}/${this.config.vehicle.manufacturer}/${this.config.vehicle.serialNumber}/writeResult`;
const resultPayload = JSON.stringify({
timestamp: new Date().toISOString(),
deviceId: this.config.deviceId,
actionId: action.actionId,
deviceInfo: {
deviceName,
brandName,
ip,
port,
slaveId,
protocolType
},
writeResults,
summary: {
total: registers.length,
success: successCount,
failure: failureCount
},
status: failureCount === 0 ? "success" : (successCount === 0 ? "error" : "partial")
});
this.client.publish(topic, resultPayload);
} catch (error) {
this.log(`❌ Modbus write action failed: ${(error as Error).message}`);
// 更新动作状态为失败
const actionState = this.deviceActions.find(a => a.actionId === action.actionId);
if (actionState) {
actionState.actionStatus = "FAILED";
actionState.resultDescription = `Modbus write action failed: ${(error as Error).message}`;
}
// 发布写操作失败的MQTT消息
const topic = `${this.config.vdaInterface}/${this.config.vehicle.vdaVersion}/${this.config.vehicle.manufacturer}/${this.config.vehicle.serialNumber}/writeResult`;
const resultPayload = JSON.stringify({
timestamp: new Date().toISOString(),
deviceId: this.config.deviceId,
actionId: action.actionId,
status: "error",
error: (error as Error).message
});
this.client.publish(topic, resultPayload);
}
}
private processDeviceAction(action: any) {
this.log(`🔧 Processing device action: ${action.actionType}`);
// 创建动作状态
const actionState: ActionState = {
actionType: action.actionType,
actionId: action.actionId,
actionDescription: action.actionDescription || `Device action: ${action.actionType}`,
actionStatus: "RUNNING",
actionParameters: action.actionParameters || [],
blockingType: action.blockingType || "NONE",
};
// 添加到设备动作列表
this.deviceActions.push(actionState);
// 模拟设备操作
this.simulateDeviceOperation(actionState);
}
private simulateDeviceOperation(actionState: ActionState) {
this.log(`🎯 Simulating device operation: ${actionState.actionType}`);
// 根据动作类型执行不同的模拟操作
switch (actionState.actionType) {
case "deviceSetup":
this.simulateDeviceSetup(actionState);
break;
case "deviceWrite":
this.simulateDeviceWrite(actionState);
break;
case "deviceRead":
this.simulateDeviceRead(actionState);
break;
case "deviceStop":
this.simulateDeviceStop(actionState);
break;
case "deviceDelete":
this.simulateDeviceDelete(actionState);
break;
default:
this.log(`⚠️ Unknown device action type: ${actionState.actionType}`);
actionState.actionStatus = "FAILED";
actionState.resultDescription = `Unknown action type: ${actionState.actionType}`;
}
}
private simulateDeviceSetup(actionState: ActionState) {
this.log(`📱 Setting up device with parameters:`);
if (actionState.actionParameters) {
for (const param of actionState.actionParameters) {
this.log(` ${param.key}: ${param.value}`);
}
}
// 模拟设备设置过程
setTimeout(() => {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = "Device setup completed successfully";
this.log(`✅ Device setup completed for action ${actionState.actionId}`);
}, 2000);
}
private simulateDeviceWrite(actionState: ActionState) {
this.log(`✍️ Writing to device registers`);
// 模拟写入操作
setTimeout(() => {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = "Device write operation completed";
this.log(`✅ Device write completed for action ${actionState.actionId}`);
}, 1000);
}
private simulateDeviceRead(actionState: ActionState) {
this.log(`📖 Reading from device registers`);
// 模拟读取操作
setTimeout(() => {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = "Device read operation completed";
this.log(`✅ Device read completed for action ${actionState.actionId}`);
}, 500);
}
private simulateDeviceStop(actionState: ActionState) {
this.log(`🛑 Processing device stop operation`);
if (actionState.actionParameters) {
this.log(`📱 Stopping device with parameters:`);
for (const param of actionState.actionParameters) {
this.log(` ${param.key}: ${param.value}`);
}
}
// 模拟设备停止过程
setTimeout(() => {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = "Device stop operation completed successfully";
this.log(`✅ Device stop completed for action ${actionState.actionId}`);
// 注意:实际的设备停止操作由主进程管理器处理
// 这里只是模拟设备层面的停止确认
}, 1000);
}
private simulateDeviceDelete(actionState: ActionState) {
this.log(`🗑️ Processing device delete operation`);
if (actionState.actionParameters) {
this.log(`📱 Deleting device with parameters:`);
for (const param of actionState.actionParameters) {
this.log(` ${param.key}: ${param.value}`);
}
}
// 模拟设备删除过程
setTimeout(async () => {
actionState.actionStatus = "FINISHED";
actionState.resultDescription = "Device delete operation completed successfully";
this.log(`✅ Device delete completed for action ${actionState.actionId}`);
// 发送最后一次状态更新,包含删除完成的动作状态
this.publishState();
// 等待一小段时间确保状态发送完成
setTimeout(async () => {
this.log(`🔄 Device delete action completed, initiating worker shutdown...`);
// 主动关闭设备模拟器 worker
await this.close();
}, 500);
}, 1000);
}
async handleAction(actionMessage: any) {
this.log(`Handling external action: ${actionMessage.actionType}`);
this.processDeviceAction(actionMessage);
}
async reconnect() {
this.log("Reconnecting to MQTT broker...");
try {
if (this.client) {
await this.client.endAsync();
}
await this.connectMqtt();
this.log("Reconnected successfully");
this.sendMessage("status", { status: "reconnected" });
} catch (error) {
this.log(`Reconnection failed: ${(error as Error).message}`);
this.sendMessage("error", { error: (error as Error).message });
}
}
async close() {
this.log("Closing device simulator...");
// 清理定时器
if (this.stateInterval) {
clearInterval(this.stateInterval);
}
if (this.connectionInterval) {
clearInterval(this.connectionInterval);
}
// 清理Modbus定时器和连接
if (this.modbusPollTimer) {
clearInterval(this.modbusPollTimer);
}
if (this.modbus) {
try {
await this.modbus.close();
this.log("Modbus connection closed");
} catch (error) {
this.log(`Error closing Modbus connection: ${(error as Error).message}`);
}
}
// 关闭MQTT连接
if (this.client) {
try {
await this.client.endAsync();
this.log("MQTT connection closed");
} catch (error) {
this.log(`Error closing MQTT connection: ${(error as Error).message}`);
}
}
this.sendMessage("status", { status: "closed" });
}
}
// Worker消息处理
const simulator = new DeviceSimulator();
// 全局错误处理器
self.addEventListener('error', (event) => {
console.error('❌ Uncaught error in device simulator worker:', event.error);
// 检查是否为连接相关错误
const errorMessage = event.error?.message || event.message || 'Unknown error';
const isConnError = [
"no connection", "connection lost", "connection closed", "connection refused",
"timeout", "timed out", "ECONNRESET", "ENOTCONN", "ECONNREFUSED",
"ETIMEDOUT", "EPIPE", "ENETUNREACH", "EHOSTUNREACH"
].some(keyword => errorMessage.toLowerCase().includes(keyword.toLowerCase()));
if (isConnError) {
console.log('❌ Global error handler detected connection failure, requesting worker reset');
self.postMessage({
type: "reset",
data: {
reason: "uncaught_connection_error",
error: `Uncaught connection error: ${errorMessage}`
}
} as DeviceMainMessage);
} else {
self.postMessage({
type: "error",
data: { error: errorMessage }
} as DeviceMainMessage);
}
});
// 全局Promise拒绝处理器
self.addEventListener('unhandledrejection', (event) => {
console.error('❌ Unhandled promise rejection in device simulator worker:', event.reason);
// 检查是否为连接相关错误
const errorMessage = event.reason?.message || event.reason || 'Unknown promise rejection';
const isConnError = [
"no connection", "connection lost", "connection closed", "connection refused",
"timeout", "timed out", "ECONNRESET", "ENOTCONN", "ECONNREFUSED",
"ETIMEDOUT", "EPIPE", "ENETUNREACH", "EHOSTUNREACH"
].some(keyword => String(errorMessage).toLowerCase().includes(keyword.toLowerCase()));
if (isConnError) {
console.log('❌ Global promise rejection handler detected connection failure, requesting worker reset');
self.postMessage({
type: "reset",
data: {
reason: "uncaught_promise_rejection",
error: `Uncaught promise rejection: ${errorMessage}`
}
} as DeviceMainMessage);
} else {
self.postMessage({
type: "error",
data: { error: String(errorMessage) }
} as DeviceMainMessage);
}
// 阻止默认的错误处理
event.preventDefault();
});
self.onmessage = async (event: MessageEvent<DeviceWorkerMessage>) => {
const { type, data } = event.data;
try {
switch (type) {
case "init":
await simulator.initialize(data);
break;
case "action":
await simulator.handleAction(data);
break;
case "reconnect":
await simulator.reconnect();
break;
case "close":
await simulator.close();
break;
default:
console.warn(`Unknown message type: ${type}`);
}
} catch (error) {
console.error(`Error handling message ${type}:`, error);
// 检查是否为连接相关错误
const errorMessage = (error as Error).message;
const isConnError = [
"no connection", "connection lost", "connection closed", "connection refused",
"timeout", "timed out", "ECONNRESET", "ENOTCONN", "ECONNREFUSED",
"ETIMEDOUT", "EPIPE", "ENETUNREACH", "EHOSTUNREACH"
].some(keyword => errorMessage.toLowerCase().includes(keyword.toLowerCase()));
if (isConnError) {
console.log(`❌ Message handler detected connection failure, requesting worker reset`);
self.postMessage({
type: "reset",
data: {
reason: "message_handler_connection_error",
error: `Message handler connection error: ${errorMessage}`
}
} as DeviceMainMessage);
} else {
self.postMessage({
type: "error",
data: { error: errorMessage }
} as DeviceMainMessage);
}
}
};