fix(mqtt): consolidate channels by name instead of splitting by slot/hash#3708
Conversation
…hash MQTT sources fragmented one logical channel (e.g. LongFast) across the per-source Channels tab and Unified Messages because channel identity was keyed off the per-packet `channel` byte — a channel *hash* (0-255) on MQTT, not a stable 0-7 slot. Three splitters, all observed in live data: 1. `recordChannelFromEnvelope` wrote a `channels` row keyed by the hash, so LongFast landed on rows 0/1/8/40, MediumFast on 0/31, etc. 2. `findOrCreatePassiveByNameAsync` was a non-atomic find-then-create with no unique constraint, so concurrent MQTT packets raced in byte-identical duplicate `channel_database` rows (Primary 3&4, Wong 5&6, …) — splitting messages across CHANNEL_DB_OFFSET+dupId. 3. Messages whose name didn't resolve at ingest were stranded on the raw hash. Canonical identity is now the `channel_database` row, surfaced everywhere as `CHANNEL_DB_OFFSET + dbId` (decrypted packets stay key-verified via decoded.channelDatabaseId; name-only packets resolve by channelId): - Ingestion: stop writing hash-keyed `channels` rows for MQTT. The channel surfaces by name through its channel_database row (message rows on CHANNEL_DB_OFFSET+dbId + the virtual-channel path in /api/unified/channels). - Repo: `findOrCreatePassiveByNameAsync` serializes concurrent creates per lower(name) via an in-flight promise cache (the race is in-process; a cross-backend unique text index is awkward). - Migration 102: merge duplicate channel_database rows by (lower(name), psk) — repointing messages + permissions — then, for MQTT/bridge sources only (never TCP slots 0-7), collapse every hash-keyed channels row and repoint unambiguous raw-hash (< OFFSET) messages onto CHANNEL_DB_OFFSET+dbId. Tests: migration consolidation + idempotency (incl. hash >= OFFSET and TCP untouched), and the passive-create race guard. Full suite: 7391 pass / 0 fail. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01SJPe6J5vKrcbwzt6vCdtrt
|
Claude finished @Yeraze's task —— View job PR Review: fix(mqtt): consolidate channels by name instead of splitting by slot/hashTasks:
Overall AssessmentThis is a well-reasoned fix for a genuine data-integrity bug. The problem analysis is accurate, the three-pronged fix is correctly scoped, and the migration is idempotent. The test coverage is good. A few issues worth addressing before merge: Migration (
|
…ration counters - Remove docker-compose.dev.local.yml from the PR (machine-specific dev override) and add it to .gitignore so `git add -A` can't re-stage it. - Migration 102 MySQL: build explicit `IN (?, ?)` placeholders from MQTT_TYPES instead of relying on mysql2's nested-array `IN (?)` expansion, removing any ambiguity that Part B could silently no-op on MySQL. (review issue #3) - Migration 102 PostgreSQL + MySQL: log merged/collapsed/repointed counts to match the SQLite path, so post-deploy auditing is consistent. (review issue #1) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01SJPe6J5vKrcbwzt6vCdtrt
|
Thanks for the review — triaged each item. Addressed in Applied
Declined, with reasoning
Full suite still green (7391/0); tsc clean. |
…nel-consolidation # Conflicts: # src/db/migrations.test.ts # src/db/migrations.ts
|
Claude finished @Yeraze's task —— View job PR Review: fix(mqtt): consolidate channels by name instead of splitting by slot/hashTasks:
Overall AssessmentThis is a well-structured fix for a real data integrity problem. The three-pronged approach is correct, the previous review's major concerns were addressed cleanly (MySQL Migration (
|
Summary
For MQTT sources, one logical channel (e.g. LongFast) was fragmenting into many across both the per-source Channels tab and Unified Messages, because channel identity was keyed off the per-packet
channelbyte — which on MQTT is a channel hash (0–255), not a stable 0–7 slot.Confirmed in live dev data (
Official MQTT):LongFastrows at0/1/8/40,MediumFastat0/31, plus duplicatechannel_databaserows and messages stranded on raw hashes.Three splitters fixed
recordChannelFromEnvelopewrote achannelsrow per hash. → Stop writing hash-keyedchannelsrows for MQTT entirely. The channel now surfaces by name through itschannel_databaserow: message rows onCHANNEL_DB_OFFSET + dbIdand the virtual-channel path in/api/unified/channels.findOrCreatePassiveByNameAsyncraced in duplicate rows (non-atomic find-then-create, no unique constraint) → byte-identical pairs (Primary3&4,Wong5&6, …). → Serialize concurrent creates perlower(name)via an in-flight promise cache (the race is in-process; a cross-backend functional/text unique index is awkward).Migration 102 (
consolidate_mqtt_channels)Backfills existing data across SQLite/PostgreSQL/MySQL:
channel_databaserows identical in(lower(name), psk)— repointing messages +channel_database_permissions, keeping the lowest id.channelsrow, and repoint unambiguous raw-hash (< CHANNEL_DB_OFFSET) messages ontoCHANNEL_DB_OFFSET + dbId.Idempotent; scoped to
mqtt_bridge/mqtt_brokersource types.Design notes
channel_databaserow. Decrypted packets stay key-verified (decoded.channelDatabaseId); name-only packets resolve bychannelId. So "same channel only if the key aligns" is honoured.(lower(name), psk), not name alone — preserves the decryption feature's ability to hold same-named/different-key entries.channelsrows (not just< OFFSET) but only repoints messages on unambiguous< OFFSEThashes (a raw hash ≥ OFFSET is indistinguishable from a realCHANNEL_DB_OFFSET+dbId).Verification
channel_databaseduplicate names: 0 remainingchannelsrows: 0 remaining (all hash junk collapsed)LongFast20→27 (stranded raw8+1folded in),MediumFast479→485 (stranded raw31folded in); raw channels31/8/1gone./api/unified/channels:LongFasta single entry @102,MediumFasta single entry @101, zero duplicate names.See
docs/internal/dev-notes/MQTT_CHANNEL_CONSOLIDATION.mdfor the full analysis.🤖 Generated with Claude Code