Skip to content

Refactor/channel system#877

Merged
yinwm merged 66 commits intomainfrom
refactor/channel-system
Feb 27, 2026
Merged

Refactor/channel system#877
yinwm merged 66 commits intomainfrom
refactor/channel-system

Conversation

@alexhoshina
Copy link
Collaborator

@alexhoshina alexhoshina commented Feb 27, 2026

Refactor: Channel System Architecture Overhaul

Summary

This PR is a comprehensive refactoring of the channel (chat platform) subsystem, introducing a modular subpackage architecture, unified
abstractions, and several new infrastructure components. The goal is to eliminate duplicated logic across 13+ chat platform integrations, fix
known concurrency issues, and provide a clean extension pattern for future channels.

87 files changed, +10,926 / −1,608 lines

Motivation

The previous channel system had each platform implemented as a standalone file in pkg/channels/, leading to:

  • Duplicated message splitting, allow-list filtering, lifecycle management, and error handling across every channel
  • Goroutine/context leaks in Start/Stop paths
  • A deadlock in MessageBus under high concurrency
  • No retry or rate-limiting on outbound sends — a single API failure would silently drop messages
  • No unified way to handle media files, typing indicators, or placeholder messages

Architecture Changes

  1. Channel Subpackages & Factory Registry

Each channel implementation is now in its own subpackage (pkg/channels/telegram/, pkg/channels/discord/, etc.) and self-registers via init() →
channels.RegisterFactory(). The Manager discovers channels through this registry instead of hardcoded constructor calls.

New files: pkg/channels/registry.go, pkg/channels/{telegram,discord,slack,...}/init.go

  1. Enhanced BaseChannel

BaseChannel now provides:

  • atomic.Bool running state (was plain bool)
  • Functional options: WithMaxMessageLength(), WithGroupTrigger(), WithReasoningChannelID()
  • Unified allow-list filtering via IsAllowedSender(bus.SenderInfo) with canonical identity matching
  • Auto-orchestration of typing indicators, placeholder messages, and reactions via capability interfaces
  1. Manager Overhaul (pkg/channels/manager.go)

The Manager is now a full message routing and delivery engine:

  • Per-channel worker queues — each channel gets a dedicated goroutine with a buffered queue, eliminating cross-channel blocking
  • Automatic message splitting — queries MessageLengthProvider interface and splits via SplitMessage() before sending
  • Per-channel rate limiting — configurable per platform (e.g. Telegram 20/s, Discord 1/s)
  • Retry with error classification — ErrRateLimit → fixed delay, ErrTemporary → exponential backoff, ErrNotRunning/ErrSendFailed → no retry
  • Shared HTTP server — channels that need webhooks register on a single http.ServeMux instead of each starting their own listener
  • TTL janitor — cleans up stale typing/placeholder/reaction state to prevent memory leaks on error paths
  • Placeholder/Typing/Reaction lifecycle — preSend() automatically stops typing, undoes reactions, and edits placeholders before sending the real
    response
  1. Capability Interfaces (pkg/channels/interfaces.go)

Channels opt into advanced features via interface implementation:

  • TypingCapable — show typing indicator
  • MessageEditor — edit existing messages
  • PlaceholderCapable — send "Thinking..." placeholder
  • ReactionCapable — add/remove reactions
  • MediaSender — send outbound media (images, files)
  • MessageLengthProvider — advertise max message length
  • WebhookHandler / HealthChecker — register on shared HTTP server
  1. Error Classification (pkg/channels/errors.go, errutil.go)

Sentinel error types (ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed) with ClassifyHTTPError() helper that maps HTTP status codes to
the appropriate sentinel.

  1. Message Splitting (pkg/channels/split.go)

Moved from pkg/utils/ to pkg/channels/, rewritten to be code-block-aware: preserves ``` fenced code blocks by extending or injecting
close/reopen fences at split boundaries.

New Packages

pkg/identity/ — Unified User Identity

Canonical "platform:id" format with backward-compatible matching against all legacy allow-list formats ("123456", "@alice", "123456|alice",
"telegram:123456").

pkg/media/ — Media File Lifecycle

MediaStore interface with FileMediaStore implementation: scope-based file registration, ref-based resolution (media://), and optional
background TTL cleanup.

New Channels

Pico Protocol (pkg/channels/pico/)

A native WebSocket channel serving as the reference implementation for all capability interfaces. Token-authenticated, supports multiple
concurrent connections per session.

WhatsApp Native (pkg/channels/whatsapp_native/)

Direct WhatsApp integration via whatsmeow library (no bridge required). Build-tag gated (whatsapp_native) to keep the default binary small.
Includes QR code terminal login flow.

Bus & Agent Changes

MessageBus (pkg/bus/)

  • Buffer size increased from 16 → 64
  • Fixed a deadlock in concurrent Publish/Subscribe paths
  • Peer and MessageID promoted from metadata map to structured fields on InboundMessage
  • Added OutboundMediaMessage type and corresponding Pub/Sub methods
  • Context cancellation handling improvements

Agent Loop (pkg/agent/loop.go)

  • Removed agentMu mutex (was unnecessary, agent loop is single-goroutine)
  • LLM Reasoning field support — reasoning content is routed to a dedicated channel via reasoning_channel_id config
  • Reasoning messages flow through the same Manager → worker → SplitMessage → sendWithRetry pipeline as normal messages, so they are
    automatically split per platform limits

Other Changes

  • Config: Added reasoning_channel_id, use_native, session_store_path, group_trigger fields across all channel configs
  • Makefile: Added build-whatsapp and build-all-whatsapp targets for WhatsApp native builds
  • Routing: SessionKey() helper for consistent "channel:chatID" session key generation
  • Docs: Bilingual (EN/ZH) architecture documentation in pkg/channels/README.md

Breaking Changes

  • Channel constructors are no longer exported from pkg/channels/ — use the factory registry or subpackage imports
  • pkg/utils.SplitMessage removed — use pkg/channels.SplitMessage
  • bus.InboundMessage.Metadata["peer"] / Metadata["message_id"] → use .Peer / .MessageID fields directly

alexhoshina and others added 30 commits February 20, 2026 23:18
…tructured fields

Add bus.Peer struct and explicit Peer/MessageID fields to InboundMessage,
replacing the implicit peer_kind/peer_id/message_id metadata convention.

- Add Peer{Kind, ID} type to pkg/bus/types.go
- Extend InboundMessage with Peer and MessageID fields
- Change BaseChannel.HandleMessage signature to accept peer and messageID
- Adapt all 12 channel implementations to pass structured peer/messageID
- Simplify agent extractPeer() to read msg.Peer directly
- extractParentPeer unchanged (parent_peer still via metadata)
…ext leaks

- OneBot: remove close(ch) race in Stop() pending cleanup; add WriteDeadline to Send/sendAPIRequest
- Telegram: add cancelCtx; Stop() now calls bh.Stop(), cancel(), and cleans up thinking CancelFuncs
- Discord: add cancelCtx via WithCancel; Stop() calls cancel(); remove unused getContext()
- WhatsApp: add cancelCtx; Send() adds WriteDeadline; replace stdlib log with project logger
- MaixCam: add cancelCtx; Send() adds WriteDeadline; Stop() calls cancel() before closing
…r queues

Move message splitting from individual channels (Discord) to the Manager
layer via per-channel worker goroutines. Each channel now declares its
max message length through BaseChannelOption/MessageLengthProvider, and
the Manager automatically splits oversized outbound messages before
dispatch. This prevents one slow channel from blocking all others.

- Add WithMaxMessageLength option and MessageLengthProvider interface
- Set platform-specific limits (Discord 2000, Telegram 4096, Slack 40000, etc.)
- Convert SplitMessage to rune-aware counting for correct Unicode handling
- Replace single dispatcher goroutine with per-channel buffered worker queues
- Remove Discord's internal SplitMessage call (now handled centrally)
…gement

Channels previously deleted downloaded media files via defer os.Remove,
racing with the async Agent consumer. Introduce MediaStore to decouple
file ownership: channels register files on download, Agent releases them
after processing via ReleaseAll(scope).

- New pkg/media with MediaStore interface + FileMediaStore implementation
- InboundMessage gains MediaScope field for lifecycle tracking
- BaseChannel gains SetMediaStore/GetMediaStore + BuildMediaScope helper
- Manager injects MediaStore into channels; AgentLoop releases on completion
- Telegram, Discord, Slack, OneBot, LINE channels migrated from defer
  os.Remove to store.Store() with media:// refs
… error classification

Define sentinel error types (ErrNotRunning, ErrRateLimit, ErrTemporary,
ErrSendFailed) so the Manager can classify Send failures and choose the
right retry strategy: permanent errors bail immediately, rate-limit
errors use a fixed 1s delay, and temporary/unknown errors use exponential
backoff (500ms→1s→2s, capped at 8s, up to 3 retries). A per-channel
token-bucket rate limiter (golang.org/x/time/rate) throttles outbound
sends before they hit the platform API.
PublishInbound/PublishOutbound held RLock during blocking channel sends,
deadlocking against Close() which needs a write lock when the buffer is
full. ConsumeInbound/SubscribeOutbound used bare receives instead of
comma-ok, causing zero-value processing or busy loops after close.

Replace sync.RWMutex+bool with atomic.Bool+done channel so Publish
methods use a lock-free 3-way select (send / done / ctx.Done). Add
context.Context parameter to both Publish methods so callers can cancel
or timeout blocked sends. Close() now only sets the atomic flag and
closes the done channel—never closes the data channels—eliminating
send-on-closed-channel panics.

- Remove dead code: RegisterHandler, GetHandler, handlers map,
  MessageHandler type (zero callers across the whole repo)
- Add ErrBusClosed sentinel error
- Update all 10 caller sites to pass context
- Add msgBus.Close() to gateway and agent shutdown flows
- Add pkg/bus/bus_test.go with 11 test cases covering basic round-trip,
  context cancellation, closed-bus behavior, concurrent publish+close,
  full-buffer timeout, and idempotent Close
…el types

All 12 channel Send methods now return proper sentinel errors (ErrNotRunning,
ErrTemporary, ErrRateLimit, ErrSendFailed) instead of plain fmt.Errorf strings,
enabling Manager's sendWithRetry classification logic to actually work.

- Add ClassifySendError/ClassifyNetError helpers in errutil.go for HTTP-based channels
- LINE/WeCom Bot/WeCom App: use ClassifySendError for HTTP status-based classification
- SDK channels (Telegram/Discord/Slack/QQ/DingTalk/Feishu): wrap errors as ErrTemporary
- WebSocket channels (OneBot/WhatsApp/MaixCam): wrap write errors as ErrTemporary
- WhatsApp: add missing IsRunning() check in Send
- WhatsApp/OneBot/MaixCam: add ctx.Done() check before entering write path
- Telegram Stop: clean up placeholders sync.Map to prevent state leaks
…ed by Manager

Merge 3 independent channel HTTP servers (LINE :18791, WeCom Bot :18793,
WeCom App :18792) and the health server (:18790) into a single shared
HTTP server on the Gateway address. Channels implement WebhookHandler
and/or HealthChecker interfaces to register their handlers on the shared
mux. Also change Gateway default host from 0.0.0.0 to 127.0.0.1 for
security.
Add outbound media sending capability so the agent can publish media
attachments (images, files, audio, video) through channels via the bus.

- Add MediaPart and OutboundMediaMessage types to bus
- Add PublishOutboundMedia/SubscribeOutboundMedia bus methods
- Add MediaSender interface discovered via type assertion by Manager
- Add media dispatch/worker in Manager with shared retry logic
- Extend ToolResult with Media field and MediaResult constructor
- Publish outbound media from agent loop on tool results
- Implement SendMedia for Telegram, Discord, Slack, LINE, OneBot, WeCom
Remove SetTranscriber and inline transcription logic from 4 channels
(Telegram, Discord, Slack, OneBot) and the gateway wiring. Voice/audio
files are still downloaded and stored in MediaStore with simple text
annotations ([voice], [audio: filename], [file: name]). The pkg/voice
package is preserved for future Agent-level transcription middleware.
Add unified ShouldRespondInGroup to BaseChannel, replacing scattered
per-channel group filtering logic. Introduce GroupTriggerConfig (with
mention_only + prefixes), TypingConfig, and PlaceholderConfig types.
Migrate Discord MentionOnly, OneBot checkGroupTrigger, and LINE
hardcoded mention-only to the shared mechanism. Add group trigger
entry points for Slack, Telegram, QQ, Feishu, DingTalk, and WeCom.
Legacy config fields are preserved with automatic migration.
…hannel (Phase 10 + 7)

Phase 10: Define TypingCapable, MessageEditor, PlaceholderRecorder interfaces.
Manager orchestrates outbound typing stop and placeholder editing via preSend.
Migrate Telegram, Discord, Slack, OneBot to register state with Manager instead
of handling locally in Send. Phase 7: Add native WebSocket Pico Protocol channel
as reference implementation of all optional capability interfaces.
Message splitting is exclusively a Manager responsibility. Moving it
into the channels package eliminates the cross-package dependency and
aligns with the refactoring plan.
- MediaStore: use full UUID to prevent ref collisions, preserve and
  expose metadata via ResolveWithMeta, include underlying OS errors
- Agent loop: populate MediaPart Type/Filename/ContentType from
  MediaStore metadata so channels can dispatch media correctly
- SplitMessage: fix byte-vs-rune index mixup in code block header
  parsing, remove dead candidateStr variable
- Pico auth: restrict query-param token behind AllowTokenQuery config
  flag (default false) to prevent token leakage via logs/referer
- HandleMessage: replace context.TODO with caller-propagated ctx,
  log PublishInbound failures instead of silently discarding
- Gateway shutdown: use fresh 15s timeout context for StopAll so
  graceful shutdown is not short-circuited by the cancelled parent ctx
…format

Introduce SenderInfo struct and pkg/identity package to standardize user
identification across all channels. Each channel now constructs structured
sender info (platform, platformID, canonicalID, username, displayName)
instead of ad-hoc string IDs. Allow-list matching supports all legacy
formats (numeric ID, @username, id|username) plus the new canonical
"platform:id" format. Session key resolution also handles canonical
peerIDs for backward-compatible identity link matching.
@alexhoshina alexhoshina marked this pull request as ready for review February 27, 2026 12:45
Copilot AI review requested due to automatic review settings February 27, 2026 12:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Large refactor of the channel subsystem to a modular, capability-based architecture with unified routing, identity, media lifecycle management, and improved bus reliability/backpressure handling.

Changes:

  • Introduces channel subpackages + factory registry, capability interfaces (typing/placeholder/reaction/media/webhooks), and moves message splitting into pkg/channels/ with rune-aware + code-block-aware behavior.
  • Adds unified identity matching (platform:id) and a media store with scope-based lifecycle + optional TTL cleanup; tools can now return media refs which publish as outbound media messages.
  • Overhauls the message bus API (ctx-aware publish with error returns, deadlock fixes) and updates agent loop + multiple tools/services/channels to use the new routing fields and publish semantics.

Reviewed changes

Copilot reviewed 85 out of 87 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pkg/utils/string.go Adds SanitizeMessageContent helper for stripping non-graphic/control Unicode.
pkg/utils/string_test.go Adds tests for SanitizeMessageContent.
pkg/utils/message.go Removes legacy SplitMessage implementation (moved/refactored elsewhere).
pkg/utils/message_test.go Removes legacy SplitMessage tests (moved to channels).
pkg/tools/subagent.go Publishes subagent completion via bus with timeout context.
pkg/tools/result.go Adds tool result media refs + helper constructor.
pkg/tools/cron.go Publishes cron output via bus with timeout contexts.
pkg/routing/session_key.go Improves identity link resolution for canonical platform:id peer IDs.
pkg/routing/session_key_test.go Adds tests for canonical/bare peer ID identity link resolution.
pkg/providers/protocoltypes/types.go Extends provider response schema with reasoning fields/details.
pkg/providers/openai_compat/provider.go Parses OpenAI-compat reasoning fields/details.
pkg/migrate/config.go Migrates WhatsApp native config fields (use_native, session_store_path).
pkg/media/store.go Adds in-memory media store with scoped lifecycle + TTL cleanup goroutine.
pkg/identity/identity.go Adds canonical identity utilities + backward-compatible allowlist matching.
pkg/identity/identity_test.go Adds test coverage for canonical ID and allowlist matching.
pkg/heartbeat/service.go Publishes heartbeat responses via bus with timeout context.
pkg/health/server.go Adds RegisterOnMux for shared HTTP server mounting.
pkg/devices/service.go Publishes device notifications via bus with timeout context.
pkg/config/defaults.go Adds defaults for new channel fields (typing/placeholder/group trigger/pico) + media cleanup config.
pkg/channels/registry.go Adds factory registry for channel discovery/creation.
pkg/channels/interfaces.go Adds capability interfaces (typing/edit/reaction/placeholder recorder).
pkg/channels/webhook.go Adds webhook/health optional interfaces for shared HTTP server.
pkg/channels/media.go Adds MediaSender interface for outbound media routing.
pkg/channels/split.go New rune-aware message splitter with fenced-code-block preservation.
pkg/channels/split_test.go Adds extensive tests for new splitting helpers + behavior.
pkg/channels/errors.go Introduces sentinel channel send errors for retry classification.
pkg/channels/errors_test.go Tests sentinel error behavior/messages.
pkg/channels/errutil.go Adds HTTP/network error classification helpers.
pkg/channels/errutil_test.go Tests send/network error classification helpers.
pkg/channels/base.go Refactors BaseChannel: atomic running, options, group trigger logic, structured allowlist, media scope, capability orchestration, ctx-aware publish.
pkg/channels/base_test.go Adds tests for group trigger + structured allowlist logic.
pkg/bus/types.go Adds structured Peer, SenderInfo, message IDs, and outbound media message types.
pkg/bus/bus.go Refactors bus to ctx-aware publishing with error returns + close signaling/drain behavior.
pkg/bus/bus_test.go Adds concurrency/backpressure/close behavior tests for new bus semantics.
pkg/agent/loop.go Updates agent loop for new bus API, reasoning routing, tool media publishing, and peer extraction from structured fields.
pkg/agent/loop_test.go Adds tests for reasoning routing + updates for channel interface changes.
pkg/channels/pico/protocol.go Defines Pico Protocol message types and wire format.
pkg/channels/pico/pico.go Implements Pico WebSocket channel + capabilities (typing/placeholder/edit).
pkg/channels/pico/init.go Registers Pico channel factory.
pkg/channels/telegram/telegram_commands.go Moves Telegram implementation into telegram subpackage.
pkg/channels/telegram/init.go Registers Telegram channel factory.
pkg/channels/slack/slack.go Moves Slack into subpackage; adds media sending + reaction capability; updates allowlist/group trigger.
pkg/channels/slack/slack_test.go Updates Slack tests package location.
pkg/channels/slack/init.go Registers Slack channel factory.
pkg/channels/line/line.go Moves LINE into subpackage; migrates to shared HTTP server hooks; adds typing + media store integration.
pkg/channels/line/init.go Registers LINE channel factory.
pkg/channels/wecom/common.go Extracts shared WeCom crypto/signature helpers.
pkg/channels/wecom/bot.go Moves WeCom bot into subpackage; migrates to shared HTTP server hooks; adds allowlist/group trigger + error classification.
pkg/channels/wecom/bot_test.go Updates tests to new helper function names/package.
pkg/channels/wecom/app_test.go Updates tests to new helper function names/package.
pkg/channels/wecom/init.go Registers WeCom bot + app factories.
pkg/channels/dingtalk/dingtalk.go Moves DingTalk into subpackage; adds group trigger + structured allowlist.
pkg/channels/dingtalk/init.go Registers DingTalk channel factory.
pkg/channels/feishu/feishu_64.go Moves Feishu into subpackage; adds group trigger + structured allowlist + updated message handling.
pkg/channels/feishu/feishu_32.go Updates Feishu 32-bit stub to new package/base channel types.
pkg/channels/feishu/common.go Adds shared stringValue helper for Feishu.
pkg/channels/feishu/init.go Registers Feishu channel factory.
pkg/channels/qq/qq.go Moves QQ into subpackage; adds group trigger + structured allowlist + updated routing fields.
pkg/channels/qq/init.go Registers QQ channel factory.
pkg/channels/maixcam/maixcam.go Moves MaixCam into subpackage; adds ctx cancellation and structured allowlist/peer routing.
pkg/channels/maixcam/init.go Registers MaixCam channel factory.
pkg/channels/onebot/init.go Registers OneBot channel factory.
pkg/channels/discord/init.go Registers Discord channel factory.
pkg/channels/whatsapp/whatsapp.go Moves WhatsApp bridge channel into subpackage; adds ctx cancel handling, structured allowlist, and error classification.
pkg/channels/whatsapp/init.go Registers WhatsApp (bridge) channel factory.
pkg/channels/whatsapp_native/whatsapp_native.go Adds build-tagged native WhatsApp channel using whatsmeow (QR login + reconnect backoff).
pkg/channels/whatsapp_native/whatsapp_native_stub.go Adds non-tag stub returning a helpful error when native support isn’t compiled in.
pkg/channels/whatsapp_native/init.go Registers WhatsApp native channel factory.
cmd/picoclaw/internal/gateway/helpers.go Wires in registry-based channel imports, shared HTTP server setup, and media store creation/injection.
cmd/picoclaw/internal/agent/helpers.go Ensures MessageBus is closed in CLI agent mode.
config/config.example.json Updates config example for new channel fields + reasoning channel IDs.
docs/troubleshooting.md Adds troubleshooting docs for OpenRouter model configuration.
README.md Documents WhatsApp native option and adds Pi Zero build notes.
Makefile Adds WhatsApp-native build target + Pi Zero build helpers; extends build-all to include linux/arm.
go.mod Adds deps for whatsmeow/sqlite/protobuf/qrterminal and related indirect deps.
.goreleaser.yaml Minor formatting cleanup for arm goarm list.
Comments suppressed due to low confidence (3)

pkg/channels/slack/slack.go:223

  • ReactToMessage ignores errors from AddReaction/RemoveReaction and always returns nil error. If reactions fail (missing permissions, invalid timestamp, rate limits), Manager will assume the reaction was applied and will later attempt to undo it. Consider checking/returning the Slack API errors (and making the undo function swallow errors to remain idempotent).
    pkg/agent/loop.go:540
  • PublishOutbound now returns an error (ctx canceled, bus closed, buffer full timeout if ctx has deadline), but the return value is ignored here. This can silently drop user-visible responses. Consider handling/logging the error (and possibly using a short timeout context for publishes to avoid blocking the agent loop under backpressure).
	if opts.SendResponse {
		al.bus.PublishOutbound(ctx, bus.OutboundMessage{
			Channel: opts.Channel,
			ChatID:  opts.ChatID,
			Content: finalContent,
		})

pkg/channels/slack/slack.go:174

  • SendMedia continues when a media ref fails to resolve, but still returns nil overall. This can silently drop attachments while the agent thinks delivery succeeded. Consider returning an error (ideally classified as ErrSendFailed/ErrTemporary) when any part cannot be resolved, or explicitly reporting partial failure.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +160 to +169
for _, p := range paths {
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
logger.WarnCF("media", "release: failed to remove file", map[string]any{
"path": p,
"error": err.Error(),
})
}
}

return nil
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReleaseAll returns nil even when os.Remove fails with an error other than IsNotExist (it only logs). This makes it impossible for callers to detect cleanup failures, despite the method signature returning error and the comment only mentioning ignoring file-not-exist errors. Consider returning a (possibly aggregated) error for non-ENOENT failures, or update the contract/comment if failures are intentionally ignored.

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +40
// writeJSON sends a JSON message to the connection with write locking.
func (pc *picoConn) writeJSON(v any) error {
if pc.closed.Load() {
return fmt.Errorf("connection closed")
}
pc.writeMu.Lock()
defer pc.writeMu.Unlock()
return pc.conn.WriteJSON(v)
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

picoConn.writeJSON writes to the websocket without any write deadline or ctx-aware cancellation. If a client stops reading, WriteJSON can block indefinitely, potentially stalling outbound delivery and leaking goroutines. Consider setting a per-write deadline (e.g. using c.config.WriteTimeout) and/or honoring ctx by aborting writes when ctx is done.

Copilot uses AI. Check for mistakes.
Comment on lines +566 to +581
func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, channelName, channelID string) {
if reasoningContent == "" || channelName == "" || channelID == "" {
return
}

// Check context cancellation before attempting to publish,
// since PublishOutbound's select may race between send and ctx.Done().
if ctx.Err() != nil {
return
}

al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
})
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleReasoning is launched in a goroutine and calls PublishOutbound with the parent ctx (typically long-lived, no timeout). If the bus is backpressured, this goroutine can block indefinitely and accumulate over time. Consider publishing synchronously, or wrapping the publish in a short timeout context (and logging on failure).

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

@Huaaudio Huaaudio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very important refactor that brings an overhaul to the current messy channel system. LGTM. Many thanks for the contribution!

@yinwm yinwm merged commit 5b96923 into main Feb 27, 2026
6 checks passed
@yinwm
Copy link
Collaborator

yinwm commented Feb 27, 2026

LGTM

@yinwm yinwm mentioned this pull request Feb 27, 2026
3 tasks
achton added a commit to achton/picoclaw that referenced this pull request Feb 28, 2026
Adapt the Signal channel (PR sipeed#630) to the upstream channel system
refactor (sipeed#662, sipeed#877). Fresh implementation on current main rather
than rebasing 5 commits across 155 upstream changes.

Changes from the original sipeed#630:
- Moved from flat pkg/channels/signal.go to pkg/channels/signal/ subpackage
- Factory registration via init() + blank import in gateway
- New HandleMessage signature with bus.Peer, bus.SenderInfo, identity.BuildCanonicalID
- IsAllowedSender() replaces IsAllowed() for structured identity matching
- Manager-handled message splitting via WithMaxMessageLength(6000)
- Typed errors (ErrNotRunning, ErrTemporary) per Phase 4 lifecycle contract
- Proper goroutine tracking with sync.WaitGroup in Start/Stop
- TypingCapable interface wraps existing typing indicator feature
- ReactionCapable interface (first channel to implement — 👀 on inbound, undo on reply)
- WithReasoningChannelID() option for routing LLM reasoning to separate channel

All original features preserved: SSE inbound, JSON-RPC sending,
markdown-to-Signal text styles, voice transcription, attachment
handling, group/DM filtering, typing indicators.

28 tests passing (24 ported + 4 new for parseMessageID).
achton added a commit to achton/picoclaw that referenced this pull request Feb 28, 2026
Adapt the Signal channel (PR sipeed#630) to the upstream channel system
refactor (sipeed#662, sipeed#877). Fresh implementation on current main rather
than rebasing 5 commits across 155 upstream changes.

Changes from the original sipeed#630:
- Moved from flat pkg/channels/signal.go to pkg/channels/signal/ subpackage
- Factory registration via init() + blank import in gateway
- New HandleMessage signature with bus.Peer, bus.SenderInfo, identity.BuildCanonicalID
- IsAllowedSender() replaces IsAllowed() for structured identity matching
- Manager-handled message splitting via WithMaxMessageLength(6000)
- Typed errors (ErrNotRunning, ErrTemporary) per Phase 4 lifecycle contract
- Proper goroutine tracking with sync.WaitGroup in Start/Stop
- TypingCapable interface wraps existing typing indicator feature
- ReactionCapable interface (first channel to implement — 👀 on inbound, undo on reply)
- WithReasoningChannelID() option for routing LLM reasoning to separate channel

All original features preserved: SSE inbound, JSON-RPC sending,
markdown-to-Signal text styles, voice transcription, attachment
handling, group/DM filtering, typing indicators.

28 tests passing (24 ported + 4 new for parseMessageID).
achton added a commit to achton/picoclaw that referenced this pull request Mar 4, 2026
Adapt the Signal channel (PR sipeed#630) to the upstream channel system
refactor (sipeed#662, sipeed#877). Fresh implementation on current main rather
than rebasing 5 commits across 155 upstream changes.

Changes from the original sipeed#630:
- Moved from flat pkg/channels/signal.go to pkg/channels/signal/ subpackage
- Factory registration via init() + blank import in gateway
- New HandleMessage signature with bus.Peer, bus.SenderInfo, identity.BuildCanonicalID
- IsAllowedSender() replaces IsAllowed() for structured identity matching
- Manager-handled message splitting via WithMaxMessageLength(6000)
- Typed errors (ErrNotRunning, ErrTemporary) per Phase 4 lifecycle contract
- Proper goroutine tracking with sync.WaitGroup in Start/Stop
- TypingCapable interface wraps existing typing indicator feature
- ReactionCapable interface (first channel to implement — 👀 on inbound, undo on reply)
- WithReasoningChannelID() option for routing LLM reasoning to separate channel

All original features preserved: SSE inbound, JSON-RPC sending,
markdown-to-Signal text styles, voice transcription, attachment
handling, group/DM filtering, typing indicators.

28 tests passing (24 ported + 4 new for parseMessageID).
hyperwd pushed a commit to hyperwd/picoclaw that referenced this pull request Mar 5, 2026
Pluckypan pushed a commit to Pluckypan/picoclaw that referenced this pull request Mar 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants