fix: prevent duplicate notifications on restart (backport to v0.9.x)#658
Conversation
…owth Two root causes of duplicate notifications: 1. pending_inbound queue grows forever because messages are only cleared during poll_received_messages() which is rarely called in event-driven mode. On service restart, drainPendingMessages() re-broadcasts all accumulated messages, triggering duplicate notifications. 2. MessageCollector's in-memory processedMessageIds cache is lost on app process restart, so replayed/re-broadcast messages trigger notifications for unread messages that were already notified. Fixes: - Python: Remove message from pending_inbound after successful Kotlin callback, keeping it only as fallback when callback fails or isn't registered - Kotlin: Pre-seed processedMessageIds from DB at startup before subscribing to message flow, preventing re-notification for existing messages - Add MessageDao.getReceivedMessageIds() for efficient bulk ID lookup - Add ConversationRepository.getReceivedMessageIds() to expose the query https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd
…on test The pre-seeded message dedup test verifies absence of notifications, which is inherently a verify-only pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The NoVerifyOnlyTests rule is non-suppressable. Add assertEquals on messagesCollected counter to verify the dedup early-return path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| // or re-broadcast by drainPendingMessages() after a service restart. | ||
| // Done inside the collection coroutine to ensure it completes before we subscribe. | ||
| try { |
There was a problem hiding this comment.
Bug: Pre-seeding processedMessageIds with all received message IDs, including unread ones, causes notifications to be permanently skipped for those messages if the app restarts after a crash.
Severity: CRITICAL
Suggested Fix
Modify the query in MessageDao.kt used by getReceivedMessageIds() to only return IDs for messages that should be skipped, such as those already marked as read. For example, add a WHERE isRead = 1 condition to the SQL query to avoid pre-seeding unread messages.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt#L88-L90
Potential issue: On app startup, the `getReceivedMessageIds()` query fetches all
received message IDs, regardless of their read status, and adds them to
`processedMessageIds`. If the app had previously crashed after receiving a message but
before showing a notification, that message's ID will be pre-seeded. When the message is
replayed, an early return check for duplicates in `processedMessageIds` prevents the
notification logic from ever being reached. This results in the permanent suppression of
notifications for unread messages that were pending before the crash.
Did we get this right? 👍 / 👎 to inform future reviews.
Greptile SummaryThis PR backports two targeted fixes from Key changes:
Notable concern: The pre-seeding strategy loads all received message IDs (including unread ones) into Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as App Process
participant MC as MessageCollector
participant CR as ConversationRepository
participant DB as Database
participant RF as ReticulumProtocol (SharedFlow)
participant NH as NotificationHelper
App->>MC: startCollecting()
activate MC
MC->>CR: getReceivedMessageIds()
CR->>DB: SELECT id FROM messages WHERE isFromMe=0
DB-->>CR: [id1, id2, id3, ...]
CR-->>MC: existingIds
MC->>MC: processedMessageIds.addAll(existingIds)
Note over MC: Pre-seed complete
MC->>RF: observeMessages().collect { ... }
RF-->>MC: receivedMessage (replayed after restart)
alt messageHash in processedMessageIds
MC->>MC: return@collect (skip entirely)
Note over MC,NH: ⚠️ Unread+unnotified messages<br/>are also skipped here
else messageHash NOT in processedMessageIds
MC->>DB: getMessageById(hash)
alt message already in DB
DB-->>MC: existingMessage
alt !existingMessage.isRead
MC->>NH: notifyMessageReceived(...)
end
else brand-new message
MC->>DB: saveMessage(...)
MC->>NH: notifyMessageReceived(...)
end
MC->>MC: processedMessageIds.add(hash)
end
deactivate MC
Last reviewed commit: a90e2f5 |
| // Pre-seed processedMessageIds with all existing received messages from the DB. | ||
| // This prevents duplicate notifications when messages are replayed via SharedFlow | ||
| // or re-broadcast by drainPendingMessages() after a service restart. | ||
| // Done inside the collection coroutine to ensure it completes before we subscribe. | ||
| try { | ||
| val existingIds = conversationRepository.getReceivedMessageIds() | ||
| processedMessageIds.addAll(existingIds) | ||
| Log.i(TAG, "Pre-seeded ${existingIds.size} existing message IDs for notification dedup") | ||
| } catch (e: Exception) { | ||
| Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e) | ||
| } |
There was a problem hiding this comment.
Pre-seeding bypasses the unread-notification check
Pre-seeding processedMessageIds with every DB-persisted received message causes the early return@collect at line 103 to fire for all of them, completely skipping the existingMessage.isRead notification branch (lines 108–173).
Consider this scenario:
ServicePersistenceManager(running in the separate:reticulumprocess) saves a message to the DB.- The app process crashes before
MessageCollectorreaches thenotifyMessageReceivedcall. - On the next restart, this pre-seeding loads that message's ID into
processedMessageIds. - When the message is replayed via the SharedFlow /
drainPendingMessages, it hits thein processedMessageIdsguard and returns immediately — the user never sees a notification.
Before this patch the replay would fall through to the existingMessage != null branch, check !existingMessage.isRead, and correctly fire the notification. The fix trades away that recovery path entirely.
A safer approach is to only pre-seed IDs for messages that have already been read (or for which a notification has already been shown), so that unread-but-persisted messages can still bubble up through the existing existingMessage.isRead gate.
| // Pre-seed processedMessageIds with all existing received messages from the DB. | |
| // This prevents duplicate notifications when messages are replayed via SharedFlow | |
| // or re-broadcast by drainPendingMessages() after a service restart. | |
| // Done inside the collection coroutine to ensure it completes before we subscribe. | |
| try { | |
| val existingIds = conversationRepository.getReceivedMessageIds() | |
| processedMessageIds.addAll(existingIds) | |
| Log.i(TAG, "Pre-seeded ${existingIds.size} existing message IDs for notification dedup") | |
| } catch (e: Exception) { | |
| Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e) | |
| } | |
| try { | |
| val existingIds = conversationRepository.getReadReceivedMessageIds() | |
| processedMessageIds.addAll(existingIds) | |
| Log.i(TAG, "Pre-seeded ${existingIds.size} read message IDs for notification dedup") | |
| } catch (e: Exception) { | |
| Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e) | |
| } |
(This would require a new DAO query that adds AND isRead = 1 to the existing getReceivedMessageIds query.)
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt
Line: 86-96
Comment:
**Pre-seeding bypasses the unread-notification check**
Pre-seeding `processedMessageIds` with every DB-persisted received message causes the early `return@collect` at line 103 to fire for all of them, completely skipping the `existingMessage.isRead` notification branch (lines 108–173).
Consider this scenario:
1. `ServicePersistenceManager` (running in the separate `:reticulum` process) saves a message to the DB.
2. The app process crashes before `MessageCollector` reaches the `notifyMessageReceived` call.
3. On the next restart, this pre-seeding loads that message's ID into `processedMessageIds`.
4. When the message is replayed via the SharedFlow / `drainPendingMessages`, it hits the `in processedMessageIds` guard and returns immediately — the user never sees a notification.
Before this patch the replay would fall through to the `existingMessage != null` branch, check `!existingMessage.isRead`, and correctly fire the notification. The fix trades away that recovery path entirely.
A safer approach is to only pre-seed IDs for messages that have already been **read** (or for which a notification has already been shown), so that unread-but-persisted messages can still bubble up through the existing `existingMessage.isRead` gate.
```suggestion
try {
val existingIds = conversationRepository.getReadReceivedMessageIds()
processedMessageIds.addAll(existingIds)
Log.i(TAG, "Pre-seeded ${existingIds.size} read message IDs for notification dedup")
} catch (e: Exception) {
Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e)
}
```
(This would require a new DAO query that adds `AND isRead = 1` to the existing `getReceivedMessageIds` query.)
How can I resolve this? If you propose a fix, please make it concise.| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0") | ||
| suspend fun getReceivedMessageIds(identityHash: String): List<String> |
There was a problem hiding this comment.
No LIMIT on startup ID load
getReceivedMessageIds returns every received message for the identity with no upper bound. For a long-lived account this could be tens of thousands of string IDs, all held in memory inside processedMessageIds for the entire session.
Consider adding a LIMIT clause (e.g. the most recent N messages), since messages older than some threshold are extremely unlikely to be replayed:
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0") | |
| suspend fun getReceivedMessageIds(identityHash: String): List<String> | |
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 ORDER BY COALESCE(receivedAt, timestamp) DESC LIMIT 5000") | |
| suspend fun getReceivedMessageIds(identityHash: String): List<String> |
Alternatively, filter by a recency window (e.g. AND COALESCE(receivedAt, timestamp) > :cutoff) to cap memory usage proportionally to message volume.
Prompt To Fix With AI
This is a comment left during a code review.
Path: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt
Line: 169-170
Comment:
**No LIMIT on startup ID load**
`getReceivedMessageIds` returns every received message for the identity with no upper bound. For a long-lived account this could be tens of thousands of string IDs, all held in memory inside `processedMessageIds` for the entire session.
Consider adding a `LIMIT` clause (e.g. the most recent N messages), since messages older than some threshold are extremely unlikely to be replayed:
```suggestion
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 ORDER BY COALESCE(receivedAt, timestamp) DESC LIMIT 5000")
suspend fun getReceivedMessageIds(identityHash: String): List<String>
```
Alternatively, filter by a recency window (e.g. `AND COALESCE(receivedAt, timestamp) > :cutoff`) to cap memory usage proportionally to message volume.
How can I resolve this? If you propose a fix, please make it concise.| if hasattr(self.router, 'pending_inbound') and lxmf_message in self.router.pending_inbound: | ||
| self.router.pending_inbound.remove(lxmf_message) | ||
| log_debug("ReticulumWrapper", "_on_lxmf_delivery", | ||
| f"Removed message from pending_inbound after successful callback (queue now has {len(self.router.pending_inbound)} messages)") |
There was a problem hiding this comment.
O(n) list scan on every successful delivery
pending_inbound is a plain Python list. Both the membership test (lxmf_message in self.router.pending_inbound) and the subsequent .remove() call scan the entire list linearly. Although the goal of this change is to keep the queue small, under high message rates or after a burst delivery there could be a transient performance penalty.
If pending_inbound is owned by the wrapper (or can be replaced), changing it to a list backed by an OrderedDict/set for O(1) lookup would be more robust. If it is a third-party LXMF router attribute that must stay a list, the current approach is acceptable but worth a note.
No code change is strictly required — just something to be aware of if performance issues arise in high-throughput environments.
Prompt To Fix With AI
This is a comment left during a code review.
Path: python/reticulum_wrapper.py
Line: 3336-3339
Comment:
**O(n) list scan on every successful delivery**
`pending_inbound` is a plain Python `list`. Both the membership test (`lxmf_message in self.router.pending_inbound`) and the subsequent `.remove()` call scan the entire list linearly. Although the goal of this change is to keep the queue small, under high message rates or after a burst delivery there could be a transient performance penalty.
If `pending_inbound` is owned by the wrapper (or can be replaced), changing it to a `list` backed by an `OrderedDict`/`set` for O(1) lookup would be more robust. If it is a third-party LXMF router attribute that must stay a `list`, the current approach is acceptable but worth a note.
No code change is strictly required — just something to be aware of if performance issues arise in high-throughput environments.
How can I resolve this? If you propose a fix, please make it concise.…onditions Tests now explicitly set up the message in pending_inbound before calling _on_lxmf_delivery, rather than relying on the implicit append side-effect. Also set _columba_rssi/_columba_snr to None to prevent MagicMock auto-attributes from causing JSON serialization failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents unbounded growth of processedMessageIds for long-running users by only pre-seeding message IDs from the last 30 days. Adds a since parameter to getReceivedMessageIds in DAO and repository. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
release/v0.9.xprocessedMessageIdsfrom DB at startup to prevent duplicate notifications when messages are replayed after service restartpending_inbounddict growth in Python wrapper to prevent unbounded memory usageTest plan
🤖 Generated with Claude Code