Skip to content

Restoring chunks with same metadata #2698

@fleprovost

Description

@fleprovost

Describe the bug
When Fluentd restarts, it restores all chunks that are stored in buffer files and intends to flush them but only the last chunk restored is flushed.

To Reproduce

  • Use any input that can ship thousands of events/s
  • Use an output that supports file buffering
  • Set chunk_limit_records to 10
  • Send 1000* {"timestamp": 1} JSON events
    (100 buffer files are created)
  • Before the events are flushed, reload fluentd service

Expected behavior
All chunks are restored and flushed when Fluentd restarts

Your Environment

  • Fluentd or td-agent version: fluentd 1.7.4
  • Operating system:
    NAME="Alpine Linux"
    VERSION_ID=3.9.4
  • Kernel version: 4.4.0-154-generic

Your Configuration

<system>
  log_level trace
</system>

<source>
  @type tail
  path /fluentd/log/foo.log
  pos_file /fluentd/log/td-agent/foo.log.pos
  tag foo.bar
  <parse>
    @type json
  </parse>
</source>

<filter foo.bar>
    @type record_transformer
    enable_ruby
    <record>
      newtime ${record["timestamp"]}
    </record>
    renew_time_key newtime
</filter>

<match foo.bar>
  @type amqp
  exchange foo
  key ""
  addresses 172.18.0.2:5672
  user guest
  pass guest
  vhost /
  <buffer time>
    @type file
    timekey 30
    timekey_wait 0
    chunk_limit_records 10
    path /fluentd/buffer/
  </buffer>
</match>

Your Error Log

Before Fluentd reloading, all chunks are flushed correctly, but at restarts:

2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0c58935521fbf76d5e691216.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ecc0a41d98d0fccef5a906ab4.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0afa28e73d5e198227f834a6.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eceeb22cfddb0516e29fad0bf.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0eee72d351461588ead60233.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee5955c88f80ff89826031ced.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11d9e4162a6f52933635699a.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eced24d53777f80807064ca49.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef312163e22da0dcea03eaad8.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0b5e73337978add08578ff79.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ed9429bf7afc4010636424da6.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0b8fba5bd26a5015402bf334.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef3b2c338e22bc016e57f0bf2.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee90e69cfef38e857e16e1d29.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f1016e3d1cc0ffa8d05282742.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0e9feddc6605e140d41c0263.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef3d02b6ece5008cd80cca186.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0c259bf7bbe07eebe33b6929.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0d6a81bce065426511dbfe3c.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef53c9c7e9e0bcf36a9f10e15.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ed65d3c9ba7ac526d1d5f9e22.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0aa814d119979ae181b7a2a9.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11a54d42c6d731084b06592d.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ee8ab8ab0d2d502b617621901.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f11f1917ac030e7e2c3b84a4c.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78ef4494d94ee2ce3f1b426b5de.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78eec5cc5ba112991296cf533df.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: restoring buffer file: path = /fluentd/buffer/buffer.b5974f78f0f2054c18053b186689eca90.log
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: buffer started instance=47070979384460 stage_size=310 queue_size=0
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: Connecting to RabbitMQ...
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: tailing paths: target = /fluentd/log/foo.log | existing = 
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: following tail of /fluentd/log/foo.log
2019-11-14 14:42:44 +0000 [info]: #0 fluent/log.rb:322:info: fluentd worker is now running worker=0
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: flush_thread actually running
2019-11-14 14:42:44 +0000 [debug]: #0 fluent/log.rb:302:debug: enqueue_thread actually running
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo
2019-11-14 14:42:45 +0000 [debug]: #0 fluent/log.rb:302:debug: Sending message {"timestamp":1,"newtime":1}, :key => , :headers => {} into foo

Only the last restored chunk is flushed. If we reload as many times as the remaining chunks, they will be finally all flushed.

Additional context
When the chunk_limit_records is reached, a new chunk is created but it has the same metadata as the previous. If those chunks have not been flushed before a restart, when it resumes, the @stage map

@stage, @queue = resume
contains only one chunk because all remaining chunks have the same metadata
stage[chunk.metadata] = chunk
and only the last one restored is saved in the map.

Maybe we could add an information in metadata that a chunk has been created because chunk_limit_records has been reached and then, each of theses chunks could have unique metadata ?

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions