-
Notifications
You must be signed in to change notification settings - Fork 71
[Bug]: ParseDataStream should tolerate "data:" without space for compatibility #162
Description
Support SSE streams without space after "data:" prefix for compatibility with non-compliant implementations
Description
The current ParseDataStream function strictly requires the SSE data: prefix to be followed by a space (data: ), as defined in the WHATWG SSE specification. However, some widely-used frameworks generate SSE streams without this space, causing silent parsing failures where no data is yielded and no error is reported.
Problem
When connecting to SSE endpoints that omit the space after data:, the parser silently ignores all data lines:
Non-compliant format (currently fails):
data:{"jsonrpc":"2.0","result":"..."}
Standard format (currently works):
data: {"jsonrpc":"2.0","result":"..."}
Real-World Impact
Spring Framework's SseEmitter is one of the most widely-used SSE implementations in the Java ecosystem, and it generates SSE streams without spaces after the colon.
Source code reference:
spring-framework/SseEmitter.java#L238
@Override
public SseEventBuilder data(Object object, @Nullable MediaType mediaType) {
append("data:"); // ❌ No space after colon
saveAppendedText();
this.dataToSend.add(new DataWithMediaType(object, mediaType));
append('\n');
return this;
}Affected versions:
Spring Framework 5.x, 6.x
Spring Boot 2.x, 3.x
All current production versions
This affects millions of Java-based SSE implementations in production environments.
Proposed Solution
Modify ParseDataStream to support both formats:
Current implementation:
func ParseDataStream(body io.Reader) iter.Seq2[[]byte, error] {
return func(yield func([]byte, error) bool) {
scanner := bufio.NewScanner(body)
buf := make([]byte, 0, bufio.MaxScanTokenSize)
scanner.Buffer(buf, MaxSSETokenSize)
prefixBytes := []byte(sseDataPrefix) // "data: "
for scanner.Scan() {
lineBytes := scanner.Bytes()
if bytes.HasPrefix(lineBytes, prefixBytes) {
data := lineBytes[len(prefixBytes):]
if !yield(data, nil) {
return
}
}
}
if err := scanner.Err(); err != nil {
yield(nil, fmt.Errorf("SSE stream error: %w", err))
}
}
}Proposed implementation:
func ParseDataStream(body io.Reader) iter.Seq2[[]byte, error] {
return func(yield func([]byte, error) bool) {
scanner := bufio.NewScanner(body)
buf := make([]byte, 0, bufio.MaxScanTokenSize)
scanner.Buffer(buf, MaxSSETokenSize)
// Support both standard and non-compliant formats
prefixWithSpace := []byte("data: ")
prefixWithoutSpace := []byte("data:")
for scanner.Scan() {
lineBytes := scanner.Bytes()
var data []byte
if bytes.HasPrefix(lineBytes, prefixWithSpace) {
// Standard format: "data: ..."
data = lineBytes[len(prefixWithSpace):]
} else if bytes.HasPrefix(lineBytes, prefixWithoutSpace) {
// Non-compliant format: "data:..."
data = lineBytes[len(prefixWithoutSpace):]
} else {
// Not a data line (comment, event, id, retry, etc.)
continue
}
if !yield(data, nil) {
return
}
}
if err := scanner.Err(); err != nil {
yield(nil, fmt.Errorf("SSE stream error: %w", err))
}
}
}Relevant log output
Code of Conduct
- I agree to follow this project's Code of Conduct