Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ async function deliverMeshtasticReply(params: {
target: string;
accountId: string;
channelIndex?: number;
channelName?: string;
chunkLimit?: number;
sendReply?: (target: string, text: string) => Promise<void>;
statusSink?: (patch: { lastOutboundAt?: number }) => void;
Expand All @@ -104,7 +103,6 @@ async function deliverMeshtasticReply(params: {
await sendMessageMeshtastic(params.target, chunk, {
accountId: params.accountId,
channelIndex: params.channelIndex,
channelName: params.channelName,
});
}
// Small delay between chunks to avoid overwhelming the radio queue.
Expand Down Expand Up @@ -383,7 +381,6 @@ export async function handleMeshtasticInbound(params: {
target: peerId,
accountId: account.accountId,
channelIndex: message.isGroup ? message.channelIndex : undefined,
channelName: message.isGroup ? message.channelName : undefined,
chunkLimit: account.config.textChunkLimit,
sendReply: params.sendReply,
statusSink,
Expand Down
11 changes: 7 additions & 4 deletions src/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ async function monitorMqtt(params: {
if (!mqttClient) {
return;
}
const channelName = message.isGroup ? message.channelName : undefined;
await mqttClient.sendText(text, message.isGroup ? undefined : target, channelName);
await mqttClient.sendText(
text,
message.isGroup ? undefined : target,
message.isGroup ? message.channelIndex : undefined,
);
opts.statusSink?.({ lastOutboundAt: Date.now() });
core.channel.activity.record({
channel: "meshtastic",
Expand All @@ -304,8 +307,8 @@ async function monitorMqtt(params: {
});

// Register active send function for `openclaw message send`.
setActiveMqttSend((text, destination, channelName) =>
mqttClient ? mqttClient.sendText(text, destination, channelName) : Promise.resolve(),
setActiveMqttSend((text, destination, channelIndex) =>
mqttClient ? mqttClient.sendText(text, destination, channelIndex) : Promise.resolve(),
);

logger.info(
Expand Down
84 changes: 53 additions & 31 deletions src/mqtt-client.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import mqtt from "mqtt";
import { nodeNumToHex } from "./normalize.js";
import { hexToNodeNum, looksLikeMeshtasticNodeId, nodeNumToHex } from "./normalize.js";
import type { MeshtasticMqttConfig } from "./types.js";

/** Derive a publish topic from a subscribe topic.
* Standard pattern: "msh/REGION/NUM/json/#" → "msh/REGION/NUM/json/mqtt".
* If the topic has no wildcard suffix, appends "/mqtt" as the publish leaf. */
/** Derive the JSON downlink topic from a subscribe topic.
* Standard pattern: "msh/REGION/NUM/json/#" → "msh/REGION/NUM/json/mqtt/".
* Meshtastic firmware accepts JSON downlinks on the ".../2/json/mqtt/" topic;
* a channel named "mqtt" (with downlink enabled) must exist on the gateway.
* See https://meshtastic.org/docs/software/integrations/mqtt/ */
function derivePublishTopic(subscribeTopic: string): string {
if (subscribeTopic.endsWith("/#")) {
return subscribeTopic.slice(0, -2) + "/mqtt";
return subscribeTopic.slice(0, -2) + "/mqtt/";
}
return subscribeTopic + "/mqtt";
return subscribeTopic + "/mqtt/";
}

export type MeshtasticMqttTextEvent = {
Expand All @@ -32,13 +34,15 @@ export type MeshtasticMqttClientOptions = {
};

export type MeshtasticMqttClient = {
sendText: (text: string, destination?: string, channelName?: string) => Promise<void>;
sendText: (text: string, destination?: string, channelIndex?: number) => Promise<void>;
close: () => void;
};

/**
* Meshtastic MQTT JSON message format.
* Messages on the JSON topic contain: sender, from, type, payload, channel.
* Meshtastic MQTT JSON *uplink* message (received from the broker).
* Text packets arrive as { type: "text", payload: { text }, from, to, channel,
* sender }, where `from` is the originating node and `sender` is the gateway
* node that published the packet to MQTT.
*/
type MqttJsonMessage = {
sender?: string;
Expand All @@ -50,6 +54,19 @@ type MqttJsonMessage = {
channel_name?: string;
};

/**
* Meshtastic MQTT JSON *downlink* message (published to ".../2/json/mqtt/").
* `payload` is a plain string and `from` is the numeric node ID of the gateway
* that will transmit the message. `channel` (index) and `to` are optional.
*/
type MqttJsonDownlink = {
from: number;
type: "sendtext";
payload: string;
channel?: number;
to?: number;
};

/** Connect to a Meshtastic mesh via MQTT broker. */
export async function connectMeshtasticMqtt(
options: MeshtasticMqttClientOptions,
Expand Down Expand Up @@ -109,16 +126,18 @@ export async function connectMeshtasticMqtt(
return;
}

// Only handle text messages.
if (msg.type !== "sendtext" || !msg.payload?.text) {
// Only handle text messages. Received text packets use type "text"
// ("sendtext" is the downlink verb and never appears on uplink).
if (msg.type !== "text" || !msg.payload?.text) {
return;
}

// Skip own messages.
const senderNodeId = msg.sender
? msg.sender.toLowerCase()
: msg.from
? nodeNumToHex(msg.from)
// Identify the originating node. `from` is the actual author; `sender` is
// only the gateway that uploaded the packet to MQTT, so prefer `from`.
const senderNodeId = msg.from !== undefined
? nodeNumToHex(msg.from)
: msg.sender
? msg.sender.toLowerCase()
: undefined;
if (!senderNodeId) {
return;
Expand All @@ -134,13 +153,10 @@ export async function connectMeshtasticMqtt(
&& msg.to !== 0xffffffff
&& nodeNumToHex(msg.to).toLowerCase() === myNodeId;

const senderName = msg.sender && msg.sender !== senderNodeId
? msg.sender
: undefined;

// The JSON envelope carries no display name (`sender` is the gateway node
// ID, not a name), so leave senderName unset and let the node ID stand in.
const event: MeshtasticMqttTextEvent = {
senderNodeId: senderNodeId.startsWith("!") ? senderNodeId : `!${senderNodeId}`,
senderName,
text: msg.payload.text,
channelIndex: msg.channel ?? 0,
channelName: msg.channel_name,
Expand All @@ -166,18 +182,24 @@ export async function connectMeshtasticMqtt(
}

return {
sendText: async (text, destination, channelName) => {
const outboundTopic = channelName
? publishTopic.replace(/\/[^/]*$/, `/${channelName}`)
: publishTopic;
const message: MqttJsonMessage = {
sender: myNodeId ?? options.myNodeId,
sendText: async (text, destination, channelIndex) => {
// Standard Meshtastic JSON downlink: publish to ".../2/json/mqtt/" with
// { from, type: "sendtext", payload: <string>, channel?, to? }. `payload`
// is a plain string (not an object) and `from` is the numeric node ID of
// the gateway that transmits. The gateway needs a channel named "mqtt"
// with downlink enabled and JSON output on.
// https://meshtastic.org/docs/software/integrations/mqtt/
const fromNum = myNodeId ? hexToNodeNum(myNodeId) : 0;
const message: MqttJsonDownlink = {
from: fromNum,
type: "sendtext",
payload: { text },
...(destination ? { to: Number.parseInt(destination.replace("!", ""), 16) } : {}),
...(channelName ? { channel_name: channelName } : {}),
payload: text,
...(channelIndex !== undefined ? { channel: channelIndex } : {}),
...(destination && looksLikeMeshtasticNodeId(destination)
? { to: hexToNodeNum(destination) }
: {}),
};
client.publish(outboundTopic, JSON.stringify(message));
client.publish(publishTopic, JSON.stringify(message));
},
close: () => {
client.end(true);
Expand Down
7 changes: 3 additions & 4 deletions src/send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { CoreConfig } from "./types.js";
type SendMeshtasticOptions = {
accountId?: string;
channelIndex?: number;
channelName?: string;
};

export type SendMeshtasticResult = {
Expand All @@ -23,7 +22,7 @@ let activeSerialSend:
| ((text: string, destination?: number, channelIndex?: number) => Promise<number>)
| null = null;
let activeMqttSend:
| ((text: string, destination?: string, channelName?: string) => Promise<void>)
| ((text: string, destination?: string, channelIndex?: number) => Promise<void>)
| null = null;

export function setActiveSerialSend(
Expand All @@ -33,7 +32,7 @@ export function setActiveSerialSend(
}

export function setActiveMqttSend(
fn: ((text: string, destination?: string, channelName?: string) => Promise<void>) | null,
fn: ((text: string, destination?: string, channelIndex?: number) => Promise<void>) | null,
) {
activeMqttSend = fn;
}
Expand Down Expand Up @@ -79,7 +78,7 @@ export async function sendMessageMeshtastic(

if (transport === "mqtt") {
if (activeMqttSend) {
await activeMqttSend(stripped, target, opts.channelName);
await activeMqttSend(stripped, target, opts.channelIndex);
} else {
throw new Error("No active MQTT connection. Run 'openclaw gateway start' to connect.");
}
Expand Down