-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add flush_after_seconds option to streaming_bulk()
#3064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cd73776 to
12762fb
Compare
ce6fd31 to
96b3469
Compare
96b3469 to
9d734a1
Compare
| timestamps.append(time.time()) | ||
|
|
||
| # make sure there is a pause between the writing of the 2nd and 3rd items | ||
| assert timestamps[2] - timestamps[1] > (timestamps[1] - timestamps[0]) * 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert may look strange. The reason I'm validating flushing by looking at timestamp differences is to attempt to not use any fixed timings and avoid a flaky test.
flush_after_seconds option to streaming_bulk()
pquentin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
| The thread is automatically joined when the block ends. If the thread raised | ||
| an exception, it is raised in the caller's context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, do you have an example traceback that we get with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example test script:
import time
from elasticsearch.compat import safe_thread
def my_thread():
print("my_thread started")
1/0
def main():
print("main started")
with safe_thread(target=my_thread):
time.sleep(1)
main()Output:
❯ python threadtest.py
main started
my_thread started
Traceback (most recent call last):
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 14, in <module>
main()
~~~~^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/threadtest.py", line 10, in main
with safe_thread(target=my_thread):
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File "/Users/miguelgrinberg/.local/share/mise/installs/python/3.14.0/lib/python3.14/contextlib.py", line 148, in __exit__
next(self.gen)
~~~~^^^^^^^^^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 105, in safe_thread
raise captured_exception
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/elasticsearch/compat.py", line 95, in run
target(*args, **kwargs)
~~~~~~^^^^^^^^^^^^^^^^^
File "/Users/miguelgrinberg/Documents/dev/elasticsearch-py/s.py", line 6, in my_thread
1/0
~^~
ZeroDivisionError: division by zero
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb)
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
* Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
#3115) * Add `flush_after_seconds` option to `streaming_bulk()` (#3064) * Add flush option to streaming_bulk() * unit tests * bulk timeouts * use context manager to run the timeout background tasks * format code * integration tests * docstrings (cherry picked from commit 6fbdecb) * fix safe_task type hint for Python 3.8 --------- Co-authored-by: Miguel Grinberg <miguel.grinberg@gmail.com>
This change adds a timeout controlled flush to the streaming bulk helper.
Fixes #3051
Tasks: