Describe the bug
When Fluentd restarts, it restores all chunks that are stored in buffer files and intends to flush them but only the last chunk restored is flushed.
To Reproduce
- Use any input that can ship thousands of events/s
- Use an output that supports file buffering
- Set chunk_limit_records to 10
- Send 1000* {"timestamp": 1} JSON events
(100 buffer files are created)
- Before the events are flushed, reload fluentd service
Expected behavior
All chunks are restored and flushed when Fluentd restarts
Your Environment
- Fluentd or td-agent version: fluentd 1.7.4
- Operating system:
NAME="Alpine Linux"
VERSION_ID=3.9.4
- Kernel version: 4.4.0-154-generic
Your Configuration
<system>
log_level trace
</system>
<source>
@type tail
path /fluentd/log/foo.log
pos_file /fluentd/log/td-agent/foo.log.pos
tag foo.bar
<parse>
@type json
</parse>
</source>
<filter foo.bar>
@type record_transformer
enable_ruby
<record>
newtime ${record["timestamp"]}
</record>
renew_time_key newtime
</filter>
<match foo.bar>
@type amqp
exchange foo
key ""
addresses 172.18.0.2:5672
user guest
pass guest
vhost /
<buffer time>
@type file
timekey 30
timekey_wait 0
chunk_limit_records 10
path /fluentd/buffer/
</buffer>
</match>
Your Error Log
Before Fluentd reloading, all chunks are flushed correctly, but at restarts:
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0c58935521fbf76d5e691216.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ecc0a41d98d0fccef5a906ab4.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0afa28e73d5e198227f834a6.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eceeb22cfddb0516e29fad0bf.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0eee72d351461588ead60233.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee5955c88f80ff89826031ced.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11d9e4162a6f52933635699a.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eced24d53777f80807064ca49.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef312163e22da0dcea03eaad8.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0b5e73337978add08578ff79.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ed9429bf7afc4010636424da6.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0b8fba5bd26a5015402bf334.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef3b2c338e22bc016e57f0bf2.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee90e69cfef38e857e16e1d29.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f1016e3d1cc0ffa8d05282742.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0e9feddc6605e140d41c0263.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef3d02b6ece5008cd80cca186.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0c259bf7bbe07eebe33b6929.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0d6a81bce065426511dbfe3c.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef53c9c7e9e0bcf36a9f10e15.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ed65d3c9ba7ac526d1d5f9e22.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0aa814d119979ae181b7a2a9.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11a54d42c6d731084b06592d.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee8ab8ab0d2d502b617621901.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11f1917ac030e7e2c3b84a4c.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef4494d94ee2ce3f1b426b5de.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eec5cc5ba112991296cf533df.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0f2054c18053b186689eca90.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: buffer started instance=47070979384460 stage_size=310 queue_size=0
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: Connecting to RabbitMQ...
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: tailing paths: target = /fluentd/log/foo.log | existing =
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: following tail of /fluentd/log/foo.log
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: fluentd worker is now running worker=0
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: flush_thread actually running
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: enqueue_thread actually running
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
Only the last restored chunk is flushed. If we reload as many times as the remaining chunks, they will be finally all flushed.
Additional context
When the chunk_limit_records is reached, a new chunk is created but it has the same metadata as the previous. If those chunks have not been flushed before a restart, when it resumes, the @stage map
contains only one chunk because all remaining chunks have the same metadata
|
stage[chunk.metadata] = chunk |
and only the last one restored is saved in the map.
Maybe we could add an information in metadata that a chunk has been created because chunk_limit_records has been reached and then, each of theses chunks could have unique metadata ?
Describe the bug
When Fluentd restarts, it restores all chunks that are stored in buffer files and intends to flush them but only the last chunk restored is flushed.
To Reproduce
(100 buffer files are created)
Expected behavior
All chunks are restored and flushed when Fluentd restarts
Your Environment
NAME="Alpine Linux"
VERSION_ID=3.9.4
Your Configuration
Your Error Log
Before Fluentd reloading, all chunks are flushed correctly, but at restarts:
Only the last restored chunk is flushed. If we reload as many times as the remaining chunks, they will be finally all flushed.
Additional context
When the chunk_limit_records is reached, a new chunk is created but it has the same metadata as the previous. If those chunks have not been flushed before a restart, when it resumes, the @stage map
fluentd/lib/fluent/plugin/buffer.rb
Line 185 in e1c8ed5
fluentd/lib/fluent/plugin/buf_file.rb
Line 165 in e1c8ed5
Maybe we could add an information in metadata that a chunk has been created because chunk_limit_records has been reached and then, each of theses chunks could have unique metadata ?