[INGEST] Interrupt the current thread if evaluation grok expressions take too long#31024
Conversation
…take too long This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search() This method can hang forever if the regex expression is too complex. The thread interrupter in the background checks every 3 seconds whether there are threads execution the org.joni.Matcher#search() method for longer than 5 seconds and if so interrupts these threads. Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and if so returns org.joni.Matcher#INTERRUPTED Closes elastic#28731
|
Pinging @elastic/es-core-infra |
talevy
left a comment
There was a problem hiding this comment.
Left some comments, looks great overall!
| if (result == Matcher.INTERRUPTED) { | ||
| throw new IllegalArgumentException("grok pattern matching is too complex and takes too long to execute"); | ||
| } else if (result == Matcher.FAILED) { | ||
| // I think we should throw an error here? |
There was a problem hiding this comment.
and change current behavior?
There was a problem hiding this comment.
Possibly but I think it should be a follow-up so let's go with a // TODO: here?
There was a problem hiding this comment.
Agreed, this should be done in a follow up.
I don't think is different behaviour, because in GrokProcessor we throw an error when null is returned.
| /** | ||
| * Provides a thread pool | ||
| */ | ||
| // TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling? |
There was a problem hiding this comment.
is this something you want to discuss?
I see no issue with leaving the Threadpool available to processors
There was a problem hiding this comment.
It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.
There was a problem hiding this comment.
So I had doubts about directly exposing TP to all processors, because this will send to wrong message to processor implementors, there should be no need to fork a thread. I will expose the minimum necessary.
| public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { | ||
|
|
||
| static final Map<String, String> GROK_PATTERNS = Grok.getBuiltinPatterns(); | ||
| static final Setting<TimeValue> GUARD_INTERVAL = |
There was a problem hiding this comment.
we should add documentation for these settings
| static final Setting<TimeValue> GUARD_INTERVAL = | ||
| Setting.timeSetting("ingest.grok.guard.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); | ||
| static final Setting<TimeValue> GUARD_MAX_EXECUTION_TIME = | ||
| Setting.timeSetting("ingest.grok.guard.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); |
There was a problem hiding this comment.
Is discussing these timeout values worth doing?
I took a look at some other defaults of ours, like in the low-level rest client. It sets its timeout to 1sec by default. Should we be more aggressive here?
There was a problem hiding this comment.
I change the defaults to 1 second. I think that should be good enough for now?
Also now I think about if maybe timeout is better than max_execution_time?
jasontedor
left a comment
There was a problem hiding this comment.
The overall implementation looks good. I left some comment about some details.
| * @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. | ||
| */ | ||
| static ThreadInterrupter noop() { | ||
| return new Noop(); |
There was a problem hiding this comment.
I think this can reference a singleton instance?
| /** | ||
| * De-registers the current thread and prevents it from being interrupted. | ||
| */ | ||
| void deregister(); |
There was a problem hiding this comment.
I think this should be unregister.
| * | ||
| * This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because | ||
| * it can end up spinning endlessly if the regular expression is too complex. Joni has checks | ||
| * that that for every 30k iterations it checks if the current thread is interrupted and if so |
| try { | ||
| final long currentRelativeTime = relativeTimeSupplier.getAsLong(); | ||
| for (Map.Entry<Thread, Long> entry : registry.entrySet()) { | ||
| long threadTime = entry.getValue(); |
There was a problem hiding this comment.
This local looks unnecessary?
| return new Noop(); | ||
| } | ||
|
|
||
| class Noop implements ThreadInterrupter { |
There was a problem hiding this comment.
I find it odd that the interface is named ThreadInterrupted and then we have a Noop implementation that does nothing; that is, it implements an interface that indicates that it interrupts threads yet it doesn't interrupt threads. I think a name that avoids this confusion is ThreadWatchdog. This is not an uncommon name for this idea.
| } | ||
| } | ||
| } finally { | ||
| scheduler.apply(interval, this::interruptLongRunningExecutions); |
There was a problem hiding this comment.
This is potentially risky. What if the try block throws an OutOfMemoryError and the scheduler#apply throws a RejectedExecutionException? We lose the OutOfMemoryError and miss dying with dignity. Now, the scheduler#apply should really on throw a RejectedExecutionException if the scheduler is shutdown which should only happen when we are shutting down, but let's not take any chances here that we lose a fatal error?
| BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> { | ||
| try { | ||
| Thread.sleep(delay); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
I think this should not happen so we can rethrow this as an AssertionError?
| ThreadInterrupter guard = ThreadInterrupter.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { | ||
| try { | ||
| Thread.sleep(delay); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
Same comment here: rethrow as an AssertionError.
| // need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted | ||
| Thread thread = new Thread(() -> { | ||
| guard.register(); | ||
| while (run.get()) { |
There was a problem hiding this comment.
Maybe have this thread check it's interrupt status and break the loop if it detect it is interrupted? That would be a more realistic implementation of cooperative interruption?
| /** | ||
| * Provides a thread pool | ||
| */ | ||
| // TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling? |
There was a problem hiding this comment.
It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.
|
@talevy @jasontedor Thanks for reviewing. I've updated this PR. |
|
retest this please |
jasontedor
left a comment
There was a problem hiding this comment.
I left a few more comments.
| [[grok-watchdog]] | ||
| ==== Grok watchdog | ||
|
|
||
| Grok expression that take too long to execute are interrupted and |
| ==== Grok watchdog | ||
|
|
||
| Grok expression that take too long to execute are interrupted and | ||
| the the grok processor then fails with an exception. The grok |
| Grok expression that take too long to execute are interrupted and | ||
| the the grok processor then fails with an exception. The grok | ||
| processor has a watchdog thread that determines when evaluation | ||
| a grok expression takes too long and is controlled by the following |
There was a problem hiding this comment.
evaluation a -> evaluation of a
| * @return The maximum allowed time for a thread to invoke {@link #unregister()} after {@link #register()} | ||
| * has been invoked before this ThreadWatchDog starts to interrupting that thread. | ||
| */ | ||
| long maxExecutionTime(); |
There was a problem hiding this comment.
TimeValue is not accessible in the grok module. I don't think we want to make grok module depend on elasticsearch-core module?
There was a problem hiding this comment.
Ah, that's a shame. I agree. Can we make the name of this method reflect that the time is represented in milliseconds then?
| } | ||
| if (result == Matcher.INTERRUPTED) { | ||
| throw new IllegalArgumentException("grok pattern matching was interrupted after [" + | ||
| threadWatchdog.maxExecutionTime() + "] ms"); |
There was a problem hiding this comment.
I think that if we keep ThreadWatchdog#maxExecutionTime as TimeValue then we can let TimeValue do the formatting for us?
| threadWatchdog.unregister(); | ||
| } | ||
| if (result == Matcher.INTERRUPTED) { | ||
| throw new IllegalArgumentException("grok pattern matching was interrupted after [" + |
There was a problem hiding this comment.
IllegalArgumentException feels wrong to me? I think a generic RuntimeException is okay?
There was a problem hiding this comment.
I think that now too. We're just not able to digest a grok expression here and interrupting a thread was our last resort, so it is not the user's fault.
…take too long (#31024) This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search() This method can hang forever if the regex expression is too complex. The thread interrupter in the background checks every 3 seconds whether there are threads execution the org.joni.Matcher#search() method for longer than 5 seconds and if so interrupts these threads. Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and if so returns org.joni.Matcher#INTERRUPTED Closes #28731
* master: Remove RestGetAllAliasesAction (#31308) Temporary fix for broken build Reenable Checkstyle's unused import rule (#31270) Remove remaining unused imports before merging #31270 Fix non-REST doc snippet [DOC] Extend SQL docs Immediately flush channel after writing to buffer (#31301) [DOCS] Shortens ML API intros Use quotes in the call invocation (#31249) move security ingest processors to a sub ingest directory (#31306) Add 5.6.11 version constant. Fix version detection. SQL: Whitelist SQL utility class for better scripting (#30681) [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299) CCS: don't proxy requests for already connected node (#31273) Mute ScriptedMetricAggregatorTests testSelfReferencingAggStateAfterMap [test] opensuse packaging turn up debug logging Add unreleased version 6.3.1 Removes experimental tag from scripted_metric aggregation (#31298) [Rollup] Metric config parser must use builder so validation runs (#31159) [ML] Check licence when datafeeds use cross cluster search (#31247) Add notion of internal index settings (#31286) Test: Remove broken yml test feature (#31255) REST hl client: cluster health to default to cluster level (#31268) [ML] Update test thresholds to account for changes to memory control (#31289) Log warnings when cluster state publication failed to some nodes (#31233) Fix AntFixture waiting condition (#31272) Ignore numeric shard count if waiting for ALL (#31265) [ML] Implement new rules design (#31110) index_prefixes back-compat should test 6.3 (#30951) Core: Remove plain execute method on TransportAction (#30998) Update checkstyle to 8.10.1 (#31269) Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202) Modify pipelining handlers to require full requests (#31280) Revert upgrade to Netty 4.1.25.Final (#31282) Use armored input stream for reading public key (#31229) Fix Netty 4 Server Transport tests. Again. REST hl client: adjust wait_for_active_shards param in cluster health (#31266) REST high-level Client: remove deprecated API methods (#31200) [DOCS] Mark SQL feature as experimental [DOCS] Updates machine learning custom URL screenshots (#31222) Fix naming conventions check for XPackTestCase Fix security Netty 4 transport tests Fix race in clear scroll (#31259) [DOCS] Clarify audit index settings when remote indexing (#30923) Delete typos in SAML docs (#31199) REST high-level client: add Cluster Health API (#29331) [ML][TEST] Mute tests using rules (#31204) Support RequestedAuthnContext (#31238) SyncedFlushResponse to implement ToXContentObject (#31155) Add Get Aliases API to the high-level REST client (#28799) Remove some line length supressions (#31209) Validate xContentType in PutWatchRequest. (#31088) [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024) Suppress extras FS on caching directory tests Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)" Revert "Fix snippets in upgrade docs" Fix snippets in upgrade docs [DOCS] Added 6.3 info & updated the upgrade table. (#30940) LLClient: Support host selection (#30523) Upgrade to Netty 4.1.25.Final (#31232) Enable custom credentials for core REST tests (#31235) Move ESIndexLevelReplicationTestCase to test framework (#31243) Encapsulate Translog in Engine (#31220) HLRest: Add get index templates API (#31161) Remove all unused imports and fix CRLF (#31207) [Tests] Fix self-referencing tests [TEST] Fix testRecoveryAfterPrimaryPromotion [Docs] Remove mention pattern files in Grok processor (#31170) Use stronger write-once semantics for Azure repository (#30437) Don't swallow exceptions on replication (#31179) Limit the number of concurrent requests per node (#31206) Call ensureNoSelfReferences() on _agg state variable after scripted metric agg script executions (#31044) Move java version checker back to its own jar (#30708) [test] add fix for rare virtualbox error (#31212)
* 6.x: SQL: Fix build on Java 10 [Tests] Mutualize fixtures code in BaseHttpFixture (#31210) [TEST] Fix RemoteClusterClientTests#testEnsureWeReconnect [ML] Update test thresholds to account for changes to memory control (#31289) Reenable Checkstyle's unused import rule (#31270) [ML] Check licence when datafeeds use cross cluster search (#31247) Fix non-REST doc snippet [DOC] Extend SQL docs [DOCS] Shortens ML API intros Use quotes in the call invocation (#31249) move security ingest processors to a sub ingest directory (#31306) SQL: Whitelist SQL utility class for better scripting (#30681) Add 5.6.11 version constant. Fix version detection. [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299) Add missing release notes. Security: fix token bwc with pre 6.0.0-beta2 (#31254) Fix compilation error in UpdateSettingsIT (#31304) Test: Remove broken yml test feature (#31255) Add unreleased version 6.3.1 [Rollup] Metric config parser must use builder so validation runs (#31159) Removes experimental tag from scripted_metric aggregation (#31298) [DOCS] Removes coming tag from 6.3.0 release notes 6.3 release notes. Add notion of internal index settings (#31286) REST high-level client: add Cluster Health API (#29331) Remove leftover usage of deprecated client API SyncedFlushResponse to implement ToXContentObject (#31155) Add Get Aliases API to the high-level REST client (#28799) HLRest: Add get index templates API (#31161) Log warnings when cluster state publication failed to some nodes (#31233) Fix AntFixture waiting condition (#31272) [TEST] Mute RecoveryIT.testHistoryUUIDIsGenerated Ignore numeric shard count if waiting for ALL (#31265) Update checkstyle to 8.10.1 (#31269) Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202) Revert upgrade to Netty 4.1.25.Final (#31282) Use armored input stream for reading public key (#31229) [DOCS] Added 'fail_on_unsupported_field' param to MLT. Closes #28008 (#31160) Fix Netty 4 Server Transport tests. Again. [DOCS] Fixed typo. [DOCS] Added release highlights for 6.3 (#31256) [DOCS] Mark SQL feature as experimental [DOCS] Updates machine learning custom URL screenshots (#31222) Fix naming conventions check for XPackTestCase Fix security Netty 4 transport tests Fix race in clear scroll (#31259) [DOCS] Clarify audit index settings when remote indexing (#30923) [ML][TEST] Mute tests using rules (#31204) Support RequestedAuthnContext (#31238) Validate xContentType in PutWatchRequest. (#31088) [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024) Upgrade to Netty 4.1.25.Final (#31232) Suppress extras FS on caching directory tests Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)" Revert "Fix snippets in upgrade docs" Fix snippets in upgrade docs [DOCS] Added 6.3 info & updated the upgrade table. (#30940) Enable custom credentials for core REST tests (#31235) Move ESIndexLevelReplicationTestCase to test framework (#31243) Encapsulate Translog in Engine (#31220) [DOCS] Adds machine learning 6.3.0 release notes (#31217) Remove all unused imports and fix CRLF (#31207) [TEST] Fix testRecoveryAfterPrimaryPromotion [Docs] Remove mention pattern files in Grok processor (#31170) Use stronger write-once semantics for Azure repository (#30437) Don't swallow exceptions on replication (#31179) Compliant SAML Response destination check (#31175) Move java version checker back to its own jar (#30708) TEST: Retry synced-flush if ongoing ops on primary (#30978) [test] add fix for rare virtualbox error (#31212)
This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.
The thread interrupter in the background checks every 3 seconds whether there are threads
execution the
org.joni.Matcher#search(...)method for longer than 5 seconds andif so interrupts these threads.
Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns
org.joni.Matcher#INTERRUPTEDPR for #28731