Skip to content

Prevent duplicate message notifications on service restart#650

Merged
torlando-tech merged 6 commits intomainfrom
claude/fix-message-queue-issues-Kvd9z
Mar 11, 2026
Merged

Prevent duplicate message notifications on service restart#650
torlando-tech merged 6 commits intomainfrom
claude/fix-message-queue-issues-Kvd9z

Conversation

@torlando-tech
Copy link
Copy Markdown
Owner

Summary

This PR implements a comprehensive deduplication mechanism to prevent duplicate message notifications when messages are replayed or re-broadcast after a service restart. The solution involves pre-seeding the in-memory dedup cache with existing messages from the database at startup, and cleaning up the pending message queue after successful callback processing.

Key Changes

Python (Reticulum Wrapper)

  • Message queue cleanup: After a successful Kotlin callback in _on_lxmf_delivery(), the message is now removed from router.pending_inbound to prevent queue growth and duplicate processing during restart recovery
  • Added tests: Two new test cases verify that messages are removed on successful callback and retained on callback failure (for polling fallback)

Kotlin (Message Collector)

  • Pre-seeding dedup cache: On startup, MessageCollector.startCollecting() now pre-seeds processedMessageIds with all existing received messages from the database via conversationRepository.getReceivedMessageIds()
  • Prevents replay duplicates: This ensures that messages replayed via SharedFlow or re-broadcast by drainPendingMessages() after restart won't trigger duplicate notifications
  • Added test: New test case pre-seeded message IDs prevent duplicate notifications on restart verifies the dedup behavior

Data Layer (Repository & DAO)

  • New repository method: ConversationRepository.getReceivedMessageIds() retrieves all received message IDs for the active identity
  • New DAO query: MessageDao.getReceivedMessageIds() queries the database for all non-self messages, used to populate the dedup cache at startup

Implementation Details

  • Pre-seeding happens inside the collection coroutine to ensure it completes before message subscription begins
  • Error handling is in place: if pre-seeding fails, a warning is logged but collection continues (graceful degradation)
  • The solution handles both event-driven callbacks (Python) and polling fallback (Kotlin) scenarios
  • Messages are only removed from the pending queue after successful callback, maintaining reliability for failed callbacks

https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 11, 2026

Greptile Summary

This PR implements a two-pronged deduplication mechanism to prevent spurious notifications on service restart: the Kotlin MessageCollector now pre-seeds its in-memory processedMessageIds cache with message IDs from the last 30 days before subscribing to the message flow, and the Python wrapper removes messages from pending_inbound after a successful Kotlin callback so the queue doesn't grow and replay stale entries on restart.

Key changes:

  • MessageCollector.startCollecting() pre-seeds processedMessageIds from the DB inside the collection coroutine (ensuring completion before the collect subscription begins), with graceful degradation on failure
  • reticulum_wrapper._on_lxmf_delivery() removes the message from pending_inbound after a successful callback; a failed callback preserves the entry for the polling fallback
  • MessageDao and ConversationRepository expose a new getReceivedMessageIds(since) bounded query
  • All new paths have accompanying unit tests; the Python tests now correctly pre-populate pending_inbound before calling _on_lxmf_delivery, making them independent of internal append side-effects
  • One issue found: MessageDao.getReceivedMessageIds filters on the sender's timestamp rather than the local receivedAt column; messages from peers with significantly wrong clocks (>30 days off) could be excluded from pre-seeding and trigger a duplicate notification after restart

Confidence Score: 4/5

  • This PR is safe to merge; the deduplication logic is sound and well-tested, with one minor correctness issue in the DAO timestamp filter that only affects edge-case peers with incorrect clocks.
  • The core dedup approach is correct: pre-seeding is sequential within the coroutine (no race before subscription), graceful degradation is in place, and the Python queue cleanup is properly scoped inside the try block. The single issue — using timestamp (sender clock) instead of COALESCE(receivedAt, timestamp) (local receipt time) for the 30-day window — is a narrow edge case that only affects messages from peers with clocks set more than 30 days in the past and could cause a duplicate notification on restart for those messages while unread.
  • data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt — the getReceivedMessageIds query timestamp filter

Important Files Changed

Filename Overview
app/src/main/java/com/lxmf/messenger/service/MessageCollector.kt Adds sequential pre-seed of processedMessageIds (bounded to 30 days) before subscribing to observeMessages(); logic and error-handling are sound.
app/src/test/java/com/lxmf/messenger/service/MessageCollectorTest.kt New test correctly mocks getReceivedMessageIds in setUp and verifies that a pre-seeded ID blocks both the counter increment and the notification.
data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt New getReceivedMessageIds query filters on sender timestamp instead of local receivedAt, which can exclude recently-received messages whose sender clock is off by >30 days.
data/src/main/java/com/lxmf/messenger/data/repository/ConversationRepository.kt New getReceivedMessageIds(since) correctly delegates to DAO and gracefully returns empty list when no active identity is found.
python/reticulum_wrapper.py Post-callback removal from pending_inbound is correctly guarded by hasattr + membership check and placed inside the try block so failures preserve the queue entry.
python/test_wrapper_messaging.py Both new tests now correctly pre-populate pending_inbound before calling _on_lxmf_delivery, making them independent of the internal append side-effect.

Sequence Diagram

sequenceDiagram
    participant App as App Startup
    participant MC as MessageCollector
    participant CR as ConversationRepository
    participant DB as MessageDao (DB)
    participant Flow as reticulumProtocol.observeMessages()
    participant NH as NotificationHelper

    App->>MC: startCollecting()
    MC->>MC: scope.launch { ... }
    note over MC: Pre-seed phase (sequential, before subscribe)
    MC->>CR: getReceivedMessageIds(since = now - 30d)
    CR->>DB: getReceivedMessageIds(identityHash, since)
    DB-->>CR: List<String> (recent msg IDs)
    CR-->>MC: existingIds
    MC->>MC: processedMessageIds.addAll(existingIds)

    note over MC: Subscribe phase
    MC->>Flow: observeMessages().collect { ... }

    Flow-->>MC: replay: receivedMessage (hash = "X")
    alt hash in processedMessageIds (pre-seeded)
        MC->>MC: skip (return@collect) — no notification
    else hash NOT in cache
        MC->>DB: getMessageById(hash)
        alt already in DB
            MC->>MC: add to cache
            alt isRead == false
                MC->>NH: notifyMessageReceived(...)
            end
        else new message
            MC->>DB: saveMessage(...)
            MC->>NH: notifyMessageReceived(...)
        end
    end
Loading

Last reviewed commit: 2c22108

claude and others added 3 commits March 10, 2026 22:43
…owth

Two root causes of duplicate notifications:
1. pending_inbound queue grows forever because messages are only cleared
   during poll_received_messages() which is rarely called in event-driven mode.
   On service restart, drainPendingMessages() re-broadcasts all accumulated
   messages, triggering duplicate notifications.
2. MessageCollector's in-memory processedMessageIds cache is lost on app
   process restart, so replayed/re-broadcast messages trigger notifications
   for unread messages that were already notified.

Fixes:
- Python: Remove message from pending_inbound after successful Kotlin callback,
  keeping it only as fallback when callback fails or isn't registered
- Kotlin: Pre-seed processedMessageIds from DB at startup before subscribing
  to message flow, preventing re-notification for existing messages
- Add MessageDao.getReceivedMessageIds() for efficient bulk ID lookup
- Add ConversationRepository.getReceivedMessageIds() to expose the query

https://claude.ai/code/session_012tF2vga1ic7rinmS5E4Ndd
…on test

The pre-seeded message dedup test verifies absence of notifications,
which is inherently a verify-only pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The NoVerifyOnlyTests rule is non-suppressable. Add assertEquals on
messagesCollected counter to verify the dedup early-return path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
torlando-tech and others added 2 commits March 10, 2026 22:54
…onditions

Tests now explicitly set up the message in pending_inbound before calling
_on_lxmf_delivery, rather than relying on the implicit append side-effect.
Also set _columba_rssi/_columba_snr to None to prevent MagicMock
auto-attributes from causing JSON serialization failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents unbounded growth of processedMessageIds for long-running users
by only pre-seeding message IDs from the last 30 days. Adds a since
parameter to getReceivedMessageIds in DAO and repository.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@torlando-tech
Copy link
Copy Markdown
Owner Author

@greptileai

* Used to pre-seed duplicate notification prevention cache at startup.
* Bounded to recent messages to avoid unbounded memory growth.
*/
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The getReceivedMessageIds query filters by timestamp (sender's clock) instead of receivedAt (local time), which can cause messages to be missed by the notification deduplication logic.
Severity: MEDIUM

Suggested Fix

The query should be updated to filter on COALESCE(receivedAt, timestamp) >= :since. This aligns with the established pattern in MessageDao.kt and correctly filters messages by their local arrival time, making the deduplication logic robust against sender clock skew.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt#L170

Potential issue: The database query at `MessageDao.kt:170` filters messages using the
sender's `timestamp` instead of the local reception time, `receivedAt`. This is
inconsistent with other queries in the file that use `COALESCE(receivedAt, timestamp)`
to protect against sender clock skew. Because the new notification deduplication feature
relies on this query to pre-seed a cache of recent messages, any message from a sender
with a significantly skewed clock will be missed. This will result in a duplicate
notification for that message upon app restart, defeating the purpose of the change.

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +170 to +174
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since")
suspend fun getReceivedMessageIds(
identityHash: String,
since: Long,
): List<String>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-seed window filters on sender timestamp, not local receipt time

The query uses timestamp >= :since where timestamp is the sender's clock value. For messages from peers whose clocks are significantly wrong (e.g., set months in the past), timestamp may be much older than 30 days even though the message was received locally yesterday. Those messages would be excluded from pre-seeding, and if they are re-broadcast by drainPendingMessages() on restart while still unread, the fallback DB-check path in MessageCollector (lines 111–167) would fire a notification for them again — exactly the duplicate this PR aims to prevent.

Using receivedAt (local reception time) would be more accurate here. Since receivedAt is nullable for older/imported messages, a COALESCE fallback keeps backward compatibility:

Suggested change
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since")
suspend fun getReceivedMessageIds(
identityHash: String,
since: Long,
): List<String>
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND COALESCE(receivedAt, timestamp) >= :since")
suspend fun getReceivedMessageIds(
identityHash: String,
since: Long,
): List<String>
Prompt To Fix With AI
This is a comment left during a code review.
Path: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt
Line: 170-174

Comment:
**Pre-seed window filters on sender timestamp, not local receipt time**

The query uses `timestamp >= :since` where `timestamp` is the sender's clock value. For messages from peers whose clocks are significantly wrong (e.g., set months in the past), `timestamp` may be much older than 30 days even though the message was received locally yesterday. Those messages would be excluded from pre-seeding, and if they are re-broadcast by `drainPendingMessages()` on restart while still unread, the fallback DB-check path in `MessageCollector` (lines 111–167) would fire a notification for them again — exactly the duplicate this PR aims to prevent.

Using `receivedAt` (local reception time) would be more accurate here. Since `receivedAt` is nullable for older/imported messages, a `COALESCE` fallback keeps backward compatibility:

```suggestion
    @Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND COALESCE(receivedAt, timestamp) >= :since")
    suspend fun getReceivedMessageIds(
        identityHash: String,
        since: Long,
    ): List<String>
```

How can I resolve this? If you propose a fix, please make it concise.

… removal

Successful callbacks now remove messages from pending_inbound, so the
queue should be empty (0) after processing, not still holding the message (1).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* Used to pre-seed duplicate notification prevention cache at startup.
* Bounded to recent messages to avoid unbounded memory growth.
*/
@Query("SELECT id FROM messages WHERE identityHash = :identityHash AND isFromMe = 0 AND timestamp >= :since")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The query in getReceivedMessageIds uses the sender's timestamp instead of the local receivedAt timestamp, which can cause deduplication to fail for senders with significant clock skew.
Severity: MEDIUM

Suggested Fix

Update the getReceivedMessageIds query to filter by receivedAt or COALESCE(receivedAt, timestamp) instead of timestamp. This will ensure the time comparison is made against the receiver's local clock, consistent with how the 30-day window is calculated and how other temporal queries in the app handle clock skew.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: data/src/main/java/com/lxmf/messenger/data/db/dao/MessageDao.kt#L170

Potential issue: The `getReceivedMessageIds` query filters messages using the sender's
`timestamp` against a 30-day window calculated from the receiver's local clock. If a
sender's device clock is set significantly in the past, a recently received message can
have a `timestamp` that falls outside this 30-day window. This causes the message to be
excluded from the pre-seeded deduplication cache. As a result, when the service
restarts, the message may be re-processed, leading to a duplicate notification for the
user. This is inconsistent with other temporal queries in the codebase which use
`COALESCE(receivedAt, timestamp)` to handle clock skew.

@sentry
Copy link
Copy Markdown
Contributor

sentry bot commented Mar 11, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@torlando-tech torlando-tech merged commit 5fbf312 into main Mar 11, 2026
14 checks passed
@torlando-tech torlando-tech deleted the claude/fix-message-queue-issues-Kvd9z branch March 11, 2026 03:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants