import mqtt, { MqttClient, IClientOptions } from "npm:mqtt"; // ==== 消息/配置类型 ==== /** init 消息接口,从主线程传入 */ interface InitMessage { type: "init"; mqtt: { brokerUrl: string; clientId: string; username?: string; password?: string; reconnectInterval?: number; qos?: 0 | 1 | 2; }; /** * 这个数组里每项配置一对 topic 和对应的映射规则 */ mappings: Array<{ /** MQTT 订阅的源 topic,可以包含 + 或 # 通配符 */ sourceTopic: string; /** 发布时使用的目标 topic 模板,可包含 {agvId} 等占位符 */ targetTopicTemplate: string; /** transformGeneric 用到的映射规则 */ mapping: Record; }>; instanceId: string; } /** 停止 Worker 的消息 */ interface StopMessage { type: "stop" | "shutdown"; // 支持 stop 和 shutdown } // transformGeneric 中用到的规则 type MappingRule = | string // 简单字符串映射,直接从源路径复制 | { op: "const"; value: any; source?: string; } | { type: "object"; op?: "object"; source?: string; mapping: Record; } | { type: "array"; op?: "array"; source: string; mapping: Record; } | { op: "toString"; source: string; } | { // 通用对象映射(没有 type 字段) mapping: Record; source?: string; }; // ==== 通用工具函数 ==== /** 按路径 a.b.c 从 obj 取值 */ function getByPath(obj: any, path: string): any { return path.split(".").reduce((o, key) => (o != null ? o[key] : undefined), obj); } /** 按路径 a.b.c 在 target 上写入 value */ function setByPath(target: any, path: string, value: any) { const keys = path.split("."); let cur = target; for (let i = 0; i < keys.length - 1; i++) { const k = keys[i]; if (!(k in cur) || typeof cur[k] !== "object") { cur[k] = {}; } cur = cur[k]; } cur[keys[keys.length - 1]] = value; } /** 通用的 JSON 转换器 */ function transformGeneric(raw: any, mapping: Record): any { const out: any = {}; for (const [outKey, rule] of Object.entries(mapping)) { try { // 处理简单字符串映射 if (typeof rule === "string") { const value = getByPath(raw, rule); if (value !== undefined) { setByPath(out, outKey, value); } continue; } // 处理对象规则 if (typeof rule === "object" && rule !== null) { // 常量值映射 if ("op" in rule && rule.op === "const") { setByPath(out, outKey, rule.value); continue; } // toString 操作 if ("op" in rule && rule.op === "toString") { const sourceValue = getByPath(raw, rule.source); if (sourceValue !== undefined) { // 如果已经是字符串,直接使用;否则进行 JSON 序列化 const stringValue = typeof sourceValue === "string" ? sourceValue : JSON.stringify(sourceValue); setByPath(out, outKey, stringValue); } continue; } // 对象映射 if ("type" in rule && rule.type === "object") { const subRaw = rule.source ? getByPath(raw, rule.source) : raw; if (subRaw !== undefined) { const transformedObj = transformGeneric(subRaw, rule.mapping); setByPath(out, outKey, transformedObj); } continue; } // 数组映射 if ("type" in rule && rule.type === "array") { const sourceArray = getByPath(raw, rule.source); if (Array.isArray(sourceArray)) { const transformedArray = sourceArray.map(item => transformGeneric(item, rule.mapping) ); setByPath(out, outKey, transformedArray); } else { setByPath(out, outKey, []); } continue; } // 如果是对象但没有特殊操作,尝试作为嵌套映射处理 if ("mapping" in rule && typeof rule.mapping === "object") { const mappingRule = rule as { mapping: Record; source?: string }; const subRaw = mappingRule.source ? getByPath(raw, mappingRule.source) : raw; if (subRaw !== undefined) { const transformedObj = transformGeneric(subRaw, mappingRule.mapping); setByPath(out, outKey, transformedObj); } continue; } } console.warn(`Unknown mapping rule for key "${outKey}":`, rule); } catch (error) { console.error(`Error processing mapping rule for key "${outKey}":`, error); } } return out; } /** * 判断一个 topic 是否匹配源 topic 模式(含 +, #) */ function topicMatches(pattern: string, topic: string): boolean { const patSeg = pattern.split("/"); const topSeg = topic.split("/"); for (let i = 0; i < patSeg.length; i++) { const p = patSeg[i]; const t = topSeg[i]; if (p === "#") return true; // 后面的全部都匹配 if (p === "+") continue; // 单层通配符 if (t === undefined) return false; if (p !== t) return false; } return patSeg.length === topSeg.length; } /** * 从 topic 根据 pattern 抽取 '+' 通配符对应的位置上的参数 * 目前只支持单个 {agvId} 占位 */ function extractAgvId(pattern: string, topic: string): string | null { const patSeg = pattern.split("/"); const topSeg = topic.split("/"); for (let i = 0; i < patSeg.length; i++) { if (patSeg[i] === "+") { return topSeg[i]; } } return null; } // ==== 全局状态 ==== let mqttClient: MqttClient | null = null; let reconnectTimer: any = null; const CONFIG: { mqtt: { brokerUrl: string; clientId: string; username?: string; password?: string; reconnectInterval?: number; qos?: 0 | 1 | 2; }; mappings: Array<{ sourceTopic: string; targetTopicTemplate: string; mapping: Record; }>; } = { mqtt: { brokerUrl: "", clientId: "", username: undefined, password: undefined, reconnectInterval: 5000, qos: 1, }, mappings: [], }; // instance identifier generated from config.manufacturer and UUID let INSTANCE_ID: string = ""; // 连续 MQTT 错误计数,只有超过10次才触发重连 let consecutiveErrors = 0; // ==== MQTT 逻辑 ==== async function connectAndSubscribe() { try { const opts: IClientOptions = { clientId: CONFIG.mqtt.clientId, username: CONFIG.mqtt.username, password: CONFIG.mqtt.password, reconnectPeriod: 3000, // 我们自己重连 }; mqttClient = mqtt.connect(CONFIG.mqtt.brokerUrl, opts); // 预先注册错误、断开和关闭处理器 mqttClient.on("error", onMqttError); mqttClient.on("disconnect", onMqttDisconnect); mqttClient.on("close", onMqttClose); mqttClient.on("connect", async () => { // 连接成功后重置错误计数 consecutiveErrors = 0; // 订阅所有 mappings 里配置的源 topic for (const m of CONFIG.mappings) { const inTopic = m.sourceTopic .replace("{instanceId}", INSTANCE_ID); // console.log("inTopic", inTopic); try { await mqttClient!.subscribe(inTopic, { qos: CONFIG.mqtt.qos! }); } catch (err) { console.error(`✗ MQTT subscribe failed for ${inTopic}:`, err); self.postMessage({ type: "error", error: "subscribe_failed", details: (err as Error).message }); scheduleReconnect(); return; // 订阅失败,中断后续处理,尝试重连 } } // 订阅完成后监听消息 mqttClient!.on("message", onMqttMessage); if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } self.postMessage({ type: "status", status: "connected" }); }); } catch (err) { console.error("✗ MQTT connect failed:", err); self.postMessage({ type: "error", error: "connect_failed", details: err instanceof Error ? err.message : String(err), }); scheduleReconnect(); } } async function onMqttMessage(topic: string, payload: Uint8Array) { try { const raw = JSON.parse(new TextDecoder().decode(payload)); // 对每个匹配的 mappingConfig,做一次转换并发布 for (const m of CONFIG.mappings) { const inTopic = m.sourceTopic .replace("{instanceId}", INSTANCE_ID); if (!topicMatches(inTopic, topic)) continue; // 提取占位符 agvId const agvId = extractAgvId(inTopic, topic) ?? ""; // 转换 const transformed = transformGeneric(raw, m.mapping); // 填充目标 topic const outTopic = m.targetTopicTemplate .replace("{instanceId}", INSTANCE_ID) .replace("{agvId}", agvId); // 发布 // console.log("outTopic", outTopic); await mqttClient!.publish(outTopic, JSON.stringify(transformed), { qos: CONFIG.mqtt.qos!, }); self.postMessage({ type: "published", topic: outTopic, agvId, sourceTopic: topic, }); } } catch (err) { console.error("✗ onMqttMessage error:", err); self.postMessage({ type: "error", error: "process_message", details: err instanceof Error ? err.message : String(err), }); } } function onMqttClose() { console.log("vda onMqttClose"); self.postMessage({ type: "status", status: "closed" }); // 确保在 close 时释放底层 socket if (mqttClient) { mqttClient.end(true); mqttClient = null; } self.postMessage({ type: "reconnect-down" }); // connectAndSubscribe(); } function onMqttDisconnect() { self.postMessage({ type: "status", status: "disconnected" }); console.log("vda onMqttDisconnect"); // 断开时也关闭客户端,避免累积未关闭的连接 if (mqttClient) { mqttClient.end(true); mqttClient = null; } scheduleReconnect(); } function onMqttError(err: Error) { // 如果是连接被拒绝错误,通知主线程重启所有 worker const anyErr = err as any; if (anyErr.code === 'ECONNREFUSED' || anyErr.code === 'ECONNRESET') { console.error('❌ MQTT connection refused, requesting full restart:', anyErr); self.postMessage({ type: 'reconnect-all' }); } // 增加错误计数,只在超过10次后才重连 consecutiveErrors++; console.error(`! MQTT error (#${consecutiveErrors}):`, err); self.postMessage({ type: "error", error: "mqtt_error", details: err.message, }); if (consecutiveErrors >= 5) { // 重置错误计数 consecutiveErrors = 0; if (mqttClient) { mqttClient.end(true); mqttClient = null; } scheduleReconnect(); } } function scheduleReconnect() { if (!reconnectTimer) { const t = CONFIG.mqtt.reconnectInterval; reconnectTimer = setTimeout(() => { reconnectTimer = null; console.log("vda schedule Worker 自身重连"); self.postMessage({ type: "reconnect-down" }); connectAndSubscribe(); }, t); } } // ==== 主线程消息处理 ==== self.onmessage = (e: MessageEvent) => { const msg = e.data; if (msg.type === "init") { // 覆盖配置 CONFIG.mqtt = { ...CONFIG.mqtt, brokerUrl: msg.mqtt.brokerUrl, clientId: msg.mqtt.clientId, username: msg.mqtt.username, password: msg.mqtt.password, reconnectInterval: msg.mqtt.reconnectInterval ?? CONFIG.mqtt.reconnectInterval, qos: msg.mqtt.qos ?? CONFIG.mqtt.qos, }; CONFIG.mappings = msg.mappings; // set our instance identifier INSTANCE_ID = msg.instanceId; console.log("vda5050_transformer_worker: instanceId", INSTANCE_ID); // 启动连接 connectAndSubscribe(); } else if (msg.type === "stop" || msg.type === "shutdown") { // 支持 stop 和 shutdown,统一关闭 if (mqttClient) { mqttClient.end(true); mqttClient = null; } if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } self.postMessage({ type: "status", status: "stopped" }); } };