Skip to content

fix(mqtt): consolidate channels by name instead of splitting by slot/hash#3708

Merged
Yeraze merged 3 commits into
mainfrom
investigate/mqtt-channel-consolidation
Jun 24, 2026
Merged

fix(mqtt): consolidate channels by name instead of splitting by slot/hash#3708
Yeraze merged 3 commits into
mainfrom
investigate/mqtt-channel-consolidation

Conversation

@Yeraze

@Yeraze Yeraze commented Jun 24, 2026

Copy link
Copy Markdown
Owner

Summary

For MQTT sources, one logical channel (e.g. LongFast) was fragmenting into many across both the per-source Channels tab and Unified Messages, because channel identity was keyed off the per-packet channel byte — which on MQTT is a channel hash (0–255), not a stable 0–7 slot.

Confirmed in live dev data (Official MQTT): LongFast rows at 0/1/8/40, MediumFast at 0/31, plus duplicate channel_database rows and messages stranded on raw hashes.

Three splitters fixed

  1. recordChannelFromEnvelope wrote a channels row per hash. → Stop writing hash-keyed channels rows for MQTT entirely. The channel now surfaces by name through its channel_database row: message rows on CHANNEL_DB_OFFSET + dbId and the virtual-channel path in /api/unified/channels.
  2. findOrCreatePassiveByNameAsync raced in duplicate rows (non-atomic find-then-create, no unique constraint) → byte-identical pairs (Primary 3&4, Wong 5&6, …). → Serialize concurrent creates per lower(name) via an in-flight promise cache (the race is in-process; a cross-backend functional/text unique index is awkward).
  3. Messages stranded on the raw hash when the name didn't resolve at ingest.

Migration 102 (consolidate_mqtt_channels)

Backfills existing data across SQLite/PostgreSQL/MySQL:

  • Merge channel_database rows identical in (lower(name), psk) — repointing messages + channel_database_permissions, keeping the lowest id.
  • For MQTT/bridge sources only (never TCP slots 0–7): collapse every hash-keyed channels row, and repoint unambiguous raw-hash (< CHANNEL_DB_OFFSET) messages onto CHANNEL_DB_OFFSET + dbId.

Idempotent; scoped to mqtt_bridge/mqtt_broker source types.

Design notes

  • Canonical identity = the channel_database row. Decrypted packets stay key-verified (decoded.channelDatabaseId); name-only packets resolve by channelId. So "same channel only if the key aligns" is honoured.
  • Dedup key is (lower(name), psk), not name alone — preserves the decryption feature's ability to hold same-named/different-key entries.
  • Channel hashes span 0–255, so the migration collapses all MQTT channels rows (not just < OFFSET) but only repoints messages on unambiguous < OFFSET hashes (a raw hash ≥ OFFSET is indistinguishable from a real CHANNEL_DB_OFFSET+dbId).

Verification

  • Full Vitest suite: 7391 passed / 0 failed. New tests: migration consolidation + idempotency (incl. hash ≥ OFFSET and TCP-untouched), and the passive-create race guard.
  • Live data (deployed dev container, migration ran on real split data):
    • channel_database duplicate names: 0 remaining
    • MQTT-source channels rows: 0 remaining (all hash junk collapsed)
    • Official MQTT messages re-consolidated: LongFast 20→27 (stranded raw 8+1 folded in), MediumFast 479→485 (stranded raw 31 folded in); raw channels 31/8/1 gone.
    • /api/unified/channels: LongFast a single entry @ 102, MediumFast a single entry @ 101, zero duplicate names.

See docs/internal/dev-notes/MQTT_CHANNEL_CONSOLIDATION.md for the full analysis.

🤖 Generated with Claude Code

…hash

MQTT sources fragmented one logical channel (e.g. LongFast) across the
per-source Channels tab and Unified Messages because channel identity was
keyed off the per-packet `channel` byte — a channel *hash* (0-255) on MQTT,
not a stable 0-7 slot. Three splitters, all observed in live data:

1. `recordChannelFromEnvelope` wrote a `channels` row keyed by the hash, so
   LongFast landed on rows 0/1/8/40, MediumFast on 0/31, etc.
2. `findOrCreatePassiveByNameAsync` was a non-atomic find-then-create with no
   unique constraint, so concurrent MQTT packets raced in byte-identical
   duplicate `channel_database` rows (Primary 3&4, Wong 5&6, …) — splitting
   messages across CHANNEL_DB_OFFSET+dupId.
3. Messages whose name didn't resolve at ingest were stranded on the raw hash.

Canonical identity is now the `channel_database` row, surfaced everywhere as
`CHANNEL_DB_OFFSET + dbId` (decrypted packets stay key-verified via
decoded.channelDatabaseId; name-only packets resolve by channelId):

- Ingestion: stop writing hash-keyed `channels` rows for MQTT. The channel
  surfaces by name through its channel_database row (message rows on
  CHANNEL_DB_OFFSET+dbId + the virtual-channel path in /api/unified/channels).
- Repo: `findOrCreatePassiveByNameAsync` serializes concurrent creates per
  lower(name) via an in-flight promise cache (the race is in-process; a
  cross-backend unique text index is awkward).
- Migration 102: merge duplicate channel_database rows by (lower(name), psk)
  — repointing messages + permissions — then, for MQTT/bridge sources only
  (never TCP slots 0-7), collapse every hash-keyed channels row and repoint
  unambiguous raw-hash (< OFFSET) messages onto CHANNEL_DB_OFFSET+dbId.

Tests: migration consolidation + idempotency (incl. hash >= OFFSET and TCP
untouched), and the passive-create race guard. Full suite: 7391 pass / 0 fail.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01SJPe6J5vKrcbwzt6vCdtrt
@github-actions

github-actions Bot commented Jun 24, 2026

Copy link
Copy Markdown

Claude finished @Yeraze's task —— View job


PR Review: fix(mqtt): consolidate channels by name instead of splitting by slot/hash

Tasks:

  • Gather context and understand the PR
  • Review migration logic (102_consolidate_mqtt_channels.ts)
  • Review repository changes (channelDatabase.ts)
  • Review MQTT ingestion changes (mqttIngestion.ts)
  • Review test coverage
  • Compile review feedback

Overall Assessment

This is a well-reasoned fix for a genuine data-integrity bug. The problem analysis is accurate, the three-pronged fix is correctly scoped, and the migration is idempotent. The test coverage is good. A few issues worth addressing before merge:


Migration (102_consolidate_mqtt_channels.ts)

Issue 1: PostgreSQL/MySQL missing merge log counters

The SQLite migration path logs mergedDbRows, collapsedChannelRows, and repointedMessages counts. The PostgreSQL and MySQL paths skip all three counters and just log a generic "complete" message. This makes it harder to audit what happened on those databases post-deployment.

Issue 2: PostgreSQL resolveDbId has an unchecked race window

The SQLite resolveDbId runs synchronously inside a transaction-like context (SQLite single-writer), so the find-then-insert is effectively atomic. The PostgreSQL resolveDbId is an async helper that runs within the migration's client but not inside a subtransaction — two concurrent migration runs (e.g. a hot-restart scenario with two Node workers starting simultaneously) could race here. For a one-time migration this is low risk, but worth noting.

Issue 3: MySQL IN (?) syntax for array bind

At line 227:

const [srcRows] = await pool.query(`SELECT id FROM sources WHERE type IN (?)`, [MQTT_TYPES]);

The mysql2 library's query API treats ? as a scalar placeholder, not a multi-value expander. The correct approach for an array is either IN (?) with [MQTT_TYPES] (which mysql2 does expand when passed as a nested array) or using an explicit multi-placeholder. The PostgreSQL version correctly uses ANY($1) with the array. This deserves a quick test against a real MySQL instance to confirm mysql2 expands this correctly — if the bracket is not the outer array wrapper expected by mysql2, the query silently returns 0 rows and the whole Part B is a no-op for MySQL.


Repository (channelDatabase.ts)

Issue 4: getByNameAsync fetches the full table on every call

async getByNameAsync(name: string): Promise<DbChannelDatabase | null> {
  const all = await this.getAllAsync();
  const lower = trimmed.toLowerCase();
  return all.find((c) => (c.name ?? '').toLowerCase() === lower) ?? null;
}

This does a full-table scan in memory. With a small channel_database table (typically < 20 rows) this is fine in practice, but findOrCreatePassiveByNameAsync calls getByNameAsync twice per create path (initial check + recheck under lock). A targeted WHERE lower(name) = ? query would be more correct under load and avoids the getAllAsync result becoming stale between the two calls within a single promise execution (not a real risk today but structurally cleaner).

