Skip to content

start inputs only when all WorkerLoop are fully initialized#11492

Merged
colinsurprenant merged 1 commit intoelastic:masterfrom
colinsurprenant:input_startup
Jan 14, 2020
Merged

start inputs only when all WorkerLoop are fully initialized#11492
colinsurprenant merged 1 commit intoelastic:masterfrom
colinsurprenant:input_startup

Conversation

@colinsurprenant
Copy link
Copy Markdown
Contributor

@colinsurprenant colinsurprenant commented Jan 13, 2020

Fixes #11170
Relates to #11175

This PR separate the WorkerLoop construction and run parts in 2 so that we can wait on the completion of all the WorkerLoop construction before ultimately starting the inputs.

This solves the problem where inputs were started before the WorkerLoop were ready to process events which resulted in either accumulating a backlog in PQ or creating backpressure.

Question: Should we make that behaviour optional?

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

Created followup feature proposal for eagerly starting inputs #11493

Copy link
Copy Markdown
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think that this will work as intended. I've left one note about red flag having to do with WorkerLoop holding onto thread context (which may be ignored if Jruby doesn't reuse java threads?), and another around whether lir_execution needs to be sent in the main thread, or if it also can be handled in the worker loop init threads.

I do think that we are adding a fair bit of complexity to an already complex method, and that by adding a helper function that takes the args for initializing a WorkerLoop, combined with Enumerable#map, would make this a lot more readable/maintainable.

Something like:

# @return [WorkerLoop]
def _init_worker_loop
  begin
    org.logstash.execution.WorkerLoop.new(lir_execution,
                                          filter_queue_client,
                                          @events_filtered,
                                          @events_consumed,
                                          @flushRequested,
                                          @flushing,
                                          @shutdownRequested,
                                          @drain_queue)
  rescue => e
    @logger.error(
              "Worker loop initialization error",
              default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n")))
    nil
  end
end

And then:

  worker_loops = pipeline_workers.times
    .map { Thread.new { _init_worker_loop } }
    .map(&:value)
 
  worker_loops.each do |worker_loop|
    fail("Some worker(s) were not correctly initialized") if worker_loop.nil?
  end
  # ...

  worker_loops.each_with_index do |worker_loop, t|
    thread = Thread.new do
      Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
      ThreadContext.put("pipeline.id", pipeline_id)
      worker_loop.run
    end
    @worker_threads << thread
  end

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

@yaauie

So for the thread creation and args passing, I probably overdid it by using Thread parameters for the WorkerLoop parameters (and the list is long); I usually prefer to pass all variables used in a thread (excepted for thread safe variables) as Thread parameters as it is safer and avoids any possible confusion about a variable scope and thread safety.

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

Per @yaauie's comment above #11492 (comment) about WorkerLoop.THREAD_CONTEXT:

  • I think the intent of this was to hold the thread context of the worker, which normally happens at WorkerLoop instantiation which is/was assumed to happen in the worker thread.
  • The problem I see with the current code :

private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);

Is that it is a static field so this field will be initialized once for all instances and I believe that the actual THREAD_CONTEXT is actually holding the main thread context (because FWIU a class will be initialized when a static field is assigned which happens at class creation here, not at class instantiation). So by this logic, the WorkerLoop.THREAD_CONTEXT is always holding the main thread reference. I am not sure how/why this actually works 🤔

@yaauie
Copy link
Copy Markdown
Member

yaauie commented Jan 14, 2020

I usually prefer to pass all variables used in a thread (excepted for thread safe variables) as Thread parameters as it is safer and avoids any possible confusion about a variable scope and thread safety.

The additional safety of re-binding parameters in Thread.new here provides protection against the reassignment of the variables in the outer scope (e.g. it is safer for an inner thread to reference a local variable bound solely to its own scope as a block parameter than to reference a variable from an outer scope that may be replaced or dereferenced before it has a chance to access it).

In this case, we are re-binding a number of effectively-final instance variables and the result of two method calls (which are each based on effectively-final java properties), so the overhead provides us no tangible benefit.

Additionally, we are sending JavaBasePipelineExt#lirExecution in the outer thread once per worker, each of which does some work to get us a new IRubyObject wrapper for the effectively-final CompiledPipeline, when we likely should bind the result to a local variable just once.

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

Further looking into the WorkerLoop.THREAD_CONTEXT it seems we are basically only using the thread context to create Ruby objects since the Ruby ThreadContext is necessary is for most Ruby API method calls.

This has probably minimal impact but I think we should fix that so that each WorkerLoop instances have their proper Ruby ThreadContext.

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

This new WorkerLoop.THREAD_CONTEXT was introduced in #9401 and per the intent I think the static attribute is an error.

@yaauie
Copy link
Copy Markdown
Member

yaauie commented Jan 14, 2020

So by this logic, the WorkerLoop.THREAD_CONTEXT is always holding the main thread reference. I am not sure how/why this actually works 🤔

Note that WorkerLoop.THREAD_CONTEXT is a ThreadLocal, so each java thread can pull its own ruby thread context out. The trouble arises if and when a java thread ever runs multiple ruby threads in the course of its lifetime (which I do not know), since we only populate it with ThreadLocal#withInitial and then use WorkerLoop.THREAD_CONTEXT in to pass a ThreadContext to various ruby internals that then use it to determine interrupt state.

After thinking on this, I believe that it is an existing situation that is not affected by this changeset either way.

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

@yaauie you are absolutely right; the ThreadLocal.withInitial in

public static final ThreadLocal<ThreadContext> THREAD_CONTEXT = ThreadLocal.withInitial(RubyUtil.RUBY::getCurrentContext);

makes it work correctly as a static field.

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

@yaauie I believe all points have been addressed. I created #11502 to followup on the potential WorkerLoop.THREAD_CONTEXT problem.

Copy link
Copy Markdown
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@colinsurprenant
Copy link
Copy Markdown
Contributor Author

Thanks @yaauie! Will also merge in 7.6 and 7.5.2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Java execution may start inputs before compilation of filters and outputs has completed.

3 participants