Skip to content

fix(diskqueue): use per-beat paths instead of global paths#48834

Merged
orestisfl merged 23 commits intoelastic:mainfrom
orestisfl:fix/diskqueue-per-beat-paths
Mar 4, 2026
Merged

fix(diskqueue): use per-beat paths instead of global paths#48834
orestisfl merged 23 commits intoelastic:mainfrom
orestisfl:fix/diskqueue-per-beat-paths

Conversation

@orestisfl
Copy link
Copy Markdown
Contributor

@orestisfl orestisfl commented Feb 12, 2026

Proposed commit message

fix(diskqueue): use per-beat paths instead of global paths

Add a Paths field to diskqueue Settings so each beat receiver gets its own data directory for the disk queue. Thread the per-beat *paths.Path through the pipeline and output factory APIs:

  • outputs.Factory, outputs.Load, outputs.Success, outputs.SuccessNet now accept *paths.Path
  • pipeline.Settings carries Paths and forwards it to queueFactoryForUserConfig
  • All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
  • dockerlogbeat updated to pass paths.New() explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in dockerlogbeat.
  • pipeline/stress test harness updated to use LoadWithSettings with paths

When Settings.Path is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat Paths.Resolve() instead of the global paths.Resolve().

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

None. In the standalone beat flow, b.Paths and paths.Paths point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

How to test this PR locally

Standalone filebeat

# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)

elastic-agent otel

Create /tmp/diskqueue-test/otel-receivers.yml:

receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none

Run:

elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml

Verify:

find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat

Related issues

Add a Paths field to diskqueue Settings so beat receivers get their own
data directory for the disk queue. The pipeline threads its per-beat
paths through to the diskqueue factory. When Paths is nil (e.g. from
output-level queue overrides), the global paths.Resolve fallback is
preserved for backward compatibility.

Closes elastic#46989
Output factories can configure the queue (including diskqueue). Thread the
per-beat *paths.Path through outputs.Load and outputs.Success/SuccessNet so
queue settings no longer rely on the global paths instance.

This also makes diskqueue settings require Paths when resolving the default
queue directory.
@orestisfl orestisfl self-assigned this Feb 12, 2026
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 12, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@orestisfl orestisfl force-pushed the fix/diskqueue-per-beat-paths branch from 612419e to dfd18fb Compare February 12, 2026 20:27
@orestisfl orestisfl added Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team and removed needs_team Indicates that the issue/PR needs a Team:* label labels Feb 12, 2026
@orestisfl orestisfl added backport-8.19 Automated backport to the 8.19 branch backport-9.2 Automated backport to the 9.2 branch backport-9.3 Automated backport to the 9.3 branch labels Feb 12, 2026
@elastic elastic deleted a comment from mergify bot Feb 12, 2026
orestisfl added a commit to elastic/elastic-agent-system-metrics that referenced this pull request Feb 19, 2026
## What does this PR do?

Remove per-test context timeouts and rely on the go test -timeout flag
(20m in CI). Bump sleep durations to 1200s to outlast the test run.

## Why is it important?

The container system tests used a shared 5-minute context.WithTimeout
for the entire permission matrix. On slower CI workers the 8-case matrix
in TestProcessAllSettings recently started exceeded this, causing
"context deadline exceeded" failures:
https://buildkite.com/elastic/elastic-agent-system-metrics/builds/929

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] ~~I have added an entry in `CHANGELOG.md`~~

## Related issues

CI failed in main and elastic/beats#48834
@orestisfl orestisfl changed the title Fix/diskqueue per beat paths fix(diskqueue): use per-beat paths instead of global paths Feb 24, 2026
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@orestisfl orestisfl requested a review from leehinman February 24, 2026 15:56
@leehinman leehinman requested a review from faec February 24, 2026 16:02
Copy link
Copy Markdown
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I would like @faec to take a look before we merge though.

Instead of passing *paths.Path through every method call, store it
on diskQueue, readerLoop, writerLoop, and deleterLoop structs. This
avoids cascading signature changes while keeping paths out of Settings.

Also fixes the missed callsite in outputs/util.go.
@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Feb 27, 2026

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b fix/diskqueue-per-beat-paths upstream/fix/diskqueue-per-beat-paths
git merge upstream/main
git push upstream fix/diskqueue-per-beat-paths

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 35a560e and 2d07916.

📒 Files selected for processing (39)
  • libbeat/cmd/instance/beat.go
  • libbeat/cmd/test/output.go
  • libbeat/outputs/console/console.go
  • libbeat/outputs/discard/discard.go
  • libbeat/outputs/elasticsearch/client_integration_test.go
  • libbeat/outputs/elasticsearch/elasticsearch.go
  • libbeat/outputs/fileout/file.go
  • libbeat/outputs/kafka/client_test.go
  • libbeat/outputs/kafka/kafka.go
  • libbeat/outputs/kafka/kafka_integration_test.go
  • libbeat/outputs/logstash/logstash.go
  • libbeat/outputs/logstash/logstash_integration_test.go
  • libbeat/outputs/logstash/logstash_test.go
  • libbeat/outputs/output_reg.go
  • libbeat/outputs/redis/redis.go
  • libbeat/outputs/redis/redis_integration_test.go
  • libbeat/outputs/redis/redis_test.go
  • libbeat/outputs/util.go
  • libbeat/outputs/util_test.go
  • libbeat/publisher/pipeline/module.go
  • libbeat/publisher/pipeline/pipeline.go
  • libbeat/publisher/pipeline/stress/out.go
  • libbeat/publisher/pipeline/stress/run.go
  • libbeat/publisher/queue/diskqueue/benchmark_test.go
  • libbeat/publisher/queue/diskqueue/config.go
  • libbeat/publisher/queue/diskqueue/config_test.go
  • libbeat/publisher/queue/diskqueue/core_loop.go
  • libbeat/publisher/queue/diskqueue/core_loop_test.go
  • libbeat/publisher/queue/diskqueue/deleter_loop.go
  • libbeat/publisher/queue/diskqueue/queue.go
  • libbeat/publisher/queue/diskqueue/queue_test.go
  • libbeat/publisher/queue/diskqueue/reader_loop.go
  • libbeat/publisher/queue/diskqueue/segments.go
  • libbeat/publisher/queue/diskqueue/segments_test.go
  • libbeat/publisher/queue/diskqueue/writer_loop.go
  • x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
  • x-pack/filebeat/tests/integration/otel_test.go
  • x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
  • x-pack/otel/exporter/logstashexporter/exporter.go

📝 Walkthrough

Walkthrough

This pull request threads beat instance-specific path information throughout the outputs and queueing subsystems, replacing implicit global path resolution with explicit parameterization. The paths *paths.Path parameter is added to output factory signatures, output utility functions (Success, SuccessNet), queue initialization (NewQueue, FactoryForSettings), and supporting path-resolution methods in the diskqueue subsystem. All call sites are updated to propagate the paths object from beat initialization through factory construction and segment handling. This enables each beat instance to use its own path configuration rather than relying on global state, with testing and integration layers updated accordingly.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed PR successfully threads per-beat paths through diskqueue, outputs, and pipeline APIs. All 8 output factories updated; explicit Settings.Path precedence preserved; fallback to per-beat Paths.Resolve implemented.
Out of Scope Changes check ✅ Passed All changes directly support per-beat diskqueue paths. Minor refactoring in util_test.go and otel_test.go aligns with path-aware queue testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…eat-paths

# Conflicts:
#	libbeat/publisher/queue/diskqueue/segments_test.go
@orestisfl orestisfl merged commit 979f9b4 into elastic:main Mar 4, 2026
203 of 206 checks passed
@orestisfl orestisfl deleted the fix/diskqueue-per-beat-paths branch March 4, 2026 15:09
mergify bot pushed a commit that referenced this pull request Mar 4, 2026
fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)

# Conflicts:
#	libbeat/outputs/logstash/logstash.go
#	x-pack/otel/exporter/logstashexporter/exporter.go
mergify bot pushed a commit that referenced this pull request Mar 4, 2026
fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)
mergify bot pushed a commit that referenced this pull request Mar 4, 2026
fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)
orestisfl added a commit that referenced this pull request Mar 5, 2026
…49276)

fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
orestisfl added a commit that referenced this pull request Mar 5, 2026
…49275)

fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
@github-actions github-actions bot mentioned this pull request Mar 5, 2026
orestisfl added a commit that referenced this pull request Mar 9, 2026
Resolve cherry-pick conflicts from #48834 backport to 8.19:
- logstash.go: keep 8.19 code structure (no MakeLogstashClients
  refactoring), add paths import and pass beatPaths to SuccessNet
- exporter.go: remove file that doesn't exist on 8.19 (the
  logstashexporter package is a main-branch feature)
orestisfl added a commit that referenced this pull request Mar 11, 2026
… global paths (#49274)

* fix(diskqueue): use per-beat paths instead of global paths (#48834)

fix(diskqueue): use per-beat paths instead of global paths

Add a `Paths` field to diskqueue `Settings` so each beat receiver gets its own data directory for the disk queue. Thread the per-beat `*paths.Path` through the pipeline and output factory APIs:

- `outputs.Factory`, `outputs.Load`, `outputs.Success`, `outputs.SuccessNet` now accept `*paths.Path`
- `pipeline.Settings` carries `Paths` and forwards it to `queueFactoryForUserConfig`
- All 8 output factory implementations (console, discard, elasticsearch, file, kafka, logstash, redis, otelconsumer) updated to accept and forward paths
- `dockerlogbeat` updated to pass `paths.New()` explicitly. This is making explicit the previously implied behavior: the global paths were never initialized or used in `dockerlogbeat`.
- `pipeline/stress` test harness updated to use `LoadWithSettings` with paths

When `Settings.Path` is explicitly configured, it still takes precedence (backward compatible). When empty, the diskqueue directory is resolved via the per-beat `Paths.Resolve()` instead of the global `paths.Resolve()`.

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- ~~I have made corresponding changes to the documentation~~
- ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~

## Disruptive User Impact

None. In the standalone beat flow, `b.Paths` and `paths.Paths` point to the same object, so behavior is identical. The change only corrects behavior for beat receivers, where each receiver gets its own paths instance.

## How to test this PR locally
### Standalone filebeat

```yaml
# filebeat-diskqueue-test.yml
path.home: /tmp/diskqueue-test/standalone
path.data: /tmp/diskqueue-test/standalone/data

filebeat.inputs:
  - type: filestream
    id: test-input
    enabled: true
    paths:
      - /tmp/diskqueue-test/input.log
    prospector.scanner.fingerprint.enabled: false
    file_identity.native: ~

queue.disk:
  max_size: 100MB

output.console:
  pretty: true
```

```bash
cd filebeat && mage build
echo '{"message": "hello"}' > /tmp/diskqueue-test/input.log
./filebeat -e -c /path/to/filebeat-diskqueue-test.yml

# Verify: diskqueue created under path.data
ls -la /tmp/diskqueue-test/standalone/data/diskqueue/
# Should contain: state.dat (and .seg files once events flow)
```

### `elastic-agent otel`
Create `/tmp/diskqueue-test/otel-receivers.yml`:

```yaml
receivers:
  filebeatreceiver/a:
    filebeat:
      inputs:
        - type: filestream
          id: stream-a
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-a
    queue.disk:
      max_size: 100MB

  filebeatreceiver/b:
    filebeat:
      inputs:
        - type: filestream
          id: stream-b
          enabled: true
          paths:
            - /tmp/diskqueue-test/input.log
          prospector.scanner.fingerprint.enabled: false
          file_identity.native: ~
    path.home: /tmp/diskqueue-test/receiver-b
    queue.disk:
      max_size: 100MB

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filebeatreceiver/a, filebeatreceiver/b]
      exporters: [debug]
  telemetry:
    metrics:
      level: none
```

Run:

```bash
elastic-agent otel --config /tmp/diskqueue-test/otel-receivers.yml
```

Verify:

```bash
find /tmp/diskqueue-test/receiver-a -name state.dat
# Expected: /tmp/diskqueue-test/receiver-a/data/diskqueue/state.dat

find /tmp/diskqueue-test/receiver-b -name state.dat
# Expected: /tmp/diskqueue-test/receiver-b/data/diskqueue/state.dat
```

## Related issues

- Closes #46989

(cherry picked from commit 979f9b4)

# Conflicts:
#	libbeat/outputs/logstash/logstash.go
#	x-pack/otel/exporter/logstashexporter/exporter.go

* fix: resolve backport merge conflicts for diskqueue paths PR

Resolve cherry-pick conflicts from #48834 backport to 8.19:
- logstash.go: keep 8.19 code structure (no MakeLogstashClients
  refactoring), add paths import and pass beatPaths to SuccessNet
- exporter.go: remove file that doesn't exist on 8.19 (the
  logstashexporter package is a main-branch feature)

---------

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-8.19 Automated backport to the 8.19 branch backport-9.2 Automated backport to the 9.2 branch backport-9.3 Automated backport to the 9.3 branch skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[beatreceiver] replace global paths in diskqueue

5 participants