Add flush_after_seconds option to streaming_bulk()#3064
Merged
miguelgrinberg merged 7 commits intoelastic:mainfrom Oct 15, 2025
Merged
Add flush_after_seconds option to streaming_bulk()#3064miguelgrinberg merged 7 commits intoelastic:mainfrom
flush_after_seconds option to streaming_bulk()#3064miguelgrinberg merged 7 commits intoelastic:mainfrom
Conversation
cd73776 to
12762fb
Compare
miguelgrinberg
commented
Sep 9, 2025
miguelgrinberg
commented
Sep 9, 2025
miguelgrinberg
commented
Sep 9, 2025
miguelgrinberg
commented
Sep 9, 2025
miguelgrinberg
commented
Sep 9, 2025
pquentin
reviewed
Sep 10, 2025
ce6fd31 to
96b3469
Compare
96b3469 to
9d734a1
Compare
miguelgrinberg
commented
Sep 16, 2025
| timestamps.append(time.time()) | ||
|
|
||
| # make sure there is a pause between the writing of the 2nd and 3rd items | ||
| assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 |
Contributor
Author
There was a problem hiding this comment.
This assert may look strange. The reason I'm validating flushing by looking at timestamp differences is to attempt to not use any fixed timings and avoid a flaky test.
flush_after_seconds option to streaming_bulk()
pquentin
approved these changes
Oct 14, 2025
Comment on lines
+88
to
+89
| The thread is automatically joined when the block ends. If the thread raised | ||
| an exception, it is raised in the caller's context. |
Member
There was a problem hiding this comment.
Out of curiosity, do you have an example traceback that we get with this?
Contributor
Author
There was a problem hiding this comment.
Example test script:
import time
from elasticsearch.compat import safe_thread
def my_thread():
print("my_thread started")
1/0
def main():
print("main started")
with safe_thread(target=my_thread):
time.sleep(1)
main()Output:
❯ python threadtest.py
main started
my_thread started
Traceback (most recent call last):
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 14, in <module>
main()
~~~~^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 10, in main
with safe_thread(target=my_thread):
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File "/Users/miguelgrinberg/.local/share/mise/installs/python/3.14.0/lib/python3.14/contextlib.py", line 148, in __exit__
next(self.gen)
~~~~^^^^^^^^^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 105, in safe_thread
raise captured_exception
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 95, in run
target(*args, **kwargs)
~~~~~~^^^^^^^^^^^^^^^^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/s.py", line 6, in my_thread
1/0
~^~
ZeroDivisionError: division by zero
github-actions bot
pushed a commit
that referenced
this pull request
Oct 15, 2025
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
github-actions bot
pushed a commit
that referenced
this pull request
Oct 15, 2025
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
github-actions bot
pushed a commit
that referenced
this pull request
Oct 15, 2025
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
miguelgrinberg
added a commit
that referenced
this pull request
Oct 15, 2025
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
miguelgrinberg
added a commit
that referenced
this pull request
Oct 15, 2025
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
miguelgrinberg
added a commit
that referenced
this pull request
Oct 16, 2025
#3115) * Add `flush_after_seconds` option to `streaming_bulk()` (#3064) * Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) * fix safe_task type hint for Python 3.8 --------- Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This change adds a timeout controlled flush to the streaming bulk helper.
Fixes #3051
Tasks: