-
Notifications
You must be signed in to change notification settings - Fork 13
Support terminate processor. #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This pull request does not have a backport label. Could you fix it @mashhurs? 🙏
|
| '{ | ||
| "terminate": { | ||
| "if": "ctx.error != null", | ||
| "tag": "terminated_ingest_pipeline" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wished this tag be available in _ingest_document so that I can expect(event.get("[tag]")).to eql? "terminated_ingest_pipeline" but no tag exists.
| org.elasticsearch.ingest.common.SetProcessor.TYPE, | ||
| org.elasticsearch.ingest.common.SortProcessor.TYPE, | ||
| org.elasticsearch.ingest.common.SplitProcessor.TYPE, | ||
| "terminate", // note: upstream constant is package-private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TerminateProcessor is also package private like dot_expander https://github.com/elastic/logstash-filter-elastic_integration/blob/main/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java#L103
| "data_stream" => data_stream)] | ||
|
|
||
| subject.multi_filter(events).each do |event| | ||
| expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand what exactly this assertion proves WRT the termination? Looks like with "if": "ctx.error != null" the terminate will trigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After plugin successfully executes the pipeline, it let's know (set the [@metadata][target_ingest_pipeline] to _none) es-output to avoid ingest pipeline execution in Elasticsearch side.
what exactly this assertion proves WRT the termination?
Yeah, this one I tried to highlight here that terminate processor doesn't attach any specific fields we can validate (it seems tag is for debug purpose). I have updated the integration test which adds append processor after terminate and validates to make sure append processor didn't execute.
Correspond ES query as an integration test would like this:
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"processors": [
{
"terminate": {
"if": "ctx.error != null",
"tag": "terminated_pipeline"
}
},
{
"append": {
"field": "message",
"value": "my message value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id1",
"_source": {
"foo": "bar",
"error": "some exception"
}
},
{
"_index": "index",
"_id": "id2",
"_source": {
"foo": "rab"
}
}
]
}
In the result, we can see id1 will not have message since pipeline was terminated by terminate processor:
{
"docs": [
{
"doc": {
"_index": "index",
"_version": "-3",
"_id": "id1",
"_source": {
"error": "some exception",
"foo": "bar"
},
"_ingest": {
"timestamp": "2025-07-15T23:56:43.427866851Z"
}
}
},
{
"doc": {
"_index": "index",
"_version": "-3",
"_id": "id2",
"_source": {
"message": [
"my message value"
],
"foo": "rab"
},
"_ingest": {
"timestamp": "2025-07-15T23:56:43.427882279Z"
}
}
}
]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That is very helpful. Love that idea!
…xpectation that after terminate processor no other processors should run.
💚 Build Succeeded
History
|
donoghuc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed breakdown. That is very helpful. Appreciate the extra context.
|
Just FYI: |
|
@Mergifyio backport 8.17 8.18 8.19 9.0 9.1 |
✅ Backports have been created
|
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: Mashhur <mashhur.sattorov@elastic.co>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: Mashhur <mashhur.sattorov@elastic.co>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and bump version. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: Mashhur <mashhur.sattorov@elastic.co>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and bump version. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: Mashhur <mashhur.sattorov@elastic.co>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: Mashhur <mashhur.sattorov@elastic.co>
Registers terminate processor to the support list.