Skip to content

[Bug]: ParseDataStream should tolerate "data:" without space for compatibility #162

@vegetable-boy

Description

@vegetable-boy

Support SSE streams without space after "data:" prefix for compatibility with non-compliant implementations

Description

The current ParseDataStream function strictly requires the SSE data: prefix to be followed by a space (data: ), as defined in the WHATWG SSE specification. However, some widely-used frameworks generate SSE streams without this space, causing silent parsing failures where no data is yielded and no error is reported.

Problem

When connecting to SSE endpoints that omit the space after data:, the parser silently ignores all data lines:

Non-compliant format (currently fails):
data:{"jsonrpc":"2.0","result":"..."}

Standard format (currently works):
data: {"jsonrpc":"2.0","result":"..."}

Real-World Impact

Spring Framework's SseEmitter is one of the most widely-used SSE implementations in the Java ecosystem, and it generates SSE streams without spaces after the colon.

Source code reference:
spring-framework/SseEmitter.java#L238

@Override
public SseEventBuilder data(Object object, @Nullable MediaType mediaType) {
    append("data:");  // ❌ No space after colon
    saveAppendedText();
    this.dataToSend.add(new DataWithMediaType(object, mediaType));
    append('\n');
    return this;
}

Affected versions:

Spring Framework 5.x, 6.x
Spring Boot 2.x, 3.x
All current production versions
This affects millions of Java-based SSE implementations in production environments.

Proposed Solution

Modify ParseDataStream to support both formats:

Current implementation:

func ParseDataStream(body io.Reader) iter.Seq2[[]byte, error] {
    return func(yield func([]byte, error) bool) {
        scanner := bufio.NewScanner(body)
        buf := make([]byte, 0, bufio.MaxScanTokenSize)
        scanner.Buffer(buf, MaxSSETokenSize)
        prefixBytes := []byte(sseDataPrefix)  // "data: "

        for scanner.Scan() {
            lineBytes := scanner.Bytes()
            if bytes.HasPrefix(lineBytes, prefixBytes) {
                data := lineBytes[len(prefixBytes):]
                if !yield(data, nil) {
                    return
                }
            }
        }
        if err := scanner.Err(); err != nil {
            yield(nil, fmt.Errorf("SSE stream error: %w", err))
        }
    }
}

Proposed implementation:

func ParseDataStream(body io.Reader) iter.Seq2[[]byte, error] {
    return func(yield func([]byte, error) bool) {
        scanner := bufio.NewScanner(body)
        buf := make([]byte, 0, bufio.MaxScanTokenSize)
        scanner.Buffer(buf, MaxSSETokenSize)
        
        // Support both standard and non-compliant formats
        prefixWithSpace := []byte("data: ")
        prefixWithoutSpace := []byte("data:")

        for scanner.Scan() {
            lineBytes := scanner.Bytes()
            
            var data []byte
            if bytes.HasPrefix(lineBytes, prefixWithSpace) {
                // Standard format: "data: ..."
                data = lineBytes[len(prefixWithSpace):]
            } else if bytes.HasPrefix(lineBytes, prefixWithoutSpace) {
                // Non-compliant format: "data:..."
                data = lineBytes[len(prefixWithoutSpace):]
            } else {
                // Not a data line (comment, event, id, retry, etc.)
                continue
            }
            
            if !yield(data, nil) {
                return
            }
        }
        
        if err := scanner.Err(); err != nil {
            yield(nil, fmt.Errorf("SSE stream error: %w", err))
        }
    }
}

Relevant log output

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions