Add log throttling per file#2702
Conversation
Remove debug log Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
a4ec3f2 to
8ab733c
Compare
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
…otify Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
8ab733c to
416693c
Compare
ganmacs
left a comment
There was a problem hiding this comment.
test fails. so could you fix the test first?
| refresh_watchers unless @skip_refresh_on_startup | ||
| timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) | ||
|
|
||
| @threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval| |
There was a problem hiding this comment.
| @threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads. | ||
|
|
||
| @threads.each { |thr| | ||
| thr.join |
There was a problem hiding this comment.
if it blocks here, all code after this is blocking.
There was a problem hiding this comment.
@threads is hash. so thr is Array.
|
|
||
| log.debug "Thread refresh_watchers" | ||
| @threads.each { |thr| | ||
| log.debug "Thread #{thr[0]} #{thr[1].status}" |
| end | ||
| end | ||
| rotated_tw = @tails[path] | ||
|
|
| paths.each { |path| | ||
| tw = remove_watcher ? @tails.delete(path) : @tails[path] | ||
| if remove_watcher | ||
| @threads[path].exit |
There was a problem hiding this comment.
Thread#exit is dangerous. could you finish this thread in a proper way?
| if @lines.size >= @watcher.read_lines_limit | ||
|
|
||
| number_bytes_read += bytes_to_read | ||
| limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0) |
There was a problem hiding this comment.
| limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0) | |
| limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second && @watcher.read_bytes_limit_per_second > 0) |
| # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion | ||
| time_spent_reading = Time.new - start_reading | ||
| @watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") | ||
| if (time_spent_reading < 1) |
There was a problem hiding this comment.
| if (time_spent_reading < 1) | |
| if time_spent_reading < 1 |
| log.warn "Skip #{path} because unexpected setup error happens: #{e}" | ||
| next | ||
| begin | ||
| tw = setup_watcher(path, pe) |
There was a problem hiding this comment.
It can be a race condition. before passing pe to setup_watcher, L334 should be called. but the current code does not ensure it.
| end | ||
| if @threads[path].nil? | ||
| log.debug "Add Thread #{path}" | ||
| @threads[path] = Thread.new(path) do |path| |
There was a problem hiding this comment.
why did you change these codes to run on new thread?
| @fifo.read_lines(@lines) | ||
| if @lines.size >= @watcher.read_lines_limit | ||
|
|
||
| number_bytes_read += bytes_to_read |
There was a problem hiding this comment.
IO#readpartial does not alway read bytes_to_read bytes. Is this code ok?
Hello, I'm gonna spend more time to add more tests and review your comments. |
|
Ok. I like this feature :) Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads. https://github.com/fluent/fluentd/pull/2702/files#diff-1da710c9dcc8d0fc57996df7a9d39695R331 |
|
Hi @ganmacs,
That was one of my worries in term of performance, I've been testing with 200 files and a high number of bytes per file, the number of thread hasn't been an issue but I agree having one thread per file is not ideal, I was thinking about using a thread pool but that would need a bigger re-architecture and I'm not sure the inotify will work as expect.
I agree, however we are more interested to throttle log in a Kubernetes environment per container which means per file, to do not affect container which are sending at a decent rate. In the commit, I've been implementing the log throttling per file but this implementation work nicely only with you use the timer because you it will stop reading and if it did not get notified again then some logs might not be read. |
|
This PR has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this PR will be closed in 30 days |
|
cosmo0920 is now working on this feature on #3185 with newer in_tail code. |
What this PR does / why we need it:
Running in a big cluster with high volume of log, it would be nice to throttle the log shipping to avoid network saturation and make it easier to calculate the max throughput per node for example in a Kubernetes cluster.
Tail plugin is watching files and every second reading from the last pointer to the end of the file.
This change allow to stop reading the file after X number of logs lines read and update the pointer in the pos file as usual.
This commit adds log throttling per bytes for each files, should work only when
watch_timeris enabled and thestat_watcher(inotify) is disabled.In order to have this feature for any watch configuration (timer or inotify..), I've updated to add a sleep when you have been reaching the bytes read limit, the sleep would block process and affect other file ingestion, I've added a basic thread array to have a multi-threading ingestion.
However I've noticed you are relying on
cool.io, and I was wondering if I should use this library instead.Would you be interested in this feature?
Some discussions before submitting the PR.
Docs Changes:
adding read_lines_limit_per_notify which by default is set to -1, so no throttling involve by default.
Release Note: