Skip to content

Add FilterHook infrastructure and DATA phase buffering#107

Merged
linyows merged 7 commits intomainfrom
filter-hook-infrastructure
Feb 19, 2026
Merged

Add FilterHook infrastructure and DATA phase buffering#107
linyows merged 7 commits intomainfrom
filter-hook-infrastructure

Conversation

@linyows
Copy link
Owner

@linyows linyows commented Feb 18, 2026

Summary

  • Introduce FilterHook interface extending Hook with a BeforeRelay() method for pre-relay message filtering (rspamd integration prep)
  • Implement DATA phase buffering in pipe.go: messages are fully buffered before relay, enabling synchronous filter inspection
  • Support three filter actions: FilterRelay (pass-through), FilterReject (SMTP error response), FilterAddHeader (modify and relay)
  • Detect and inject FilterHook from the hooks list in server.go, passing sender IP and connection metadata
  • Add E2E tests (TestE2EFilter) covering Relay, Reject, AddHeader, WithoutFilterHook, and MessageIntegrity scenarios
  • Fully backward compatible: without a FilterHook registered, streaming relay behavior is unchanged

Architecture

Client → warp → [DATA phase buffering]
                      ↓
              FilterHook.BeforeRelay()
                      ↓
              Relay / Reject / AddHeader

Test plan

  • go test -short — existing unit tests pass
  • TestIntegration — backward compatibility confirmed
  • TestE2E — existing E2E tests pass unchanged
  • TestE2EFilter/Relay — FilterRelay relays message normally
  • TestE2EFilter/Reject — FilterReject returns SMTP 550 error to client
  • TestE2EFilter/AddHeader — FilterAddHeader delivers modified message
  • TestE2EFilter/WithoutFilterHook — no filter hook = standard streaming relay
  • TestE2EFilter/MessageIntegrity — large message preserved through filter

🤖 Generated with Claude Code

linyows and others added 3 commits February 18, 2026 14:12
Introduce FilterHook interface for pre-relay message filtering (e.g. rspamd integration).
Messages are buffered during the SMTP DATA phase, passed to BeforeRelay() for inspection,
then relayed, rejected, or modified based on the filter result. Backward compatible:
without a FilterHook, streaming relay is unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cover isDataCommand, isActionCompletedResponse, handleDataPhaseUpstream
(Relay/Reject/AddHeader/Buffering/BufferOverflow), mediateOnUpstream
(DATA command detection, no-filter bypass, DATA phase delegation),
mediateOnDownstream (354 detection, reject reply substitution), and
metadata extraction with filter hook enabled.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add explicit error return value handling for Write/Close calls
in handleDataPhaseUpstream, mediateOnDownstream, and test helpers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a FilterHook extension point to allow synchronous inspection/modification of SMTP messages by buffering the DATA phase before relaying, enabling actions like relay-as-is, reject, or header modification (e.g., future rspamd integration).

Changes:

  • Introduces FilterHook/BeforeRelayData/FilterResult types and wiring to detect/inject a filter hook into the connection pipe.
  • Implements DATA-phase buffering in Pipe and calls BeforeRelay at end-of-data to decide relay/reject/modify behavior.
  • Adds unit/E2E test coverage for filter behaviors and buffering mechanics.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
server.go Extracts sender IP and injects FilterHook.BeforeRelay into Pipe when present.
pipe.go Adds DATA buffering state, filter invocation, and downstream reply substitution logic.
filter.go Defines filter hook/action/result types used by the new buffering flow.
pipe_test.go Adds unit tests for DATA detection, buffering, and filter action handling.
e2e_test.go Adds end-to-end tests validating relay/reject/add-header and integrity scenarios.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +215 to +220
// End-of-data detected: extract message (without terminator)
fullData := p.dataBuffer.Bytes()
termIdx := bytes.Index(fullData, dataTerminator)
message := fullData[:termIdx+2] // Include trailing \r\n but not .\r\n

go p.afterCommHook(p.removeMailBody(message), srcToDst)
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the DATA terminator is detected, the code relays message but discards any bytes after the first \r\n.\r\n that may have arrived in the same read (e.g., client SMTP pipelining / next command). This can corrupt the session by dropping subsequent commands. Consider splitting fullData at the terminator and feeding the remainder back into normal upstream processing (or buffering it for the next read).

Copilot uses AI. Check for mistakes.
Comment on lines +197 to +205
if p.dataBufferSize > 0 && p.dataBuffer.Len()+len(data) > p.dataBufferSize {
// Buffer overflow: send error to client, empty terminator to server
_, _ = p.sConn.Write([]byte("552 5.3.4 Message too big for filter" + crlf))
_, _ = p.rConn.Write(dataTerminator)
p.inDataPhase = false
p.dataBuffer = nil
go p.afterCommHook([]byte("filter buffer overflow"), onPxy)
return b, i, true
}
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On buffer overflow you immediately write a 552 to the client and send the terminator to the server, but nothing suppresses the server’s subsequent 250 completion reply. This can result in the client observing a later 250 OK after already receiving 552, which is protocol-confusing. Consider setting a flag to suppress the next action-completed response (similar to filterRejectReply handling) or substituting the server reply consistently.

Copilot uses AI. Check for mistakes.
pipe.go Outdated
Comment on lines +287 to +293
// FilterHook: detect 354 response to enter DATA phase
if p.beforeRelayHook != nil && p.dataCommandSent {
if bytes.Contains(data, []byte(fmt.Sprint(codeStartingMailInput))) {
p.inDataPhase = true
p.dataCommandSent = false
}
}
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataCommandSent is only cleared when a downstream packet contains "354" via bytes.Contains. If the server responds to DATA with a non-354 (e.g., 503/451), dataCommandSent (and the allocated dataBuffer) stay set, and the bytes.Contains check can also false-trigger if "354" appears in text. Consider parsing status codes at the start of each response line (like setTimeAtDataStarting does) and clearing dataCommandSent (and dataBuffer) on any non-354 response.

Copilot uses AI. Check for mistakes.
pipe.go Outdated

// isActionCompletedResponse checks if the response indicates action completed (250).
func (p *Pipe) isActionCompletedResponse(b []byte) bool {
return bytes.Contains(b, []byte(fmt.Sprint(codeActionCompleted)))
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isActionCompletedResponse uses bytes.Contains(..., "250"), which can match unrelated text (e.g., enhanced status codes or message text containing "250") and cause incorrect reject-reply substitution. Consider checking the SMTP reply code at the start of the line (first 3 bytes) and handling multi-line replies (250- / 250 ).

Suggested change
return bytes.Contains(b, []byte(fmt.Sprint(codeActionCompleted)))
codeBytes := []byte(fmt.Sprint(codeActionCompleted))
// Split into lines to properly handle multi-line replies (e.g., "250-" / "250 ").
lines := bytes.Split(b, []byte("\n"))
for _, line := range lines {
// Trim trailing '\r' if present.
line = bytes.TrimRight(line, "\r")
if len(line) < 3 {
continue
}
// Check that the line starts with the expected 3-digit code.
if !bytes.HasPrefix(line, codeBytes) {
continue
}
// If the line is exactly the code, or the next character is a valid separator
// (' ' for final line, '-' for continuation), treat it as a 250 response.
if len(line) == 3 {
return true
}
if line[3] == ' ' || line[3] == '-' {
return true
}
}
return false

Copilot uses AI. Check for mistakes.
pipe.go Outdated
Comment on lines +295 to +301
// FilterHook: replace server's 250 OK with reject reply
if p.filterRejectReply != "" && p.isActionCompletedResponse(data) {
reply := p.filterRejectReply
p.filterRejectReply = ""
_, _ = p.sConn.Write([]byte(reply + crlf))
go p.afterCommHook([]byte(reply), pxyToSrc)
return b, i, true // Suppress server's 250 OK
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filterRejectReply is written back to the client verbatim. If a hook returns a reply containing CR/LF, it could inject multiple SMTP responses. Consider validating/sanitizing the reply to a single line (no \r/\n) and defaulting to a safe 550 if invalid.

Copilot uses AI. Check for mistakes.
filter.go Outdated
Comment on lines +19 to +25
Message []byte // Full message (headers + body)
}

// FilterResult represents the result of a filter hook.
type FilterResult struct {
Action FilterAction
Message []byte // Modified message for FilterAddHeader
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BeforeRelayData.Message/FilterResult.Message contract is ambiguous about SMTP dot-stuffing and terminator handling. Since handleDataPhaseUpstream writes the returned bytes directly to the server, a filter implementation that returns un-dot-stuffed content containing \r\n.\r\n could prematurely terminate DATA and potentially inject subsequent SMTP commands. Consider documenting that messages are in SMTP DATA wire format (dot-stuffed, no terminator) and/or enforcing dot-stuffing/terminator stripping before relaying modified messages.

Suggested change
Message []byte // Full message (headers + body)
}
// FilterResult represents the result of a filter hook.
type FilterResult struct {
Action FilterAction
Message []byte // Modified message for FilterAddHeader
// Message is the full message (headers + body) in SMTP DATA wire format.
// It is dot-stuffed as received on the DATA stream and does NOT include
// the terminating "\r\n.\r\n" sequence. Filters that de-dot-stuff for
// inspection must re-apply correct SMTP dot-stuffing before relaying.
Message []byte
}
// FilterResult represents the result of a filter hook.
type FilterResult struct {
Action FilterAction
// Message is the modified message to relay when Action == FilterAddHeader
// (or any action that relays message data). It MUST be in SMTP DATA wire
// format: dot-stuffed as required by SMTP, and MUST NOT contain the final
// "\r\n.\r\n" terminator. Returning un-dot-stuffed data containing
// "\r\n.\r\n" could prematurely terminate the DATA phase and is unsafe.
Message []byte

Copilot uses AI. Check for mistakes.
Comment on lines +223 to +256
result := p.beforeRelayHook(&BeforeRelayData{
ConnID: p.id,
MailFrom: p.sMailAddr,
MailTo: p.rMailAddr,
SenderIP: p.senderIP,
Helo: p.sServerName,
Message: message,
})

p.inDataPhase = false
p.dataBuffer = nil

switch result.Action {
case FilterRelay:
// Relay original message + terminator to server
_, _ = p.rConn.Write(message)
_, _ = p.rConn.Write(dataTerminator[2:]) // .\r\n
go p.afterCommHook([]byte("filter: relay"), onPxy)

case FilterAddHeader:
// Relay modified message + terminator to server
_, _ = p.rConn.Write(result.Message)
if !bytes.HasSuffix(result.Message, []byte(crlf)) {
_, _ = p.rConn.Write([]byte(crlf))
}
_, _ = p.rConn.Write(dataTerminator[2:]) // .\r\n
go p.afterCommHook([]byte("filter: add header"), onPxy)

case FilterReject:
// Send empty terminator to server, set reject reply for downstream
_, _ = p.rConn.Write(dataTerminator)
p.filterRejectReply = result.Reply
go p.afterCommHook([]byte(fmt.Sprintf("filter: reject (%s)", result.Reply)), onPxy)
}
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleDataPhaseUpstream assumes beforeRelayHook always returns a non-nil *FilterResult and that result.Action is one of the known values. If the hook returns nil or an unknown action, this will panic or silently drop the message (no relay/terminator), leaving the SMTP session in a bad state. Consider defaulting nil/unknown to FilterRelay, and validating required fields for FilterReject/FilterAddHeader before acting.

Copilot uses AI. Check for mistakes.
linyows and others added 4 commits February 18, 2026 15:49
- Suppress server's 250 OK after buffer overflow via suppressNextResponse flag
- Clear dataCommandSent/dataBuffer on non-354 response to DATA command
- Replace bytes.Contains with line-start SMTP code check (hasResponseCode)
- Sanitize CR/LF from reject reply to prevent SMTP response injection
- Fallback to FilterRelay on nil result or unknown FilterAction
- Improve documentation for Message field dot-stuffing contract
- Add unit tests for all new safety checks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix errcheck warnings for Close, Flush, Read calls
- Replace deprecated io/ioutil with os.ReadDir
- Fix Yoda condition in cmd/warp/main.go
- Use net.JoinHostPort for IPv6 compatibility in smtp.go
- Remove trailing newlines from error format strings in plugins
- Use t.Setenv instead of os.Setenv in tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…roval

Previously, DATA was relayed to the server immediately, and on reject
the proxy sent an empty terminator, leaving a garbage message in the
server's queue.

New approach:
- Proxy intercepts DATA command and sends fake 354 to client
- Client message is buffered entirely before filter invocation
- Reject: reply sent directly to client, server never sees DATA
- Relay/AddHeader: DATA sent to server only after filter approval,
  downstream handles 354 and relays buffered message
- Buffer overflow: 552 sent to client with discard mode until
  terminator, no server interaction needed

This eliminates garbage messages on the MX server for rejected emails
and removes the need for suppressNextResponse/filterRejectReply fields.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `p.locked` check in copy() was blocking both upstream and downstream
goroutines during TLS negotiation. This caused a deadlock because the
downstream goroutine couldn't read the server's "220 Ready to start TLS"
response needed by connectTLS() to complete the handshake. Restrict the
lock to upstream only, since downstream must continue reading from the
server during the TLS transition.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@linyows linyows merged commit c8881d5 into main Feb 19, 2026
11 checks passed
@linyows linyows deleted the filter-hook-infrastructure branch February 19, 2026 08:16
@linyows linyows self-assigned this Feb 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants