Add FilterHook infrastructure and DATA phase buffering#107
Conversation
Introduce FilterHook interface for pre-relay message filtering (e.g. rspamd integration). Messages are buffered during the SMTP DATA phase, passed to BeforeRelay() for inspection, then relayed, rejected, or modified based on the filter result. Backward compatible: without a FilterHook, streaming relay is unchanged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cover isDataCommand, isActionCompletedResponse, handleDataPhaseUpstream (Relay/Reject/AddHeader/Buffering/BufferOverflow), mediateOnUpstream (DATA command detection, no-filter bypass, DATA phase delegation), mediateOnDownstream (354 detection, reject reply substitution), and metadata extraction with filter hook enabled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add explicit error return value handling for Write/Close calls in handleDataPhaseUpstream, mediateOnDownstream, and test helpers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a FilterHook extension point to allow synchronous inspection/modification of SMTP messages by buffering the DATA phase before relaying, enabling actions like relay-as-is, reject, or header modification (e.g., future rspamd integration).
Changes:
- Introduces
FilterHook/BeforeRelayData/FilterResulttypes and wiring to detect/inject a filter hook into the connection pipe. - Implements DATA-phase buffering in
Pipeand callsBeforeRelayat end-of-data to decide relay/reject/modify behavior. - Adds unit/E2E test coverage for filter behaviors and buffering mechanics.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| server.go | Extracts sender IP and injects FilterHook.BeforeRelay into Pipe when present. |
| pipe.go | Adds DATA buffering state, filter invocation, and downstream reply substitution logic. |
| filter.go | Defines filter hook/action/result types used by the new buffering flow. |
| pipe_test.go | Adds unit tests for DATA detection, buffering, and filter action handling. |
| e2e_test.go | Adds end-to-end tests validating relay/reject/add-header and integrity scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // End-of-data detected: extract message (without terminator) | ||
| fullData := p.dataBuffer.Bytes() | ||
| termIdx := bytes.Index(fullData, dataTerminator) | ||
| message := fullData[:termIdx+2] // Include trailing \r\n but not .\r\n | ||
|
|
||
| go p.afterCommHook(p.removeMailBody(message), srcToDst) |
There was a problem hiding this comment.
When the DATA terminator is detected, the code relays message but discards any bytes after the first \r\n.\r\n that may have arrived in the same read (e.g., client SMTP pipelining / next command). This can corrupt the session by dropping subsequent commands. Consider splitting fullData at the terminator and feeding the remainder back into normal upstream processing (or buffering it for the next read).
| if p.dataBufferSize > 0 && p.dataBuffer.Len()+len(data) > p.dataBufferSize { | ||
| // Buffer overflow: send error to client, empty terminator to server | ||
| _, _ = p.sConn.Write([]byte("552 5.3.4 Message too big for filter" + crlf)) | ||
| _, _ = p.rConn.Write(dataTerminator) | ||
| p.inDataPhase = false | ||
| p.dataBuffer = nil | ||
| go p.afterCommHook([]byte("filter buffer overflow"), onPxy) | ||
| return b, i, true | ||
| } |
There was a problem hiding this comment.
On buffer overflow you immediately write a 552 to the client and send the terminator to the server, but nothing suppresses the server’s subsequent 250 completion reply. This can result in the client observing a later 250 OK after already receiving 552, which is protocol-confusing. Consider setting a flag to suppress the next action-completed response (similar to filterRejectReply handling) or substituting the server reply consistently.
pipe.go
Outdated
| // FilterHook: detect 354 response to enter DATA phase | ||
| if p.beforeRelayHook != nil && p.dataCommandSent { | ||
| if bytes.Contains(data, []byte(fmt.Sprint(codeStartingMailInput))) { | ||
| p.inDataPhase = true | ||
| p.dataCommandSent = false | ||
| } | ||
| } |
There was a problem hiding this comment.
dataCommandSent is only cleared when a downstream packet contains "354" via bytes.Contains. If the server responds to DATA with a non-354 (e.g., 503/451), dataCommandSent (and the allocated dataBuffer) stay set, and the bytes.Contains check can also false-trigger if "354" appears in text. Consider parsing status codes at the start of each response line (like setTimeAtDataStarting does) and clearing dataCommandSent (and dataBuffer) on any non-354 response.
pipe.go
Outdated
|
|
||
| // isActionCompletedResponse checks if the response indicates action completed (250). | ||
| func (p *Pipe) isActionCompletedResponse(b []byte) bool { | ||
| return bytes.Contains(b, []byte(fmt.Sprint(codeActionCompleted))) |
There was a problem hiding this comment.
isActionCompletedResponse uses bytes.Contains(..., "250"), which can match unrelated text (e.g., enhanced status codes or message text containing "250") and cause incorrect reject-reply substitution. Consider checking the SMTP reply code at the start of the line (first 3 bytes) and handling multi-line replies (250- / 250 ).
| return bytes.Contains(b, []byte(fmt.Sprint(codeActionCompleted))) | |
| codeBytes := []byte(fmt.Sprint(codeActionCompleted)) | |
| // Split into lines to properly handle multi-line replies (e.g., "250-" / "250 "). | |
| lines := bytes.Split(b, []byte("\n")) | |
| for _, line := range lines { | |
| // Trim trailing '\r' if present. | |
| line = bytes.TrimRight(line, "\r") | |
| if len(line) < 3 { | |
| continue | |
| } | |
| // Check that the line starts with the expected 3-digit code. | |
| if !bytes.HasPrefix(line, codeBytes) { | |
| continue | |
| } | |
| // If the line is exactly the code, or the next character is a valid separator | |
| // (' ' for final line, '-' for continuation), treat it as a 250 response. | |
| if len(line) == 3 { | |
| return true | |
| } | |
| if line[3] == ' ' || line[3] == '-' { | |
| return true | |
| } | |
| } | |
| return false |
pipe.go
Outdated
| // FilterHook: replace server's 250 OK with reject reply | ||
| if p.filterRejectReply != "" && p.isActionCompletedResponse(data) { | ||
| reply := p.filterRejectReply | ||
| p.filterRejectReply = "" | ||
| _, _ = p.sConn.Write([]byte(reply + crlf)) | ||
| go p.afterCommHook([]byte(reply), pxyToSrc) | ||
| return b, i, true // Suppress server's 250 OK |
There was a problem hiding this comment.
filterRejectReply is written back to the client verbatim. If a hook returns a reply containing CR/LF, it could inject multiple SMTP responses. Consider validating/sanitizing the reply to a single line (no \r/\n) and defaulting to a safe 550 if invalid.
filter.go
Outdated
| Message []byte // Full message (headers + body) | ||
| } | ||
|
|
||
| // FilterResult represents the result of a filter hook. | ||
| type FilterResult struct { | ||
| Action FilterAction | ||
| Message []byte // Modified message for FilterAddHeader |
There was a problem hiding this comment.
The BeforeRelayData.Message/FilterResult.Message contract is ambiguous about SMTP dot-stuffing and terminator handling. Since handleDataPhaseUpstream writes the returned bytes directly to the server, a filter implementation that returns un-dot-stuffed content containing \r\n.\r\n could prematurely terminate DATA and potentially inject subsequent SMTP commands. Consider documenting that messages are in SMTP DATA wire format (dot-stuffed, no terminator) and/or enforcing dot-stuffing/terminator stripping before relaying modified messages.
| Message []byte // Full message (headers + body) | |
| } | |
| // FilterResult represents the result of a filter hook. | |
| type FilterResult struct { | |
| Action FilterAction | |
| Message []byte // Modified message for FilterAddHeader | |
| // Message is the full message (headers + body) in SMTP DATA wire format. | |
| // It is dot-stuffed as received on the DATA stream and does NOT include | |
| // the terminating "\r\n.\r\n" sequence. Filters that de-dot-stuff for | |
| // inspection must re-apply correct SMTP dot-stuffing before relaying. | |
| Message []byte | |
| } | |
| // FilterResult represents the result of a filter hook. | |
| type FilterResult struct { | |
| Action FilterAction | |
| // Message is the modified message to relay when Action == FilterAddHeader | |
| // (or any action that relays message data). It MUST be in SMTP DATA wire | |
| // format: dot-stuffed as required by SMTP, and MUST NOT contain the final | |
| // "\r\n.\r\n" terminator. Returning un-dot-stuffed data containing | |
| // "\r\n.\r\n" could prematurely terminate the DATA phase and is unsafe. | |
| Message []byte |
| result := p.beforeRelayHook(&BeforeRelayData{ | ||
| ConnID: p.id, | ||
| MailFrom: p.sMailAddr, | ||
| MailTo: p.rMailAddr, | ||
| SenderIP: p.senderIP, | ||
| Helo: p.sServerName, | ||
| Message: message, | ||
| }) | ||
|
|
||
| p.inDataPhase = false | ||
| p.dataBuffer = nil | ||
|
|
||
| switch result.Action { | ||
| case FilterRelay: | ||
| // Relay original message + terminator to server | ||
| _, _ = p.rConn.Write(message) | ||
| _, _ = p.rConn.Write(dataTerminator[2:]) // .\r\n | ||
| go p.afterCommHook([]byte("filter: relay"), onPxy) | ||
|
|
||
| case FilterAddHeader: | ||
| // Relay modified message + terminator to server | ||
| _, _ = p.rConn.Write(result.Message) | ||
| if !bytes.HasSuffix(result.Message, []byte(crlf)) { | ||
| _, _ = p.rConn.Write([]byte(crlf)) | ||
| } | ||
| _, _ = p.rConn.Write(dataTerminator[2:]) // .\r\n | ||
| go p.afterCommHook([]byte("filter: add header"), onPxy) | ||
|
|
||
| case FilterReject: | ||
| // Send empty terminator to server, set reject reply for downstream | ||
| _, _ = p.rConn.Write(dataTerminator) | ||
| p.filterRejectReply = result.Reply | ||
| go p.afterCommHook([]byte(fmt.Sprintf("filter: reject (%s)", result.Reply)), onPxy) | ||
| } |
There was a problem hiding this comment.
handleDataPhaseUpstream assumes beforeRelayHook always returns a non-nil *FilterResult and that result.Action is one of the known values. If the hook returns nil or an unknown action, this will panic or silently drop the message (no relay/terminator), leaving the SMTP session in a bad state. Consider defaulting nil/unknown to FilterRelay, and validating required fields for FilterReject/FilterAddHeader before acting.
- Suppress server's 250 OK after buffer overflow via suppressNextResponse flag - Clear dataCommandSent/dataBuffer on non-354 response to DATA command - Replace bytes.Contains with line-start SMTP code check (hasResponseCode) - Sanitize CR/LF from reject reply to prevent SMTP response injection - Fallback to FilterRelay on nil result or unknown FilterAction - Improve documentation for Message field dot-stuffing contract - Add unit tests for all new safety checks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix errcheck warnings for Close, Flush, Read calls - Replace deprecated io/ioutil with os.ReadDir - Fix Yoda condition in cmd/warp/main.go - Use net.JoinHostPort for IPv6 compatibility in smtp.go - Remove trailing newlines from error format strings in plugins - Use t.Setenv instead of os.Setenv in tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…roval Previously, DATA was relayed to the server immediately, and on reject the proxy sent an empty terminator, leaving a garbage message in the server's queue. New approach: - Proxy intercepts DATA command and sends fake 354 to client - Client message is buffered entirely before filter invocation - Reject: reply sent directly to client, server never sees DATA - Relay/AddHeader: DATA sent to server only after filter approval, downstream handles 354 and relays buffered message - Buffer overflow: 552 sent to client with discard mode until terminator, no server interaction needed This eliminates garbage messages on the MX server for rejected emails and removes the need for suppressNextResponse/filterRejectReply fields. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `p.locked` check in copy() was blocking both upstream and downstream goroutines during TLS negotiation. This caused a deadlock because the downstream goroutine couldn't read the server's "220 Ready to start TLS" response needed by connectTLS() to complete the handshake. Restrict the lock to upstream only, since downstream must continue reading from the server during the TLS transition. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
FilterHookinterface extendingHookwith aBeforeRelay()method for pre-relay message filtering (rspamd integration prep)pipe.go: messages are fully buffered before relay, enabling synchronous filter inspectionFilterRelay(pass-through),FilterReject(SMTP error response),FilterAddHeader(modify and relay)FilterHookfrom the hooks list inserver.go, passing sender IP and connection metadataTestE2EFilter) covering Relay, Reject, AddHeader, WithoutFilterHook, and MessageIntegrity scenariosFilterHookregistered, streaming relay behavior is unchangedArchitecture
Test plan
go test -short— existing unit tests passTestIntegration— backward compatibility confirmedTestE2E— existing E2E tests pass unchangedTestE2EFilter/Relay— FilterRelay relays message normallyTestE2EFilter/Reject— FilterReject returns SMTP 550 error to clientTestE2EFilter/AddHeader— FilterAddHeader delivers modified messageTestE2EFilter/WithoutFilterHook— no filter hook = standard streaming relayTestE2EFilter/MessageIntegrity— large message preserved through filter🤖 Generated with Claude Code