Skip to content

Add log throttling per file#2702

Closed
rewiko wants to merge 5 commits into
fluent:masterfrom
rewiko:add-log-throttling-per-file
Closed

Add log throttling per file#2702
rewiko wants to merge 5 commits into
fluent:masterfrom
rewiko:add-log-throttling-per-file

Conversation

@rewiko

@rewiko rewiko commented Nov 18, 2019

Copy link
Copy Markdown

What this PR does / why we need it:
Running in a big cluster with high volume of log, it would be nice to throttle the log shipping to avoid network saturation and make it easier to calculate the max throughput per node for example in a Kubernetes cluster.

Tail plugin is watching files and every second reading from the last pointer to the end of the file.
This change allow to stop reading the file after X number of logs lines read and update the pointer in the pos file as usual.

This commit adds log throttling per bytes for each files, should work only when watch_timer is enabled and the stat_watcher (inotify) is disabled.

In order to have this feature for any watch configuration (timer or inotify..), I've updated to add a sleep when you have been reaching the bytes read limit, the sleep would block process and affect other file ingestion, I've added a basic thread array to have a multi-threading ingestion.
However I've noticed you are relying on cool.io, and I was wondering if I should use this library instead.

Would you be interested in this feature?

Some discussions before submitting the PR.

Docs Changes:

adding read_lines_limit_per_notify which by default is set to -1, so no throttling involve by default.

Release Note:

Remove debug log

Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
@rewiko rewiko force-pushed the add-log-throttling-per-file branch from a4ec3f2 to 8ab733c Compare November 18, 2019 21:26
Anthony Comtois added 4 commits November 18, 2019 21:27
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
…otify

Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
@rewiko rewiko force-pushed the add-log-throttling-per-file branch from 8ab733c to 416693c Compare November 18, 2019 21:31

@ganmacs ganmacs left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test fails. so could you fix the test first?

refresh_watchers unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))

@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads.

@threads.each { |thr|
thr.join

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it blocks here, all code after this is blocking.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@threads is hash. so thr is Array.


log.debug "Thread refresh_watchers"
@threads.each { |thr|
log.debug "Thread #{thr[0]} #{thr[1].status}"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

end
end
rotated_tw = @tails[path]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

paths.each { |path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
if remove_watcher
@threads[path].exit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread#exit is dangerous. could you finish this thread in a proper way?

if @lines.size >= @watcher.read_lines_limit

number_bytes_read += bytes_to_read
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second && @watcher.read_bytes_limit_per_second > 0)

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Time.new - start_reading
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if (time_spent_reading < 1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (time_spent_reading < 1)
if time_spent_reading < 1

log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
begin
tw = setup_watcher(path, pe)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a race condition. before passing pe to setup_watcher, L334 should be called. but the current code does not ensure it.

end
if @threads[path].nil?
log.debug "Add Thread #{path}"
@threads[path] = Thread.new(path) do |path|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change these codes to run on new thread?

@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit

number_bytes_read += bytes_to_read

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO#readpartial does not alway read bytes_to_read bytes. Is this code ok?

@rewiko

rewiko commented Nov 27, 2019

Copy link
Copy Markdown
Author

test fails. so could you fix the test first?

Hello,
I will definitely add more test and fix those failing, I've created the PR to be able to discuss in term of design and making sure you would be interested by this kind of feature.

I'm gonna spend more time to add more tests and review your comments.
Thanks

@ganmacs

ganmacs commented Nov 27, 2019

Copy link
Copy Markdown
Member

Ok. I like this feature :)
but, There are some considering points.

Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads. https://github.com/fluent/fluentd/pull/2702/files#diff-1da710c9dcc8d0fc57996df7a9d39695R331
This patch restricts the bytes size per file, right? I think that restring all data(not per file) could be better for the situation like "to avoid network saturation and make it easier to calculate the max throughput per node".

@rewiko

rewiko commented Nov 27, 2019

Copy link
Copy Markdown
Author

Hi @ganmacs,

Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads.

That was one of my worries in term of performance, I've been testing with 200 files and a high number of bytes per file, the number of thread hasn't been an issue but I agree having one thread per file is not ideal, I was thinking about using a thread pool but that would need a bigger re-architecture and I'm not sure the inotify will work as expect.

I think that restring all data(not per file) could be better for the situation like "to avoid network saturation and make it easier to calculate the max throughput per node".

I agree, however we are more interested to throttle log in a Kubernetes environment per container which means per file, to do not affect container which are sending at a decent rate.

In the commit, I've been implementing the log throttling per file but this implementation work nicely only with you use the timer because you it will stop reading and if it did not get notified again then some logs might not be read.
Maybe I could reuse that concept and more breaking I can push the function with a sleep and a notify which will be consume by a thread pool. In that case only the file throttled will create thread.

@github-actions

Copy link
Copy Markdown

This PR has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this PR will be closed in 30 days

@github-actions github-actions Bot added the stale label Dec 18, 2020
@repeatedly

Copy link
Copy Markdown
Member

cosmo0920 is now working on this feature on #3185 with newer in_tail code.
Thanks for the idea!

@repeatedly repeatedly closed this Dec 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants