Prevent duplicate message notifications on service restart#650
Prevent duplicate message notifications on service restart#650torlando-tech merged 6 commits intomainfrom
Conversation
Greptile SummaryThis PR implements a two-pronged deduplication mechanism to prevent spurious notifications on service restart: the Kotlin Key changes:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as App Startup
participant MC as MessageCollector
participant CR as ConversationRepository
participant DB as MessageDao (DB)
participant Flow as reticulumProtocol.observeMessages()
participant NH as NotificationHelper
App->>MC: startCollecting()
MC->>MC: scope.launch { ... }
note over MC: Pre-seed phase (sequential, before subscribe)
MC->>CR: getReceivedMessageIds(since = now - 30d)
CR->>DB: getReceivedMessageIds(identityHash, since)
DB-->>CR: List<String> (recent msg IDs)
CR-->>MC: existingIds
MC->>MC: processedMessageIds.addAll(existingIds)
note over MC: Subscribe phase
MC->>Flow: observeMessages().collect { ... }
Flow-->>MC: replay: receivedMessage (hash = "X")
alt hash in processedMessageIds (pre-seeded)
MC->>MC: skip (return@collect) — no notification
else hash NOT in cache
MC->>DB: getMessageById(hash)
alt already in DB
MC->>MC: add to cache
alt isRead == false
MC->>NH: notifyMessageReceived(...)
end
else new message
MC->>DB: saveMessage(...)
MC->>NH: notifyMessageReceived(...)
end
end
Last reviewed commit: 2c22108 |
app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt
Outdated
Show resolved
Hide resolved
…owth Two root causes of duplicate notifications: 1. pending_inbound queue grows forever because messages are only cleared during poll_received_messages() which is rarely called in event-driven mode. On service restart, drainPendingMessages() re-broadcasts all accumulated messages, triggering duplicate notifications. 2. MessageCollector's in-memory processedMessageIds cache is lost on app process restart, so replayed/re-broadcast messages trigger notifications for unread messages that were already notified. Fixes: - Python: Remove message from pending_inbound after successful Kotlin callback, keeping it only as fallback when callback fails or isn't registered - Kotlin: Pre-seed processedMessageIds from DB at startup before subscribing to message flow, preventing re-notification for existing messages - Add MessageDao.getReceivedMessageIds() for efficient bulk ID lookup - Add ConversationRepository.getReceivedMessageIds() to expose the query https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd
…on test The pre-seeded message dedup test verifies absence of notifications, which is inherently a verify-only pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The NoVerifyOnlyTests rule is non-suppressable. Add assertEquals on messagesCollected counter to verify the dedup early-return path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
150def5 to
5a25a56
Compare
…onditions Tests now explicitly set up the message in pending_inbound before calling _on_lxmf_delivery, rather than relying on the implicit append side-effect. Also set _columba_rssi/_columba_snr to None to prevent MagicMock auto-attributes from causing JSON serialization failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents unbounded growth of processedMessageIds for long-running users by only pre-seeding message IDs from the last 30 days. Adds a since parameter to getReceivedMessageIds in DAO and repository. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| * Used to pre-seed duplicate notification prevention cache at startup. | ||
| * Bounded to recent messages to avoid unbounded memory growth. | ||
| */ | ||
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since") |
There was a problem hiding this comment.
Bug: The getReceivedMessageIds query filters by timestamp (sender's clock) instead of receivedAt (local time), which can cause messages to be missed by the notification deduplication logic.
Severity: MEDIUM
Suggested Fix
The query should be updated to filter on COALESCE(receivedAt, timestamp) >= :since. This aligns with the established pattern in MessageDao.kt and correctly filters messages by their local arrival time, making the deduplication logic robust against sender clock skew.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt#L170
Potential issue: The database query at `MessageDao.kt:170` filters messages using the
sender's `timestamp` instead of the local reception time, `receivedAt`. This is
inconsistent with other queries in the file that use `COALESCE(receivedAt, timestamp)`
to protect against sender clock skew. Because the new notification deduplication feature
relies on this query to pre-seed a cache of recent messages, any message from a sender
with a significantly skewed clock will be missed. This will result in a duplicate
notification for that message upon app restart, defeating the purpose of the change.
Did we get this right? 👍 / 👎 to inform future reviews.
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since") | ||
| suspend fun getReceivedMessageIds( | ||
| identityHash: String, | ||
| since: Long, | ||
| ): List<String> |
There was a problem hiding this comment.
Pre-seed window filters on sender timestamp, not local receipt time
The query uses timestamp >= :since where timestamp is the sender's clock value. For messages from peers whose clocks are significantly wrong (e.g., set months in the past), timestamp may be much older than 30 days even though the message was received locally yesterday. Those messages would be excluded from pre-seeding, and if they are re-broadcast by drainPendingMessages() on restart while still unread, the fallback DB-check path in MessageCollector (lines 111–167) would fire a notification for them again — exactly the duplicate this PR aims to prevent.
Using receivedAt (local reception time) would be more accurate here. Since receivedAt is nullable for older/imported messages, a COALESCE fallback keeps backward compatibility:
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since") | |
| suspend fun getReceivedMessageIds( | |
| identityHash: String, | |
| since: Long, | |
| ): List<String> | |
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND COALESCE(receivedAt, timestamp) >= :since") | |
| suspend fun getReceivedMessageIds( | |
| identityHash: String, | |
| since: Long, | |
| ): List<String> |
Prompt To Fix With AI
This is a comment left during a code review.
Path: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt
Line: 170-174
Comment:
**Pre-seed window filters on sender timestamp, not local receipt time**
The query uses `timestamp >= :since` where `timestamp` is the sender's clock value. For messages from peers whose clocks are significantly wrong (e.g., set months in the past), `timestamp` may be much older than 30 days even though the message was received locally yesterday. Those messages would be excluded from pre-seeding, and if they are re-broadcast by `drainPendingMessages()` on restart while still unread, the fallback DB-check path in `MessageCollector` (lines 111–167) would fire a notification for them again — exactly the duplicate this PR aims to prevent.
Using `receivedAt` (local reception time) would be more accurate here. Since `receivedAt` is nullable for older/imported messages, a `COALESCE` fallback keeps backward compatibility:
```suggestion
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND COALESCE(receivedAt, timestamp) >= :since")
suspend fun getReceivedMessageIds(
identityHash: String,
since: Long,
): List<String>
```
How can I resolve this? If you propose a fix, please make it concise.… removal Successful callbacks now remove messages from pending_inbound, so the queue should be empty (0) after processing, not still holding the message (1). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| * Used to pre-seed duplicate notification prevention cache at startup. | ||
| * Bounded to recent messages to avoid unbounded memory growth. | ||
| */ | ||
| @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since") |
There was a problem hiding this comment.
Bug: The query in getReceivedMessageIds uses the sender's timestamp instead of the local receivedAt timestamp, which can cause deduplication to fail for senders with significant clock skew.
Severity: MEDIUM
Suggested Fix
Update the getReceivedMessageIds query to filter by receivedAt or COALESCE(receivedAt, timestamp) instead of timestamp. This will ensure the time comparison is made against the receiver's local clock, consistent with how the 30-day window is calculated and how other temporal queries in the app handle clock skew.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt#L170
Potential issue: The `getReceivedMessageIds` query filters messages using the sender's
`timestamp` against a 30-day window calculated from the receiver's local clock. If a
sender's device clock is set significantly in the past, a recently received message can
have a `timestamp` that falls outside this 30-day window. This causes the message to be
excluded from the pre-seeded deduplication cache. As a result, when the service
restarts, the message may be re-processed, leading to a duplicate notification for the
user. This is inconsistent with other temporal queries in the codebase which use
`COALESCE(receivedAt, timestamp)` to handle clock skew.
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
Summary
This PR implements a comprehensive deduplication mechanism to prevent duplicate message notifications when messages are replayed or re-broadcast after a service restart. The solution involves pre-seeding the in-memory dedup cache with existing messages from the database at startup, and cleaning up the pending message queue after successful callback processing.
Key Changes
Python (Reticulum Wrapper)
_on_lxmf_delivery(), the message is now removed fromrouter.pending_inboundto prevent queue growth and duplicate processing during restart recoveryKotlin (Message Collector)
MessageCollector.startCollecting()now pre-seedsprocessedMessageIdswith all existing received messages from the database viaconversationRepository.getReceivedMessageIds()drainPendingMessages()after restart won't trigger duplicate notificationspre-seeded message IDs prevent duplicate notifications on restartverifies the dedup behaviorData Layer (Repository & DAO)
ConversationRepository.getReceivedMessageIds()retrieves all received message IDs for the active identityMessageDao.getReceivedMessageIds()queries the database for all non-self messages, used to populate the dedup cache at startupImplementation Details
https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd