start inputs only when all WorkerLoop are fully initialized#11492
start inputs only when all WorkerLoop are fully initialized#11492colinsurprenant merged 1 commit intoelastic:masterfrom
Conversation
|
Created followup feature proposal for eagerly starting inputs #11493 |
yaauie
left a comment
There was a problem hiding this comment.
Overall, I think that this will work as intended. I've left one note about red flag having to do with WorkerLoop holding onto thread context (which may be ignored if Jruby doesn't reuse java threads?), and another around whether lir_execution needs to be sent in the main thread, or if it also can be handled in the worker loop init threads.
I do think that we are adding a fair bit of complexity to an already complex method, and that by adding a helper function that takes the args for initializing a WorkerLoop, combined with Enumerable#map, would make this a lot more readable/maintainable.
Something like:
# @return [WorkerLoop]
def _init_worker_loop
begin
org.logstash.execution.WorkerLoop.new(lir_execution,
filter_queue_client,
@events_filtered,
@events_consumed,
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue)
rescue => e
@logger.error(
"Worker loop initialization error",
default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n")))
nil
end
endAnd then:
worker_loops = pipeline_workers.times
.map { Thread.new { _init_worker_loop } }
.map(&:value)
worker_loops.each do |worker_loop|
fail("Some worker(s) were not correctly initialized") if worker_loop.nil?
end
# ...
worker_loops.each_with_index do |worker_loop, t|
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
worker_loop.run
end
@worker_threads << thread
end|
So for the thread creation and args passing, I probably overdid it by using Thread parameters for the |
|
Per @yaauie's comment above #11492 (comment) about
Is that it is a static field so this field will be initialized once for all instances and I believe that the actual |
The additional safety of re-binding parameters in In this case, we are re-binding a number of effectively-final instance variables and the result of two method calls (which are each based on effectively-final java properties), so the overhead provides us no tangible benefit. Additionally, we are sending |
|
Further looking into the This has probably minimal impact but I think we should fix that so that each |
|
This new |
Note that After thinking on this, I believe that it is an existing situation that is not affected by this changeset either way. |
|
@yaauie you are absolutely right; the makes it work correctly as a static field. |
65a4ed1 to
4ce51a9
Compare
|
Thanks @yaauie! Will also merge in 7.6 and 7.5.2. |
Fixes #11170
Relates to #11175
This PR separate the
WorkerLoopconstruction and run parts in 2 so that we can wait on the completion of all theWorkerLoopconstruction before ultimately starting the inputs.This solves the problem where inputs were started before the
WorkerLoopwere ready to process events which resulted in either accumulating a backlog in PQ or creating backpressure.Question: Should we make that behaviour optional?