Issue 5: passiveCreateInFlight is instance-level, not process-level

The in-flight promise map is on the ChannelDatabaseRepository instance. If ChannelDatabaseRepository is instantiated more than once per process (e.g., different sources constructing separate repository objects), the race guard won't work across instances. Looking at src/services/database.ts / DatabaseService, this appears to be a singleton pattern so there's likely only one ChannelDatabaseRepository instance per process — but this assumption should be explicitly documented or enforced.


MQTT Ingestion (mqttIngestion.ts)

Issue 6: bootstrapMqttChannelDatabase doesn't go through findOrCreatePassiveByNameAsync

The bootstrap path at line 42–56 does its own getAllAsync + set-membership check + createAsync for DEFAULT_MQTT_CHANNELS. If two MQTT sources start simultaneously, this path races exactly like the old findOrCreatePassiveByNameAsync did — it does not use the newly serialized create path. For the current list of just LongFast, this is low risk (two sources starting at the same time would both find the row missing on the first getAllAsync call and both insert). If DEFAULT_MQTT_CHANNELS grows, this race becomes more likely. Consider routing through findOrCreatePassiveByNameAsync here instead.

Issue 7: channelNameToDbIdCache is module-level and never has a size limit

The cache at line 902 is an unbounded Map. In a long-lived process serving a busy public MQTT broker (where channel names might include node IDs or other dynamic strings), this could grow without bound. A simple max-size eviction or a TTL would prevent this.

Non-issue (acknowledged): The logger import

The bootstrapMqttChannelDatabase function uses logger but the import is below it in the file (line 85). JavaScript/TypeScript hoist function declarations but not const assignments — this works because logger is at module scope and resolved before the function is called, but the ordering is unconventional and could confuse readers. Not a bug, but worth cleaning up.


Test Coverage

The tests are solid:

  • Migration idempotency ✓
  • Case-insensitive dedup ✓
  • TCP source left untouched ✓
  • Hash >= OFFSET left untouched (ambiguous messages) ✓
  • findOrCreatePassiveByNameAsync concurrent race guard ✓

Gap: No test for bootstrapMqttChannelDatabase concurrent race (Issue 6 above). Given Issue 4's noted subtlety and Issue 7's cache behavior, a test exercising two concurrent bootstrapMqttChannelDatabase(sourceId) calls for the same source with an empty DB would be a useful regression guard.


Minor

  • docker-compose.dev.local.yml is a dev convenience file. If it's meant to be committed it should be in .gitignore or clearly marked as a checked-in override template — currently .gitignore would need to explicitly allow it if docker-compose.dev.*.yml is in the ignore list.

Summary

The core logic is correct and the migration is the right approach for backfilling. The most actionable issues are #3 (MySQL IN binding), #6 (bootstrap race), and #4 (full-table scan in getByNameAsync). The rest are minor/informational.

…ration counters

- Remove docker-compose.dev.local.yml from the PR (machine-specific dev
  override) and add it to .gitignore so `git add -A` can't re-stage it.
- Migration 102 MySQL: build explicit `IN (?, ?)` placeholders from MQTT_TYPES
  instead of relying on mysql2's nested-array `IN (?)` expansion, removing any
  ambiguity that Part B could silently no-op on MySQL. (review issue #3)
- Migration 102 PostgreSQL + MySQL: log merged/collapsed/repointed counts to
  match the SQLite path, so post-deploy auditing is consistent. (review issue #1)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01SJPe6J5vKrcbwzt6vCdtrt
@Yeraze

Yeraze commented Jun 24, 2026

Copy link
Copy Markdown
Owner Author

Thanks for the review — triaged each item. Addressed in f363be93:

Applied

  • "Update Claude PR Assistant workflow" #3 (MySQL IN binding) — switched to explicit IN (?, ?) placeholders built from MQTT_TYPES rather than relying on mysql2's nested-array expansion, so Part B can't silently no-op on MySQL.
  • "Claude PR Assistant workflow" #1 (PG/MySQL log counters) — both paths now log merged/collapsed/repointed counts, matching SQLite.
  • Minor (docker-compose.dev.local.yml) — good catch; it's a machine-specific dev override that slipped into git add -A (it wasn't in .gitignore). Untracked it and added it to .gitignore.

Declined, with reasoning

  • feat: traceroute highlighting and UI improvements #2 (PG resolveDbId race across concurrent migration runs) — migrations run once, serialized by the registry/settingsKey in a single process; there's no two-worker concurrent-migration path. Acknowledged low-risk as you noted.
  • feat: traceroute request tracking and auth documentation #4 (getByNameAsync full-table scan) — pre-existing; channel_database is tiny (<40 rows) and the double call is per create, not per packet. Out of scope for this PR.
  • feat: add telemetry graphs, node sorting, and fix duplication #5 (passiveCreateInFlight instance-level)ChannelDatabaseRepository is a singleton on DatabaseService, so there's one instance per process; the existing comment already scopes the guard to "this single Node process." No multi-instance path exists today.
  • feat: telemetry parsing, direct messages, danger zone, and UX improvements #6 (bootstrapMqttChannelDatabase race) — the literal suggestion (route through findOrCreatePassiveByNameAsync) would be incorrect: bootstrap seeds an enabled, keyed decryption row (psk='AQ=='), whereas findOrCreatePassive creates a disabled, empty-psk permission row — routing through it would break server-side decryption. The race is also far lower-probability (once per source at startup vs. per-packet), and migration 102 already merges any (lower(name), psk)-identical duplicates it could produce. Reasonable as a separate follow-up if desired, but out of scope here.
  • feat: Messages UI redesign, telemetry fixes, and emoji support #7 (channelNameToDbIdCache unbounded) — pre-existing; channel names (not per-node strings) are the keys, so the set is bounded in practice. Fair future hardening, but not this PR.

Full suite still green (7391/0); tsc clean.

…nel-consolidation

# Conflicts:
#	src/db/migrations.test.ts
#	src/db/migrations.ts
@github-actions

github-actions Bot commented Jun 24, 2026

Copy link
Copy Markdown

Claude finished @Yeraze's task —— View job


PR Review: fix(mqtt): consolidate channels by name instead of splitting by slot/hash

Tasks:

  • Gather context and understand the PR
  • Review migration logic (103_consolidate_mqtt_channels.ts)
  • Review repository changes (channelDatabase.ts)
  • Review MQTT ingestion changes (mqttIngestion.ts)
  • Review test coverage
  • Compile review feedback

Overall Assessment

This is a well-structured fix for a real data integrity problem. The three-pronged approach is correct, the previous review's major concerns were addressed cleanly (MySQL IN binding, PG/MySQL log counters, gitignore), and the test coverage is solid. A few remaining observations below.


Migration (103_consolidate_mqtt_channels.ts)

Observation 1: SQLite messages update is unscoped by sourceId in Part A

The Part A merge step updates messages channel when consolidating duplicate channel_database rows:

// line 65
db.prepare(`UPDATE messages SET channel = ? WHERE channel = ?`).run(OFFSET + keep, OFFSET + dup);

This updates messages across all sources, not scoped to any particular sourceId. This is intentional by design since channel_database is global (not per-source), and CHANNEL_DB_OFFSET + id is the same across sources — but worth confirming: if two sources independently have messages at OFFSET + dupId, both get repointed to OFFSET + keepId, which is correct behavior. The PostgreSQL Part A counterpart at line 154 is similarly unscoped. This is fine as designed, but the code comment could make this explicit.

Observation 2: No transaction wrapping in SQLite or PostgreSQL paths

SQLite's migration.up runs multiple prepare().run() calls without a BEGIN TRANSACTION wrapper. A mid-migration failure would leave the database in a partially-migrated state (some channel_database dups deleted, others not). SQLite's journal mode provides atomicity per-statement but not across the migration as a whole. For a one-time migration this is low-risk if your migration runner is resilient, but other migrations in this codebase may warrant checking whether they share this pattern or wrap in transactions.

Observation 3: resolveDbId in Part B creates a new channel_database row with is_enabled=0 but no sort_order

The SQLite resolveDbId inserts with sort_order = 0 (line 89). The PostgreSQL/MySQL counterparts do the same. A newly created passive row landing at sortOrder=0 will sort before any manually ordered entries. This might be surprising to admins who see a new entry appearing at the top of their channel list. This is arguably acceptable for passive/disabled rows (they're not "real" decryption entries), but worth noting as a UX edge case.


Repository (channelDatabase.ts)

Observation 4: Logical error path in findOrCreatePassiveByNameAsync

At line 163-164, the promise is registered after it is already created, but the comment says "Register before the first await above has a chance to clear it":

const promise = (async (): Promise<number | null> => {
  const recheck = await this.getByNameAsync(trimmed);  // ← first await is HERE
  ...
})();
this.passiveCreateInFlight.set(key, promise);  // ← registered AFTER the IIFE is created

The IIFE starts executing synchronously until the first await (getByNameAsync), at which point control yields. The passiveCreateInFlight.set line runs after the IIFE is constructed but before the first await returns — so the race guard IS correctly installed before any concurrent caller could enter. This is subtle JavaScript microtask sequencing. The existing comment is slightly misleading ("before the first await above has a chance to clear it" — it's not about clearing, it's about racing callers). Minor documentation issue, not a bug.

Observation 5: findOrCreatePassiveByNameAsync never demotes existing enabled rows, but the JSDoc says so

The JSDoc at line 131 says: "we never demote a real decryption entry by reusing its slot here" — confirmed correct by the implementation (line 138: if (existing && typeof existing.id === 'number') return existing.id). Good defensive design.


MQTT Ingestion (mqttIngestion.ts)

Observation 6: logger import ordering (cosmetic)

Lines 14-16 import are at the top, but logger itself (used by bootstrapMqttChannelDatabase at line 55) is imported at line 85. This works fine in JavaScript module scope (all module-level const declarations are resolved before any function calls execute at call-site), but the ordering is non-standard and could confuse a reader who traces the function's dependencies top-to-bottom. A previous review noted this; it's a cosmetic cleanup deferred intentionally.

Observation 7: bootstrapMqttChannelDatabase uses sourceId in the log but the row is not scoped to that source

// line 55-58
logger.info(
  `MQTT source ${sourceId} bootstrapped channel_database entry "${ch.name}" with default key`,
);

The log says "MQTT source X bootstrapped..." but channel_database is global — the first source to trigger bootstrap owns the row, subsequent sources are no-ops. This is correct behavior (global table by design), but the log message could be misread as if the entry belongs to sourceId. A minor wording nit.

Observation 8: channelNameToDbIdCache invalidation is one-directional

The cache at line 902 is populated on findOrCreatePassiveByNameAsync success (line 935), but if an admin deletes a channel_database row via the UI, the cache retains the stale ID until process restart. This would cause subsequent packets with that channel name to be stored with a dangling channel_database_id. This is a pre-existing structural concern (not introduced by this PR), and the admin would need to restart the server to purge the stale cache entry. Acceptable for the current scale, but worth a TODO comment.


Test Coverage

The migration tests are excellent — edge cases covered include:

  • Duplicate channel_database rows with permission repointing ✓
  • Case-insensitive channel name matching ✓
  • Hashes >= OFFSET left untouched ✓
  • TCP source completely isolated ✓
  • Idempotency ✓
  • findOrCreatePassiveByNameAsync concurrent race guard ✓

Minor test gap: The migration test seeds data but doesn't verify the logger.info output line matches expected counts (e.g. merged 1, collapsed 5, repointed 4). This is very low value to enforce in tests, but a quick assertion on the final DB state count (which the test already does) is sufficient.

Missing: No test for resolveChannelDatabaseIdForMqtt module-level cache (channelNameToDbIdCache) interaction with _resetMqttIngestCachesForTest(). The export exists for testing but doesn't appear to be used in the new tests.


Minor / Cosmetic

  • PASSIVE_DESC constant at line 41 of the migration matches the string in channelDatabase.ts line 158. These two strings must stay in sync; a shared constant in meshtastic.ts or a similar constants file would eliminate that drift risk, though it would require importing application code into migrations (currently migrations are self-contained).

Summary

The core fix is correct and well-executed. The previous review's actionable items were properly addressed. Remaining observations are either pre-existing, by-design, or cosmetic — none are blocking. This is ready to merge from a code review perspective.

View job

@Yeraze Yeraze merged commit 0b293b2 into main Jun 24, 2026
20 checks passed
@Yeraze Yeraze deleted the investigate/mqtt-channel-consolidation branch June 24, 2026 13:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant