Conversation
|
💻 Deploy preview available (loki.write: implement sharding): |
4326897 to
6f05b9e
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR implements queue-based sharding for the loki.write component, introducing a new architecture that distributes log entries across multiple parallel queues based on label fingerprints. The implementation unifies the handling of both normal and WAL-enabled clients through a shared shards structure, eliminating significant code duplication. The queue_config block, previously WAL-only, now applies to all endpoints and controls both queue capacity and shard count.
Key changes:
- Introduces
shards.gowith new sharding architecture for parallel processing via multiple queues - Refactors WAL and fanout clients to use the shared shards implementation, removing ~500 lines of duplicated code
- Adds
min_shardsconfiguration parameter to control parallelism level
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/component/loki/write/types.go | Adds MinShards field to QueueConfig and updates documentation to reflect queue config is now always used |
| internal/component/common/loki/client/shards.go | New file implementing the core sharding logic with queue management and parallel batch sending |
| internal/component/common/loki/client/shards_test.go | Comprehensive test coverage for queue operations including append, drain, and flush/shutdown scenarios |
| internal/component/common/loki/client/consumer_wal.go | Refactored to delegate batching and sending to the shards implementation, significantly simplified |
| internal/component/common/loki/client/consumer_fanout.go | Refactored to use shards implementation, removing duplicated send/batch logic |
| internal/component/common/loki/client/config.go | Adds MinShards field definition to QueueConfig struct |
| docs/sources/reference/components/loki/loki.write.md | Documents the new min_shards parameter and clarifies queue config usage |
thampiotr
left a comment
There was a problem hiding this comment.
I still think we need to get a coherent story with naming established and then make sure it is reflected in docs and in the code. But we're on the right track now.
|
|
||
| ### `queue_config` | ||
|
|
||
| {{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="<ALLOY_VERSION>" >}} |
There was a problem hiding this comment.
Shouldn't this continue to be experimental?
There was a problem hiding this comment.
Yeah we could keep this as experimental. But we would always use this config after this pr.
What would be considered experimental with it would be naming and changing defaults I guess.
|
|
||
| | Name | Type | Description | Default | Required | | ||
| | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | ||
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | |
There was a problem hiding this comment.
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | |
| | `capacity` | `string` | Controls the size of the underlying send queue buffer of each shard. Consider this setting as the worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | |
What does it even mean 'all enqueued batches are full'? Shouldn't it say that it's the total size of all the enqueued batches instead?
There was a problem hiding this comment.
Yeah this was there before and I did not check / alter it.
But essentially whenever the capacity is full that means that the queue of batches is full and we cannot enqueue another one so we would block here until we get more capacity
There was a problem hiding this comment.
This setting is per-shard right? we should clarify this here.
|
In ffb2bec I renamed the two clients implementation we have to endpoint and walEndpoint. I will look into the other ways we discussed structuring this but will have this as a fallback if that don't work out |
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Comments suppressed due to low confidence (1)
internal/component/common/loki/client/consumer_wal.go:135
sync.WaitGroupdoes not have aGomethod. The standard library'ssync.WaitGrouphasAdd(),Done(), andWait()methods. This should be:
stopWG.Add(1)
go func() {
defer stopWG.Done()
pair.Stop(drain)
}() stopWG.Go(func() {
pair.Stop(drain)
})
6132ec6 to
eaac392
Compare
|
@thampiotr I update the pr with an attempt to share stuff between non WAL and WAL implementation. So we now have one endpoint struct that will handle shards. This implemntation has one method We can use this directly in Fanout and we no longer need channels between fanout and endpoint. For WAL implementation I renamed it walEndpointAdapter. This implements the necessary interface that watcher expected and will just call enqueue on endpoint, that internally handles retries etc. Naming could be a bit off but this would be an option we can go with. Let me know what you think :) |
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
internal/component/common/loki/client/consumer_wal.go:134
sync.WaitGroupdoes not have aGo()method. This code will not compile. You should either:
- Use
stopWG.Add(1)andgo func(p endpointWatcherPair) { defer stopWG.Done(); p.Stop(drain) }(pair)pattern (note: capturepairas parameter to avoid closure issues) - Or use
golang.org/x/sync/errgroupwhich has aGo()method
stopWG.Go(func() {
pair.Stop(drain)
})
c889a4a to
c55de0c
Compare
|
Nothing really stands out for the small doc changes. It's good as-is. |
9454993 to
bf7d130
Compare
bf7d130 to
e94d632
Compare
queue. This would deadlock because we would not be able to drain and hard shutdown would not cancel it
e94d632 to
ea9f263
Compare
|
💻 Deploy preview available (feat: add sharding for loki.write): |
| var defaultQueueConfig = QueueConfig{ | ||
| Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 | ||
| MinShards: 1, | ||
| DrainTimeout: 15 * time.Second, |
There was a problem hiding this comment.
No BatchSize is configured to 1MB but capacity is 10MB in docs
| if err := validateConfigStabilityLevel(o, args); err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Don't we call Update anyway where this check will happen?
| for !c.shards.enqueue(entry, segmentNum) { | ||
| backoff.Wait() | ||
| if !backoff.Ongoing() { | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
If this blocks forever due to some issues in enqueue or other functions downstream from here, what would be the symptoms? Would we have any logs or metrics or some other way of debugging this?
There was a problem hiding this comment.
Not more than we have today. If we get stuck here for whatever reason loki_write_sent_bytes_total would go down to 0 and loki_write_dropped_bytes_total be 0.
I have a issue for adding more useful metrics for shardings here #4838 I could also go through add ad useful logs in the same pr for that one.
| case <-s.softShutdown: | ||
| return false | ||
| default: | ||
| return s.queues[shard].append(tenantID, entry, segmentNum) |
There was a problem hiding this comment.
Do we still need to hold the lock when calling append here? Or it doesn't really matter?
There was a problem hiding this comment.
We don't because has it's own locking but do not matter right now because we can only queue one item at a time currently, thanks to the limitations of the pipeline.
I think it's safest to hold the lock for the full duration for now. I need to revisit this if we change the pipeline
required for all queue scales with shards
|
💻 Deploy preview deleted (feat: Add sharding for loki.write). |
thampiotr
left a comment
There was a problem hiding this comment.
I can't see any obvious problems here, so I think we should merge it and then try to validate thoroughly in dev. Hope it's going to work well! As mentioned offline, I'd like to invest more in integration tests to be able to make such changes with more confidence.
PR Description
This PR implements
queue_configfor theloki.writecomponent, enabling users to configure queue-based batching and parallel processing. The implementation introduces a new sharding architecture that distributes log entries across multiple parallel queues based on label fingerprints. This implementation is based on Prometheus rw sharding.The shards implementation is used with both "normal" clients and "WAL" clients. So we get rid of a lot of duplicated logic.
Before this pr we had a
queue_configblock that was only used whenWALwas enabled. It is now always used and will affect clients regardless.Currently no automatic "resharding" is implemented. Implementing this without the WAL will most likely be pretty primitive. So for now
min_shardsis the only configurable value until we address this.Ideally we would move a couple of attributes from
endpointblock toqueue_configblock to closer matchprometheus.remote_write. But we can't do that without doing a breaking change. These attributes are:retry_on_http_429max_backoff_periodmin_backoff_periodbatch_sizebatch_waitWhich issue(s) this PR fixes
Part of: #4728
Notes to the Reviewer
I moved wal writer ownership into client.Manager. No need to expose it to
loki.writecomponent.I plan to work on resharding in followup pr
PR Checklist