api-amr/master_manager.ts

309 lines
11 KiB
TypeScript
Raw Permalink Normal View History

2025-06-04 19:15:02 +08:00
// master_manager.ts
// Module to setup and handle the Master Controller (VDA 5050) Worker
import { initAgvWorker, setupAgvWorker } from './agv_manager.ts';
/**
* Sets up the Master Controller Worker, wiring message handlers and returning the instance.
* @param kv Deno KV instance for storing/retrieving AGV state
* @param webWorker Web Worker for UI updates
* @param masterWorker The master VDA 5050 Worker to forward AGV events to
* @param agvWorker The AGV Worker instance
* @returns The initialized Master Controller Worker
*/
export function setupMasterWorker(
kv: Deno.Kv,
webWorker: Worker,
masterWorker: Worker,
agvWorker: Worker,
config: any,
mappings: any,
instanceId: string
): Worker {
console.log("VDA 5050 Master Controller Worker 已启动", instanceId);
masterWorker.onmessage = async (event: MessageEvent) => {
const message = event.data;
try {
switch (message.type) {
case "started":
// 同步通知 Master Worker 初始化完成
masterWorker.postMessage({ type: "started" });
break;
case "factsheet": {
// console.log("收到 master Worker factsheet 消息", message);
const { agvId, factsheet, timestamp } = message.data;
const deviceKey = `device-factsheet:${agvId.manufacturer}/${agvId.serialNumber}`;
const res = await kv.get([deviceKey]);
const newData = { ...(res.value || {}), agvId, factsheet, timestamp };
await kv.set([deviceKey], newData);
agvWorker.postMessage({ type: "factsheetResponse", data: { agvId, factsheet, timestamp } });
break
}
case "connectionState": {
const { agvId, state, timestamp } = message.data;
const deviceKey = `device-online:${agvId.manufacturer}/${agvId.serialNumber}`;
const res = await kv.get([deviceKey]);
const newData = { ...(res.value || {}), agvId, state, lastSeen: timestamp };
let needsUpdate = false;
if (!res.value) {
needsUpdate = true;
} else {
const existing = res.value as any;
if (existing.state !== state || existing.lastSeen !== timestamp ||
JSON.stringify(existing.agvId) !== JSON.stringify(agvId)) {
needsUpdate = true;
}
}
if (needsUpdate) {
// console.log("1----------------更新设备状态---------------<",deviceKey, newData);
await kv.set([deviceKey], newData);
}
masterWorker.postMessage({ type: "connectionState", data: { agvId, state, timestamp } });
break;
}
case "stateUpdate": {
const { agvId, state, timestamp } = message.data;
const deviceKey = `device:${agvId.manufacturer}/${agvId.serialNumber}`;
const res = await kv.get([deviceKey]);
const newData = { ...(res.value || {}), agvId, state, lastSeen: timestamp };
let needsUpdate = false;
if (!res.value) {
needsUpdate = true;
} else {
const existing = res.value as any;
if (existing.lastSeen !== timestamp ||
JSON.stringify(existing.state) !== JSON.stringify(state)) {
needsUpdate = true;
}
}
if (needsUpdate) {
// console.log("2----------------更新设备状态---------------<",deviceKey, newData);
await kv.set([deviceKey], newData);
}
masterWorker.postMessage({ type: "stateUpdate", data: { agvId, state, timestamp } });
webWorker.postMessage({ type: "positionUpdate", data: {
agvId: { manufacturer: agvId.manufacturer, serialNumber: agvId.serialNumber },
position: { x: state.agvPosition.x, y: state.agvPosition.y, theta: state.agvPosition.theta }
}});
break;
}
case "deviceDiscovered": {
// console.log("收到 deviceDiscovered 消息", message);
const { agvId, timestamp, isOnline } = message.data;
const deviceKey = `device-discovered:${agvId.manufacturer}/${agvId.serialNumber}`;
const res = await kv.get([deviceKey]);
const newData = { agvId, lastSeen: timestamp, isOnline };
let needsUpdate = false;
if (!res.value) {
needsUpdate = true;
} else {
const existing = res.value as any;
if (existing.lastSeen !== timestamp || existing.isOnline !== isOnline ||
JSON.stringify(existing.agvId) !== JSON.stringify(agvId)) {
needsUpdate = true;
}
}
if (needsUpdate) {
// console.log("3----------------更新设备状态---------------<",deviceKey, newData);
await kv.set([deviceKey], newData);
}
masterWorker.postMessage({ type: "deviceDiscovered", data: { agvId, timestamp, isOnline } });
break;
}
case "devicesList":
masterWorker.postMessage({ type: "devicesList", data: message.data });
break;
case "orderSent":
masterWorker.postMessage({ type: "orderSent", orderId: message.orderId });
break;
case "orderCompleted":
masterWorker.postMessage({
type: "orderCompleted",
orderId: message.orderId,
withError: message.withError,
byCancelation: message.byCancelation,
active: message.active
});
break;
case "nodeTraversed":
masterWorker.postMessage({ type: "nodeTraversed", data: message.data });
break;
case "edgeTraversing":
masterWorker.postMessage({ type: "edgeTraversing", data: message.data });
break;
case "edgeTraversed":
masterWorker.postMessage({ type: "edgeTraversed", data: message.data });
break;
case "orderCancelled":
masterWorker.postMessage({ type: "orderCancelled", orderId: message.orderId });
break;
case "commandSettings":
masterWorker.postMessage({ type: "commandSettings", data: message.data });
break;
case "reconnect-all":
console.log("收到 master1 Worker reconnect-all 消息,准备重连");
reconnectAllWorker(kv, webWorker, masterWorker, agvWorker, config, mappings, instanceId);
break;
case "deviceRemoved": {
const { manufacturer, serialNumber, deviceKey, remainingDevices } = message.data;
console.log(`🗑️ 设备已从VDA Worker中移除: ${manufacturer}/${serialNumber}`);
console.log(`📊 剩余设备数量: ${remainingDevices}`);
// 清理KV存储中的设备数据
try {
const keysToDelete = [
`device:${manufacturer}/${serialNumber}`,
`device-online:${manufacturer}/${serialNumber}`,
`device-discovered:${manufacturer}/${serialNumber}`,
`device-factsheet:${manufacturer}/${serialNumber}`
];
for (const key of keysToDelete) {
await kv.delete([key]);
console.log(`🧹 已清理KV数据: ${key}`);
}
} catch (error) {
console.error(`❌ 清理KV数据失败: ${error}`);
}
break;
}
case "deviceListUpdated": {
const { total, added, updated, removed, devices } = message.data;
console.log(`📊 设备列表更新统计: 总数=${total}, 新增=${added}, 更新=${updated}, 移除=${removed}`);
break;
}
case "shutdown":
masterWorker.postMessage({ type: "shutdown" });
break;
default:
console.warn("收到未知类型的消息:", message);
}
} catch (err) {
console.error("处理消息时发生异常:", err);
}
};
masterWorker.onerror = (error: any) => {
console.error("Master Controller Worker 执行错误:", error);
};
return masterWorker;
}
// 定义读取并发送设备列表更新消息的函数
// 直接移除单个设备(不依赖文件更新)
export async function removeDeviceFromWorker(masterWorker: Worker, manufacturer: string, serialNumber: string) {
console.log(`🗑️ 正在从VDA Worker中移除设备: ${manufacturer}/${serialNumber}`);
masterWorker.postMessage({
type: "removeDevice",
data: { manufacturer, serialNumber }
});
}
export async function updateDeviceListFromConfig(masterWorker: Worker, config: any, instanceId: string) {
try {
console.log("🔄 开始读取设备配置文件...");
const text = await Deno.readTextFile("./devices.json");
const devices = JSON.parse(text);
if (Array.isArray(devices)) {
console.log(`📱 读取到 ${devices.length} 个设备配置:`);
devices.forEach((device, index) => {
console.log(` ${index + 1}. ${device.manufacturer}/${device.serialNumber}`);
});
// 发送初始化消息(如果需要)
masterWorker.postMessage({
type: "init",
data: {
brokerUrl: config.mqtt.brokerUrl,
interfaceName: config.interfaceName,
manufacturer: config.manufacturer,
instanceId: instanceId
},
});
// 将最新设备列表发送给 masterWorker
masterWorker.postMessage({ type: "updateDeviceList", data: devices });
console.log("✅ 设备列表已成功发送到 VDA Worker");
} else {
console.error("❌ 配置文件格式错误,要求为数组格式");
}
} catch (error) {
console.error("❌ 读取设备配置文件失败:", error);
if (error instanceof Deno.errors.NotFound) {
console.log("💡 提示devices.json 文件不存在,将使用空设备列表");
// 发送空设备列表
masterWorker.postMessage({ type: "updateDeviceList", data: [] });
}
}
}
// Flag to prevent concurrent full-worker reconnects
let isReconnectingAll = false;
export function reconnectAllWorker(
kv: Deno.Kv,
webWorker: Worker,
masterWorker: Worker,
agvWorker: Worker,
config: any,
mappings: any,
instanceId: string
): Worker {
if (isReconnectingAll) {
console.log("reconnectAllWorker: 已在重连中,忽略重复调用");
return masterWorker;
}
isReconnectingAll = true;
console.log("vda 收到 master Worker reconnect 消息,准备重连");
agvWorker.postMessage({
type: "shutdown"
});
masterWorker.postMessage({
type: "shutdown"
});
masterWorker.terminate();
agvWorker.terminate();
setTimeout( () => {
masterWorker.terminate();
agvWorker.terminate();
setTimeout( () => {
const agvWorker = new Worker(
new URL("./agv_worker.ts", import.meta.url).href,
{ type: "module" }
);
const masterWorker = new Worker(
new URL("./vda_worker.ts", import.meta.url).href,
{ type: "module" }
);
setupAgvWorker(kv, config, masterWorker, agvWorker, webWorker, mappings, instanceId);
initAgvWorker(agvWorker, config);
setupMasterWorker(kv, webWorker, masterWorker, agvWorker, config, mappings, instanceId);
updateDeviceListFromConfig(masterWorker, config, instanceId);
// Reset flag after reconnect completes
isReconnectingAll = false;
}, 2000);
}, 2000);
return masterWorker;
}