Skip to content

Conversation

@miguelgrinberg
Copy link
Contributor

@miguelgrinberg miguelgrinberg commented Sep 8, 2025

This change adds a timeout controlled flush to the streaming bulk helper.

Fixes #3051

Tasks:

  • flush sentinel object
  • flush unit tests
  • flush integration tests
  • timeout async support
  • timeout sync support
  • timeout integration tests
  • docstring updates

@miguelgrinberg miguelgrinberg force-pushed the bulk-flush branch 3 times, most recently from cd73776 to 12762fb Compare September 9, 2025 16:05
@miguelgrinberg miguelgrinberg force-pushed the bulk-flush branch 5 times, most recently from ce6fd31 to 96b3469 Compare September 16, 2025 14:37
timestamps.append(time.time())

# make sure there is a pause between the writing of the 2nd and 3rd items
assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert may look strange. The reason I'm validating flushing by looking at timestamp differences is to attempt to not use any fixed timings and avoid a flaky test.

@miguelgrinberg miguelgrinberg marked this pull request as ready for review September 16, 2025 15:56
@miguelgrinberg miguelgrinberg changed the title Add timeout option to streaming_bulk() Add flush_after_seconds option to streaming_bulk() Sep 16, 2025
Copy link
Member

@pquentin pquentin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM.

Comment on lines +88 to +89
The thread is automatically joined when the block ends. If the thread raised
an exception, it is raised in the caller's context.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, do you have an example traceback that we get with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example test script:

import time
from elasticsearch.compat import safe_thread

def my_thread():
    print("my_thread started")
    1/0

def main():
    print("main started")
    with safe_thread(target=my_thread):
        time.sleep(1)

main()

Output:

❯ python threadtest.py
main started
my_thread started
Traceback (most recent call last):
  File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 14, in <module>
    main()
    ~~~~^^
  File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 10, in main
    with safe_thread(target=my_thread):
         ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
  File "/Users/miguelgrinberg/.local/share/mise/installs/python/3.14.0/lib/python3.14/contextlib.py", line 148, in __exit__
    next(self.gen)
    ~~~~^^^^^^^^^^
  File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 105, in safe_thread
    raise captured_exception
  File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 95, in run
    target(*args, **kwargs)
    ~~~~~~^^^^^^^^^^^^^^^^^
  File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/s.py", line 6, in my_thread
    1/0
    ~^~
ZeroDivisionError: division by zero

@miguelgrinberg miguelgrinberg merged commit 6fbdecb into elastic:main Oct 15, 2025
18 checks passed
@miguelgrinberg miguelgrinberg deleted the bulk-flush branch October 15, 2025 11:56
github-actions bot pushed a commit that referenced this pull request Oct 15, 2025
* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)
github-actions bot pushed a commit that referenced this pull request Oct 15, 2025
* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)
github-actions bot pushed a commit that referenced this pull request Oct 15, 2025
* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)
miguelgrinberg added a commit that referenced this pull request Oct 15, 2025
* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)

Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
miguelgrinberg added a commit that referenced this pull request Oct 15, 2025
* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)

Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
miguelgrinberg added a commit that referenced this pull request Oct 16, 2025
#3115)

* Add `flush_after_seconds` option to `streaming_bulk()` (#3064)

* Add flush option to streaming_bulk()

* unit tests

* bulk timeouts

* use context manager to run the timeout background tasks

* format code

* integration tests

* docstrings

(cherry picked from commit 6fbdecb)

* fix safe_task type hint for Python 3.8

---------

Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support timeout-based flushing in async_streaming_bulk

2 participants