api-amr/vda5050_transformer_worker.ts

428 lines
12 KiB
TypeScript
Raw Permalink Normal View History

2025-06-04 19:15:02 +08:00
import mqtt, { MqttClient, IClientOptions } from "npm:mqtt";
// ==== 消息/配置类型 ====
/** init 消息接口,从主线程传入 */
interface InitMessage {
type: "init";
mqtt: {
brokerUrl: string;
clientId: string;
username?: string;
password?: string;
reconnectInterval?: number;
qos?: 0 | 1 | 2;
};
/**
* topic
*/
mappings: Array<{
/** MQTT 订阅的源 topic可以包含 + 或 # 通配符 */
sourceTopic: string;
/** 发布时使用的目标 topic 模板,可包含 {agvId} 等占位符 */
targetTopicTemplate: string;
/** transformGeneric 用到的映射规则 */
mapping: Record<string, MappingRule>;
}>;
instanceId: string;
}
/** 停止 Worker 的消息 */
interface StopMessage {
type: "stop" | "shutdown"; // 支持 stop 和 shutdown
}
// transformGeneric 中用到的规则
type MappingRule =
| string // 简单字符串映射,直接从源路径复制
| {
op: "const";
value: any;
source?: string;
}
| {
type: "object";
op?: "object";
source?: string;
mapping: Record<string, MappingRule>;
}
| {
type: "array";
op?: "array";
source: string;
mapping: Record<string, MappingRule>;
}
| {
op: "toString";
source: string;
}
| {
// 通用对象映射(没有 type 字段)
mapping: Record<string, MappingRule>;
source?: string;
};
// ==== 通用工具函数 ====
/** 按路径 a.b.c 从 obj 取值 */
function getByPath(obj: any, path: string): any {
return path.split(".").reduce((o, key) => (o != null ? o[key] : undefined), obj);
}
/** 按路径 a.b.c 在 target 上写入 value */
function setByPath(target: any, path: string, value: any) {
const keys = path.split(".");
let cur = target;
for (let i = 0; i < keys.length - 1; i++) {
const k = keys[i];
if (!(k in cur) || typeof cur[k] !== "object") {
cur[k] = {};
}
cur = cur[k];
}
cur[keys[keys.length - 1]] = value;
}
/** 通用的 JSON 转换器 */
function transformGeneric(raw: any, mapping: Record<string, MappingRule>): any {
const out: any = {};
for (const [outKey, rule] of Object.entries(mapping)) {
try {
// 处理简单字符串映射
if (typeof rule === "string") {
const value = getByPath(raw, rule);
if (value !== undefined) {
setByPath(out, outKey, value);
}
continue;
}
// 处理对象规则
if (typeof rule === "object" && rule !== null) {
// 常量值映射
if ("op" in rule && rule.op === "const") {
setByPath(out, outKey, rule.value);
continue;
}
// toString 操作
if ("op" in rule && rule.op === "toString") {
const sourceValue = getByPath(raw, rule.source);
if (sourceValue !== undefined) {
// 如果已经是字符串,直接使用;否则进行 JSON 序列化
const stringValue = typeof sourceValue === "string"
? sourceValue
: JSON.stringify(sourceValue);
setByPath(out, outKey, stringValue);
}
continue;
}
// 对象映射
if ("type" in rule && rule.type === "object") {
const subRaw = rule.source ? getByPath(raw, rule.source) : raw;
if (subRaw !== undefined) {
const transformedObj = transformGeneric(subRaw, rule.mapping);
setByPath(out, outKey, transformedObj);
}
continue;
}
// 数组映射
if ("type" in rule && rule.type === "array") {
const sourceArray = getByPath(raw, rule.source);
if (Array.isArray(sourceArray)) {
const transformedArray = sourceArray.map(item =>
transformGeneric(item, rule.mapping)
);
setByPath(out, outKey, transformedArray);
} else {
setByPath(out, outKey, []);
}
continue;
}
// 如果是对象但没有特殊操作,尝试作为嵌套映射处理
if ("mapping" in rule && typeof rule.mapping === "object") {
const mappingRule = rule as { mapping: Record<string, MappingRule>; source?: string };
const subRaw = mappingRule.source ? getByPath(raw, mappingRule.source) : raw;
if (subRaw !== undefined) {
const transformedObj = transformGeneric(subRaw, mappingRule.mapping);
setByPath(out, outKey, transformedObj);
}
continue;
}
}
console.warn(`Unknown mapping rule for key "${outKey}":`, rule);
} catch (error) {
console.error(`Error processing mapping rule for key "${outKey}":`, error);
}
}
return out;
}
/**
* topic topic +, #
*/
function topicMatches(pattern: string, topic: string): boolean {
const patSeg = pattern.split("/");
const topSeg = topic.split("/");
for (let i = 0; i < patSeg.length; i++) {
const p = patSeg[i];
const t = topSeg[i];
if (p === "#") return true; // 后面的全部都匹配
if (p === "+") continue; // 单层通配符
if (t === undefined) return false;
if (p !== t) return false;
}
return patSeg.length === topSeg.length;
}
/**
* topic pattern '+'
* {agvId}
*/
function extractAgvId(pattern: string, topic: string): string | null {
const patSeg = pattern.split("/");
const topSeg = topic.split("/");
for (let i = 0; i < patSeg.length; i++) {
if (patSeg[i] === "+") {
return topSeg[i];
}
}
return null;
}
// ==== 全局状态 ====
let mqttClient: MqttClient | null = null;
let reconnectTimer: any = null;
const CONFIG: {
mqtt: {
brokerUrl: string;
clientId: string;
username?: string;
password?: string;
reconnectInterval?: number;
qos?: 0 | 1 | 2;
};
mappings: Array<{
sourceTopic: string;
targetTopicTemplate: string;
mapping: Record<string, MappingRule>;
}>;
} = {
mqtt: {
brokerUrl: "",
clientId: "",
username: undefined,
password: undefined,
reconnectInterval: 5000,
qos: 1,
},
mappings: [],
};
// instance identifier generated from config.manufacturer and UUID
let INSTANCE_ID: string = "";
// 连续 MQTT 错误计数只有超过10次才触发重连
let consecutiveErrors = 0;
// ==== MQTT 逻辑 ====
async function connectAndSubscribe() {
try {
const opts: IClientOptions = {
clientId: CONFIG.mqtt.clientId,
username: CONFIG.mqtt.username,
password: CONFIG.mqtt.password,
reconnectPeriod: 3000, // 我们自己重连
};
mqttClient = mqtt.connect(CONFIG.mqtt.brokerUrl, opts);
// 预先注册错误、断开和关闭处理器
mqttClient.on("error", onMqttError);
mqttClient.on("disconnect", onMqttDisconnect);
mqttClient.on("close", onMqttClose);
mqttClient.on("connect", async () => {
// 连接成功后重置错误计数
consecutiveErrors = 0;
// 订阅所有 mappings 里配置的源 topic
for (const m of CONFIG.mappings) {
const inTopic = m.sourceTopic
.replace("{instanceId}", INSTANCE_ID);
// console.log("inTopic", inTopic);
try {
await mqttClient!.subscribe(inTopic, { qos: CONFIG.mqtt.qos! });
} catch (err) {
console.error(`✗ MQTT subscribe failed for ${inTopic}:`, err);
self.postMessage({ type: "error", error: "subscribe_failed", details: (err as Error).message });
scheduleReconnect();
return; // 订阅失败,中断后续处理,尝试重连
}
}
// 订阅完成后监听消息
mqttClient!.on("message", onMqttMessage);
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
self.postMessage({ type: "status", status: "connected" });
});
} catch (err) {
console.error("✗ MQTT connect failed:", err);
self.postMessage({
type: "error",
error: "connect_failed",
details: err instanceof Error ? err.message : String(err),
});
scheduleReconnect();
}
}
async function onMqttMessage(topic: string, payload: Uint8Array) {
try {
const raw = JSON.parse(new TextDecoder().decode(payload));
// 对每个匹配的 mappingConfig做一次转换并发布
for (const m of CONFIG.mappings) {
const inTopic = m.sourceTopic
.replace("{instanceId}", INSTANCE_ID);
if (!topicMatches(inTopic, topic)) continue;
// 提取占位符 agvId
const agvId = extractAgvId(inTopic, topic) ?? "";
// 转换
const transformed = transformGeneric(raw, m.mapping);
// 填充目标 topic
const outTopic = m.targetTopicTemplate
.replace("{instanceId}", INSTANCE_ID)
.replace("{agvId}", agvId);
// 发布
// console.log("outTopic", outTopic);
await mqttClient!.publish(outTopic, JSON.stringify(transformed), {
qos: CONFIG.mqtt.qos!,
});
self.postMessage({
type: "published",
topic: outTopic,
agvId,
sourceTopic: topic,
});
}
} catch (err) {
console.error("✗ onMqttMessage error:", err);
self.postMessage({
type: "error",
error: "process_message",
details: err instanceof Error ? err.message : String(err),
});
}
}
function onMqttClose() {
console.log("vda onMqttClose");
self.postMessage({ type: "status", status: "closed" });
// 确保在 close 时释放底层 socket
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
self.postMessage({ type: "reconnect-down" });
// connectAndSubscribe();
}
function onMqttDisconnect() {
self.postMessage({ type: "status", status: "disconnected" });
console.log("vda onMqttDisconnect");
// 断开时也关闭客户端,避免累积未关闭的连接
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
scheduleReconnect();
}
function onMqttError(err: Error) {
// 如果是连接被拒绝错误,通知主线程重启所有 worker
const anyErr = err as any;
if (anyErr.code === 'ECONNREFUSED' || anyErr.code === 'ECONNRESET') {
console.error('❌ MQTT connection refused, requesting full restart:', anyErr);
self.postMessage({ type: 'reconnect-all' });
}
// 增加错误计数只在超过10次后才重连
consecutiveErrors++;
console.error(`! MQTT error (#${consecutiveErrors}):`, err);
self.postMessage({
type: "error",
error: "mqtt_error",
details: err.message,
});
if (consecutiveErrors >= 5) {
// 重置错误计数
consecutiveErrors = 0;
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
scheduleReconnect();
}
}
function scheduleReconnect() {
if (!reconnectTimer) {
const t = CONFIG.mqtt.reconnectInterval;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
console.log("vda schedule Worker 自身重连");
self.postMessage({ type: "reconnect-down" });
connectAndSubscribe();
}, t);
}
}
// ==== 主线程消息处理 ====
self.onmessage = (e: MessageEvent<InitMessage | StopMessage>) => {
const msg = e.data;
if (msg.type === "init") {
// 覆盖配置
CONFIG.mqtt = {
...CONFIG.mqtt,
brokerUrl: msg.mqtt.brokerUrl,
clientId: msg.mqtt.clientId,
username: msg.mqtt.username,
password: msg.mqtt.password,
reconnectInterval: msg.mqtt.reconnectInterval ?? CONFIG.mqtt.reconnectInterval,
qos: msg.mqtt.qos ?? CONFIG.mqtt.qos,
};
CONFIG.mappings = msg.mappings;
// set our instance identifier
INSTANCE_ID = msg.instanceId;
console.log("vda5050_transformer_worker: instanceId", INSTANCE_ID);
// 启动连接
connectAndSubscribe();
} else if (msg.type === "stop" || msg.type === "shutdown") {
// 支持 stop 和 shutdown统一关闭
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
self.postMessage({ type: "status", status: "stopped" });
}
};