Skip to content
This repository was archived by the owner on Sep 8, 2018. It is now read-only.

Streaming queries#34

Merged
peterbourgon merged 11 commits intomasterfrom
stream
Feb 25, 2017
Merged

Streaming queries#34
peterbourgon merged 11 commits intomasterfrom
stream

Conversation

@peterbourgon
Copy link
Copy Markdown
Member

@peterbourgon peterbourgon commented Jan 27, 2017

This PR adds streaming queries. But how do it work?? 🤔

The flow of data

Stores regularly poll ingesters for their oldest segment. Stores aggregate ingester segments in memory until they reach an age or size threshold. Then, they replicate those aggregate segments to other store nodes. Only replicated segments are available for query. Once replication is successful, the original ingester segments are confirmed and deleted. See Design · Consume segments.

This PR adds a few things to the replication handler. First, it tees the replicated segment to disk and into memory, and then throws the in-memory segment over to a matching function. The matching function sends matching records to registered queries via a channel.

How do queries get registered?

Stream query lifecycle

Users register a stream query by making a GET to /store/stream. This builds a stream.Execute and stream.Deduplicate pipeline, and feeds the results back to the client via a long-lived HTTP connection (HTTP/1.1 with Transfer-Encoding: Chunked).

stream.Execute is responsible for making connections to each store node's internal stream endpoint. When the topology of the cluster changes, stream.Execute will detect it, and either create new connections or terminate old ones. If there is a network interruption between the initial store node and any other store node, stream.Execute will detect the error and retry until success.

Recall that each log record will be duplicated on replication_factor store nodes. stream.Deduplicate does a best-effort deduplication of log records based on their ULID over a given time window. Note this produces memory pressure commensurate with throughput; see caveats and risks, below.

The context from the user request is used to control the lifecycle of all components.

Internal stream endpoint

We've described how a user stream query fans out to N internal stream queries. The internal stream endpoint is the one that actually registers the stream query in the query registry and writes the matching records back to the originating node.

Caveats and risks

The following caveats and risks apply, and should be addressed in future work.

  • Record matching does not have robust protection against slow clients.
  • Deduplication is not memory-limited. OOM is possible, especially with broad match expressions.
  • There are no limits on the number of active streaming queries, that's probably bad.
  • It's not been exhaustively profiled. Should we perhaps batch records?
  • The transport should be abstracted; see TLS support #17.

The /stream endpoint now accepts a &window=W parameter, default 3s.
Also, found a bug where the btree in deduplicate will blow the stack if
you try to push too many records into it, by combining a broad query
e.g. &q=A with a short enough window. Haha ;(
When you do a bufio.Scanner.Scan loop that emits the tokens onto a chan,
it's important to copy the token out of the scanner's internal buffer.
Especially if the channel is buffered.
@peterbourgon peterbourgon changed the title [WIP] Streaming queries Streaming queries Feb 13, 2017
@peterbourgon peterbourgon merged commit 04bcec3 into master Feb 25, 2017
@peterbourgon peterbourgon deleted the stream branch February 25, 2017 12:01
@peterbourgon peterbourgon mentioned this pull request Mar 17, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant