Skip to content

fix: prevent duplicate notifications on restart (backport to v0.9.x)#658

Merged
torlando-tech merged 5 commits intorelease/v0.9.xfrom
fix/duplicate-notifications-v0.9.x
Mar 11, 2026
Merged

fix: prevent duplicate notifications on restart (backport to v0.9.x)#658
torlando-tech merged 5 commits intorelease/v0.9.xfrom
fix/duplicate-notifications-v0.9.x

Conversation

@torlando-tech
Copy link
Copy Markdown
Owner

Summary

Test plan

🤖 Generated with Claude Code

claude and others added 3 commits March 10, 2026 22:51
…owth

Two root causes of duplicate notifications:
1. pending_inbound queue grows forever because messages are only cleared
   during poll_received_messages() which is rarely called in event-driven mode.
   On service restart, drainPendingMessages() re-broadcasts all accumulated
   messages, triggering duplicate notifications.
2. MessageCollector's in-memory processedMessageIds cache is lost on app
   process restart, so replayed/re-broadcast messages trigger notifications
   for unread messages that were already notified.

Fixes:
- Python: Remove message from pending_inbound after successful Kotlin callback,
  keeping it only as fallback when callback fails or isn't registered
- Kotlin: Pre-seed processedMessageIds from DB at startup before subscribing
  to message flow, preventing re-notification for existing messages
- Add MessageDao.getReceivedMessageIds() for efficient bulk ID lookup
- Add ConversationRepository.getReceivedMessageIds() to expose the query

https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd
…on test

The pre-seeded message dedup test verifies absence of notifications,
which is inherently a verify-only pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The NoVerifyOnlyTests rule is non-suppressable. Add assertEquals on
messagesCollected counter to verify the dedup early-return path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines +88 to +90
// or re-broadcast by drainPendingMessages() after a service restart.
// Done inside the collection coroutine to ensure it completes before we subscribe.
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Pre-seeding processedMessageIds with all received message IDs, including unread ones, causes notifications to be permanently skipped for those messages if the app restarts after a crash.
Severity: CRITICAL

Suggested Fix

Modify the query in MessageDao.kt used by getReceivedMessageIds() to only return IDs for messages that should be skipped, such as those already marked as read. For example, add a WHERE isRead = 1 condition to the SQL query to avoid pre-seeding unread messages.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt#L88-L90

Potential issue: On app startup, the `getReceivedMessageIds()` query fetches all
received message IDs, regardless of their read status, and adds them to
`processedMessageIds`. If the app had previously crashed after receiving a message but
before showing a notification, that message's ID will be pre-seeded. When the message is
replayed, an early return check for duplicates in `processedMessageIds` prevents the
notification logic from ever being reached. This results in the permanent suppression of
notifications for unread messages that were pending before the crash.

Did we get this right? 👍 / 👎 to inform future reviews.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 11, 2026

Greptile Summary

This PR backports two targeted fixes from main to release/v0.9.x: (1) pre-seeding processedMessageIds in MessageCollector from the database at startup so that messages replayed via SharedFlow or drainPendingMessages after a service restart don't trigger duplicate notifications, and (2) removing a successfully-delivered lxmf_message from pending_inbound in the Python wrapper to cap unbounded list growth.

Key changes:

  • MessageCollector.kt — loads all received message IDs from the DB into processedMessageIds before subscribing to the message flow, so replayed messages are immediately filtered out.
  • MessageDao.kt / ConversationRepository.kt — new getReceivedMessageIds() query and repository method to support the above.
  • reticulum_wrapper.py — removes the message object from router.pending_inbound after a successful Kotlin callback, preventing the queue from growing unbounded.
  • Both sides include new unit tests covering the happy path.

Notable concern: The pre-seeding strategy loads all received message IDs (including unread ones) into processedMessageIds. Because the early-exit guard (return@collect) fires before the existingMessage.isRead notification branch, messages that were persisted to the DB (e.g. by ServicePersistenceManager in a separate process) but never notified before a crash will silently receive no notification after restart — a regression compared to the previous behaviour. Consider restricting the pre-seed to read messages only, or messages for which a notification has already been issued.

Confidence Score: 3/5

  • Safe to merge for the common case, but introduces a regression where unread persisted messages may never generate a notification after a crash.
  • The Python-side change is straightforward and low-risk. The Kotlin pre-seeding correctly solves the duplicate-notification problem but uses an overly broad predicate (all received messages) that inadvertently swallows the existing unread-message notification recovery path. The new tests cover the duplicate-prevention happy path but do not cover the crash-recovery regression scenario. Confidence is reduced accordingly.
  • app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt and data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt need attention for the notification regression and unbounded ID load respectively.

Important Files Changed

Filename Overview
app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt Adds DB pre-seeding of processedMessageIds at startup, but the approach skips the existingMessage.isRead notification recovery path, which can cause missed notifications for messages that were persisted (e.g. by ServicePersistenceManager) but not yet notified before a crash.
app/src/test/java/com/lxmf/messenger/service/MessageCollectorTest.kt Adds a pre-seed dedup test and default mock for getReceivedMessageIds; covers the happy-path duplicate-prevention scenario but does not test the crash-recovery regression (pre-seeded unread message that should still fire a notification).
data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt Adds getReceivedMessageIds query; correct SQL but no LIMIT clause, which could load unbounded rows into memory for large message histories.
data/src/main/java/com/lxmf/messenger/data/repository/ConversationRepository.kt Thin wrapper around the new DAO method; correctly handles missing active identity by returning emptyList().
python/reticulum_wrapper.py Removes message from pending_inbound after a successful Kotlin callback, capping unbounded list growth; uses O(n) list.remove() which is fine for small queues but worth monitoring under high throughput.
python/test_wrapper_messaging.py Two new tests cover both the success-path removal and failure-path retention of messages in pending_inbound; logic and assertions are correct.

Sequence Diagram

sequenceDiagram
    participant App as App Process
    participant MC as MessageCollector
    participant CR as ConversationRepository
    participant DB as Database
    participant RF as ReticulumProtocol (SharedFlow)
    participant NH as NotificationHelper

    App->>MC: startCollecting()
    activate MC

    MC->>CR: getReceivedMessageIds()
    CR->>DB: SELECT id FROM messages WHERE isFromMe=0
    DB-->>CR: [id1, id2, id3, ...]
    CR-->>MC: existingIds
    MC->>MC: processedMessageIds.addAll(existingIds)
    Note over MC: Pre-seed complete

    MC->>RF: observeMessages().collect { ... }

    RF-->>MC: receivedMessage (replayed after restart)

    alt messageHash in processedMessageIds
        MC->>MC: return@collect (skip entirely)
        Note over MC,NH: ⚠️ Unread+unnotified messages<br/>are also skipped here
    else messageHash NOT in processedMessageIds
        MC->>DB: getMessageById(hash)
        alt message already in DB
            DB-->>MC: existingMessage
            alt !existingMessage.isRead
                MC->>NH: notifyMessageReceived(...)
            end
        else brand-new message
            MC->>DB: saveMessage(...)
            MC->>NH: notifyMessageReceived(...)
        end
        MC->>MC: processedMessageIds.add(hash)
    end

    deactivate MC
Loading

Last reviewed commit: a90e2f5

Comment on lines +86 to +96
// Pre-seed processedMessageIds with all existing received messages from the DB.
// This prevents duplicate notifications when messages are replayed via SharedFlow
// or re-broadcast by drainPendingMessages() after a service restart.
// Done inside the collection coroutine to ensure it completes before we subscribe.
try {
val existingIds = conversationRepository.getReceivedMessageIds()
processedMessageIds.addAll(existingIds)
Log.i(TAG, "Pre-seeded ${existingIds.size} existing message IDs for notification dedup")
} catch (e: Exception) {
Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-seeding bypasses the unread-notification check

Pre-seeding processedMessageIds with every DB-persisted received message causes the early return@collect at line 103 to fire for all of them, completely skipping the existingMessage.isRead notification branch (lines 108–173).

Consider this scenario:

  1. ServicePersistenceManager (running in the separate :reticulum process) saves a message to the DB.
  2. The app process crashes before MessageCollector reaches the notifyMessageReceived call.
  3. On the next restart, this pre-seeding loads that message's ID into processedMessageIds.
  4. When the message is replayed via the SharedFlow / drainPendingMessages, it hits the in processedMessageIds guard and returns immediately — the user never sees a notification.

Before this patch the replay would fall through to the existingMessage != null branch, check !existingMessage.isRead, and correctly fire the notification. The fix trades away that recovery path entirely.

A safer approach is to only pre-seed IDs for messages that have already been read (or for which a notification has already been shown), so that unread-but-persisted messages can still bubble up through the existing existingMessage.isRead gate.

Suggested change
// Pre-seed processedMessageIds with all existing received messages from the DB.
// This prevents duplicate notifications when messages are replayed via SharedFlow
// or re-broadcast by drainPendingMessages() after a service restart.
// Done inside the collection coroutine to ensure it completes before we subscribe.
try {
val existingIds = conversationRepository.getReceivedMessageIds()
processedMessageIds.addAll(existingIds)
Log.i(TAG, "Pre-seeded ${existingIds.size} existing message IDs for notification dedup")
} catch (e: Exception) {
Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e)
}
try {
val existingIds = conversationRepository.getReadReceivedMessageIds()
processedMessageIds.addAll(existingIds)
Log.i(TAG, "Pre-seeded ${existingIds.size} read message IDs for notification dedup")
} catch (e: Exception) {
Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e)
}

(This would require a new DAO query that adds AND isRead = 1 to the existing getReceivedMessageIds query.)

Prompt To Fix With AI
This is a comment left during a code review.
Path: app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt
Line: 86-96

Comment:
**Pre-seeding bypasses the unread-notification check**

Pre-seeding `processedMessageIds` with every DB-persisted received message causes the early `return@collect` at line 103 to fire for all of them, completely skipping the `existingMessage.isRead` notification branch (lines 108–173).

Consider this scenario:
1. `ServicePersistenceManager` (running in the separate `:reticulum` process) saves a message to the DB.
2. The app process crashes before `MessageCollector` reaches the `notifyMessageReceived` call.
3. On the next restart, this pre-seeding loads that message's ID into `processedMessageIds`.
4. When the message is replayed via the SharedFlow / `drainPendingMessages`, it hits the `in processedMessageIds` guard and returns immediately — the user never sees a notification.

Before this patch the replay would fall through to the `existingMessage != null` branch, check `!existingMessage.isRead`, and correctly fire the notification. The fix trades away that recovery path entirely.

A safer approach is to only pre-seed IDs for messages that have already been **read** (or for which a notification has already been shown), so that unread-but-persisted messages can still bubble up through the existing `existingMessage.isRead` gate.

```suggestion
                try {
                    val existingIds = conversationRepository.getReadReceivedMessageIds()
                    processedMessageIds.addAll(existingIds)
                    Log.i(TAG, "Pre-seeded ${existingIds.size} read message IDs for notification dedup")
                } catch (e: Exception) {
                    Log.w(TAG, "Failed to pre-seed message IDs - duplicate notifications may occur", e)
                }
```
(This would require a new DAO query that adds `AND isRead = 1` to the existing `getReceivedMessageIds` query.)

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +169 to +170
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0")
suspend fun getReceivedMessageIds(identityHash: String): List<String>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No LIMIT on startup ID load

getReceivedMessageIds returns every received message for the identity with no upper bound. For a long-lived account this could be tens of thousands of string IDs, all held in memory inside processedMessageIds for the entire session.

Consider adding a LIMIT clause (e.g. the most recent N messages), since messages older than some threshold are extremely unlikely to be replayed:

Suggested change
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0")
suspend fun getReceivedMessageIds(identityHash: String): List<String>
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 ORDER BY COALESCE(receivedAt, timestamp) DESC LIMIT 5000")
suspend fun getReceivedMessageIds(identityHash: String): List<String>

Alternatively, filter by a recency window (e.g. AND COALESCE(receivedAt, timestamp) > :cutoff) to cap memory usage proportionally to message volume.

Prompt To Fix With AI
This is a comment left during a code review.
Path: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt
Line: 169-170

Comment:
**No LIMIT on startup ID load**

`getReceivedMessageIds` returns every received message for the identity with no upper bound. For a long-lived account this could be tens of thousands of string IDs, all held in memory inside `processedMessageIds` for the entire session.

Consider adding a `LIMIT` clause (e.g. the most recent N messages), since messages older than some threshold are extremely unlikely to be replayed:

```suggestion
    @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 ORDER BY COALESCE(receivedAt, timestamp) DESC LIMIT 5000")
    suspend fun getReceivedMessageIds(identityHash: String): List<String>
```

Alternatively, filter by a recency window (e.g. `AND COALESCE(receivedAt, timestamp) > :cutoff`) to cap memory usage proportionally to message volume.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +3336 to +3339
if hasattr(self.router, 'pending_inbound') and lxmf_message in self.router.pending_inbound:
self.router.pending_inbound.remove(lxmf_message)
log_debug("ReticulumWrapper", "_on_lxmf_delivery",
f"Removed message from pending_inbound after successful callback (queue now has {len(self.router.pending_inbound)} messages)")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O(n) list scan on every successful delivery

pending_inbound is a plain Python list. Both the membership test (lxmf_message in self.router.pending_inbound) and the subsequent .remove() call scan the entire list linearly. Although the goal of this change is to keep the queue small, under high message rates or after a burst delivery there could be a transient performance penalty.

If pending_inbound is owned by the wrapper (or can be replaced), changing it to a list backed by an OrderedDict/set for O(1) lookup would be more robust. If it is a third-party LXMF router attribute that must stay a list, the current approach is acceptable but worth a note.

No code change is strictly required — just something to be aware of if performance issues arise in high-throughput environments.

Prompt To Fix With AI
This is a comment left during a code review.
Path: python/reticulum_wrapper.py
Line: 3336-3339

Comment:
**O(n) list scan on every successful delivery**

`pending_inbound` is a plain Python `list`. Both the membership test (`lxmf_message in self.router.pending_inbound`) and the subsequent `.remove()` call scan the entire list linearly. Although the goal of this change is to keep the queue small, under high message rates or after a burst delivery there could be a transient performance penalty.

If `pending_inbound` is owned by the wrapper (or can be replaced), changing it to a `list` backed by an `OrderedDict`/`set` for O(1) lookup would be more robust. If it is a third-party LXMF router attribute that must stay a `list`, the current approach is acceptable but worth a note.

No code change is strictly required — just something to be aware of if performance issues arise in high-throughput environments.

How can I resolve this? If you propose a fix, please make it concise.

torlando-tech and others added 2 commits March 10, 2026 23:00
…onditions

Tests now explicitly set up the message in pending_inbound before calling
_on_lxmf_delivery, rather than relying on the implicit append side-effect.
Also set _columba_rssi/_columba_snr to None to prevent MagicMock
auto-attributes from causing JSON serialization failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents unbounded growth of processedMessageIds for long-running users
by only pre-seeding message IDs from the last 30 days. Adds a since
parameter to getReceivedMessageIds in DAO and repository.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@torlando-tech torlando-tech merged commit 5bd0a3d into release/v0.9.x Mar 11, 2026
1 check passed
@torlando-tech torlando-tech deleted the fix/duplicate-notifications-v0.9.x branch March 11, 2026 03:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants